Zum Hauptinhalt springen

Streaming + Asynchron

FeatureLiteLLM SDKLiteLLM Proxy
Streaming✅ start here✅ start here
Async✅ start here✅ start here
Async Streaming✅ start here✅ start here

Streaming Responses​

LiteLLM unterstützt das Streamen der Modellantwort, indem stream=True als Argument an die Completion-Funktion übergeben wird.

Verwendung​

from litellm import completion
messages = [{"role": "user", "content": "Hey, how's it going?"}]
response = completion(model="gpt-3.5-turbo", messages=messages, stream=True)
for part in response:
print(part.choices[0].delta.content or "")

Hilfsfunktion​

LiteLLM stellt auch eine Hilfsfunktion zur Verfügung, um die vollständige Streaming-Antwort aus der Liste der Chunks wiederherzustellen.

from litellm import completion
messages = [{"role": "user", "content": "Hey, how's it going?"}]
response = completion(model="gpt-3.5-turbo", messages=messages, stream=True)

for chunk in response:
chunks.append(chunk)

print(litellm.stream_chunk_builder(chunks, messages=messages))

Async Completion​

Asynchrone Completion mit LiteLLM. LiteLLM bietet eine asynchrone Version der Completion-Funktion namens acompletion.

Verwendung​

from litellm import acompletion
import asyncio

async def test_get_response():
user_message = "Hello, how are you?"
messages = [{"content": user_message, "role": "user"}]
response = await acompletion(model="gpt-3.5-turbo", messages=messages)
return response

response = asyncio.run(test_get_response())
print(response)

Async Streaming​

Wir haben eine Funktion __anext__() im zurückgegebenen Streaming-Objekt implementiert. Dies ermöglicht die asynchrone Iteration über das Streaming-Objekt.

Verwendung​

Hier ist ein Beispiel für die Verwendung mit OpenAI.

from litellm import acompletion
import asyncio, os, traceback

async def completion_call():
try:
print("test acompletion + streaming")
response = await acompletion(
model="gpt-3.5-turbo",
messages=[{"content": "Hello, how are you?", "role": "user"}],
stream=True
)
print(f"response: {response}")
async for chunk in response:
print(chunk)
except:
print(f"error occurred: {traceback.format_exc()}")
pass

asyncio.run(completion_call())

Fehlerbehandlung - Endlosschleifen​

Manchmal kann ein Modell in eine Endlosschleife geraten und dieselben Chunks wiederholen - z.B. Issue.

Brechen Sie diese mit

litellm.REPEATED_STREAMING_CHUNK_LIMIT = 100 # # catch if model starts looping the same chunk while streaming. Uses high default to prevent false positives.

LiteLLM bietet eine Fehlerbehandlung dafür, indem geprüft wird, ob ein Chunk 'n' Mal wiederholt wird (Standard ist 100). Wenn dieses Limit überschritten wird, wird eine litellm.InternalServerError ausgelöst, damit Wiederholungslogik stattfinden kann.

import litellm 
import os

litellm.set_verbose = False
loop_amount = litellm.REPEATED_STREAMING_CHUNK_LIMIT + 1
chunks = [
litellm.ModelResponse(**{
"id": "chatcmpl-123",
"object": "chat.completion.chunk",
"created": 1694268190,
"model": "gpt-3.5-turbo-0125",
"system_fingerprint": "fp_44709d6fcb",
"choices": [
{"index": 0, "delta": {"content": "How are you?"}, "finish_reason": "stop"}
],
}, stream=True)
] * loop_amount
completion_stream = litellm.ModelResponseListIterator(model_responses=chunks)

response = litellm.CustomStreamWrapper(
completion_stream=completion_stream,
model="gpt-3.5-turbo",
custom_llm_provider="cached_response",
logging_obj=litellm.Logging(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey"}],
stream=True,
call_type="completion",
start_time=time.time(),
litellm_call_id="12345",
function_id="1245",
),
)

for chunk in response:
continue # expect to raise InternalServerError