Skip to main content
The dottxt API can stream a schema-constrained response one field at a time, instead of returning a single complete JSON object at the end. Each field arrives as a JSON Patch add operation, so downstream work (routing, dispatching, UI updates) can begin the moment the relevant field lands. The model generates fields in schema order. Field order in your schema decides when each field becomes available; routing keys, classifications, and gates should come first. For a broader discussion of JSON Patch stremaing, see The closing brace. This page covers the Python SDK helper. For the underlying wire format (stream: "patch", NDJSON, SSE), see JSON Patch streaming on /chat/completions.

Quickstart

AsyncDotTxt.stream(...) yields PatchEvent objects as the model fills in your schema:
import asyncio
from typing import Literal

from pydantic import BaseModel
from dottxt import AsyncDotTxt


class SupportTicket(BaseModel):
    # Field order = arrival order. Put what unblocks downstream work first.
    intent: Literal["billing", "technical", "account"]
    urgency: Literal["low", "medium", "high", "critical"]
    reply: str


async def main():
    client = AsyncDotTxt()
    stream = client.stream(
        model="openai/gpt-oss-20b",
        response_format=SupportTicket,
        input="I was charged twice this month, please refund the duplicate.",
    )
    async for event in stream:
        match event.field:
            case "intent":
                print(f"dispatching to {event.value} queue")
            case "urgency" if event.value == "critical":
                print("paging oncall")
            case "reply":
                print(f"reply: {event.value}")


asyncio.run(main())
The routing decision fires the moment intent arrives — typically tens of milliseconds into generation — while reply continues to stream.

The PatchEvent object

Each yielded event carries:
  • event.op — the raw RFC 6902 operation: {"op": "add", "path": ..., "value": ...}.
  • event.snapshot — an independent deep copy of the document built up to and including this op. Safe to stash; later events do not mutate earlier snapshots.
  • event.field — the JSON Pointer with the leading / stripped. Top-level keys read as "intent", array items as "steps/0", nested fields as "address/city".
  • event.value — the op’s value.
The four properties give you a clean match site for the common case; event.op and event.snapshot are available when you want the raw patch or the partial document so far.

Parameters

AsyncDotTxt.stream(...) mirrors generate(...):
  • model (str) — model identifier.
  • input (str | list[dict]) — prompt string or chat-message list.
  • response_format (Any) — any schema input accepted by generate(...): Pydantic model, JSON Schema dict/string, TypedDict, dataclass, etc.
  • temperature, max_tokens, seed, timeout — optional.
  • extra (dict | None) — additional chat-completions body fields.

Examples

The smallest possible patch-stream consumer: iterate, print. No buffering, no closing brace.
import asyncio
from typing import Literal

from pydantic import BaseModel, Field
from dottxt import AsyncDotTxt


class Engineer(BaseModel):
    name: str = Field(max_length=32)
    role: Literal["backend", "frontend", "ml", "infra"]
    years_experience: int = Field(ge=0, le=50)
    favorite_languages: list[str] = Field(min_length=1, max_length=4)


async def main():
    client = AsyncDotTxt()
    stream = client.stream(
        model="openai/gpt-oss-20b",
        response_format=Engineer,
        input="Generate a profile for a senior backend engineer.",
    )
    async for event in stream:
        # Skip the structural seed ops (root `{}`, array `[]`) that arrive
        # before their contents; print only populated leaf fields.
        if not event.value:
            continue
        print(f"{event.field:>24} = {event.value!r}")


asyncio.run(main())

Route on a classification field before the long field finishes

Order the schema so the routing key (here, intent) comes before the long-form reply. The dispatch decision fires tens of milliseconds in; the reply lands seconds later. The elapsed-time prefix on the reply is the punchline — how much later the full message lands compared to when routing was already settled.
import asyncio
import time
from typing import Literal

from pydantic import BaseModel, Field
from dottxt import AsyncDotTxt


class SupportTicket(BaseModel):
    intent: Literal["billing", "technical", "account", "feedback"]
    urgency: Literal["low", "medium", "high", "critical"]
    reply: str = Field(max_length=400)


async def route_to_billing(ticket_id):
    print(f"  -> dispatched {ticket_id} to billing queue")


async def route_to_technical(ticket_id):
    print(f"  -> dispatched {ticket_id} to technical queue")


async def page_oncall(ticket_id):
    print(f"  -> paged oncall for {ticket_id}")


async def main():
    client = AsyncDotTxt()
    ticket_id = "TKT-8821"
    started = time.monotonic()
    stream = client.stream(
        model="openai/gpt-oss-20b",
        response_format=SupportTicket,
        input="I was charged twice this month, please refund the duplicate.",
        max_tokens=400,
    )
    async for event in stream:
        match event.field:
            # Fire-and-forget: routing kicks off while /reply is still streaming.
            case "intent" if event.value == "billing":
                asyncio.create_task(route_to_billing(ticket_id))
            case "intent" if event.value == "technical":
                asyncio.create_task(route_to_technical(ticket_id))
            case "urgency" if event.value == "critical":
                asyncio.create_task(page_oncall(ticket_id))
            case "reply":
                elapsed = int((time.monotonic() - started) * 1000)
                print(f"reply ({elapsed}ms): {event.value}")


asyncio.run(main())

Fan out work on each array item

When the schema has a top-level array, each item streams in as a separate field (steps/0, steps/1, …). Launch a coroutine the moment each one arrives, so step 0’s work is already underway while step 1 is still being generated. Total wall-clock time tends to be roughly one research interval longer than generation, not the sum of all research times.
import asyncio
import time
from typing import Any

from pydantic import BaseModel, Field
from dottxt import AsyncDotTxt


class Plan(BaseModel):
    topic: str = Field(max_length=80)
    steps: list[str] = Field(min_length=3, max_length=5)


async def research(index, step):
    started = time.monotonic()
    print(f"  [step {index}] started: {step!r}")
    # Pretend to do real work.
    await asyncio.sleep(1.0 + 0.2 * index)
    elapsed_ms = int((time.monotonic() - started) * 1000)
    print(f"  [step {index}] done in {elapsed_ms}ms")
    return {"step": step, "elapsed_ms": elapsed_ms}


async def main():
    client = AsyncDotTxt()
    tasks: list[asyncio.Task[dict[str, Any]]] = []
    started = time.monotonic()
    stream = client.stream(
        model="openai/gpt-oss-20b",
        response_format=Plan,
        input=(
            "Plan three to five research steps to answer the question: "
            "'What are the trade-offs between RAG and fine-tuning for "
            "domain-specific assistants?'"
        ),
        max_tokens=400,
    )
    async for event in stream:
        if event.field.startswith("steps/") and event.value:
            index = int(event.field.split("/", 1)[1])
            tasks.append(asyncio.create_task(research(index, event.value)))

    results = await asyncio.gather(*tasks)
    total_ms = int((time.monotonic() - started) * 1000)
    sum_research_ms = sum(r["elapsed_ms"] for r in results)
    print(f"\nall {len(results)} steps researched in {total_ms}ms total")
    print(
        f"sum of per-step research times: {sum_research_ms}ms "
        f"(overlap saved {max(0, sum_research_ms - total_ms)}ms)"
    )


asyncio.run(main())

Mid-stream human approval

Order high-risk decisions ahead of their effects. The proposed action arrives before the reply; prompt the operator between the two. If they decline, the rest of the stream is still consumed, but the reply is never sent.
import asyncio
from typing import Literal

from pydantic import BaseModel, Field
from dottxt import AsyncDotTxt


class AgentDecision(BaseModel):
    # ``action`` precedes ``reply`` so the operator can approve or reject
    # while the reply text is still streaming.
    action: Literal["answer_only", "open_ticket", "issue_refund", "delete_account"]
    reply: str = Field(max_length=300)


HIGH_RISK_ACTIONS = {"issue_refund", "delete_account"}


async def ask_human(question):
    answer = await asyncio.to_thread(input, f"{question} [y/N]: ")
    return answer.strip().lower() in {"y", "yes"}


async def send_reply(reply):
    print(f"sent reply: {reply}")


async def main():
    client = AsyncDotTxt()
    approved = True
    proposed_action = None
    stream = client.stream(
        model="openai/gpt-oss-20b",
        response_format=AgentDecision,
        input="Please close my account permanently. I am leaving.",
        max_tokens=300,
    )
    async for event in stream:
        match event.field:
            case "action":
                proposed_action = event.value
                if event.value in HIGH_RISK_ACTIONS:
                    approved = await ask_human(f"Approve action '{event.value}'?")
            case "reply" if approved:
                await send_reply(event.value)
            case "reply":
                print(f"discarded reply (action '{proposed_action}' declined)")


asyncio.run(main())

The full object so far

If you need the partial object (e.g. to log progress or hand a partial object to another service), use event.snapshot. Each snapshot is an independent deep copy, so events can be stashed without later ops mutating earlier views:
async for event in stream:
    log.info("partial document: %s", event.snapshot)