Streaming with SSE — Agentuity Documentation

Streaming with SSE

Stream updates from server to client using SSE middleware

Server-Sent Events (SSE) provide efficient one-way streaming from server to client over HTTP. Use them for progress indicators, live feeds, notifications, and LLM response streaming.

When to Use SSE

ProtocolBest For
SSELLM streaming, progress updates, server-to-client feeds
WebSocketsChat, collaboration, real-time bidirectional data
WebRTCVideo/audio calls, P2P data transfer, low-latency gaming

Basic Example

import { Hono } from 'hono';
import { type Env, sse } from '@agentuity/runtime';
 
const router = new Hono<Env>();
 
router.get('/updates', sse(async (c, stream) => {
  await stream.write('Connected!');
 
  // Stream data to client
  for (let i = 0; i < 5; i++) {
    await stream.write(`Update ${i + 1}`);
    await new Promise((r) => setTimeout(r, 1000));
  }
 
  stream.close();
}));
 
export default router;

Handler Structure

The sse() middleware wraps your handler and provides the stream object:

import { Hono } from 'hono';
import { type Env, sse } from '@agentuity/runtime';
 
router.get('/path', sse(async (c, stream) => {
  // c - Route context (logger, agents, storage)
  // stream - SSE stream object
 
  await stream.write('data');
  await stream.writeSSE({ event, data, id });
  stream.onAbort(() => { /* cleanup */ });
  stream.close();  // Optional: stream closes automatically when handler returns
}));

With Middleware

Apply authentication or logging before streaming:

import { Hono } from 'hono';
import { type Env, sse } from '@agentuity/runtime';
import { createMiddleware } from 'hono/factory';
 
const router = new Hono<Env>();
 
const authMiddleware = createMiddleware(async (c, next) => {
  const apiKey = c.req.header('X-API-Key');
  if (!apiKey) {
    return c.json({ error: 'API key required' }, 401);
  }
  await next();
});
 
router.get('/events', authMiddleware, sse(async (c, stream) => {
  await stream.writeSSE({ event: 'connected', data: 'Authenticated!' });
 
  // Stream events...
  stream.close();
}));
 
export default router;

Two Write APIs

Simple Write

await stream.write('Hello');
await stream.write(JSON.stringify({ status: 'ok' }));

Automatically formats data as SSE.

Full SSE Format

await stream.writeSSE({
  event: 'status',        // Event type for client filtering
  data: 'Processing...',  // The payload
  id: '1',                // Optional event ID
});

Use this for named events that clients can filter.

Named Events

Clients can listen for specific event types:

Server:

await stream.writeSSE({ event: 'progress', data: '50%' });
await stream.writeSSE({ event: 'complete', data: JSON.stringify({ success: true }) });

Client:

const source = new EventSource('/agent-name');
 
source.addEventListener('progress', (e) => {
  console.log('Progress:', e.data);
});
 
source.addEventListener('complete', (e) => {
  console.log('Done:', JSON.parse(e.data));
  source.close();
});

Full Example

A job progress tracker that streams status updates:

import { Hono } from 'hono';
import { type Env, sse } from '@agentuity/runtime';
 
const router = new Hono<Env>();
 
router.get('/', sse(async (c, stream) => {
  c.var.logger.info('Client connected');
 
  const steps = [
    'Loading resources...',
    'Processing data...',
    'Generating report...',
    'Finalizing...',
  ];
 
  let stepIndex = 0;
 
  const interval = setInterval(async () => {
    try {
      if (stepIndex < steps.length) {
        const progress = ((stepIndex + 1) / steps.length * 100).toFixed(0);
 
        await stream.writeSSE({
          event: 'status',
          data: `[${progress}%] ${steps[stepIndex]}`,
          id: String(stepIndex),
        });
 
        stepIndex++;
      } else {
        await stream.write(JSON.stringify({ success: true }));
        clearInterval(interval);
        stream.close();
      }
    } catch (error) {
      c.var.logger.error('Stream error', { error });
      clearInterval(interval);
    }
  }, 1000);
 
  stream.onAbort(() => {
    c.var.logger.info('Client disconnected');
    clearInterval(interval);
  });
 
  // Keep connection open
  await new Promise(() => {});
}));
 
export default router;

Client Disconnection

Handle early client disconnection with onAbort:

stream.onAbort(() => {
  clearInterval(interval);
  // Cancel any pending work
});

Always clean up resources to prevent memory leaks.

Keeping the Connection Open

SSE connections stay open until closed. Use a pending promise to keep the handler alive:

router.get('/stream', sse(async (c, stream) => {
  // Set up intervals, subscriptions, etc.
 
  // Keep connection open until client disconnects or stream.close()
  await new Promise(() => {});
}));

Client Connection

Connect from JavaScript using the EventSource API:

const source = new EventSource('https://your-project.agentuity.cloud/agent-name');
 
source.onmessage = (event) => {
  console.log('Received:', event.data);
};
 
source.onerror = () => {
  console.log('Connection error or closed');
  source.close();
};

Or with cURL:

curl -N https://your-project.agentuity.cloud/agent-name

Streaming LLM Responses

Use SSE to stream AI SDK responses to clients:

import { Hono } from 'hono';
import { type Env, sse } from '@agentuity/runtime';
import { streamText } from 'ai';
import { anthropic } from '@ai-sdk/anthropic';
 
const router = new Hono<Env>();
 
router.post('/chat', sse(async (c, stream) => {
  const { message } = await c.req.json();
 
  const result = streamText({
    model: anthropic('claude-sonnet-4-6'),
    prompt: message,
  });
 
  for await (const chunk of result.textStream) {
    await stream.write(chunk);
  }
 
  stream.close();
}));
 
export default router;

Standalone Usage

SSE handlers work without agents. This example streams build logs from storage:

import { Hono } from 'hono';
import { type Env, sse } from '@agentuity/runtime';
 
const router = new Hono<Env>();
 
router.get('/builds/:id/logs', sse(async (c, stream) => {
  const buildId = c.req.param('id');
  const logs = await c.var.kv.get<string[]>('builds', `${buildId}:logs`);
 
  if (logs.exists) {
    for (const line of logs.data) {
      await stream.writeSSE({ event: 'log', data: line });
    }
  }
 
  await stream.writeSSE({ event: 'complete', data: 'Build finished' });
  stream.close();
}));
 
export default router;

Next Steps

  • WebSockets: Bidirectional real-time communication
  • HTTP Routes: Standard request/response endpoints
  • Frontend Utilities: Connect from the browser using EventStreamManager from @agentuity/frontend or the native EventSource API