Sync vs Async

InfraPrism tracks your LLM calls asynchronously in the background, ensuring zero impact on your application’s performance. This page explains how it works and how to use async clients.

How Tracking Works

When you make an LLM call through InfraPrism:

  1. Your call executes normally - The request goes directly to OpenAI/Anthropic
  2. Metadata is captured - Token counts, latency, model, and your tags
  3. Background upload - Metadata is batched and sent asynchronously
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│ Your Code   │────▶│ OpenAI API  │────▶│  Response   │
└─────────────┘     └─────────────┘     └─────────────┘

       ▼ (async, non-blocking)
┌─────────────┐
│ InfraPrism  │
│  (metadata) │
└─────────────┘

Zero Latency Impact

Because tracking happens asynchronously:

  • No added latency to your LLM calls
  • No blocking while metadata uploads
  • Graceful degradation if InfraPrism is unreachable
# This call takes the same time with or without InfraPrism
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello"}],
    entity_type="customer",
    entity_id="acme-corp",
)
# Metadata upload happens in background after response

Sync vs Async Clients

Synchronous Client

Use for traditional synchronous code:

from infraprism import InfraPrismOpenAI

client = InfraPrismOpenAI()

# Blocking call - waits for response
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello"}],
    entity_type="customer",
    entity_id="acme-corp",
)
print(response.choices[0].message.content)

Asynchronous Client

Use for async/await code (FastAPI, async Django, etc.):

from infraprism import AsyncInfraPrismOpenAI
import asyncio

client = AsyncInfraPrismOpenAI()

async def main():
    # Non-blocking call - can await
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": "Hello"}],
        entity_type="customer",
        entity_id="acme-corp",
    )
    print(response.choices[0].message.content)

asyncio.run(main())

FastAPI Example

from fastapi import FastAPI
from infraprism import AsyncInfraPrismOpenAI

app = FastAPI()
client = AsyncInfraPrismOpenAI()

@app.post("/chat")
async def chat(message: str, customer_id: str):
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": message}],
        entity_type="customer",
        entity_id=customer_id,
    )
    return {"response": response.choices[0].message.content}

Django Async Example

from django.http import JsonResponse
from infraprism import AsyncInfraPrismOpenAI

client = AsyncInfraPrismOpenAI()

async def chat_view(request):
    message = request.POST.get("message")
    customer_id = request.POST.get("customer_id")

    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": message}],
        entity_type="customer",
        entity_id=customer_id,
    )

    return JsonResponse({
        "response": response.choices[0].message.content
    })

Concurrent Requests

With the async client, you can make concurrent requests:

import asyncio
from infraprism import AsyncInfraPrismOpenAI

client = AsyncInfraPrismOpenAI()

async def process_batch(messages: list[str], customer_id: str):
    tasks = [
        client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": msg}],
            entity_type="customer",
            entity_id=customer_id,
        )
        for msg in messages
    ]

    # All requests run concurrently
    responses = await asyncio.gather(*tasks)
    return [r.choices[0].message.content for r in responses]

# Process 10 messages concurrently
results = asyncio.run(process_batch(
    messages=["Question 1", "Question 2", "...", "Question 10"],
    customer_id="acme-corp",
))

Batch Processing

For large batch jobs, use async with rate limiting:

import asyncio
from infraprism import AsyncInfraPrismOpenAI

client = AsyncInfraPrismOpenAI()

async def process_with_rate_limit(items: list, customer_id: str, max_concurrent: int = 10):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def process_one(item):
        async with semaphore:
            response = await client.chat.completions.create(
                model="gpt-4o",
                messages=[{"role": "user", "content": item}],
                entity_type="customer",
                entity_id=customer_id,
            )
            return response.choices[0].message.content

    return await asyncio.gather(*[process_one(item) for item in items])

Streaming with Async

Async streaming works as expected:

async def stream_response(message: str, customer_id: str):
    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": message}],
        stream=True,
        entity_type="customer",
        entity_id=customer_id,
    )

    async for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

Shutdown Handling

The SDK automatically flushes pending metadata on shutdown. For graceful shutdown in long-running processes:

import atexit
from infraprism import InfraPrismOpenAI

client = InfraPrismOpenAI()

def cleanup():
    # Flush any pending metadata
    client.infraprism_flush()

atexit.register(cleanup)

Choosing Sync vs Async

Use Sync WhenUse Async When
Scripts and CLI toolsWeb servers (FastAPI, async Django)
Jupyter notebooksHigh-concurrency applications
Simple integrationsBatch processing
Existing sync codebasesNew async codebases

Next Steps