Durable Streams
Streaming storage for large exports, audit logs, and real-time data
Durable streams provide streaming storage for large data exports, audit logs, and real-time processing. Streams follow a write-once, read-many pattern: once data is written and the stream is closed, the content is immutable and can be accessed via a permanent public URL.
When to Use Durable Streams
| Storage Type | Best For |
|---|---|
| Durable Streams | Large exports, audit logs, streaming data, batch processing |
| Key-Value | Fast lookups, caching, configuration |
| Vector | Semantic search, embeddings, RAG |
| Object (S3) | Files, images, documents, media |
| Database | Structured data, complex queries, transactions |
Use streams when you need to:
- Export large datasets incrementally
- Create audit logs while streaming to clients
- Process data in chunks without holding everything in memory
- Return a URL immediately while data writes in the background
Access Patterns
| Context | Access | Details |
|---|---|---|
| Agents | ctx.stream | See examples below |
| Routes | c.var.stream | See Using in Routes |
| Standalone | createAgentContext() | See Standalone Usage |
| External backends | HTTP routes | SDK Utilities for External Apps |
| Frontend | Via routes | React Hooks |
Same API Everywhere
The Stream API is identical in all contexts. ctx.stream.create() and c.var.stream.create() work the same way. See Accessing Services for the full reference.
Creating Streams
import { createAgent } from '@agentuity/runtime';
const agent = createAgent('StreamCreator', {
handler: async (ctx, input) => {
const stream = await ctx.stream.create('export', {
contentType: 'text/csv',
compress: true, // optional gzip compression
metadata: { userId: input.userId },
});
// Stream is ready immediately
ctx.logger.info('Stream created', {
id: stream.id,
url: stream.url,
});
return { streamId: stream.id, streamUrl: stream.url };
},
});Options:
contentType: MIME type for the stream content (e.g.,text/csv,application/json)compress: Enable gzip compression for smaller storage and faster transfersmetadata: Key-value pairs for tracking stream context (user IDs, timestamps, etc.)
Writing Data
Write data incrementally, then close the stream:
const agent = createAgent('CSVExporter', {
handler: async (ctx, input) => {
const stream = await ctx.stream.create('export', {
contentType: 'text/csv',
});
// Write header
await stream.write('Name,Email,Created\n');
// Write rows
for (const user of input.users) {
await stream.write(`${user.name},${user.email},${user.created}\n`);
}
// Close when done
await stream.close();
ctx.logger.info('Export complete', {
bytesWritten: stream.bytesWritten,
});
return { url: stream.url };
},
});Always Close Streams
Streams must be closed manually with stream.close(). They do not auto-close. Failing to close a stream leaves it in an incomplete state.
Background Processing
Use ctx.waitUntil() to write data in the background while returning immediately:
const agent = createAgent('BackgroundExporter', {
handler: async (ctx, input) => {
const stream = await ctx.stream.create('report', {
contentType: 'application/json',
});
// Return URL immediately
const response = { streamUrl: stream.url };
// Process in background
ctx.waitUntil(async () => {
const data = await fetchLargeDataset(input.query);
await stream.write(JSON.stringify(data, null, 2));
await stream.close();
ctx.logger.info('Background export complete');
});
return response;
},
});Managing Streams
Listing Streams
const agent = createAgent('StreamManager', {
handler: async (ctx, input) => {
// List streams with optional filtering
const result = await ctx.stream.list({
name: 'export', // filter by stream name
metadata: { userId: input.userId }, // filter by metadata
limit: 100, // max 1000, default 100
offset: 0, // pagination offset
});
ctx.logger.info('Found streams', {
total: result.total,
count: result.streams.length,
});
return { streams: result.streams };
},
});Reading Stream Content
const agent = createAgent('StreamReader', {
handler: async (ctx, input) => {
// Get stream metadata by ID
const info = await ctx.stream.get(input.streamId);
ctx.logger.info('Stream info', { name: info.name, sizeBytes: info.sizeBytes });
// Download stream content as ReadableStream
const content = await ctx.stream.download(input.streamId);
// Process the stream
const reader = content.getReader();
const chunks: Uint8Array[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
return { data: Buffer.concat(chunks).toString() };
},
});Deleting Streams
const agent = createAgent('StreamCleaner', {
handler: async (ctx, input) => {
await ctx.stream.delete(input.streamId);
return { deleted: true };
},
});Dual Stream Pattern
Create two streams simultaneously: one for the client response, one for audit logging.
import { createAgent } from '@agentuity/runtime';
import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
const agent = createAgent('DualStreamWriter', {
handler: async (ctx, input) => {
// Create two streams
const mainStream = await ctx.stream.create('output', {
contentType: 'text/plain',
});
const auditStream = await ctx.stream.create('audit', {
contentType: 'application/json',
metadata: { userId: input.userId },
});
// Return main stream URL immediately
const response = { streamUrl: mainStream.url };
// Process both streams in background
ctx.waitUntil(async () => {
const { textStream } = streamText({
model: openai('gpt-5-mini'),
prompt: input.message,
});
const chunks: string[] = [];
for await (const chunk of textStream) {
// Write to client stream
await mainStream.write(chunk);
chunks.push(chunk);
}
// Write audit log
await auditStream.write(JSON.stringify({
timestamp: new Date().toISOString(),
userId: input.userId,
prompt: input.message,
response: chunks.join(''),
}));
await mainStream.close();
await auditStream.close();
});
return response;
},
});Content Moderation While Streaming
Buffer and evaluate content in real-time using an LLM-as-a-judge pattern:
import { generateObject, streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
import { groq } from '@ai-sdk/groq';
import { s } from '@agentuity/schema';
const agent = createAgent('ModeratedStreamer', {
handler: async (ctx, input) => {
const stream = await ctx.stream.create('moderated', {
contentType: 'text/plain',
});
ctx.waitUntil(async () => {
const { textStream } = streamText({
model: openai('gpt-5-mini'),
prompt: input.message,
});
let buffer = '';
const sentenceEnd = /[.!?]\s/;
for await (const chunk of textStream) {
buffer += chunk;
// Check complete sentences
if (sentenceEnd.test(buffer)) {
const isAppropriate = await moderateContent(buffer);
if (isAppropriate) {
await stream.write(buffer);
} else {
ctx.logger.warn('Content blocked', { content: buffer });
await stream.write('[Content filtered]');
}
buffer = '';
}
}
// Flush remaining buffer
if (buffer) {
await stream.write(buffer);
}
await stream.close();
});
return { streamUrl: stream.url };
},
});
// Use Groq for low-latency moderation
async function moderateContent(text: string): Promise<boolean> {
const { object } = await generateObject({
model: groq('openai/gpt-oss-20b'),
schema: s.object({
safe: s.boolean(),
reason: s.optional(s.string()),
}),
prompt: `Is this content appropriate? Respond with safe=true if appropriate, safe=false if it contains harmful content.\n\nContent: "${text}"`,
});
return object.safe;
}Using in Routes
Routes have the same stream access via c.var.stream:
import { createRouter } from '@agentuity/runtime';
const router = createRouter();
router.post('/export', async (c) => {
const { data } = await c.req.json();
const stream = await c.var.stream.create('export', {
contentType: 'text/csv',
});
// Return URL immediately, write in background
c.waitUntil(async () => {
await stream.write('Name,Email\n');
for (const row of data) {
await stream.write(`${row.name},${row.email}\n`);
}
await stream.close();
});
return c.json({ url: stream.url });
});
export default router;External Backend Access
Need to create streams from a Next.js backend or other external service? Create authenticated routes that expose stream operations, then call them via HTTP. See SDK Utilities for External Apps.
Standalone Usage
Create streams from background processes with createAgentContext():
import { createApp, createAgentContext } from '@agentuity/runtime';
await createApp();
// Background job to generate reports
async function generateDailyReport() {
const ctx = createAgentContext({ trigger: 'cron' });
await ctx.invoke(async () => {
const stream = await ctx.stream.create('report', {
contentType: 'text/csv',
metadata: { date: new Date().toISOString() },
});
await stream.write('Date,Metric,Value\n');
// ... write report data
await stream.close();
ctx.logger.info('Report generated', { url: stream.url });
});
}See Running Agents Without HTTP for more patterns including Discord bots, CLI tools, and queue workers.
Stream Properties
| Property | Description |
|---|---|
stream.id | Unique stream identifier |
stream.url | Public URL to access the stream |
stream.bytesWritten | Total bytes written so far |
Best Practices
- Use compression: Enable
compress: truefor large text exports to reduce storage and transfer time - Return URLs early: Use
ctx.waitUntil()to return the stream URL while writing in the background - Clean up: Delete streams after they're no longer needed to free storage
- Set content types: Always specify the correct MIME type for proper browser handling
Next Steps
- Key-Value Storage: Fast caching and configuration
- Vector Storage: Semantic search and embeddings
- Object Storage (S3): File and media storage
- Database: Relational data with queries and transactions
- Streaming Responses: HTTP streaming patterns
Need Help?
Join our Community for assistance or just to hang with other humans building agents.
Send us an email at hi@agentuity.com if you'd like to get in touch.
Please Follow us on
If you haven't already, please Signup for your free account now and start building your first agent!