Router - Lastenausgleich
LiteLLM verwaltet
- Lastverteilung über mehrere Deployments (z. B. Azure/OpenAI)
- Priorisierung wichtiger Anfragen, um sicherzustellen, dass sie nicht fehlschlagen (d. h. Warteschlangenbildung)
- Grundlegende Zuverlässigkeitslogik - Cooldowns, Fallbacks, Timeouts und Wiederholungsversuche (fest + exponentielle Rückfallverzögerung) über mehrere Deployments/Anbieter.
In der Produktion unterstützt litellm die Verwendung von Redis zur Nachverfolgung von Cooldown-Servern und zur Nutzung (Verwaltung von TPM/RPM-Limits).
Wenn Sie einen Server zur Lastverteilung über verschiedene LLM-APIs wünschen, verwenden Sie unseren LiteLLM Proxy Server
Lastverteilung
(s/o @paulpierre und Sweep Proxy für ihre Beiträge zu dieser Implementierung) Code anzeigen
Schnellstart
Lastverteilung über mehrere Azure/Bedrock/Anbieter-Deployments. LiteLLM kümmert sich um Wiederholungsversuche in verschiedenen Regionen, falls ein Aufruf fehlschlägt.
- SDK
- PROXY
from litellm import Router
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias -> loadbalance between models with same `model_name`
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
}
}, {
"model_name": "gpt-4",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/gpt-4",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION"),
}
}, {
"model_name": "gpt-4",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-4",
"api_key": os.getenv("OPENAI_API_KEY"),
}
},
]
router = Router(model_list=model_list)
# openai.ChatCompletion.create replacement
# requests with model="gpt-3.5-turbo" will pick a deployment where model_name="gpt-3.5-turbo"
response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}])
print(response)
# openai.ChatCompletion.create replacement
# requests with model="gpt-4" will pick a deployment where model_name="gpt-4"
response = await router.acompletion(model="gpt-4",
messages=[{"role": "user", "content": "Hey, how's it going?"}])
print(response)
Detaillierte Proxy-Lastverteilungs-/Fallback-Dokumentation finden Sie hier
- Modellliste mit mehreren Deployments einrichten
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/<your-deployment-name>
api_base: <your-azure-endpoint>
api_key: <your-azure-api-key>
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/gpt-turbo-small-ca
api_base: https://my-endpoint-canada-berri992.openai.azure.com/
api_key: <your-azure-api-key>
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/gpt-turbo-large
api_base: https://openai-france-1234.openai.azure.com/
api_key: <your-azure-api-key>
- Proxy starten
litellm --config /path/to/config.yaml
- Testen Sie es!
curl -X POST 'http://0.0.0.0:4000/chat/completions' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer sk-1234' \
-d '{
"model": "gpt-3.5-turbo",
"messages": [
{"role": "user", "content": "Hi there!"}
],
"mock_testing_rate_limit_error": true
}'
Verfügbare Endpunkte
router.completion()- Chat-Completion-Endpunkt für über 100 LLMsrouter.acompletion()- asynchrone Chat-Completion-Aufruferouter.embedding()- Embedding-Endpunkt für Azure, OpenAI, Huggingface-Endpunkterouter.aembedding()- asynchrone Embedding-Aufruferouter.text_completion()- Completion-Aufrufe im alten OpenAI/v1/completions-Endpunktformatrouter.atext_completion()- asynchrone Text-Completion-Aufruferouter.image_generation()- Completion-Aufrufe im OpenAI/v1/images/generations-Endpunktformatrouter.aimage_generation()- asynchrone Bildgenerierungsaufrufe
Erweitert - Routing-Strategien ⭐️
Routing-Strategien - Gewichtet, Ratenbegrenzung bewusst, am wenigsten beschäftigt, Latenz-basiert, Kosten-basiert
Der Router bietet 4 Strategien zum Routen Ihrer Aufrufe über mehrere Deployments.
- Ratenbegrenzung-bewusst V2 (ASYNC)
- Latenz-basiert
- (Standard) Gewichtete Auswahl (Async)
- Ratenbegrenzung-bewusst
- Am wenigsten beschäftigt
- Benutzerdefinierte Routing-Strategie
- Niedrigste Kosten-Routing (Async)
🎉 NEU Dies ist eine asynchrone Implementierung von nutzungsbasierter Routenplanung.
Filtert Deployment heraus, wenn TPM/RPM-Limit überschritten wird - Wenn Sie die TPM/RPM-Limits des Deployments angeben.
Routet zum **Deployment mit der geringsten TPM-Auslastung** für diese Minute.
In der Produktion verwenden wir Redis, um die Auslastung (TPM/RPM) über mehrere Deployments hinweg zu verfolgen. Diese Implementierung verwendet **asynchrone Redis-Aufrufe** (redis.incr und redis.mget).
Für Azure erhalten Sie 6 RPM pro 1000 TPM
- SDK
- Proxy
from litellm import Router
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
"tpm": 100000,
"rpm": 10000,
},
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
"tpm": 100000,
"rpm": 1000,
},
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
"tpm": 100000,
"rpm": 1000,
},
}]
router = Router(model_list=model_list,
redis_host=os.environ["REDIS_HOST"],
redis_password=os.environ["REDIS_PASSWORD"],
redis_port=os.environ["REDIS_PORT"],
routing_strategy="usage-based-routing-v2" # 👈 KEY CHANGE
enable_pre_call_checks=True, # enables router rate limits for concurrent calls
)
response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
print(response)
1. Strategie in der Konfiguration festlegen
model_list:
- model_name: gpt-3.5-turbo # model alias
litellm_params: # params for litellm completion/embedding call
model: azure/chatgpt-v-2 # actual model name
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
tpm: 100000
rpm: 10000
- model_name: gpt-3.5-turbo
litellm_params: # params for litellm completion/embedding call
model: gpt-3.5-turbo
api_key: os.getenv(OPENAI_API_KEY)
tpm: 100000
rpm: 1000
router_settings:
routing_strategy: usage-based-routing-v2 # 👈 KEY CHANGE
redis_host: <your-redis-host>
redis_password: <your-redis-password>
redis_port: <your-redis-port>
enable_pre_call_check: true
general_settings:
master_key: sk-1234
2. Proxy starten
litellm --config /path/to/config.yaml
3. Testen!
curl --location 'https://:4000/v1/chat/completions' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer sk-1234' \
--data '{
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Hey, how's it going?"}]
}'
Wählt das Deployment mit der niedrigsten Antwortzeit aus.
Es speichert und aktualisiert die Antwortzeiten für Deployments basierend darauf, wann eine Anfrage gesendet und von einem Deployment empfangen wurde.
from litellm import Router
import asyncio
model_list = [{ ... }]
# init router
router = Router(model_list=model_list,
routing_strategy="latency-based-routing",# 👈 set routing strategy
enable_pre_call_check=True, # enables router rate limits for concurrent calls
)
## CALL 1+2
tasks = []
response = None
final_response = None
for _ in range(2):
tasks.append(router.acompletion(model=model, messages=messages))
response = await asyncio.gather(*tasks)
if response is not None:
## CALL 3
await asyncio.sleep(1) # let the cache update happen
picked_deployment = router.lowestlatency_logger.get_available_deployments(
model_group=model, healthy_deployments=router.healthy_deployments
)
final_response = await router.acompletion(model=model, messages=messages)
print(f"min deployment id: {picked_deployment}")
print(f"model id: {final_response._hidden_params['model_id']}")
assert (
final_response._hidden_params["model_id"]
== picked_deployment["model_info"]["id"]
)
Zeitfenster festlegen
Legen Sie das Zeitfenster fest, wie weit zurück betrachtet werden soll, wenn die Latenz für ein Deployment gemittelt wird.
Im Router
router = Router(..., routing_strategy_args={"ttl": 10})
Im Proxy
router_settings:
routing_strategy_args: {"ttl": 10}
Niedrigsten Latenz-Puffer festlegen
Legen Sie einen Puffer fest, innerhalb dessen Deployments als Kandidaten für Anrufe in Betracht gezogen werden.
Z.B.
Wenn Sie 5 Deployments haben
https://litellm-prod-1.openai.azure.com/: 0.07s
https://litellm-prod-2.openai.azure.com/: 0.1s
https://litellm-prod-3.openai.azure.com/: 0.1s
https://litellm-prod-4.openai.azure.com/: 0.1s
https://litellm-prod-5.openai.azure.com/: 4.66s
Um zunächst prod-1 mit allen Anfragen zu überlasten, können wir einen Puffer von 50 % festlegen, um die Deployments prod-2, prod-3, prod-4 zu berücksichtigen.
Im Router
router = Router(..., routing_strategy_args={"lowest_latency_buffer": 0.5})
Im Proxy
router_settings:
routing_strategy_args: {"lowest_latency_buffer": 0.5}
Standard Wählt ein Deployment basierend auf den angegebenen **Anfragen pro Minute (rpm) oder Token pro Minute (tpm)** aus.
Wenn rpm oder tpm nicht angegeben ist, wählt es zufällig ein Deployment aus.
Sie können auch einen weight-Parameter festlegen, um anzugeben, welches Modell wann ausgewählt werden soll.
- RPM-basierte Mischung
- Gewichtsbasierte Mischung
LiteLLM Proxy Config.yaml
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-v-2
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
rpm: 900
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-functioncalling
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
rpm: 10
Python SDK
from litellm import Router
import asyncio
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"rpm": 900, # requests per minute for this API
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"rpm": 10,
}
},]
# init router
router = Router(model_list=model_list, routing_strategy="simple-shuffle")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
return response
asyncio.run(router_acompletion())
LiteLLM Proxy Config.yaml
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-v-2
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
weight: 9
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-functioncalling
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
weight: 1
Python SDK
from litellm import Router
import asyncio
model_list = [{
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": {
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"weight": 9, # pick this 90% of the time
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"weight": 1,
}
}]
# init router
router = Router(model_list=model_list, routing_strategy="simple-shuffle")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
return response
asyncio.run(router_acompletion())
Dies leitet an das Deployment mit der niedrigsten TPM-Auslastung für diese Minute weiter.
In der Produktion verwenden wir Redis, um die Auslastung (TPM/RPM) über mehrere Deployments hinweg zu verfolgen.
Wenn Sie die TPM/RPM-Limits des Deployments angeben, wird dies auch dagegen geprüft und alle herausgefiltert, deren Limits überschritten würden.
Für Azure ist Ihr RPM = TPM/6.
from litellm import Router
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 100000,
"rpm": 10000,
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 100000,
"rpm": 1000,
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tpm": 100000,
"rpm": 1000,
}]
router = Router(model_list=model_list,
redis_host=os.environ["REDIS_HOST"],
redis_password=os.environ["REDIS_PASSWORD"],
redis_port=os.environ["REDIS_PORT"],
routing_strategy="usage-based-routing"
enable_pre_call_check=True, # enables router rate limits for concurrent calls
)
response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
print(response)
Wählt ein Deployment mit der geringsten Anzahl laufender Anrufe aus, das es bearbeitet.
from litellm import Router
import asyncio
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
}
}]
# init router
router = Router(model_list=model_list, routing_strategy="least-busy")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
return response
asyncio.run(router_acompletion())
Eine benutzerdefinierte Routing-Strategie einbinden, um Deployments auszuwählen.
Schritt 1. Definieren Sie Ihre benutzerdefinierte Routing-Strategie
from litellm.router import CustomRoutingStrategyBase
class CustomRoutingStrategy(CustomRoutingStrategyBase):
async def async_get_available_deployment(
self,
model: str,
messages: Optional[List[Dict[str, str]]] = None,
input: Optional[Union[str, List]] = None,
specific_deployment: Optional[bool] = False,
request_kwargs: Optional[Dict] = None,
):
"""
Asynchronously retrieves the available deployment based on the given parameters.
Args:
model (str): The name of the model.
messages (Optional[List[Dict[str, str]]], optional): The list of messages for a given request. Defaults to None.
input (Optional[Union[str, List]], optional): The input for a given embedding request. Defaults to None.
specific_deployment (Optional[bool], optional): Whether to retrieve a specific deployment. Defaults to False.
request_kwargs (Optional[Dict], optional): Additional request keyword arguments. Defaults to None.
Returns:
Returns an element from litellm.router.model_list
"""
print("In CUSTOM async get available deployment")
model_list = router.model_list
print("router model list=", model_list)
for model in model_list:
if isinstance(model, dict):
if model["litellm_params"]["model"] == "openai/very-special-endpoint":
return model
pass
def get_available_deployment(
self,
model: str,
messages: Optional[List[Dict[str, str]]] = None,
input: Optional[Union[str, List]] = None,
specific_deployment: Optional[bool] = False,
request_kwargs: Optional[Dict] = None,
):
"""
Synchronously retrieves the available deployment based on the given parameters.
Args:
model (str): The name of the model.
messages (Optional[List[Dict[str, str]]], optional): The list of messages for a given request. Defaults to None.
input (Optional[Union[str, List]], optional): The input for a given embedding request. Defaults to None.
specific_deployment (Optional[bool], optional): Whether to retrieve a specific deployment. Defaults to False.
request_kwargs (Optional[Dict], optional): Additional request keyword arguments. Defaults to None.
Returns:
Returns an element from litellm.router.model_list
"""
pass
Schritt 2. Initialisieren Sie den Router mit der benutzerdefinierten Routing-Strategie
from litellm import Router
router = Router(
model_list=[
{
"model_name": "azure-model",
"litellm_params": {
"model": "openai/very-special-endpoint",
"api_base": "https://exampleopenaiendpoint-production.up.railway.app/", # If you are Krrish, this is OpenAI Endpoint3 on our Railway endpoint :)
"api_key": "fake-key",
},
"model_info": {"id": "very-special-endpoint"},
},
{
"model_name": "azure-model",
"litellm_params": {
"model": "openai/fast-endpoint",
"api_base": "https://exampleopenaiendpoint-production.up.railway.app/",
"api_key": "fake-key",
},
"model_info": {"id": "fast-endpoint"},
},
],
set_verbose=True,
debug_level="DEBUG",
timeout=1,
) # type: ignore
router.set_custom_routing_strategy(CustomRoutingStrategy()) # 👈 Set your routing strategy here
Schritt 3. Testen Sie Ihre Routing-Strategie. Erwarten Sie, dass Ihre benutzerdefinierte Routing-Strategie aufgerufen wird, wenn Sie router.acompletion-Anfragen ausführen.
for _ in range(10):
response = await router.acompletion(
model="azure-model", messages=[{"role": "user", "content": "hello"}]
)
print(response)
_picked_model_id = response._hidden_params["model_id"]
print("picked model=", _picked_model_id)
Wählt ein Deployment basierend auf den niedrigsten Kosten aus.
So funktioniert es
- Alle gesunden Deployments abrufen
- Alle Deployments auswählen, die unter ihren angegebenen
rpm/tpm-Limits liegen - Prüfen Sie für jedes Deployment, ob
litellm_param["model"]inlitellm_model_cost_mapvorhanden ist.- Wenn das Deployment nicht in
litellm_model_cost_mapvorhanden ist -> verwenden Sie deployment_cost =$1.
- Wenn das Deployment nicht in
- Deployment mit den niedrigsten Kosten auswählen
from litellm import Router
import asyncio
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {"model": "gpt-4"},
"model_info": {"id": "openai-gpt-4"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {"model": "groq/llama3-8b-8192"},
"model_info": {"id": "groq-llama"},
},
]
# init router
router = Router(model_list=model_list, routing_strategy="cost-based-routing")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
print(response._hidden_params["model_id"]) # expect groq-llama, since groq/llama has lowest cost
return response
asyncio.run(router_acompletion())
Verwendung benutzerdefinierter Ein- und Ausgabe-Preise
Setzen Sie litellm_params["input_cost_per_token"] und litellm_params["output_cost_per_token"], um benutzerdefinierte Preise beim Routing zu verwenden.
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"input_cost_per_token": 0.00003,
"output_cost_per_token": 0.00003,
},
"model_info": {"id": "chatgpt-v-experimental"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-1",
"input_cost_per_token": 0.000000001,
"output_cost_per_token": 0.00000001,
},
"model_info": {"id": "chatgpt-v-1"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-5",
"input_cost_per_token": 10,
"output_cost_per_token": 12,
},
"model_info": {"id": "chatgpt-v-5"},
},
]
# init router
router = Router(model_list=model_list, routing_strategy="cost-based-routing")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
print(response._hidden_params["model_id"]) # expect chatgpt-v-1, since chatgpt-v-1 has lowest cost
return response
asyncio.run(router_acompletion())
Grundlegende Zuverlässigkeit
Gewichtete Deployments
Setzen Sie weight für ein Deployment, um ein Deployment häufiger als andere auszuwählen.
Dies funktioniert über die Routing-Strategie **simple-shuffle** (dies ist die Standardeinstellung, wenn keine Routing-Strategie ausgewählt ist).
- SDK
- PROXY
from litellm import Router
model_list = [
{
"model_name": "o1",
"litellm_params": {
"model": "o1-preview",
"api_key": os.getenv("OPENAI_API_KEY"),
"weight": 1
},
},
{
"model_name": "o1",
"litellm_params": {
"model": "o1-preview",
"api_key": os.getenv("OPENAI_API_KEY"),
"weight": 2 # 👈 PICK THIS DEPLOYMENT 2x MORE OFTEN THAN o1-preview
},
},
]
router = Router(model_list=model_list, routing_strategy="cost-based-routing")
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
model_list:
- model_name: o1
litellm_params:
model: o1
api_key: os.environ/OPENAI_API_KEY
weight: 1
- model_name: o1
litellm_params:
model: o1-preview
api_key: os.environ/OPENAI_API_KEY
weight: 2 # 👈 PICK THIS DEPLOYMENT 2x MORE OFTEN THAN o1-preview
Maximale parallele Anfragen (ASYNC)
Wird in Semaphore für asynchrone Anfragen an den Router verwendet. Begrenzt die maximalen gleichzeitigen Aufrufe an ein Deployment. Nützlich in Szenarien mit hohem Datenverkehr.
Wenn tpm/rpm gesetzt ist und keine maximale parallele Anfragebegrenzung angegeben ist, verwenden wir RPM oder berechnete RPM (tpm/1000/6) als maximale parallele Anfragebegrenzung.
from litellm import Router
model_list = [{
"model_name": "gpt-4",
"litellm_params": {
"model": "azure/gpt-4",
...
"max_parallel_requests": 10 # 👈 SET PER DEPLOYMENT
}
}]
### OR ###
router = Router(model_list=model_list, default_max_parallel_requests=20) # 👈 SET DEFAULT MAX PARALLEL REQUESTS
# deployment max parallel requests > default max parallel requests
Cooldowns
Legen Sie das Limit fest, wie viele Aufrufe ein Modell pro Minute fehlschlagen darf, bevor es für eine Minute heruntergefahren wird.
- SDK
- PROXY
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list,
allowed_fails=1, # cooldown model if it fails > 1 call in a minute.
cooldown_time=100 # cooldown the deployment for 100 seconds if it num_fails > allowed_fails
)
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal call
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
Globalen Wert festlegen
router_settings:
allowed_fails: 3 # cooldown model if it fails > 1 call in a minute.
cooldown_time: 30 # (in seconds) how long to cooldown model if fails/min > allowed_fails
Standardwerte
- allowed_fails: 3
- cooldown_time: 5s (
DEFAULT_COOLDOWN_TIME_SECONDSin constants.py)
Pro Modell festlegen
model_list:
- model_name: fake-openai-endpoint
litellm_params:
model: predibase/llama-3-8b-instruct
api_key: os.environ/PREDIBASE_API_KEY
tenant_id: os.environ/PREDIBASE_TENANT_ID
max_new_tokens: 256
cooldown_time: 0 # 👈 KEY CHANGE
Erwartete Antwort
No deployments available for selected model, Try again in 60 seconds. Passed model=claude-3-5-sonnet. pre-call-checks=False, allowed_model_region=n/a.
Cooldowns deaktivieren
- SDK
- PROXY
from litellm import Router
router = Router(..., disable_cooldowns=True)
router_settings:
disable_cooldowns: True
Wiederholungsversuche
Sowohl für asynchrone als auch für synchrone Funktionen unterstützen wir das Wiederholen fehlgeschlagener Anfragen.
Für RateLimitError implementieren wir exponentielle Rückfallverzögerungen.
Für generische Fehler wiederholen wir sofort.
Hier ein kurzer Überblick, wie wir num_retries = 3 einstellen können.
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list,
num_retries=3)
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal call
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
Wir unterstützen auch die Festlegung der minimalen Wartezeit, bevor ein fehlgeschlagener Versuch wiederholt wird. Dies geschieht über den Parameter retry_after.
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list,
num_retries=3, retry_after=5) # waits min 5s before retrying request
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal call
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
[Erweitert]: Benutzerdefinierte Wiederholungsversuche, Cooldowns basierend auf Fehlertyp
- Verwenden Sie
RetryPolicy, wenn Sienum_retriesbasierend auf der erhaltenen Ausnahme festlegen möchten. - Verwenden Sie
AllowedFailsPolicy, um eine benutzerdefinierte Anzahl vonallowed_failspro Minute festzulegen, bevor ein Deployment heruntergefahren wird.
- SDK
- PROXY
Beispiel
retry_policy = RetryPolicy(
ContentPolicyViolationErrorRetries=3, # run 3 retries for ContentPolicyViolationErrors
AuthenticationErrorRetries=0, # run 0 retries for AuthenticationErrorRetries
)
allowed_fails_policy = AllowedFailsPolicy(
ContentPolicyViolationErrorAllowedFails=1000, # Allow 1000 ContentPolicyViolationError before cooling down a deployment
RateLimitErrorAllowedFails=100, # Allow 100 RateLimitErrors before cooling down a deployment
)
Beispielverwendung
from litellm.router import RetryPolicy, AllowedFailsPolicy
retry_policy = RetryPolicy(
ContentPolicyViolationErrorRetries=3, # run 3 retries for ContentPolicyViolationErrors
AuthenticationErrorRetries=0, # run 0 retries for AuthenticationErrorRetries
BadRequestErrorRetries=1,
TimeoutErrorRetries=2,
RateLimitErrorRetries=3,
)
allowed_fails_policy = AllowedFailsPolicy(
ContentPolicyViolationErrorAllowedFails=1000, # Allow 1000 ContentPolicyViolationError before cooling down a deployment
RateLimitErrorAllowedFails=100, # Allow 100 RateLimitErrors before cooling down a deployment
)
router = litellm.Router(
model_list=[
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
},
{
"model_name": "bad-model", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": "bad-key",
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
},
],
retry_policy=retry_policy,
allowed_fails_policy=allowed_fails_policy,
)
response = await router.acompletion(
model=model,
messages=messages,
)
router_settings:
retry_policy: {
"BadRequestErrorRetries": 3,
"ContentPolicyViolationErrorRetries": 4
}
allowed_fails_policy: {
"ContentPolicyViolationErrorAllowedFails": 1000, # Allow 1000 ContentPolicyViolationError before cooling down a deployment
"RateLimitErrorAllowedFails": 100 # Allow 100 RateLimitErrors before cooling down a deployment
}
Caching
In der Produktion empfehlen wir die Verwendung eines Redis-Caches. Für schnelles lokales Testen unterstützen wir auch einfaches In-Memory-Caching.
In-Memory-Cache
router = Router(model_list=model_list,
cache_responses=True)
print(response)
Redis-Cache
router = Router(model_list=model_list,
redis_host=os.getenv("REDIS_HOST"),
redis_password=os.getenv("REDIS_PASSWORD"),
redis_port=os.getenv("REDIS_PORT"),
cache_responses=True)
print(response)
Übergeben Sie die Redis-URL, zusätzliche kwargs
router = Router(model_list: Optional[list] = None,
## CACHING ##
redis_url=os.getenv("REDIS_URL")",
cache_kwargs= {}, # additional kwargs to pass to RedisCache (see caching.py)
cache_responses=True)
Vorab-Aufrufprüfungen (Kontextfenster, EU-Regionen)
Aktivieren Sie Vorab-Aufrufprüfungen, um herauszufiltern
- Deployments mit einem Kontextfensterlimit kleiner als die Nachrichten für einen Aufruf.
- Deployments außerhalb der EU-Region
- SDK
- Proxy
1. Vorab-Aufrufprüfungen aktivieren
from litellm import Router
# ...
router = Router(model_list=model_list, enable_pre_call_checks=True) # 👈 Set to True
2. Modellliste einrichten
Für Kontextfensterprüfungen bei Azure-Deployments legen Sie das Basismodell fest. Wählen Sie das Basismodell aus dieser Liste, alle Azure-Modelle beginnen mit azure/.
Für die Filterung nach 'EU-Region' legen Sie den 'region_name' des Deployments fest.
Hinweis: Wir leiten den region_name für Vertex AI, Bedrock und IBM WatsonxAI automatisch basierend auf Ihren LiteLLM-Parametern ab. Für Azure setzen Sie litellm.enable_preview = True.
model_list = [
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"region_name": "eu" # 👈 SET 'EU' REGION NAME
"base_model": "azure/gpt-35-turbo", # 👈 (Azure-only) SET BASE MODEL
},
},
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
{
"model_name": "gemini-pro",
"litellm_params: {
"model": "vertex_ai/gemini-pro-1.5",
"vertex_project": "adroit-crow-1234",
"vertex_location": "us-east1" # 👈 AUTOMATICALLY INFERS 'region_name'
}
}
]
router = Router(model_list=model_list, enable_pre_call_checks=True)
3. Testen!
- Kontextfensterprüfung
- EU-Regionenprüfung
"""
- Give a gpt-3.5-turbo model group with different context windows (4k vs. 16k)
- Send a 5k prompt
- Assert it works
"""
from litellm import Router
import os
model_list = [
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"base_model": "azure/gpt-35-turbo",
},
"model_info": {
"base_model": "azure/gpt-35-turbo",
}
},
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
]
router = Router(model_list=model_list, enable_pre_call_checks=True)
text = "What is the meaning of 42?" * 5000
response = router.completion(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": text},
{"role": "user", "content": "Who was Alexander?"},
],
)
print(f"response: {response}")
"""
- Give 2 gpt-3.5-turbo deployments, in eu + non-eu regions
- Make a call
- Assert it picks the eu-region model
"""
from litellm import Router
import os
model_list = [
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"region_name": "eu"
},
"model_info": {
"id": "1"
}
},
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"model_info": {
"id": "2"
}
},
]
router = Router(model_list=model_list, enable_pre_call_checks=True)
response = router.completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Who was Alexander?"}],
)
print(f"response: {response}")
print(f"response id: {response._hidden_params['model_id']}")
Gehen Sie hier entlang, um dies im Proxy zu tun.
Caching über Modellgruppen hinweg
Wenn Sie über 2 verschiedene Modellgruppen cachen möchten (z. B. Azure-Deployments und OpenAI), verwenden Sie Caching-Gruppen.
import litellm, asyncio, time
from litellm import Router
# set os env
os.environ["OPENAI_API_KEY"] = ""
os.environ["AZURE_API_KEY"] = ""
os.environ["AZURE_API_BASE"] = ""
os.environ["AZURE_API_VERSION"] = ""
async def test_acompletion_caching_on_router_caching_groups():
# tests acompletion + caching on router
try:
litellm.set_verbose = True
model_list = [
{
"model_name": "openai-gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo-0613",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
{
"model_name": "azure-gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION")
},
}
]
messages = [
{"role": "user", "content": f"write a one sentence poem {time.time()}?"}
]
start_time = time.time()
router = Router(model_list=model_list,
cache_responses=True,
caching_groups=[("openai-gpt-3.5-turbo", "azure-gpt-3.5-turbo")])
response1 = await router.acompletion(model="openai-gpt-3.5-turbo", messages=messages, temperature=1)
print(f"response1: {response1}")
await asyncio.sleep(1) # add cache is async, async sleep for cache to get set
response2 = await router.acompletion(model="azure-gpt-3.5-turbo", messages=messages, temperature=1)
assert response1.id == response2.id
assert len(response1.choices[0].message.content) > 0
assert response1.choices[0].message.content == response2.choices[0].message.content
except Exception as e:
traceback.print_exc()
asyncio.run(test_acompletion_caching_on_router_caching_groups())
Alarmierung 🚨
Senden Sie Alarme an Slack / Ihre Webhook-URL für die folgenden Ereignisse
- LLM API-Ausnahmen
- Langsame LLM-Antworten
Holen Sie sich eine Slack-Webhook-URL von https://api.slack.com/messaging/webhooks
Verwendung
Initialisieren Sie ein AlertingConfig und übergeben Sie es an litellm.Router. Der folgende Code löst einen Alarm aus, da api_key=bad-key ungültig ist.
from litellm.router import AlertingConfig
import litellm
import os
router = litellm.Router(
model_list=[
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "bad_key",
},
}
],
alerting_config= AlertingConfig(
alerting_threshold=10, # threshold for slow / hanging llm responses (in seconds). Defaults to 300 seconds
webhook_url= os.getenv("SLACK_WEBHOOK_URL") # webhook you want to send alerts to
),
)
try:
await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)
except:
pass
Kosten für Azure-Deployments verfolgen
Problem: Azure gibt gpt-4 in der Antwort zurück, wenn azure/gpt-4-1106-preview verwendet wird. Dies führt zu ungenauen Kostenverfolgung.
Lösung ✅ : Setzen Sie model_info["base_model"] bei Ihrer Router-Initialisierung, damit litellm das richtige Modell für die Berechnung der Azure-Kosten verwendet.
Schritt 1. Router-Einrichtung
from litellm import Router
model_list = [
{ # list of model deployments
"model_name": "gpt-4-preview", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"model_info": {
"base_model": "azure/gpt-4-1106-preview" # azure/gpt-4-1106-preview will be used for cost tracking, ensure this exists in litellm model_prices_and_context_window.json
}
},
{
"model_name": "gpt-4-32k",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"model_info": {
"base_model": "azure/gpt-4-32k" # azure/gpt-4-32k will be used for cost tracking, ensure this exists in litellm model_prices_and_context_window.json
}
}
]
router = Router(model_list=model_list)
Schritt 2. Greifen Sie auf response_cost im benutzerdefinierten Callback zu. **LiteLLM berechnet die Antwortkosten für Sie**.
import litellm
from litellm.integrations.custom_logger import CustomLogger
class MyCustomHandler(CustomLogger):
def log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
response_cost = kwargs.get("response_cost")
print("response_cost=", response_cost)
customHandler = MyCustomHandler()
litellm.callbacks = [customHandler]
# router completion call
response = router.completion(
model="gpt-4-32k",
messages=[{ "role": "user", "content": "Hi who are you"}]
)
Standardmäßige LiteLLM.completion/embedding-Parameter
Sie können auch Standardparameter für LiteLLM-Completion/Embedding-Aufrufe festlegen. Hier erfahren Sie, wie das geht.
from litellm import Router
fallback_dict = {"gpt-3.5-turbo": "gpt-3.5-turbo-16k"}
router = Router(model_list=model_list,
default_litellm_params={"context_window_fallback_dict": fallback_dict})
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal call
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
Benutzerdefinierte Callbacks - API-Schlüssel, API-Endpunkt, verwendetes Modell verfolgen
Wenn Sie den api_key, den api_endpoint, das verwendete Modell und den custom_llm_provider für jeden Completion-Aufruf nachverfolgen müssen, können Sie einen benutzerdefinierten Callback einrichten.
Verwendung
import litellm
from litellm.integrations.custom_logger import CustomLogger
class MyCustomHandler(CustomLogger):
def log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
print("kwargs=", kwargs)
litellm_params= kwargs.get("litellm_params")
api_key = litellm_params.get("api_key")
api_base = litellm_params.get("api_base")
custom_llm_provider= litellm_params.get("custom_llm_provider")
response_cost = kwargs.get("response_cost")
# print the values
print("api_key=", api_key)
print("api_base=", api_base)
print("custom_llm_provider=", custom_llm_provider)
print("response_cost=", response_cost)
def log_failure_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Failure")
print("kwargs=")
customHandler = MyCustomHandler()
litellm.callbacks = [customHandler]
# Init Router
router = Router(model_list=model_list, routing_strategy="simple-shuffle")
# router completion call
response = router.completion(
model="gpt-3.5-turbo",
messages=[{ "role": "user", "content": "Hi who are you"}]
)
Router bereitstellen
Wenn Sie einen Server zur Lastverteilung über verschiedene LLM-APIs wünschen, verwenden Sie unseren LiteLLM Proxy Server
Router debuggen
Grundlegendes Debugging
Setzen Sie Router(set_verbose=True)
from litellm import Router
router = Router(
model_list=model_list,
set_verbose=True
)
Detailliertes Debugging
Setzen Sie Router(set_verbose=True,debug_level="DEBUG")
from litellm import Router
router = Router(
model_list=model_list,
set_verbose=True,
debug_level="DEBUG" # defaults to INFO
)
Sehr detailliertes Debugging
Setzen Sie litellm.set_verbose=True und Router(set_verbose=True,debug_level="DEBUG")
from litellm import Router
import litellm
litellm.set_verbose = True
router = Router(
model_list=model_list,
set_verbose=True,
debug_level="DEBUG" # defaults to INFO
)
Router Allgemeine Einstellungen
Verwendung
router = Router(model_list=..., router_general_settings=RouterGeneralSettings(async_only_mode=True))
Spezifikation
class RouterGeneralSettings(BaseModel):
async_only_mode: bool = Field(
default=False
) # this will only initialize async clients. Good for memory utils
pass_through_all_models: bool = Field(
default=False
) # if passed a model not llm_router model list, pass through the request to litellm.acompletion/embedding