Returning Streaming Responses — Agentuity Documentation

Returning Streaming Responses

Return real-time LLM output with streaming agents

Show LLM output as it's generated instead of waiting for the full response. Streaming reduces perceived latency and creates a more responsive experience.

Streaming Types

Agentuity supports two streaming patterns:

Ephemeral Streaming

Returns a ReadableStream directly to the HTTP client. Data flows through and is not stored. Use this for real-time chat responses.

// In src/api/index.ts
import { Hono } from 'hono';
import type { Env } from '@agentuity/runtime';
import chatAgent from '@agent/chat';
 
const api = new Hono<Env>()
  .post('/chat', chatAgent.validator(), async (c) => {
    const body = c.req.valid('json');
    return c.body(await chatAgent.run(body));
  });
 
export default api;

Persistent Streaming

Uses ctx.stream.create() to create stored streams with public URLs. Data persists and can be accessed after the connection closes. Use this for batch processing, exports, or content that needs to be accessed later.

// In agent.ts
const stream = await ctx.stream.create('my-export', {
  contentType: 'text/csv',
});
await stream.write('data');
await stream.close();

For stored streams with public URLs, see the Storage documentation.

Basic Streaming

Enable streaming by setting stream: true in your schema and returning a textStream:

import { createAgent } from '@agentuity/runtime';
import { streamText } from 'ai';
import { anthropic } from '@ai-sdk/anthropic';
import { s } from '@agentuity/schema';
 
const agent = createAgent('ChatStream', {
  schema: {
    input: s.object({ message: s.string() }),
    stream: true,
  },
  handler: async (ctx, input) => {
    const { textStream } = streamText({
      model: anthropic('claude-sonnet-4-6'),
      prompt: input.message,
    });
 
    return textStream;
  },
});
 
export default agent;

Route Configuration

For streaming agents, validate the request, run the agent, and return the resulting stream:

// src/api/index.ts
import { Hono } from 'hono';
import type { Env } from '@agentuity/runtime';
import chatAgent from '@agent/chat';
 
const api = new Hono<Env>()
  .post('/chat', chatAgent.validator(), async (c) => {
    const body = c.req.valid('json');
    return c.body(await chatAgent.run(body));
  });
 
export type ApiRouter = typeof api;
 
export default api;

Use c.body(await agent.run(data)) when the agent already returns a stream. Use stream() when the route itself creates the ReadableStream.

Consuming Streams

With Fetch API

Read the stream using the Fetch API:

const response = await fetch('http://localhost:3500/api/chat', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({ message: 'Tell me a story' }),
});
 
if (!response.ok) {
  throw new Error(`Streaming request failed with ${response.status}`);
}
 
const reader = response.body?.getReader();
const decoder = new TextDecoder();
 
while (reader) {
  const { done, value } = await reader.read();
  if (done) break;
 
  const text = decoder.decode(value);
  // Process each chunk as it arrives
  appendToUI(text);
}

With React

Use hc<ApiRouter>() when you want typed request bodies and direct access to the Response stream:

import { hc } from 'hono/client';
import { useState } from 'react';
import type { ApiRouter } from '../api';
 
const client = hc<ApiRouter>('/api');
 
function Chat() {
  const [data, setData] = useState('');
  const [isLoading, setIsLoading] = useState(false);
 
  const handleSubmit = async (message: string) => {
    setIsLoading(true);
    setData('');
 
    try {
      const res = await client.chat.$post({ json: { message } });
      if (!res.ok) {
        throw new Error(`Streaming request failed with ${res.status}`);
      }
 
      const reader = res.body?.getReader();
      const decoder = new TextDecoder();
 
      if (!reader) return;
 
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        setData((prev) => prev + decoder.decode(value, { stream: true }));
      }
    } finally {
      setIsLoading(false);
    }
  };
 
  return (
    <div>
      {isLoading && <p>Generating...</p>}
      {data && <p>{data}</p>}
      <button onClick={() => handleSubmit('Hello!')}>Send</button>
    </div>
  );
}

For type-safe API calls, see RPC Client.

Streaming with System Prompts

Add context to streaming responses:

handler: async (ctx, input) => {
  const { textStream } = streamText({
    model: anthropic('claude-sonnet-4-6'),
    system: 'You are a helpful assistant. Be concise.',
    messages: [
      { role: 'user', content: input.message },
    ],
  });
 
  return textStream;
}

Streaming with Conversation History

Combine streaming with thread state for multi-turn conversations:

handler: async (ctx, input) => {
  // Get existing messages from thread state (async)
  const messages = (await ctx.thread.state.get('messages')) || [];
 
  // Add new user message
  messages.push({ role: 'user', content: input.message });
 
  const { textStream, text } = streamText({
    model: anthropic('claude-sonnet-4-6'),
    messages,
  });
 
  // Save assistant response after streaming completes
  ctx.waitUntil(async () => {
    const fullText = await text;
    messages.push({ role: 'assistant', content: fullText });
    await ctx.thread.state.set('messages', messages);
  });
 
  return textStream;
}

When to Stream

ScenarioRecommendation
Chat interfacesStream for better UX
Long-form contentStream to show progress
Quick classificationsBuffer (faster overall, consider Groq for speed)
Structured dataBuffer (use AI SDK Output.object())

Error Handling

Handle streaming errors with the onError callback:

const { textStream } = streamText({
  model: anthropic('claude-sonnet-4-6'),
  prompt: input.message,
  onError({ error }) {
    ctx.logger.error('Stream error', { error });
  },
});

Next Steps