# Durable Streams

Streaming storage for large exports, audit logs, and real-time data

Durable streams provide streaming storage for large data exports, audit logs, and real-time processing. Streams follow a **write-once, read-many** pattern: once data is written and the stream is closed, the content is immutable and accessible via URL until deleted.

## Why Durable Streams?

WebSocket and SSE connections are straightforward to set up, but they're fragile in practice. Tabs get suspended, networks disconnect, pages get refreshed. When the connection drops, any in-flight data is lost, unless you build custom reconnection logic on top.

This becomes a real problem when you're streaming LLM responses. Token streaming is often the primary UI for chat applications, and agentic apps stream tool outputs and progress events over long-running sessions. If someone refreshes mid-generation, you're faced with two bad options: re-run an expensive inference call, or lose the response entirely.

Durable streams solve this by decoupling the data from the connection. Instead of streaming directly to the client, you write to persistent storage and return a URL. The stream continues writing in the background regardless of what happens on the client side.

**What this gives you:**

- **Refresh-safe**: If someone refreshes the page mid-stream, the URL still works and the content is preserved. The expensive work you already did isn't wasted.
- **Background processing**: Return the stream URL immediately and write data asynchronously with `ctx.waitUntil()`. Your handler responds fast while the stream continues writing.
- **Shareable URLs**: A stream is just a URL. You can share it, open it on another device, or have multiple viewers access the same output.
- **Durable artifacts**: Once closed, streams are immutable and remain accessible via their public URL. Useful for audit logs, exports, and generated reports.

## When to Use Durable Streams

| Storage Type | Best For |
|--------------|----------|
| **Durable Streams** | Large exports, audit logs, streaming data, batch processing |
| [Key-Value](/services/storage/key-value) | Fast lookups, caching, configuration |
| [Vector](/services/storage/vector) | Semantic search, embeddings, RAG |
| [Object (S3)](/services/storage/object) | Files, images, documents, media |
| [Database](/services/database) | Structured data, complex queries, transactions |

**Use streams when you need to:**
- Export large datasets incrementally
- Create audit logs while streaming to clients
- Process data in chunks without holding everything in memory
- Return a URL immediately while data writes in the background

## Access Patterns

| Context | Access | Details |
|---------|--------|---------|
| Agents | `ctx.stream` | See examples below |
| Routes | `c.var.stream` | See [Using in Routes](#using-in-routes) |
| Standalone | `createAgentContext()` | See [Standalone Usage](#standalone-usage) |
| External backends | HTTP routes | [SDK Utilities for External Apps](/cookbook/patterns/server-utilities) |
| Frontend | Via routes | [React Hooks](/frontend/react-hooks) |

> [!NOTE]
> **Same API Everywhere**
> The Stream API is identical in all contexts. `ctx.stream.create()` and `c.var.stream.create()` work the same way. See [Accessing Services](/reference/sdk-reference/router#accessing-services) for the full reference.

## Creating Streams

```typescript
import { createAgent } from '@agentuity/runtime';

const agent = createAgent('StreamCreator', {
  handler: async (ctx, input) => {
    const stream = await ctx.stream.create('export', {
      contentType: 'text/csv',
      compress: true, // optional gzip compression
      metadata: { userId: input.userId },
      ttl: 86400 * 7, // expires in 7 days
    });

    // Stream is ready immediately
    ctx.logger.info('Stream created', {
      id: stream.id,
      url: stream.url,
    });

    return { streamId: stream.id, streamUrl: stream.url };
  },
});
```

**Options:**
- `contentType`: MIME type for the stream content (e.g., `text/csv`, `application/json`)
- `compress`: Enable gzip compression for smaller storage and faster transfers
- `metadata`: Key-value pairs for tracking stream context (user IDs, timestamps, etc.)
- `ttl`: Time-to-live in seconds (see TTL semantics below)

**TTL semantics:**

| Value | Behavior |
|-------|----------|
| `undefined` | Streams expire after 30 days (default) |
| `null` or `0` | Streams never expire |
| `>= 60` | Custom TTL in seconds (minimum 60 seconds, maximum 90 days) |

> [!NOTE]
> **TTL in Local Development**
> TTL is enforced only in cloud deployments. During local development, streams persist indefinitely regardless of the TTL value. The `expiresAt` field will not be populated in local stream metadata.

### Compression

Enable `compress: true` to reduce storage size and transfer time for text-heavy streams:

```typescript
const stream = await ctx.stream.create('export', {
  contentType: 'text/csv',
  compress: true,
});
```

Compression uses server-side gzip. Individual `write()` calls send raw data; compression is applied when the stream is closed. The resulting URL serves the data with `Content-Encoding: gzip`, so browsers and HTTP clients decompress it automatically.

> [!NOTE]
> **Client Compatibility**
> Compressed streams require the client to accept gzip encoding. Browsers handle this natively. If you're downloading streams programmatically with a client that doesn't support gzip, you'll need to decompress the data manually.

The `stream.compressed` property indicates whether compression is active:

```typescript
const stream = await ctx.stream.create('data', { compress: true });
ctx.logger.info('Compressed:', { compressed: stream.compressed });
```

## Writing Data

Write data incrementally, then close the stream:

```typescript
import { createAgent } from '@agentuity/runtime';

const agent = createAgent('CSVExporter', {
  handler: async (ctx, input) => {
    const stream = await ctx.stream.create('export', {
      contentType: 'text/csv',
    });

    // Write header
    await stream.write('Name,Email,Created\n');

    // Write rows
    for (const user of input.users) {
      await stream.write(`${user.name},${user.email},${user.created}\n`);
    }

    // Close when done
    await stream.close();

    ctx.logger.info('Export complete', {
      bytesWritten: stream.bytesWritten,
    });

    return { url: stream.url };
  },
});
```

> [!WARNING]
> **Always Close Streams**
> Streams must be closed manually with `stream.close()`. They do not auto-close. Failing to close a stream leaves it in an incomplete state.

## Background Processing

Use `ctx.waitUntil()` to write data in the background while returning immediately:

```typescript
import { createAgent } from '@agentuity/runtime';

const agent = createAgent('BackgroundExporter', {
  handler: async (ctx, input) => {
    const stream = await ctx.stream.create('report', {
      contentType: 'application/json',
    });

    // Return URL immediately
    const response = { streamUrl: stream.url };

    // Process in background
    ctx.waitUntil(async () => {
      const data = await fetchLargeDataset(input.query);

      await stream.write(JSON.stringify(data, null, 2));
      await stream.close();

      ctx.logger.info('Background export complete');
    });

    return response;
  },
});
```

## Managing Streams

### Listing Streams

```typescript
import { createAgent } from '@agentuity/runtime';

const agent = createAgent('StreamManager', {
  handler: async (ctx, input) => {
    // List streams with optional filtering
    const result = await ctx.stream.list({
      namespace: 'export',  // filter by stream namespace
      metadata: { userId: input.userId },  // filter by metadata
      limit: 100,  // max 1000, default 100
      offset: 0,   // pagination offset
    });

    ctx.logger.info('Found streams', {
      total: result.total,
      count: result.streams.length,
    });

    return { streams: result.streams };
  },
});
```

### Reading Stream Content

```typescript
import { createAgent } from '@agentuity/runtime';

const agent = createAgent('StreamReader', {
  handler: async (ctx, input) => {
    // Get stream metadata by ID
    const info = await ctx.stream.get(input.streamId);
    ctx.logger.info('Stream info', {
      namespace: info.namespace,
      sizeBytes: info.sizeBytes,
      expiresAt: info.expiresAt,  // ISO timestamp when stream expires
    });

    // Download stream content as ReadableStream
    const content = await ctx.stream.download(input.streamId);

    // Process the stream
    const reader = content.getReader();
    const chunks: Uint8Array[] = [];

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      chunks.push(value);
    }

    return { data: Buffer.concat(chunks).toString() };
  },
});
```

### Deleting Streams

```typescript
import { createAgent } from '@agentuity/runtime';

const agent = createAgent('StreamCleaner', {
  handler: async (ctx, input) => {
    await ctx.stream.delete(input.streamId);
    return { deleted: true };
  },
});
```

## Dual Stream Pattern

Create two streams simultaneously: one for the client response, one for audit logging.

```typescript
import { createAgent } from '@agentuity/runtime';
import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';

const agent = createAgent('DualStreamWriter', {
  handler: async (ctx, input) => {
    // Create two streams
    const mainStream = await ctx.stream.create('output', {
      contentType: 'text/plain',
    });
    const auditStream = await ctx.stream.create('audit', {
      contentType: 'application/json',
      metadata: { userId: input.userId },
    });

    // Return main stream URL immediately
    const response = { streamUrl: mainStream.url };

    // Process both streams in background
    ctx.waitUntil(async () => {
      const { textStream } = streamText({
        model: openai('gpt-5.4-nano'),
        prompt: input.message,
      });

      const chunks: string[] = [];
      for await (const chunk of textStream) {
        // Write to client stream
        await mainStream.write(chunk);
        chunks.push(chunk);
      }

      // Write audit log
      await auditStream.write(JSON.stringify({
        timestamp: new Date().toISOString(),
        userId: input.userId,
        prompt: input.message,
        response: chunks.join(''),
      }));

      await mainStream.close();
      await auditStream.close();
    });

    return response;
  },
});
```

## Content Moderation While Streaming

Buffer and evaluate content in real-time using an LLM-as-a-judge pattern:

```typescript
import { createAgent } from '@agentuity/runtime';
import { generateObject, streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
import { groq } from '@ai-sdk/groq';
import { s } from '@agentuity/schema';

const agent = createAgent('ModeratedStreamer', {
  handler: async (ctx, input) => {
    const stream = await ctx.stream.create('moderated', {
      contentType: 'text/plain',
    });

    ctx.waitUntil(async () => {
      const { textStream } = streamText({
        model: openai('gpt-5.4-nano'),
        prompt: input.message,
      });

      let buffer = '';
      const sentenceEnd = /[.!?]\s/;

      for await (const chunk of textStream) {
        buffer += chunk;

        // Check complete sentences
        if (sentenceEnd.test(buffer)) {
          const isAppropriate = await moderateContent(buffer);

          if (isAppropriate) {
            await stream.write(buffer);
          } else {
            ctx.logger.warn('Content blocked', { content: buffer });
            await stream.write('[Content filtered]');
          }
          buffer = '';
        }
      }

      // Flush remaining buffer
      if (buffer) {
        await stream.write(buffer);
      }

      await stream.close();
    });

    return { streamUrl: stream.url };
  },
});

// Use Groq for low-latency moderation
async function moderateContent(text: string): Promise<boolean> {
  const { object } = await generateObject({
    model: groq('llama-4-scout-17b-16e'),
    schema: s.object({
      safe: s.boolean(),
      reason: s.optional(s.string()),
    }),
    prompt: `Is this content appropriate? Respond with safe=true if appropriate, safe=false if it contains harmful content.\n\nContent: "${text}"`,
  });

  return object.safe;
}
```

## Using in Routes

Routes have the same stream access via `c.var.stream`:

```typescript
import { Hono } from 'hono';
import type { Env } from '@agentuity/runtime';

const router = new Hono<Env>();

router.post('/export', async (c) => {
  const { data } = await c.req.json();

  const stream = await c.var.stream.create('export', {
    contentType: 'text/csv',
  });

  // Return URL immediately, write in background
  c.waitUntil(async () => {
    await stream.write('Name,Email\n');
    for (const row of data) {
      await stream.write(`${row.name},${row.email}\n`);
    }
    await stream.close();
  });

  return c.json({ url: stream.url });
});

export default router;
```

> [!TIP]
> **External Backend Access**
> Need to create streams from a Next.js backend or other external service? Create authenticated routes that expose stream operations, then call them via HTTP. See [SDK Utilities for External Apps](/cookbook/patterns/server-utilities).

## Standalone Usage

Create streams from background processes with `createAgentContext()`:

```typescript
import { createApp, createAgentContext } from '@agentuity/runtime';

const app = await createApp();
export default app;

// Background job to generate reports
async function generateDailyReport() {
  const ctx = createAgentContext({ trigger: 'cron' });

  await ctx.invoke(async () => {
    const stream = await ctx.stream.create('report', {
      contentType: 'text/csv',
      metadata: { date: new Date().toISOString() },
    });

    await stream.write('Date,Metric,Value\n');
    // ... write report data
    await stream.close();

    ctx.logger.info('Report generated', { url: stream.url });
  });
}
```

See [Running Agents Without HTTP](/agents/standalone-execution) for more patterns including Discord bots, CLI tools, and queue workers.

## Stream Properties

| Property | Description |
|----------|-------------|
| `stream.id` | Unique stream identifier |
| `stream.url` | Public URL to access the stream |
| `stream.bytesWritten` | Total bytes written so far |

## Best Practices

- **Use compression**: Enable `compress: true` for text-heavy streams (CSV, JSON, logs). Compression is applied server-side when the stream is closed, so it works with any write pattern
- **Return URLs early**: Use `ctx.waitUntil()` to return the stream URL while writing in the background
- **Clean up**: Delete streams after they're no longer needed to free storage
- **Set content types**: Always specify the correct MIME type for proper browser handling

## Next Steps

- [Key-Value Storage](/services/storage/key-value): Fast caching and configuration
- [Vector Storage](/services/storage/vector): Semantic search and embeddings
- [Object Storage (S3)](/services/storage/object): File and media storage
- [Database](/services/database): Relational data with queries and transactions
- [Streaming Responses](/agents/streaming-responses): HTTP streaming patterns