Chat and Streaming

Stream model output from framework routes and persist chat history with KV storage

Use streaming when the user should see tokens as they arrive rather than waiting for the full response. Persist chat state separately in KV, writing it only after the stream finishes.

Minimal Route

The smallest-complete shape: read recent history, call the Gateway with stream: true, and return the SSE stream.

import { Hono } from 'hono';
import { AIGatewayClient } from '@agentuity/aigateway';
import { KeyValueClient } from '@agentuity/keyvalue';
 
interface ChatMessage {
  role: 'user' | 'assistant';
  content: string;
}
 
const kv = new KeyValueClient();
const gateway = new AIGatewayClient();
const app = new Hono();
const DEFAULT_MODEL = 'deepseek/deepseek-v4-flash';
 
app.post('/api/chat', async (c) => {
  const { conversationId, message, model = DEFAULT_MODEL } = await c.req.json<{
    conversationId: string;
    message: string;
    model?: string;
  }>();
 
  const stored = await kv.get<ChatMessage[]>('chat-history', conversationId);
  const messages: ChatMessage[] = stored.exists ? (stored.data ?? []) : [];
  const next = [...messages, { role: 'user' as const, content: message }];
 
  const { stream } = await gateway.streamRequest({ 
    path: '/',
    body: { model, stream: true, messages: next },
  });
 
  return new Response(stream, { 
    headers: { 'content-type': 'text/event-stream' },
  });
});
 
export default app;

streamRequest() passes through the Gateway SSE stream. The SDK key authenticates the request; the model value is normal app input or configuration.

Complete Hono Route with KV History

This route reads stored history, streams the next answer, then appends the assistant turn.

npm install hono @agentuity/aigateway @agentuity/keyvalue valibot @agentuity/telemetry
typescriptsrc/index.ts
import { Hono } from 'hono';
import { AIGatewayClient } from '@agentuity/aigateway';
import { KeyValueClient } from '@agentuity/keyvalue';
import { logger } from '@agentuity/telemetry';
import * as v from 'valibot';
 
const inputSchema = v.object({
  conversationId: v.string(),
  message: v.string(),
  model: v.optional(v.string()),
});
 
const messageSchema = v.object({
  role: v.picklist(['user', 'assistant']),
  content: v.string(),
});
 
const historySchema = v.array(messageSchema);
 
type ChatMessage = v.InferOutput<typeof messageSchema>;
 
const kv = new KeyValueClient();
const gateway = new AIGatewayClient();
const app = new Hono();
const DEFAULT_MODEL = 'deepseek/deepseek-v4-flash';
 
app.post('/api/chat', async (c) => {
  const body: unknown = await c.req.json();
  const input = v.parse(inputSchema, body);
  const model = input.model ?? DEFAULT_MODEL;
 
  const stored = await kv.get<unknown>('chat-history', input.conversationId);
  const history: ChatMessage[] = stored.exists
    ? v.parse(historySchema, stored.data)
    : [];
 
  const userMessage: ChatMessage = { role: 'user', content: input.message };
  const nextHistory = [...history, userMessage];
 
  const { stream } = await gateway.streamRequest({
    path: '/',
    body: {
      model,
      stream: true,
      messages: [
        { role: 'system', content: 'You are a concise product support assistant.' },
        ...nextHistory,
      ],
    },
  });
 
  const [clientStream, historyStream] = stream.tee(); 
 
  void saveAssistantTurn(input.conversationId, nextHistory, historyStream).catch((error) => {
    logger.error('conversation save failed', { conversationId: input.conversationId, error });
  });
 
  return new Response(clientStream, { 
    headers: { 'content-type': 'text/event-stream' },
  });
});
 
async function saveAssistantTurn(
  conversationId: string,
  nextHistory: readonly ChatMessage[],
  stream: ReadableStream<Uint8Array>,
): Promise<void> {
  const text = await readOpenAICompatibleStreamText(stream);
  if (!text) return;
 
  await kv.set(
    'chat-history',
    conversationId,
    [...nextHistory, { role: 'assistant', content: text }],
    { ttl: 60 * 60 * 24 * 30 }, // 30-day TTL
  );
 
  logger.info('conversation saved', {
    conversationId,
    turns: nextHistory.length + 1,
  });
}
 
async function readOpenAICompatibleStreamText(
  stream: ReadableStream<Uint8Array>,
): Promise<string> {
  const reader = stream.pipeThrough(new TextDecoderStream()).getReader();
  let buffer = '';
  let text = '';
 
  for (;;) {
    const { value, done } = await reader.read();
    if (done) break;
 
    buffer += value;
    const frames = buffer.split(/\r?\n\r?\n/);
    buffer = frames.pop() ?? '';
    text += frames.map(readFrameText).join('');
  }
 
  return text + readFrameText(buffer);
}
 
function readFrameText(frame: string): string {
  const data = frame
    .split(/\r?\n/)
    .filter((line) => line.startsWith('data:'))
    .map((line) => line.slice(5).trimStart())
    .join('\n')
    .trim();
 
  if (!data || data === '[DONE]') return '';
 
  try {
    return readDeltaText(JSON.parse(data));
  } catch {
    return '';
  }
}
 
function readDeltaText(event: unknown): string {
  if (!isRecord(event)) return '';
  const choices = event.choices;
  if (!Array.isArray(choices)) return '';
 
  return choices
    .map((choice) => {
      if (!isRecord(choice)) return '';
      const delta = choice.delta;
      if (!isRecord(delta)) return '';
      const content = delta.content;
      return typeof content === 'string' ? content : '';
    })
    .join('');
}
 
function isRecord(value: unknown): value is Record<string, unknown> {
  return typeof value === 'object' && value !== null;
}
 
export default app;

streamRequest() returns a standard Web stream, so this works in Hono and any framework that accepts Web Response objects. KeyValueClient and AIGatewayClient can both authenticate from AGENTUITY_SDK_KEY. The parser above stores text from OpenAI-compatible SSE frames; if you choose a model with a different stream event shape, parse that provider's frame format before writing assistant history.

Read the Stream in the Browser

For a custom UI, read the response body and append chunks as they arrive. This is the lowest-level browser shape.

async function sendMessage(conversationId: string, message: string): Promise<string> {
  const response = await fetch('/api/chat', {
    method: 'POST',
    headers: { 'content-type': 'application/json' },
    body: JSON.stringify({ conversationId, message }),
  });
 
  if (!response.ok) {
    throw new Error(`Chat request failed with ${response.status}`);
  }
 
  if (!response.body) {
    throw new Error('Chat response did not include a body');
  }
 
  const reader = response.body.pipeThrough(new TextDecoderStream()).getReader(); 
  let assistantMessage = '';
  let buffer = '';
 
  for (;;) {
    const { value, done } = await reader.read();
    if (done) break;
    buffer += value;
    const frames = buffer.split(/\r?\n\r?\n/);
    buffer = frames.pop() ?? '';
    assistantMessage += frames.map(readGatewayFrameText).join(''); 
  }
 
  return assistantMessage + readGatewayFrameText(buffer);
}
 
function readGatewayFrameText(frame: string): string {
  const data = frame
    .split(/\r?\n/)
    .filter((line) => line.startsWith('data:'))
    .map((line) => line.slice(5).trimStart())
    .join('\n')
    .trim();
 
  if (!data || data === '[DONE]') return '';
 
  try {
    return readGatewayDeltaText(JSON.parse(data));
  } catch {
    return '';
  }
}
 
function readGatewayDeltaText(event: unknown): string {
  if (!isRecord(event)) return '';
  const choices = event.choices;
  if (!Array.isArray(choices)) return '';
 
  return choices
    .map((choice) => {
      if (!isRecord(choice)) return '';
      const delta = choice.delta;
      if (!isRecord(delta)) return '';
      const content = delta.content;
      return typeof content === 'string' ? content : '';
    })
    .join('');
}
 
function isRecord(value: unknown): value is Record<string, unknown> {
  return typeof value === 'object' && value !== null;
}

The Gateway returns provider-compatible SSE frames, not plain text chunks. If the selected model uses OpenAI-compatible frames, reuse readFrameText() from the route example before appending to the UI. If you use AI SDK UI helpers on the frontend, use an AI SDK route with toUIMessageStreamResponse() so the stream format matches the UI package.

Stream with an AI SDK Provider

The direct Gateway stream above is the default path for app-owned streaming routes. When the frontend expects AI SDK UI message streams, keep the route on AI SDK and use one provider package. This example uses the Anthropic provider; under agentuity dev, Anthropic SDK env wiring can route through AI Gateway when no provider key override is set.

npm install ai@latest @ai-sdk/anthropic@latest
import { anthropic } from '@ai-sdk/anthropic';
import { streamText } from 'ai';
 
export async function streamWithAnthropic(message: string): Promise<Response> {
  const model = process.env.ANTHROPIC_MODEL;
  if (!model) {
    throw new Error('Set ANTHROPIC_MODEL to the model this route should use.');
  }
 
  const result = streamText({
    model: anthropic(model),
    system: 'You are a concise product support assistant.',
    prompt: message,
  });
 
  return result.toTextStreamResponse();
}

Keep model IDs in configuration so each route can use the provider and tier that fits the task. Use the AI Gateway model catalog for direct Gateway calls, or the provider's model docs for provider SDK calls.

Smoke Test with curl

curl -N http://127.0.0.1:3000/api/chat \
  -H "content-type: application/json" \
  -d '{"conversationId":"demo","message":"Summarize Agentuity in one sentence"}'

-N disables output buffering so you see chunks as they arrive.

Choose the Right Stream Shape

Stream shapeUse it when
Gateway SSE streamroute calls AIGatewayClient.streamRequest() directly
toTextStreamResponse()AI SDK route returns plain text chunks
toUIMessageStreamResponse()AI SDK frontend needs tool calls, usage, finish reason
Durable Streamsoutput must survive a page refresh or be replayed later
Server-sent eventsyou need named events such as status, chunk, and done

Durable streams persist to storage and return a URL. Use them when the generated content is large, long-running, or must remain available after the HTTP connection closes.

Keep History Bounded

Sending an unlimited transcript to the model on every request increases cost and latency. Keep a rolling summary and the most recent turns instead of the full history.

interface ChatMemory {
  readonly summary: string;
  readonly recent: readonly ChatMessage[];
}
 
// After the stream collector finishes, replace the flat history with a bounded memory object
const memory: ChatMemory = {
  summary: existingSummary, // update periodically with a summarization call
  recent: allMessages.slice(-8), // keep last 8 turns
};
 
await kv.set('chat-memory', input.conversationId, memory, {
  ttl: 60 * 60 * 24 * 30,
});

Common Gotchas

SymptomCauseFix
History written even when stream failsSaving to KV before the stream drainsTee the stream and write history after the collector finishes
Context window errors after long conversationsSending full history every requestUse bounded memory (summary + recent turns)
Client sees no chunksResponse buffered by middleware or proxyVerify Content-Type: text/event-stream and no buffering layer
Tool calls missing from clientReading the Gateway SSE stream as plain textParse the provider-compatible stream event shape, or use an AI SDK route when the frontend expects AI SDK UI messages
KV TTL errorsTTL below minimum (60 s) or above maximum (365 days)Use ttl: null or ttl: 0 for never-expire; otherwise pass a value in [60, 31_536_000] seconds

Next Steps

  • Durable Streams: persist large or long-running output outside the HTTP response
  • Key-Value Storage: compact conversation state keyed by conversation ID
  • Agents: wrap model calls in typed, reusable app functions
  • Tool Calling: stream tool-aware chat responses with AI SDK UI messages