Durable streams provide streaming storage for large data exports, audit logs, and real-time processing. Streams follow a write-once, read-many pattern: once data is written and the stream is closed, the content is immutable and accessible via URL until deleted.
Why Durable Streams?
WebSocket and SSE connections are straightforward to set up, but they're fragile in practice. Tabs get suspended, networks disconnect, pages get refreshed. When the connection drops, any in-flight data is lost—unless you build custom reconnection logic on top.
This becomes a real problem when you're streaming LLM responses. Token streaming is often the primary UI for chat applications, and agentic apps stream tool outputs and progress events over long-running sessions. If someone refreshes mid-generation, you're faced with two bad options: re-run an expensive inference call, or lose the response entirely.
Durable streams solve this by decoupling the data from the connection. Instead of streaming directly to the client, you write to persistent storage and return a URL. The stream continues writing in the background regardless of what happens on the client side.
What this gives you:
- Refresh-safe: If someone refreshes the page mid-stream, the URL still works and the content is preserved. The expensive work you already did isn't wasted.
- Background processing: Return the stream URL immediately and write data asynchronously with
ctx.waitUntil(). Your handler responds fast while the stream continues writing. - Shareable URLs: A stream is just a URL. You can share it, open it on another device, or have multiple viewers access the same output.
- Durable artifacts: Once closed, streams are immutable and remain accessible via their public URL. Useful for audit logs, exports, and generated reports.
When to Use Durable Streams
| Storage Type | Best For |
|---|---|
| Durable Streams | Large exports, audit logs, streaming data, batch processing |
| Key-Value | Fast lookups, caching, configuration |
| Vector | Semantic search, embeddings, RAG |
| Object (S3) | Files, images, documents, media |
| Database | Structured data, complex queries, transactions |
Use streams when you need to:
- Export large datasets incrementally
- Create audit logs while streaming to clients
- Process data in chunks without holding everything in memory
- Return a URL immediately while data writes in the background
Access Patterns
| Context | Access | Details |
|---|---|---|
| Agents | ctx.stream | See examples below |
| Routes | c.var.stream | See Using in Routes |
| Standalone | createAgentContext() | See Standalone Usage |
| External backends | HTTP routes | SDK Utilities for External Apps |
| Frontend | Via routes | React Hooks |
The Stream API is identical in all contexts. ctx.stream.create() and c.var.stream.create() work the same way. See Accessing Services for the full reference.
Creating Streams
import { createAgent } from '@agentuity/runtime';
const agent = createAgent('StreamCreator', {
handler: async (ctx, input) => {
const stream = await ctx.stream.create('export', {
contentType: 'text/csv',
compress: true, // optional gzip compression
metadata: { userId: input.userId },
ttl: 86400 * 7, // expires in 7 days
});
// Stream is ready immediately
ctx.logger.info('Stream created', {
id: stream.id,
url: stream.url,
});
return { streamId: stream.id, streamUrl: stream.url };
},
});Options:
contentType: MIME type for the stream content (e.g.,text/csv,application/json)compress: Enable gzip compression for smaller storage and faster transfersmetadata: Key-value pairs for tracking stream context (user IDs, timestamps, etc.)ttl: Time-to-live in seconds (see TTL semantics below)
TTL semantics:
| Value | Behavior |
|---|---|
undefined | Streams expire after 30 days (default) |
null or 0 | Streams never expire |
>= 60 | Custom TTL in seconds (minimum 60 seconds, maximum 90 days) |
TTL is enforced only in cloud deployments. During local development, streams persist indefinitely regardless of the TTL value. The expiresAt field will not be populated in local stream metadata.
Compression
Enable compress: true to reduce storage size and transfer time for text-heavy streams:
const stream = await ctx.stream.create('export', {
contentType: 'text/csv',
compress: true,
});Compression uses server-side gzip. Individual write() calls send raw data; compression is applied when the stream is closed. The resulting URL serves the data with Content-Encoding: gzip, so browsers and HTTP clients decompress it automatically.
Compressed streams require the client to accept gzip encoding. Browsers handle this natively. If you're downloading streams programmatically with a client that doesn't support gzip, you'll need to decompress the data manually.
The stream.compressed property indicates whether compression is active:
const stream = await ctx.stream.create('data', { compress: true });
ctx.logger.info('Compressed:', { compressed: stream.compressed });Writing Data
Write data incrementally, then close the stream:
import { createAgent } from '@agentuity/runtime';
const agent = createAgent('CSVExporter', {
handler: async (ctx, input) => {
const stream = await ctx.stream.create('export', {
contentType: 'text/csv',
});
// Write header
await stream.write('Name,Email,Created\n');
// Write rows
for (const user of input.users) {
await stream.write(`${user.name},${user.email},${user.created}\n`);
}
// Close when done
await stream.close();
ctx.logger.info('Export complete', {
bytesWritten: stream.bytesWritten,
});
return { url: stream.url };
},
});Streams must be closed manually with stream.close(). They do not auto-close. Failing to close a stream leaves it in an incomplete state.
Background Processing
Use ctx.waitUntil() to write data in the background while returning immediately:
import { createAgent } from '@agentuity/runtime';
const agent = createAgent('BackgroundExporter', {
handler: async (ctx, input) => {
const stream = await ctx.stream.create('report', {
contentType: 'application/json',
});
// Return URL immediately
const response = { streamUrl: stream.url };
// Process in background
ctx.waitUntil(async () => {
const data = await fetchLargeDataset(input.query);
await stream.write(JSON.stringify(data, null, 2));
await stream.close();
ctx.logger.info('Background export complete');
});
return response;
},
});Managing Streams
Listing Streams
import { createAgent } from '@agentuity/runtime';
const agent = createAgent('StreamManager', {
handler: async (ctx, input) => {
// List streams with optional filtering
const result = await ctx.stream.list({
namespace: 'export', // filter by stream namespace
metadata: { userId: input.userId }, // filter by metadata
limit: 100, // max 1000, default 100
offset: 0, // pagination offset
});
ctx.logger.info('Found streams', {
total: result.total,
count: result.streams.length,
});
return { streams: result.streams };
},
});Reading Stream Content
import { createAgent } from '@agentuity/runtime';
const agent = createAgent('StreamReader', {
handler: async (ctx, input) => {
// Get stream metadata by ID
const info = await ctx.stream.get(input.streamId);
ctx.logger.info('Stream info', {
namespace: info.namespace,
sizeBytes: info.sizeBytes,
expiresAt: info.expiresAt, // ISO timestamp when stream expires
});
// Download stream content as ReadableStream
const content = await ctx.stream.download(input.streamId);
// Process the stream
const reader = content.getReader();
const chunks: Uint8Array[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
return { data: Buffer.concat(chunks).toString() };
},
});Deleting Streams
import { createAgent } from '@agentuity/runtime';
const agent = createAgent('StreamCleaner', {
handler: async (ctx, input) => {
await ctx.stream.delete(input.streamId);
return { deleted: true };
},
});Dual Stream Pattern
Create two streams simultaneously: one for the client response, one for audit logging.
import { createAgent } from '@agentuity/runtime';
import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
const agent = createAgent('DualStreamWriter', {
handler: async (ctx, input) => {
// Create two streams
const mainStream = await ctx.stream.create('output', {
contentType: 'text/plain',
});
const auditStream = await ctx.stream.create('audit', {
contentType: 'application/json',
metadata: { userId: input.userId },
});
// Return main stream URL immediately
const response = { streamUrl: mainStream.url };
// Process both streams in background
ctx.waitUntil(async () => {
const { textStream } = streamText({
model: openai('gpt-5-mini'),
prompt: input.message,
});
const chunks: string[] = [];
for await (const chunk of textStream) {
// Write to client stream
await mainStream.write(chunk);
chunks.push(chunk);
}
// Write audit log
await auditStream.write(JSON.stringify({
timestamp: new Date().toISOString(),
userId: input.userId,
prompt: input.message,
response: chunks.join(''),
}));
await mainStream.close();
await auditStream.close();
});
return response;
},
});Content Moderation While Streaming
Buffer and evaluate content in real-time using an LLM-as-a-judge pattern:
import { createAgent } from '@agentuity/runtime';
import { generateObject, streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
import { groq } from '@ai-sdk/groq';
import { s } from '@agentuity/schema';
const agent = createAgent('ModeratedStreamer', {
handler: async (ctx, input) => {
const stream = await ctx.stream.create('moderated', {
contentType: 'text/plain',
});
ctx.waitUntil(async () => {
const { textStream } = streamText({
model: openai('gpt-5-mini'),
prompt: input.message,
});
let buffer = '';
const sentenceEnd = /[.!?]\s/;
for await (const chunk of textStream) {
buffer += chunk;
// Check complete sentences
if (sentenceEnd.test(buffer)) {
const isAppropriate = await moderateContent(buffer);
if (isAppropriate) {
await stream.write(buffer);
} else {
ctx.logger.warn('Content blocked', { content: buffer });
await stream.write('[Content filtered]');
}
buffer = '';
}
}
// Flush remaining buffer
if (buffer) {
await stream.write(buffer);
}
await stream.close();
});
return { streamUrl: stream.url };
},
});
// Use Groq for low-latency moderation
async function moderateContent(text: string): Promise<boolean> {
const { object } = await generateObject({
model: groq('openai/gpt-oss-120b'),
schema: s.object({
safe: s.boolean(),
reason: s.optional(s.string()),
}),
prompt: `Is this content appropriate? Respond with safe=true if appropriate, safe=false if it contains harmful content.\n\nContent: "${text}"`,
});
return object.safe;
}Using in Routes
Routes have the same stream access via c.var.stream:
import { createRouter } from '@agentuity/runtime';
const router = createRouter();
router.post('/export', async (c) => {
const { data } = await c.req.json();
const stream = await c.var.stream.create('export', {
contentType: 'text/csv',
});
// Return URL immediately, write in background
c.waitUntil(async () => {
await stream.write('Name,Email\n');
for (const row of data) {
await stream.write(`${row.name},${row.email}\n`);
}
await stream.close();
});
return c.json({ url: stream.url });
});
export default router;Need to create streams from a Next.js backend or other external service? Create authenticated routes that expose stream operations, then call them via HTTP. See SDK Utilities for External Apps.
Standalone Usage
Create streams from background processes with createAgentContext():
import { createApp, createAgentContext } from '@agentuity/runtime';
await createApp();
// Background job to generate reports
async function generateDailyReport() {
const ctx = createAgentContext({ trigger: 'cron' });
await ctx.invoke(async () => {
const stream = await ctx.stream.create('report', {
contentType: 'text/csv',
metadata: { date: new Date().toISOString() },
});
await stream.write('Date,Metric,Value\n');
// ... write report data
await stream.close();
ctx.logger.info('Report generated', { url: stream.url });
});
}See Running Agents Without HTTP for more patterns including Discord bots, CLI tools, and queue workers.
Stream Properties
| Property | Description |
|---|---|
stream.id | Unique stream identifier |
stream.url | Public URL to access the stream |
stream.bytesWritten | Total bytes written so far |
Best Practices
- Use compression: Enable
compress: truefor text-heavy streams (CSV, JSON, logs). Compression is applied server-side when the stream is closed, so it works with any write pattern - Return URLs early: Use
ctx.waitUntil()to return the stream URL while writing in the background - Clean up: Delete streams after they're no longer needed to free storage
- Set content types: Always specify the correct MIME type for proper browser handling
Next Steps
- Key-Value Storage: Fast caching and configuration
- Vector Storage: Semantic search and embeddings
- Object Storage (S3): File and media storage
- Database: Relational data with queries and transactions
- Streaming Responses: HTTP streaming patterns