Durable Streams — Agentuity Documentation

Durable Streams

Streaming storage for large exports, audit logs, and real-time data

Durable streams provide streaming storage for large data exports, audit logs, and real-time processing. Streams follow a write-once, read-many pattern: once data is written and the stream is closed, the content is immutable and accessible via URL until deleted.

Why Durable Streams?

WebSocket and SSE connections are straightforward to set up, but they're fragile in practice. Tabs get suspended, networks disconnect, pages get refreshed. When the connection drops, any in-flight data is lost—unless you build custom reconnection logic on top.

This becomes a real problem when you're streaming LLM responses. Token streaming is often the primary UI for chat applications, and agentic apps stream tool outputs and progress events over long-running sessions. If someone refreshes mid-generation, you're faced with two bad options: re-run an expensive inference call, or lose the response entirely.

Durable streams solve this by decoupling the data from the connection. Instead of streaming directly to the client, you write to persistent storage and return a URL. The stream continues writing in the background regardless of what happens on the client side.

What this gives you:

  • Refresh-safe: If someone refreshes the page mid-stream, the URL still works and the content is preserved. The expensive work you already did isn't wasted.
  • Background processing: Return the stream URL immediately and write data asynchronously with ctx.waitUntil(). Your handler responds fast while the stream continues writing.
  • Shareable URLs: A stream is just a URL. You can share it, open it on another device, or have multiple viewers access the same output.
  • Durable artifacts: Once closed, streams are immutable and remain accessible via their public URL. Useful for audit logs, exports, and generated reports.

When to Use Durable Streams

Storage TypeBest For
Durable StreamsLarge exports, audit logs, streaming data, batch processing
Key-ValueFast lookups, caching, configuration
VectorSemantic search, embeddings, RAG
Object (S3)Files, images, documents, media
DatabaseStructured data, complex queries, transactions

Use streams when you need to:

  • Export large datasets incrementally
  • Create audit logs while streaming to clients
  • Process data in chunks without holding everything in memory
  • Return a URL immediately while data writes in the background

Access Patterns

ContextAccessDetails
Agentsctx.streamSee examples below
Routesc.var.streamSee Using in Routes
StandalonecreateAgentContext()See Standalone Usage
External backendsHTTP routesSDK Utilities for External Apps
FrontendVia routesReact Hooks

Creating Streams

import { createAgent } from '@agentuity/runtime';
 
const agent = createAgent('StreamCreator', {
  handler: async (ctx, input) => {
    const stream = await ctx.stream.create('export', {
      contentType: 'text/csv',
      compress: true, // optional gzip compression
      metadata: { userId: input.userId },
      ttl: 86400 * 7, // expires in 7 days
    });
 
    // Stream is ready immediately
    ctx.logger.info('Stream created', {
      id: stream.id,
      url: stream.url,
    });
 
    return { streamId: stream.id, streamUrl: stream.url };
  },
});

Options:

  • contentType: MIME type for the stream content (e.g., text/csv, application/json)
  • compress: Enable gzip compression for smaller storage and faster transfers
  • metadata: Key-value pairs for tracking stream context (user IDs, timestamps, etc.)
  • ttl: Time-to-live in seconds (see TTL semantics below)

TTL semantics:

ValueBehavior
undefinedStreams expire after 30 days (default)
null or 0Streams never expire
>= 60Custom TTL in seconds (minimum 60 seconds, maximum 90 days)

Compression

Enable compress: true to reduce storage size and transfer time for text-heavy streams:

const stream = await ctx.stream.create('export', {
  contentType: 'text/csv',
  compress: true,
});

Compression uses server-side gzip. Individual write() calls send raw data; compression is applied when the stream is closed. The resulting URL serves the data with Content-Encoding: gzip, so browsers and HTTP clients decompress it automatically.

The stream.compressed property indicates whether compression is active:

const stream = await ctx.stream.create('data', { compress: true });
ctx.logger.info('Compressed:', { compressed: stream.compressed });

Writing Data

Write data incrementally, then close the stream:

import { createAgent } from '@agentuity/runtime';
 
const agent = createAgent('CSVExporter', {
  handler: async (ctx, input) => {
    const stream = await ctx.stream.create('export', {
      contentType: 'text/csv',
    });
 
    // Write header
    await stream.write('Name,Email,Created\n');
 
    // Write rows
    for (const user of input.users) {
      await stream.write(`${user.name},${user.email},${user.created}\n`);
    }
 
    // Close when done
    await stream.close();
 
    ctx.logger.info('Export complete', {
      bytesWritten: stream.bytesWritten,
    });
 
    return { url: stream.url };
  },
});

Background Processing

Use ctx.waitUntil() to write data in the background while returning immediately:

import { createAgent } from '@agentuity/runtime';
 
const agent = createAgent('BackgroundExporter', {
  handler: async (ctx, input) => {
    const stream = await ctx.stream.create('report', {
      contentType: 'application/json',
    });
 
    // Return URL immediately
    const response = { streamUrl: stream.url };
 
    // Process in background
    ctx.waitUntil(async () => {
      const data = await fetchLargeDataset(input.query);
 
      await stream.write(JSON.stringify(data, null, 2));
      await stream.close();
 
      ctx.logger.info('Background export complete');
    });
 
    return response;
  },
});

Managing Streams

Listing Streams

import { createAgent } from '@agentuity/runtime';
 
const agent = createAgent('StreamManager', {
  handler: async (ctx, input) => {
    // List streams with optional filtering
    const result = await ctx.stream.list({
      namespace: 'export',  // filter by stream namespace
      metadata: { userId: input.userId },  // filter by metadata
      limit: 100,  // max 1000, default 100
      offset: 0,   // pagination offset
    });
 
    ctx.logger.info('Found streams', {
      total: result.total,
      count: result.streams.length,
    });
 
    return { streams: result.streams };
  },
});

Reading Stream Content

import { createAgent } from '@agentuity/runtime';
 
const agent = createAgent('StreamReader', {
  handler: async (ctx, input) => {
    // Get stream metadata by ID
    const info = await ctx.stream.get(input.streamId);
    ctx.logger.info('Stream info', {
      namespace: info.namespace,
      sizeBytes: info.sizeBytes,
      expiresAt: info.expiresAt,  // ISO timestamp when stream expires
    });
 
    // Download stream content as ReadableStream
    const content = await ctx.stream.download(input.streamId);
 
    // Process the stream
    const reader = content.getReader();
    const chunks: Uint8Array[] = [];
 
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      chunks.push(value);
    }
 
    return { data: Buffer.concat(chunks).toString() };
  },
});

Deleting Streams

import { createAgent } from '@agentuity/runtime';
 
const agent = createAgent('StreamCleaner', {
  handler: async (ctx, input) => {
    await ctx.stream.delete(input.streamId);
    return { deleted: true };
  },
});

Dual Stream Pattern

Create two streams simultaneously: one for the client response, one for audit logging.

import { createAgent } from '@agentuity/runtime';
import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
 
const agent = createAgent('DualStreamWriter', {
  handler: async (ctx, input) => {
    // Create two streams
    const mainStream = await ctx.stream.create('output', {
      contentType: 'text/plain',
    });
    const auditStream = await ctx.stream.create('audit', {
      contentType: 'application/json',
      metadata: { userId: input.userId },
    });
 
    // Return main stream URL immediately
    const response = { streamUrl: mainStream.url };
 
    // Process both streams in background
    ctx.waitUntil(async () => {
      const { textStream } = streamText({
        model: openai('gpt-5-mini'),
        prompt: input.message,
      });
 
      const chunks: string[] = [];
      for await (const chunk of textStream) {
        // Write to client stream
        await mainStream.write(chunk);
        chunks.push(chunk);
      }
 
      // Write audit log
      await auditStream.write(JSON.stringify({
        timestamp: new Date().toISOString(),
        userId: input.userId,
        prompt: input.message,
        response: chunks.join(''),
      }));
 
      await mainStream.close();
      await auditStream.close();
    });
 
    return response;
  },
});

Content Moderation While Streaming

Buffer and evaluate content in real-time using an LLM-as-a-judge pattern:

import { createAgent } from '@agentuity/runtime';
import { generateObject, streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
import { groq } from '@ai-sdk/groq';
import { s } from '@agentuity/schema';
 
const agent = createAgent('ModeratedStreamer', {
  handler: async (ctx, input) => {
    const stream = await ctx.stream.create('moderated', {
      contentType: 'text/plain',
    });
 
    ctx.waitUntil(async () => {
      const { textStream } = streamText({
        model: openai('gpt-5-mini'),
        prompt: input.message,
      });
 
      let buffer = '';
      const sentenceEnd = /[.!?]\s/;
 
      for await (const chunk of textStream) {
        buffer += chunk;
 
        // Check complete sentences
        if (sentenceEnd.test(buffer)) {
          const isAppropriate = await moderateContent(buffer);
 
          if (isAppropriate) {
            await stream.write(buffer);
          } else {
            ctx.logger.warn('Content blocked', { content: buffer });
            await stream.write('[Content filtered]');
          }
          buffer = '';
        }
      }
 
      // Flush remaining buffer
      if (buffer) {
        await stream.write(buffer);
      }
 
      await stream.close();
    });
 
    return { streamUrl: stream.url };
  },
});
 
// Use Groq for low-latency moderation
async function moderateContent(text: string): Promise<boolean> {
  const { object } = await generateObject({
    model: groq('openai/gpt-oss-120b'),
    schema: s.object({
      safe: s.boolean(),
      reason: s.optional(s.string()),
    }),
    prompt: `Is this content appropriate? Respond with safe=true if appropriate, safe=false if it contains harmful content.\n\nContent: "${text}"`,
  });
 
  return object.safe;
}

Using in Routes

Routes have the same stream access via c.var.stream:

import { createRouter } from '@agentuity/runtime';
 
const router = createRouter();
 
router.post('/export', async (c) => {
  const { data } = await c.req.json();
 
  const stream = await c.var.stream.create('export', {
    contentType: 'text/csv',
  });
 
  // Return URL immediately, write in background
  c.waitUntil(async () => {
    await stream.write('Name,Email\n');
    for (const row of data) {
      await stream.write(`${row.name},${row.email}\n`);
    }
    await stream.close();
  });
 
  return c.json({ url: stream.url });
});
 
export default router;

Standalone Usage

Create streams from background processes with createAgentContext():

import { createApp, createAgentContext } from '@agentuity/runtime';
 
await createApp();
 
// Background job to generate reports
async function generateDailyReport() {
  const ctx = createAgentContext({ trigger: 'cron' });
 
  await ctx.invoke(async () => {
    const stream = await ctx.stream.create('report', {
      contentType: 'text/csv',
      metadata: { date: new Date().toISOString() },
    });
 
    await stream.write('Date,Metric,Value\n');
    // ... write report data
    await stream.close();
 
    ctx.logger.info('Report generated', { url: stream.url });
  });
}

See Running Agents Without HTTP for more patterns including Discord bots, CLI tools, and queue workers.

Stream Properties

PropertyDescription
stream.idUnique stream identifier
stream.urlPublic URL to access the stream
stream.bytesWrittenTotal bytes written so far

Best Practices

  • Use compression: Enable compress: true for text-heavy streams (CSV, JSON, logs). Compression is applied server-side when the stream is closed, so it works with any write pattern
  • Return URLs early: Use ctx.waitUntil() to return the stream URL while writing in the background
  • Clean up: Delete streams after they're no longer needed to free storage
  • Set content types: Always specify the correct MIME type for proper browser handling

Next Steps