Sync vs Async
InfraPrism tracks your LLM calls asynchronously in the background, ensuring zero impact on your application’s performance. This page explains how it works and how to use async clients.
How Tracking Works
When you make an LLM call through InfraPrism:
- Your call executes normally - The request goes directly to OpenAI/Anthropic
- Metadata is captured - Token counts, latency, model, and your tags
- Background upload - Metadata is batched and sent asynchronously
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Your Code │────▶│ OpenAI API │────▶│ Response │
└─────────────┘ └─────────────┘ └─────────────┘
│
▼ (async, non-blocking)
┌─────────────┐
│ InfraPrism │
│ (metadata) │
└─────────────┘
Zero Latency Impact
Because tracking happens asynchronously:
- No added latency to your LLM calls
- No blocking while metadata uploads
- Graceful degradation if InfraPrism is unreachable
# This call takes the same time with or without InfraPrism
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
entity_type="customer",
entity_id="acme-corp",
)
# Metadata upload happens in background after response
Sync vs Async Clients
Synchronous Client
Use for traditional synchronous code:
from infraprism import InfraPrismOpenAI
client = InfraPrismOpenAI()
# Blocking call - waits for response
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
entity_type="customer",
entity_id="acme-corp",
)
print(response.choices[0].message.content)
Asynchronous Client
Use for async/await code (FastAPI, async Django, etc.):
from infraprism import AsyncInfraPrismOpenAI
import asyncio
client = AsyncInfraPrismOpenAI()
async def main():
# Non-blocking call - can await
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
entity_type="customer",
entity_id="acme-corp",
)
print(response.choices[0].message.content)
asyncio.run(main())
FastAPI Example
from fastapi import FastAPI
from infraprism import AsyncInfraPrismOpenAI
app = FastAPI()
client = AsyncInfraPrismOpenAI()
@app.post("/chat")
async def chat(message: str, customer_id: str):
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": message}],
entity_type="customer",
entity_id=customer_id,
)
return {"response": response.choices[0].message.content}
Django Async Example
from django.http import JsonResponse
from infraprism import AsyncInfraPrismOpenAI
client = AsyncInfraPrismOpenAI()
async def chat_view(request):
message = request.POST.get("message")
customer_id = request.POST.get("customer_id")
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": message}],
entity_type="customer",
entity_id=customer_id,
)
return JsonResponse({
"response": response.choices[0].message.content
})
Concurrent Requests
With the async client, you can make concurrent requests:
import asyncio
from infraprism import AsyncInfraPrismOpenAI
client = AsyncInfraPrismOpenAI()
async def process_batch(messages: list[str], customer_id: str):
tasks = [
client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": msg}],
entity_type="customer",
entity_id=customer_id,
)
for msg in messages
]
# All requests run concurrently
responses = await asyncio.gather(*tasks)
return [r.choices[0].message.content for r in responses]
# Process 10 messages concurrently
results = asyncio.run(process_batch(
messages=["Question 1", "Question 2", "...", "Question 10"],
customer_id="acme-corp",
))
Batch Processing
For large batch jobs, use async with rate limiting:
import asyncio
from infraprism import AsyncInfraPrismOpenAI
client = AsyncInfraPrismOpenAI()
async def process_with_rate_limit(items: list, customer_id: str, max_concurrent: int = 10):
semaphore = asyncio.Semaphore(max_concurrent)
async def process_one(item):
async with semaphore:
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": item}],
entity_type="customer",
entity_id=customer_id,
)
return response.choices[0].message.content
return await asyncio.gather(*[process_one(item) for item in items])
Streaming with Async
Async streaming works as expected:
async def stream_response(message: str, customer_id: str):
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": message}],
stream=True,
entity_type="customer",
entity_id=customer_id,
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
Shutdown Handling
The SDK automatically flushes pending metadata on shutdown. For graceful shutdown in long-running processes:
import atexit
from infraprism import InfraPrismOpenAI
client = InfraPrismOpenAI()
def cleanup():
# Flush any pending metadata
client.infraprism_flush()
atexit.register(cleanup)
Choosing Sync vs Async
| Use Sync When | Use Async When |
|---|---|
| Scripts and CLI tools | Web servers (FastAPI, async Django) |
| Jupyter notebooks | High-concurrency applications |
| Simple integrations | Batch processing |
| Existing sync codebases | New async codebases |
Next Steps
- Configuration - SDK configuration options
- OpenAI Provider - OpenAI-specific features
- Anthropic Provider - Anthropic-specific features