Streaming + Asynchron
| Feature | LiteLLM SDK | LiteLLM Proxy |
|---|---|---|
| Streaming | ✅ start here | ✅ start here |
| Async | ✅ start here | ✅ start here |
| Async Streaming | ✅ start here | ✅ start here |
Streaming Responses​
LiteLLM unterstützt das Streamen der Modellantwort, indem stream=True als Argument an die Completion-Funktion übergeben wird.
Verwendung​
from litellm import completion
messages = [{"role": "user", "content": "Hey, how's it going?"}]
response = completion(model="gpt-3.5-turbo", messages=messages, stream=True)
for part in response:
print(part.choices[0].delta.content or "")
Hilfsfunktion​
LiteLLM stellt auch eine Hilfsfunktion zur Verfügung, um die vollständige Streaming-Antwort aus der Liste der Chunks wiederherzustellen.
from litellm import completion
messages = [{"role": "user", "content": "Hey, how's it going?"}]
response = completion(model="gpt-3.5-turbo", messages=messages, stream=True)
for chunk in response:
chunks.append(chunk)
print(litellm.stream_chunk_builder(chunks, messages=messages))
Async Completion​
Asynchrone Completion mit LiteLLM. LiteLLM bietet eine asynchrone Version der Completion-Funktion namens acompletion.
Verwendung​
from litellm import acompletion
import asyncio
async def test_get_response():
user_message = "Hello, how are you?"
messages = [{"content": user_message, "role": "user"}]
response = await acompletion(model="gpt-3.5-turbo", messages=messages)
return response
response = asyncio.run(test_get_response())
print(response)
Async Streaming​
Wir haben eine Funktion __anext__() im zurückgegebenen Streaming-Objekt implementiert. Dies ermöglicht die asynchrone Iteration über das Streaming-Objekt.
Verwendung​
Hier ist ein Beispiel für die Verwendung mit OpenAI.
from litellm import acompletion
import asyncio, os, traceback
async def completion_call():
try:
print("test acompletion + streaming")
response = await acompletion(
model="gpt-3.5-turbo",
messages=[{"content": "Hello, how are you?", "role": "user"}],
stream=True
)
print(f"response: {response}")
async for chunk in response:
print(chunk)
except:
print(f"error occurred: {traceback.format_exc()}")
pass
asyncio.run(completion_call())
Fehlerbehandlung - Endlosschleifen​
Manchmal kann ein Modell in eine Endlosschleife geraten und dieselben Chunks wiederholen - z.B. Issue.
Brechen Sie diese mit
litellm.REPEATED_STREAMING_CHUNK_LIMIT = 100 # # catch if model starts looping the same chunk while streaming. Uses high default to prevent false positives.
LiteLLM bietet eine Fehlerbehandlung dafür, indem geprüft wird, ob ein Chunk 'n' Mal wiederholt wird (Standard ist 100). Wenn dieses Limit überschritten wird, wird eine litellm.InternalServerError ausgelöst, damit Wiederholungslogik stattfinden kann.
- SDK
- PROXY
import litellm
import os
litellm.set_verbose = False
loop_amount = litellm.REPEATED_STREAMING_CHUNK_LIMIT + 1
chunks = [
litellm.ModelResponse(**{
"id": "chatcmpl-123",
"object": "chat.completion.chunk",
"created": 1694268190,
"model": "gpt-3.5-turbo-0125",
"system_fingerprint": "fp_44709d6fcb",
"choices": [
{"index": 0, "delta": {"content": "How are you?"}, "finish_reason": "stop"}
],
}, stream=True)
] * loop_amount
completion_stream = litellm.ModelResponseListIterator(model_responses=chunks)
response = litellm.CustomStreamWrapper(
completion_stream=completion_stream,
model="gpt-3.5-turbo",
custom_llm_provider="cached_response",
logging_obj=litellm.Logging(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey"}],
stream=True,
call_type="completion",
start_time=time.time(),
litellm_call_id="12345",
function_id="1245",
),
)
for chunk in response:
continue # expect to raise InternalServerError
Definieren Sie dies in Ihrer config.yaml auf dem Proxy.
litellm_settings:
REPEATED_STREAMING_CHUNK_LIMIT: 100 # this overrides the litellm default
Der Proxy verwendet das LiteLLM SDK. Um zu überprüfen, ob dies funktioniert, versuchen Sie den 'SDK'-Code-Schnipsel.