Show LLM output as it's generated instead of waiting for the full response. Streaming reduces perceived latency and creates a more responsive experience.
Streaming Types
Agentuity supports two streaming patterns:
Ephemeral Streaming
Returns a ReadableStream directly to the HTTP client. Data flows through and is not stored. Use this for real-time chat responses.
// In src/api/index.ts
import { Hono } from 'hono';
import type { Env } from '@agentuity/runtime';
import chatAgent from '@agent/chat';
const api = new Hono<Env>()
.post('/chat', chatAgent.validator(), async (c) => {
const body = c.req.valid('json');
return c.body(await chatAgent.run(body));
});
export default api;Persistent Streaming
Uses ctx.stream.create() to create stored streams with public URLs. Data persists and can be accessed after the connection closes. Use this for batch processing, exports, or content that needs to be accessed later.
// In agent.ts
const stream = await ctx.stream.create('my-export', {
contentType: 'text/csv',
});
await stream.write('data');
await stream.close();For stored streams with public URLs, see the Storage documentation.
Streaming agent responses require both: schema.stream: true in your agent and a route that returns the resulting stream, usually with return c.body(await agent.run(data)). Use agent.validator() as usual. Streaming agents skip output validation automatically.
Basic Streaming
Enable streaming by setting stream: true in your schema and returning a textStream:
The textStream from AI SDK's streamText() works directly with Agentuity. Return it from your handler, then return the resulting stream from your route without additional processing.
import { createAgent } from '@agentuity/runtime';
import { streamText } from 'ai';
import { anthropic } from '@ai-sdk/anthropic';
import { s } from '@agentuity/schema';
const agent = createAgent('ChatStream', {
schema: {
input: s.object({ message: s.string() }),
stream: true,
},
handler: async (ctx, input) => {
const { textStream } = streamText({
model: anthropic('claude-sonnet-4-6'),
prompt: input.message,
});
return textStream;
},
});
export default agent;Route Configuration
For streaming agents, validate the request, run the agent, and return the resulting stream:
// src/api/index.ts
import { Hono } from 'hono';
import type { Env } from '@agentuity/runtime';
import chatAgent from '@agent/chat';
const api = new Hono<Env>()
.post('/chat', chatAgent.validator(), async (c) => {
const body = c.req.valid('json');
return c.body(await chatAgent.run(body));
});
export type ApiRouter = typeof api;
export default api;Use c.body(await agent.run(data)) when the agent already returns a stream. Use stream() when the route itself creates the ReadableStream.
Use stream() when the route itself creates the ReadableStream. When you are forwarding a streaming agent response, return the agent stream directly with c.body(...). For non-agent routes that use validator() with an output schema, pass stream: true to skip output validation on the stream.
Consuming Streams
With Fetch API
Read the stream using the Fetch API:
const response = await fetch('http://localhost:3500/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message: 'Tell me a story' }),
});
if (!response.ok) {
throw new Error(`Streaming request failed with ${response.status}`);
}
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
// Process each chunk as it arrives
appendToUI(text);
}With React
Use hc<ApiRouter>() when you want typed request bodies and direct access to the Response stream:
import { hc } from 'hono/client';
import { useState } from 'react';
import type { ApiRouter } from '../api';
const client = hc<ApiRouter>('/api');
function Chat() {
const [data, setData] = useState('');
const [isLoading, setIsLoading] = useState(false);
const handleSubmit = async (message: string) => {
setIsLoading(true);
setData('');
try {
const res = await client.chat.$post({ json: { message } });
if (!res.ok) {
throw new Error(`Streaming request failed with ${res.status}`);
}
const reader = res.body?.getReader();
const decoder = new TextDecoder();
if (!reader) return;
while (true) {
const { done, value } = await reader.read();
if (done) break;
setData((prev) => prev + decoder.decode(value, { stream: true }));
}
} finally {
setIsLoading(false);
}
};
return (
<div>
{isLoading && <p>Generating...</p>}
{data && <p>{data}</p>}
<button onClick={() => handleSubmit('Hello!')}>Send</button>
</div>
);
}For type-safe API calls, see RPC Client.
Streaming with System Prompts
Add context to streaming responses:
handler: async (ctx, input) => {
const { textStream } = streamText({
model: anthropic('claude-sonnet-4-6'),
system: 'You are a helpful assistant. Be concise.',
messages: [
{ role: 'user', content: input.message },
],
});
return textStream;
}Streaming with Conversation History
Combine streaming with thread state for multi-turn conversations:
handler: async (ctx, input) => {
// Get existing messages from thread state (async)
const messages = (await ctx.thread.state.get('messages')) || [];
// Add new user message
messages.push({ role: 'user', content: input.message });
const { textStream, text } = streamText({
model: anthropic('claude-sonnet-4-6'),
messages,
});
// Save assistant response after streaming completes
ctx.waitUntil(async () => {
const fullText = await text;
messages.push({ role: 'assistant', content: fullText });
await ctx.thread.state.set('messages', messages);
});
return textStream;
}Use ctx.waitUntil() to save conversation history without blocking the stream. The response starts immediately while state updates happen in the background.
When to Stream
| Scenario | Recommendation |
|---|---|
| Chat interfaces | Stream for better UX |
| Long-form content | Stream to show progress |
| Quick classifications | Buffer (faster overall, consider Groq for speed) |
| Structured data | Buffer (use AI SDK Output.object()) |
Error Handling
Handle streaming errors with the onError callback:
const { textStream } = streamText({
model: anthropic('claude-sonnet-4-6'),
prompt: input.message,
onError({ error }) {
ctx.logger.error('Stream error', { error });
},
});Errors in streaming are part of the stream, not thrown exceptions. Always provide an onError callback.
Next Steps
- Using the AI SDK: Structured output and non-streaming responses
- State Management: Multi-turn conversations with memory
- Server-Sent Events: Server-push updates without polling