# Returning Streaming Responses

Return real-time LLM output with streaming agents

Show LLM output as it's generated instead of waiting for the full response. Streaming reduces perceived latency and creates a more responsive experience.

## Streaming Types

Agentuity supports two streaming patterns:

### Ephemeral Streaming

Returns a `ReadableStream` directly to the HTTP client. Data flows through and is not stored. Use this for real-time chat responses.

```typescript
// In src/api/index.ts
import { Hono } from 'hono';
import { type Env } from '@agentuity/runtime';
import chatAgent from '@agent/chat';

const api = new Hono<Env>()
  .post('/chat', chatAgent.validator(), async (c) => {
    const body = c.req.valid('json');
    return c.body(await chatAgent.run(body));
  });

export default api;
```

### Persistent Streaming

Uses `ctx.stream.create()` to create stored streams with public URLs. Data persists and can be accessed after the connection closes. Use this for batch processing, exports, or content that needs to be accessed later.

```typescript
// In agent.ts
const stream = await ctx.stream.create('my-export', {
  contentType: 'text/csv',
});
await stream.write('data');
await stream.close();
```

For stored streams with public URLs, see the [Storage documentation](/services/storage/durable-streams).

> [!NOTE]
> **Two Parts to Streaming**
> Streaming agent responses require both: `schema.stream: true` in your agent and a route that returns the resulting stream, usually with `return c.body(await agent.run(data))`. Use `agent.validator()` as usual. Streaming agents skip output validation automatically.

## Basic Streaming

Enable streaming by setting `stream: true` in your schema and returning a `textStream`:

> [!NOTE]
> **AI SDK Integration**
> The `textStream` from AI SDK's `streamText()` works directly with Agentuity. Return it from your handler, then return the resulting stream from your route without additional processing.

```typescript
import { createAgent } from '@agentuity/runtime';
import { streamText } from 'ai';
import { anthropic } from '@ai-sdk/anthropic';
import { s } from '@agentuity/schema';

const agent = createAgent('ChatStream', {
  schema: {
    input: s.object({ message: s.string() }),
    stream: true,
  },
  handler: async (ctx, input) => {
    const { textStream } = streamText({
      model: anthropic('claude-sonnet-4-5'),
      prompt: input.message,
    });

    return textStream;
  },
});

export default agent;
```

## Route Configuration

For streaming agents, validate the request, run the agent, and return the resulting stream:

```typescript
// src/api/index.ts
import { Hono } from 'hono';
import { type Env } from '@agentuity/runtime';
import chatAgent from '@agent/chat';

const api = new Hono<Env>()
  .post('/chat', chatAgent.validator(), async (c) => {
    const body = c.req.valid('json');
    return c.body(await chatAgent.run(body));
  });

export type ApiRouter = typeof api;

export default api;
```

Use `c.body(await agent.run(data))` when the agent already returns a stream. Use `stream()` when the route itself creates the `ReadableStream`.

> [!NOTE]
> **Route Methods**
> Use `stream()` when the route itself creates the `ReadableStream`. When you are forwarding a streaming agent response, return the agent stream directly with `c.body(...)`. For non-agent routes that use `validator()` with an output schema, pass `stream: true` to skip output validation on the stream.

## Consuming Streams

### With Fetch API

Read the stream using the Fetch API:

```typescript
const response = await fetch('http://localhost:3500/api/chat', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({ message: 'Tell me a story' }),
});

if (!response.ok) {
  throw new Error(`Streaming request failed with ${response.status}`);
}

const reader = response.body?.getReader();
const decoder = new TextDecoder();

while (reader) {
  const { done, value } = await reader.read();
  if (done) break;

  const text = decoder.decode(value);
  // Process each chunk as it arrives
  appendToUI(text);
}
```

### With React

Use `hc<ApiRouter>()` when you want typed request bodies and direct access to the `Response` stream:

```tsx
import { hc } from 'hono/client';
import { useState } from 'react';
import type { ApiRouter } from '../api';

const client = hc<ApiRouter>('/api');

function Chat() {
  const [data, setData] = useState('');
  const [isLoading, setIsLoading] = useState(false);

  const handleSubmit = async (message: string) => {
    setIsLoading(true);
    setData('');

    try {
      const res = await client.chat.$post({ json: { message } });
      if (!res.ok) {
        throw new Error(`Streaming request failed with ${res.status}`);
      }

      const reader = res.body?.getReader();
      const decoder = new TextDecoder();

      if (!reader) return;

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        setData((prev) => prev + decoder.decode(value, { stream: true }));
      }
    } finally {
      setIsLoading(false);
    }
  };

  return (
    <div>
      {isLoading && <p>Generating...</p>}
      {data && <p>{data}</p>}
      <button onClick={() => handleSubmit('Hello!')}>Send</button>
    </div>
  );
}
```

For type-safe API calls, see [RPC Client](/frontend/rpc-client).

## Streaming with System Prompts

Add context to streaming responses:

```typescript
handler: async (ctx, input) => {
  const { textStream } = streamText({
    model: anthropic('claude-sonnet-4-5'),
    system: 'You are a helpful assistant. Be concise.',
    messages: [
      { role: 'user', content: input.message },
    ],
  });

  return textStream;
}
```

## Streaming with Conversation History

Combine streaming with thread state for multi-turn conversations:

```typescript
handler: async (ctx, input) => {
  // Get existing messages from thread state (async)
  const messages = (await ctx.thread.state.get('messages')) || [];

  // Add new user message
  messages.push({ role: 'user', content: input.message });

  const { textStream, text } = streamText({
    model: anthropic('claude-sonnet-4-5'),
    messages,
  });

  // Save assistant response after streaming completes
  ctx.waitUntil(async () => {
    const fullText = await text;
    messages.push({ role: 'assistant', content: fullText });
    await ctx.thread.state.set('messages', messages);
  });

  return textStream;
}
```

> [!TIP]
> **Background Tasks**
> Use `ctx.waitUntil()` to save conversation history without blocking the stream. The response starts immediately while state updates happen in the background.

## When to Stream

| Scenario | Recommendation |
|----------|----------------|
| Chat interfaces | Stream for better UX |
| Long-form content | Stream to show progress |
| Quick classifications | Buffer (faster overall, consider Groq for speed) |
| Structured data | Buffer (use `generateObject`) |

## Error Handling

Handle streaming errors with the `onError` callback:

```typescript
const { textStream } = streamText({
  model: anthropic('claude-sonnet-4-5'),
  prompt: input.message,
  onError: (error) => {
    ctx.logger.error('Stream error', { error });
  },
});
```

> [!WARNING]
> **Stream Errors**
> Errors in streaming are part of the stream, not thrown exceptions. Always provide an `onError` callback.

## Next Steps

- [Using the AI SDK](/agents/ai-sdk-integration): Structured output and non-streaming responses
- [State Management](/agents/state-management): Multi-turn conversations with memory
- [Server-Sent Events](/routes/sse): Server-push updates without polling