Agent Streaming
How to use streaming in your agents
Streaming lets your users read the response before the AI finishes thinking. Nothing feels faster than already happening.
Why Streaming?
- Latency hiding by showing results instantly instead of after the whole response is ready.
- Large inputs and outputs without hitting payload limits.
- Agent chains can forward chunks to the next agent as soon as they arrive.
- Snappier UX so users see progress in milliseconds instead of waiting for the full payload.
- Resource efficiency by not holding entire responses in memory; chunks flow straight through.
- Composable pipelines by allowing agents, functions, and external services to hand off work in a continuous stream.
A simple visualization of the difference between traditional request/response and streaming:
┌─────────────────────────── traditional request/response ───────────────────────────────────┐
| client waiting ... ██████████████████████████████████████████ full payload display |
└────────────────────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────── streaming request/response ─────────────────────────────────────┐
| c l i e n t r e a d s c h u n k 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 … |
└────────────────────────────────────────────────────────────────────────────────────────────┘Real-World Use Cases
- Live chat / customer support. Stream the assistant's words as they are generated for a more natural feel.
- Speech-to-text. Pipe microphone audio into a transcription agent and forward captions to the UI in real time.
- Streaming search results. Show the first relevant hits immediately while the rest are still processing.
- Agent chains. One agent can translate, the next can summarize, the third can analyze – all in a single flowing stream.
How Streaming Works in Agentuity
Agentuity provides multiple approaches for streaming data:
- High-level Streaming:
resp.stream(source)– wheresourcecan be:- An async iterator (e.g. OpenAI SDK stream)
- A ReadableStream
- Another agent's stream
- Low-level Stream Control:
context.stream.create(name, props)– create and manage server-side streams directly - Inbound:
await request.data.stream()– consume the client's incoming stream.
Under the hood Agentuity handles the details of the streaming input and output for you.
OpenAI Streaming Example
In this example, we use the OpenAI SDK to stream the response from the OpenAI API back to the caller.
import type { AgentRequest, AgentResponse, AgentContext } from "@agentuity/sdk";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";
export default async function Agent(
req: AgentRequest,
resp: AgentResponse,
ctx: AgentContext,
) {
const { textStream } = streamText({
model: openai("gpt-4o"),
prompt: "Invent a new holiday and describe its traditions.",
});
return resp.stream(textStream);
}from openai import OpenAI
from agentuity import AgentRequest, AgentResponse, AgentContext
client = OpenAI()
async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
chat_completion = client.chat.completions.create(
messages=[
{"role": "system", "content": "You are a friendly assistant!"},
{"role": "user", "content": (await request.data.text()) or "Why is the sky blue?"},
],
model="gpt-4o",
stream=True,
)
return response.stream(chat_completion, lambda chunk: chunk.choices[0].delta.content)
Structured Object Streaming with Vercel AI SDK
The stream method now supports transformer functions that can filter and transform stream items. This is particularly useful when working with structured data from AI SDKs like Vercel AI SDK's streamObject.
import type { AgentRequest, AgentResponse, AgentContext } from "@agentuity/sdk";
import { openai } from '@ai-sdk/openai';
import { streamObject } from 'ai';
import { z } from 'zod';
export default async function Agent(
req: AgentRequest,
resp: AgentResponse,
ctx: AgentContext,
) {
const { elementStream } = streamObject({
model: openai('gpt-4o'),
output: 'array',
schema: z.object({
name: z.string(),
class: z
.string()
.describe('Character class, e.g. warrior, mage, or thief.'),
description: z.string(),
}),
prompt: 'Generate 3 hero descriptions for a fantasy role playing game.',
});
return resp.stream(elementStream);
}The SDK automatically detects object streams and converts them to JSON newline format with the appropriate application/json content type.
Stream Transformers
You can provide transformer functions to filter and transform stream data:
import type { AgentRequest, AgentResponse, AgentContext } from "@agentuity/sdk";
export default async function Agent(
req: AgentRequest,
resp: AgentResponse,
ctx: AgentContext,
) {
// Get stream from another source
const dataStream = getDataStream();
// Transform and filter items
const transformer = (item: any) => {
// Filter out items (return null/undefined to skip)
if (!item.active) return null;
// Transform the item
return {
id: item.id,
name: item.name.toUpperCase(),
timestamp: Date.now()
};
};
return resp.stream(dataStream, undefined, {}, transformer);
}You can also use generator functions for more complex transformations:
// Generator transformer that can yield multiple items or filter
function* transformer(item: any) {
if (item.type === 'batch') {
// Yield multiple items from a batch
for (const subItem of item.items) {
yield { ...subItem, processed: true };
}
} else if (item.valid) {
// Yield single transformed item
yield { ...item, enhanced: true };
}
// Return nothing to filter out invalid items
}Agent-to-Agent Streaming
In this example, we use the Agentuity SDK to stream the response from one agent to another.
import type { AgentRequest, AgentResponse, AgentContext } from "@agentuity/sdk";
export default async function Agent(
req: AgentRequest,
resp: AgentResponse,
ctx: AgentContext,
) {
// [1] Call another agent
const expert = await ctx.getAgent({ name: "HistoryExpert" });
const expertResp = await expert.run({ prompt: "What engine did a P-51D Mustang use?" });
// [2] Grab its stream
const stream = await expertResp.data.stream();
// [3] Pipe straight through
return resp.stream(stream);
}Chain as many agents as you like; each one can inspect, transform, or just relay the chunks.
Low-Level Stream Control
For advanced use cases, you can create and manage streams directly with context.stream.create(), giving you fine-grained control over stream creation, data flow, and background processing.
Creating Streams with context.stream.create
This method returns a Promise that resolves to a named, writable stream. The stream is created immediately, enabling background data processing without blocking the response:
import type { AgentRequest, AgentResponse, AgentContext } from "@agentuity/sdk";
import { openai } from '@ai-sdk/openai';
import { streamText } from 'ai';
export default async function Agent(
req: AgentRequest,
resp: AgentResponse,
ctx: AgentContext,
) {
const { prompt } = await req.data.json();
// Create a stream
const stream = await ctx.stream.create('llm-response', {
contentType: 'text/plain',
metadata: {
requestId: ctx.sessionId,
type: 'llm-generation',
model: 'gpt-4o'
}
});
// Use waitUntil to handle streaming in the background
ctx.waitUntil(async () => {
const { textStream } = streamText({
model: openai('gpt-4o'),
prompt
});
// Pipe the LLM stream to our created stream
await textStream.pipeTo(stream);
});
// Return stream information immediately
return resp.json({
streamId: stream.id,
streamUrl: stream.url,
status: 'streaming'
});
}Manual Stream Writing
You can write to streams manually for complete control over the data flow. The Stream API provides a simple write() method that handles writer management automatically:
export default async function Agent(
req: AgentRequest,
resp: AgentResponse,
ctx: AgentContext,
) {
const stream = await ctx.stream.create('progress-updates', {
contentType: 'application/json',
metadata: { type: 'progress-tracking' }
});
ctx.waitUntil(async () => {
try {
// Send progress updates
const steps = ['Initializing', 'Processing', 'Analyzing', 'Finalizing'];
for (let i = 0; i < steps.length; i++) {
await stream.write(JSON.stringify({
step: i + 1,
total: steps.length,
message: steps[i],
progress: ((i + 1) / steps.length) * 100,
timestamp: new Date().toISOString()
}) + '\n');
// Simulate work
await new Promise(resolve => setTimeout(resolve, 1000));
}
// Track progress with bytesWritten
ctx.logger.info(`Stream complete: ${stream.bytesWritten} bytes written`);
} finally {
await stream.close();
}
});
return resp.json({
streamId: stream.id,
streamUrl: stream.url
});
}Writing Patterns
The Stream API supports two writing patterns:
stream.write()- Simplified API that handles writer acquisition and locking automatically. Recommended for most use cases.stream.getWriter()- Direct access to the underlying WritableStream writer for scenarios requiring precise control over the writer lifecycle (e.g., sharing a single writer across multiple operations or custom error handling).
Tracking Stream Progress
The bytesWritten property provides real-time visibility into stream progress:
await stream.write(chunk1);
ctx.logger.info(`Progress: ${stream.bytesWritten} bytes`); // e.g., "Progress: 1024 bytes"
await stream.write(chunk2);
ctx.logger.info(`Progress: ${stream.bytesWritten} bytes`); // e.g., "Progress: 2048 bytes"Stream Compression
Stream compression reduces bandwidth usage by automatically applying gzip encoding to outbound data (defaults to disabled):
export default async function Agent(
req: AgentRequest,
resp: AgentResponse,
ctx: AgentContext,
) {
const { dataset } = await req.data.json();
const stream = await ctx.stream.create('dataset-export', {
contentType: 'application/json',
compress: true, // Automatic gzip compression
metadata: {
type: 'data-export',
recordCount: String(dataset.length),
timestamp: String(Date.now())
}
});
ctx.waitUntil(async () => {
try {
for (const record of dataset) {
await stream.write(JSON.stringify(record) + '\n');
// Log progress periodically
if (stream.bytesWritten % 100000 < 1000) {
ctx.logger.info(`Export progress: ${stream.bytesWritten} bytes written`);
}
}
ctx.logger.info(
`Export complete: ${stream.bytesWritten} bytes (uncompressed), ` +
`compression: ${stream.compressed ? 'enabled' : 'disabled'}`
);
} finally {
await stream.close();
}
});
return resp.json({
streamUrl: stream.url,
compressed: stream.compressed,
totalBytes: stream.bytesWritten
});
}When to Use Compression
Enable compression for highly compressible data formats:
- JSON APIs and structured data streams
- Text-based logs and documents
- CSV files and tabular data
The bytesWritten property tracks uncompressed bytes, while clients receive gzip-encoded responses that are automatically decompressed by standard HTTP clients.
Stream as Direct Response
You can return a stream directly from your agent handler, which will automatically redirect clients to the stream URL:
export default async function Agent(
req: AgentRequest,
resp: AgentResponse,
ctx: AgentContext,
) {
const { prompt } = await req.data.json();
const stream = await ctx.stream.create('direct-stream', {
contentType: 'text/plain'
});
ctx.waitUntil(async () => {
const { textStream } = streamText({
model: openai('gpt-4o'),
prompt
});
await textStream.pipeTo(stream);
});
// Return the stream directly - client gets redirected to stream URL
return stream;
}Benefits of Low-Level Stream Control
- Non-blocking: Stream creation returns immediately, allowing instant responses
- Background Processing: Use
ctx.waitUntil()to handle data streaming without blocking - Rich Metadata: Associate custom metadata with streams for tracking and debugging
- Direct Access: Clients can access streams via direct URLs
- Flexible Content Types: Support any content type (text, JSON, binary, etc.)
- Direct Writing: Use
stream.write()for simple data writing without manual lock management - Progress Monitoring: Track bytes written in real-time with
stream.bytesWritten - Automatic Compression: Enable optional compression to reduce bandwidth for compressible content
Managing Streams
Beyond creating streams, you can list and delete streams to manage your stream lifecycle.
Listing Streams
Use context.stream.list() to search and filter through your streams:
const handler: AgentHandler = async (req, resp, ctx) => {
// List all streams
const allStreams = await ctx.stream.list();
console.log(`Total streams: ${allStreams.total}`);
// Filter by name
const llmStreams = await ctx.stream.list({
name: 'llm-response'
});
// Filter by metadata
const userStreams = await ctx.stream.list({
metadata: { userId: 'user-123', sessionId: 'session-456' }
});
// Paginate results
const page1 = await ctx.stream.list({ limit: 10, offset: 0 });
const page2 = await ctx.stream.list({ limit: 10, offset: 10 });
return resp.json({ streams: userStreams.streams });
}Deleting Streams
Clean up streams when they're no longer needed:
const handler: AgentHandler = async (req, resp, ctx) => {
const { streamId } = await req.data.json();
try {
await ctx.stream.delete(streamId);
return resp.json({ success: true });
} catch (error) {
if (error.message.includes('not found')) {
return resp.json({ error: 'Stream not found' }, { status: 404 });
}
throw error;
}
}Complete Stream Lifecycle
Here's a complete example showing stream creation, listing, and cleanup:
import { AgentHandler } from '@agentuity/sdk';
import { openai } from '@ai-sdk/openai';
import { streamText } from 'ai';
const handler: AgentHandler = async (req, resp, ctx) => {
const { prompt, userId } = await req.data.json();
// Create a new stream with metadata
const stream = await ctx.stream.create('llm-response', {
contentType: 'text/plain',
metadata: {
userId,
sessionId: ctx.sessionId,
timestamp: Date.now(),
type: 'llm-generation'
}
});
// Stream LLM response in the background
ctx.waitUntil(async () => {
const { textStream } = streamText({
model: openai('gpt-4o'),
prompt
});
await textStream.pipeTo(stream);
});
// List all streams for this user
const userStreams = await ctx.stream.list({
metadata: { userId }
});
// Clean up old streams (older than 24 hours)
const oneDayAgo = Date.now() - 24 * 60 * 60 * 1000;
ctx.waitUntil(async () => {
for (const oldStream of userStreams.streams) {
const timestamp = Number(oldStream.metadata?.timestamp);
if (timestamp && timestamp < oneDayAgo) {
await ctx.stream.delete(oldStream.id);
ctx.logger.info(`Deleted old stream: ${oldStream.id}`);
}
}
});
return resp.json({
streamId: stream.id,
streamUrl: stream.url,
totalStreams: userStreams.total
});
}Further Reading
- Blog Post: Agents just want to have streams
- SDK Examples: JavaScript · Python
- Streaming Video Demo: Watch on YouTube
Need Help?
Join our Community for assistance or just to hang with other humans building agents.
Send us an email at hi@agentuity.com if you'd like to get in touch.
Please Follow us on
If you haven't already, please Signup for your free account now and start building your first agent!