Guides

Agent Streaming

How to use streaming in your agents

Streaming lets your users read the response before the AI finishes thinking. Nothing feels faster than already happening.

Why Streaming?

  • Latency hiding by showing results instantly instead of after the whole response is ready.
  • Large inputs and outputs without hitting payload limits.
  • Agent chains can forward chunks to the next agent as soon as they arrive.
  • Snappier UX so users see progress in milliseconds instead of waiting for the full payload.
  • Resource efficiency by not holding entire responses in memory; chunks flow straight through.
  • Composable pipelines by allowing agents, functions, and external services to hand off work in a continuous stream.

A simple visualization of the difference between traditional request/response and streaming:

┌─────────────────────────── traditional request/response ───────────────────────────────────┐
|  client waiting ...   ██████████████████████████████████████████  full payload   display   |
└────────────────────────────────────────────────────────────────────────────────────────────┘
 
┌─────────────────────────── streaming request/response ─────────────────────────────────────┐
|  c  l  i  e  n  t  r  e  a  d  s  c  h  u  n  k  1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 |
└────────────────────────────────────────────────────────────────────────────────────────────┘

Real-World Use Cases

  • Live chat / customer support. Stream the assistant's words as they are generated for a more natural feel.
  • Speech-to-text. Pipe microphone audio into a transcription agent and forward captions to the UI in real time.
  • Streaming search results. Show the first relevant hits immediately while the rest are still processing.
  • Agent chains. One agent can translate, the next can summarize, the third can analyze – all in a single flowing stream.

How Streaming Works in Agentuity

Agentuity provides multiple approaches for streaming data:

  1. High-level Streaming: resp.stream(source) – where source can be:
    • An async iterator (e.g. OpenAI SDK stream)
    • A ReadableStream
    • Another agent's stream
  2. Low-level Stream Control: context.stream.create(name, props) – create and manage server-side streams directly
  3. Inbound: await request.data.stream() – consume the client's incoming stream.

Under the hood Agentuity handles the details of the streaming input and output for you.

OpenAI Streaming Example

In this example, we use the OpenAI SDK to stream the response from the OpenAI API back to the caller.

import type { AgentRequest, AgentResponse, AgentContext } from "@agentuity/sdk";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";

export default async function Agent(
req: AgentRequest,
resp: AgentResponse,
ctx: AgentContext,
) {
const { textStream } = streamText({
  model: openai("gpt-4o"),
  prompt: "Invent a new holiday and describe its traditions.",
});

return resp.stream(textStream);
}
from openai import OpenAI
from agentuity import AgentRequest, AgentResponse, AgentContext

client = OpenAI()

async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
  chat_completion = client.chat.completions.create(
      messages=[
          {"role": "system", "content": "You are a friendly assistant!"},
          {"role": "user", "content": (await request.data.text()) or "Why is the sky blue?"},
      ],
      model="gpt-4o",
      stream=True,
  )
  return response.stream(chat_completion, lambda chunk: chunk.choices[0].delta.content)

Structured Object Streaming with Vercel AI SDK

The stream method now supports transformer functions that can filter and transform stream items. This is particularly useful when working with structured data from AI SDKs like Vercel AI SDK's streamObject.

import type { AgentRequest, AgentResponse, AgentContext } from "@agentuity/sdk";
import { openai } from '@ai-sdk/openai';
import { streamObject } from 'ai';
import { z } from 'zod';

export default async function Agent(
req: AgentRequest,
resp: AgentResponse,
ctx: AgentContext,
) {
const { elementStream } = streamObject({
  model: openai('gpt-4o'),
  output: 'array',
  schema: z.object({
    name: z.string(),
    class: z
      .string()
      .describe('Character class, e.g. warrior, mage, or thief.'),
    description: z.string(),
  }),
  prompt: 'Generate 3 hero descriptions for a fantasy role playing game.',
});

return resp.stream(elementStream);
}

The SDK automatically detects object streams and converts them to JSON newline format with the appropriate application/json content type.

Stream Transformers

You can provide transformer functions to filter and transform stream data:

import type { AgentRequest, AgentResponse, AgentContext } from "@agentuity/sdk";

export default async function Agent(
req: AgentRequest,
resp: AgentResponse,
ctx: AgentContext,
) {
// Get stream from another source
const dataStream = getDataStream();

// Transform and filter items
const transformer = (item: any) => {
  // Filter out items (return null/undefined to skip)
  if (!item.active) return null;
  
  // Transform the item
  return {
    id: item.id,
    name: item.name.toUpperCase(),
    timestamp: Date.now()
  };
};

return resp.stream(dataStream, undefined, {}, transformer);
}

You can also use generator functions for more complex transformations:

// Generator transformer that can yield multiple items or filter
function* transformer(item: any) {
if (item.type === 'batch') {
  // Yield multiple items from a batch
  for (const subItem of item.items) {
    yield { ...subItem, processed: true };
  }
} else if (item.valid) {
  // Yield single transformed item
  yield { ...item, enhanced: true };
}
// Return nothing to filter out invalid items
}

Agent-to-Agent Streaming

In this example, we use the Agentuity SDK to stream the response from one agent to another.

import type { AgentRequest, AgentResponse, AgentContext } from "@agentuity/sdk";
 
export default async function Agent(
  req: AgentRequest,
  resp: AgentResponse,
  ctx: AgentContext,
) {
  // [1] Call another agent
  const expert = await ctx.getAgent({ name: "HistoryExpert" });
  const expertResp = await expert.run({ prompt: "What engine did a P-51D Mustang use?" });
 
  // [2] Grab its stream
  const stream = await expertResp.data.stream();
 
  // [3] Pipe straight through
  return resp.stream(stream);
}

Chain as many agents as you like; each one can inspect, transform, or just relay the chunks.

Low-Level Stream Control

For advanced use cases, you can create and manage streams directly with context.stream.create(), giving you fine-grained control over stream creation, data flow, and background processing.

Creating Streams with context.stream.create

This method returns a Promise that resolves to a named, writable stream. The stream is created immediately, enabling background data processing without blocking the response:

import type { AgentRequest, AgentResponse, AgentContext } from "@agentuity/sdk";
import { openai } from '@ai-sdk/openai';
import { streamText } from 'ai';
 
export default async function Agent(
  req: AgentRequest,
  resp: AgentResponse,
  ctx: AgentContext,
) {
  const { prompt } = await req.data.json();
 
  // Create a stream
  const stream = await ctx.stream.create('llm-response', {
    contentType: 'text/plain',
    metadata: {
      requestId: ctx.sessionId,
      type: 'llm-generation',
      model: 'gpt-4o'
    }
  });
 
  // Use waitUntil to handle streaming in the background
  ctx.waitUntil(async () => {
    const { textStream } = streamText({
      model: openai('gpt-4o'),
      prompt
    });
 
    // Pipe the LLM stream to our created stream
    await textStream.pipeTo(stream);
  });
 
  // Return stream information immediately
  return resp.json({
    streamId: stream.id,
    streamUrl: stream.url,
    status: 'streaming'
  });
}

Manual Stream Writing

You can write to streams manually for complete control over the data flow. The Stream API provides a simple write() method that handles writer management automatically:

export default async function Agent(
  req: AgentRequest,
  resp: AgentResponse,
  ctx: AgentContext,
) {
  const stream = await ctx.stream.create('progress-updates', {
    contentType: 'application/json',
    metadata: { type: 'progress-tracking' }
  });
 
  ctx.waitUntil(async () => {
    try {
      // Send progress updates
      const steps = ['Initializing', 'Processing', 'Analyzing', 'Finalizing'];
 
      for (let i = 0; i < steps.length; i++) {
        await stream.write(JSON.stringify({
          step: i + 1,
          total: steps.length,
          message: steps[i],
          progress: ((i + 1) / steps.length) * 100,
          timestamp: new Date().toISOString()
        }) + '\n');
 
        // Simulate work
        await new Promise(resolve => setTimeout(resolve, 1000));
      }
 
      // Track progress with bytesWritten
      ctx.logger.info(`Stream complete: ${stream.bytesWritten} bytes written`);
    } finally {
      await stream.close();
    }
  });
 
  return resp.json({
    streamId: stream.id,
    streamUrl: stream.url
  });
}

Writing Patterns

The Stream API supports two writing patterns:

  • stream.write() - Simplified API that handles writer acquisition and locking automatically. Recommended for most use cases.
  • stream.getWriter() - Direct access to the underlying WritableStream writer for scenarios requiring precise control over the writer lifecycle (e.g., sharing a single writer across multiple operations or custom error handling).

Tracking Stream Progress

The bytesWritten property provides real-time visibility into stream progress:

await stream.write(chunk1);
ctx.logger.info(`Progress: ${stream.bytesWritten} bytes`); // e.g., "Progress: 1024 bytes"
 
await stream.write(chunk2);
ctx.logger.info(`Progress: ${stream.bytesWritten} bytes`); // e.g., "Progress: 2048 bytes"

Stream Compression

Stream compression reduces bandwidth usage by automatically applying gzip encoding to outbound data (defaults to disabled):

export default async function Agent(
  req: AgentRequest,
  resp: AgentResponse,
  ctx: AgentContext,
) {
  const { dataset } = await req.data.json();
 
  const stream = await ctx.stream.create('dataset-export', {
    contentType: 'application/json',
    compress: true,  // Automatic gzip compression
    metadata: {
      type: 'data-export',
      recordCount: String(dataset.length),
      timestamp: String(Date.now())
    }
  });
 
  ctx.waitUntil(async () => {
    try {
      for (const record of dataset) {
        await stream.write(JSON.stringify(record) + '\n');
 
        // Log progress periodically
        if (stream.bytesWritten % 100000 < 1000) {
          ctx.logger.info(`Export progress: ${stream.bytesWritten} bytes written`);
        }
      }
 
      ctx.logger.info(
        `Export complete: ${stream.bytesWritten} bytes (uncompressed), ` +
        `compression: ${stream.compressed ? 'enabled' : 'disabled'}`
      );
    } finally {
      await stream.close();
    }
  });
 
  return resp.json({
    streamUrl: stream.url,
    compressed: stream.compressed,
    totalBytes: stream.bytesWritten
  });
}

When to Use Compression

Enable compression for highly compressible data formats:

  • JSON APIs and structured data streams
  • Text-based logs and documents
  • CSV files and tabular data

The bytesWritten property tracks uncompressed bytes, while clients receive gzip-encoded responses that are automatically decompressed by standard HTTP clients.

Stream as Direct Response

You can return a stream directly from your agent handler, which will automatically redirect clients to the stream URL:

export default async function Agent(
  req: AgentRequest,
  resp: AgentResponse,
  ctx: AgentContext,
) {
  const { prompt } = await req.data.json();
 
  const stream = await ctx.stream.create('direct-stream', {
    contentType: 'text/plain'
  });
 
  ctx.waitUntil(async () => {
    const { textStream } = streamText({
      model: openai('gpt-4o'),
      prompt
    });
    await textStream.pipeTo(stream);
  });
 
  // Return the stream directly - client gets redirected to stream URL
  return stream;
}

Benefits of Low-Level Stream Control

  • Non-blocking: Stream creation returns immediately, allowing instant responses
  • Background Processing: Use ctx.waitUntil() to handle data streaming without blocking
  • Rich Metadata: Associate custom metadata with streams for tracking and debugging
  • Direct Access: Clients can access streams via direct URLs
  • Flexible Content Types: Support any content type (text, JSON, binary, etc.)
  • Direct Writing: Use stream.write() for simple data writing without manual lock management
  • Progress Monitoring: Track bytes written in real-time with stream.bytesWritten
  • Automatic Compression: Enable optional compression to reduce bandwidth for compressible content

Managing Streams

Beyond creating streams, you can list and delete streams to manage your stream lifecycle.

Listing Streams

Use context.stream.list() to search and filter through your streams:

const handler: AgentHandler = async (req, resp, ctx) => {
  // List all streams
  const allStreams = await ctx.stream.list();
  console.log(`Total streams: ${allStreams.total}`);
 
  // Filter by name
  const llmStreams = await ctx.stream.list({
    name: 'llm-response'
  });
 
  // Filter by metadata
  const userStreams = await ctx.stream.list({
    metadata: { userId: 'user-123', sessionId: 'session-456' }
  });
 
  // Paginate results
  const page1 = await ctx.stream.list({ limit: 10, offset: 0 });
  const page2 = await ctx.stream.list({ limit: 10, offset: 10 });
 
  return resp.json({ streams: userStreams.streams });
}

Deleting Streams

Clean up streams when they're no longer needed:

const handler: AgentHandler = async (req, resp, ctx) => {
  const { streamId } = await req.data.json();
 
  try {
    await ctx.stream.delete(streamId);
    return resp.json({ success: true });
  } catch (error) {
    if (error.message.includes('not found')) {
      return resp.json({ error: 'Stream not found' }, { status: 404 });
    }
    throw error;
  }
}

Complete Stream Lifecycle

Here's a complete example showing stream creation, listing, and cleanup:

import { AgentHandler } from '@agentuity/sdk';
import { openai } from '@ai-sdk/openai';
import { streamText } from 'ai';
 
const handler: AgentHandler = async (req, resp, ctx) => {
  const { prompt, userId } = await req.data.json();
 
  // Create a new stream with metadata
  const stream = await ctx.stream.create('llm-response', {
    contentType: 'text/plain',
    metadata: {
      userId,
      sessionId: ctx.sessionId,
      timestamp: Date.now(),
      type: 'llm-generation'
    }
  });
 
  // Stream LLM response in the background
  ctx.waitUntil(async () => {
    const { textStream } = streamText({
      model: openai('gpt-4o'),
      prompt
    });
    await textStream.pipeTo(stream);
  });
 
  // List all streams for this user
  const userStreams = await ctx.stream.list({
    metadata: { userId }
  });
 
  // Clean up old streams (older than 24 hours)
  const oneDayAgo = Date.now() - 24 * 60 * 60 * 1000;
  ctx.waitUntil(async () => {
    for (const oldStream of userStreams.streams) {
      const timestamp = Number(oldStream.metadata?.timestamp);
      if (timestamp && timestamp < oneDayAgo) {
        await ctx.stream.delete(oldStream.id);
        ctx.logger.info(`Deleted old stream: ${oldStream.id}`);
      }
    }
  });
 
  return resp.json({
    streamId: stream.id,
    streamUrl: stream.url,
    totalStreams: userStreams.total
  });
}

Further Reading

Need Help?

Join our DiscordCommunity for assistance or just to hang with other humans building agents.

Send us an email at hi@agentuity.com if you'd like to get in touch.

Please Follow us on

If you haven't already, please Signup for your free account now and start building your first agent!