Queues — Agentuity Documentation

Queues

Publish messages for async processing, webhooks, and event-driven workflows

Queues enable asynchronous message processing for background tasks, webhooks, and event-driven workflows. Publish messages from agents and consume them with workers or webhook destinations.

When to Use Queues

PatternBest For
QueuesBackground jobs, webhooks, event-driven processing, decoupled services
Durable StreamsLarge exports, audit logs, streaming data
Key-ValueFast lookups, caching, configuration

Use queues when you need to:

  • Process tasks asynchronously (email sending, report generation)
  • Decouple services with reliable message delivery
  • Deliver webhooks to external endpoints
  • Handle bursty workloads with rate limiting
  • Retry failed operations with exponential backoff

Access Patterns

ContextAccessDetails
Agentsctx.queueSee examples below
Routesc.var.queueSee Using in Routes
StandalonecreateAgentContext()See Standalone Usage

Queue Types

TypeBehavior
workerEach message is processed by exactly one consumer. Use for background jobs.
pubsubMessages are broadcast to all subscribers. Use for event notifications.

Publishing Messages

Publish messages from agents using ctx.queue.publish():

import { createAgent } from '@agentuity/runtime';
 
const agent = createAgent('OrderProcessor', {
  handler: async (ctx, input) => {
    // Queue an email to be sent asynchronously
    const result = await ctx.queue.publish('email-queue', {
      to: input.customerEmail,
      subject: 'Order Confirmed',
      orderId: input.orderId,
    });
 
    ctx.logger.info('Email queued', { messageId: result.id });
 
    return { success: true, messageId: result.id };
  },
});

Publish Options

const agent = createAgent('TaskScheduler', {
  handler: async (ctx, input) => {
    await ctx.queue.publish('task-queue', input.task, {
      // Attach metadata for filtering or routing
      metadata: { priority: 'high', region: 'us-west' },
 
      // Guarantee ordering for messages with the same key
      partitionKey: input.customerId,
 
      // Prevent duplicate messages
      idempotencyKey: `task-${input.taskId}-v1`,
 
      // Auto-expire after 1 hour
      ttl: 3600,
    });
 
    return { queued: true };
  },
});
OptionDescription
metadataKey-value pairs for routing or filtering
partitionKeyMessages with the same key are processed in order
idempotencyKeyPrevents duplicate messages if the same key is published again
ttlTime-to-live in seconds before the message expires

Publish Result

interface QueuePublishResult {
  id: string;         // Unique message ID (msg_...)
  offset: number;     // Sequential position in the queue
  publishedAt: string; // ISO 8601 timestamp
}

Synchronous Publishing

Use sync: true when you need to wait for the message to be persisted before returning:

import { createAgent } from '@agentuity/runtime';
 
const agent = createAgent('CriticalProcessor', {
  handler: async (ctx, input) => {
    // Wait for message to be persisted
    const result = await ctx.queue.publish('critical-tasks', {
      taskId: input.taskId,
      payload: input.data,
    }, {
      sync: true,
    });
 
    ctx.logger.info('Task queued synchronously', { taskId: input.taskId });
    return { status: 'queued', messageId: result.id };
  },
});

Using in Routes

Routes have the same queue access via c.var.queue:

import { createRouter } from '@agentuity/runtime';
 
const router = createRouter();
 
router.post('/webhook/stripe', async (c) => {
  const event = await c.req.json();
 
  // Queue webhook for async processing
  await c.var.queue.publish('stripe-webhooks', {
    type: event.type,
    data: event.data,
  });
 
  // Return 200 immediately (Stripe expects fast responses)
  return c.json({ received: true });
});
 
export default router;

Standalone Usage

Use queues from background jobs with createAgentContext():

import { createApp, createAgentContext } from '@agentuity/runtime';
 
await createApp();
 
// Scheduled job to send daily reports
async function sendDailyReports() {
  const ctx = createAgentContext({ trigger: 'cron' });
 
  await ctx.invoke(async () => {
    const users = await getActiveUsers();
 
    for (const user of users) {
      await ctx.queue.publish('email-reports', {
        userId: user.id,
        reportType: 'daily',
      });
    }
 
    ctx.logger.info('Queued daily reports', { count: users.length });
  });
}

See Running Agents Without HTTP for more patterns.

Error Handling

import { QueueNotFoundError, QueueValidationError } from '@agentuity/core';
 
const agent = createAgent('SafePublisher', {
  handler: async (ctx, input) => {
    try {
      await ctx.queue.publish('notifications', input.notification);
      return { success: true };
    } catch (error) {
      if (error instanceof QueueNotFoundError) {
        ctx.logger.error('Queue does not exist', { queue: 'notifications' });
        return { success: false, error: 'Queue not found' };
      }
      if (error instanceof QueueValidationError) {
        ctx.logger.error('Invalid message', { field: error.field });
        return { success: false, error: 'Validation failed' };
      }
      throw error;
    }
  },
});

Queue Management

Create and manage queues using the CLI or @agentuity/server package.

CLI Commands

# Create a worker queue
agentuity cloud queue create worker --name order-processing
 
# Create a pubsub queue for broadcasting
agentuity cloud queue create pubsub --name events
 
# List all queues
agentuity cloud queue list
 
# Filter by type and status
agentuity cloud queue list --queue-type worker --status active
 
# Filter by name
agentuity cloud queue list --name order-processing
 
# Sort by message count, descending
agentuity cloud queue list --sort message_count --direction desc
 
# Paginate results
agentuity cloud queue list --limit 10 --offset 0
OptionDescription
--name <name>Filter by queue name
--queue-type <type>Filter by type: worker or pubsub
--status <status>Filter by status: active or paused
--org-id <id>Filter by organization
--sort <field>Sort by name, created, updated, message_count, or dlq_count (default: created)
--direction <dir>Sort direction: asc or desc (default: desc)
--limit <n>Maximum number of results
--offset <n>Pagination offset
# Get queue details and stats
agentuity cloud queue get order-processing
 
# Pause/resume processing
agentuity cloud queue pause order-processing
agentuity cloud queue resume order-processing
 
# Delete a queue
agentuity cloud queue delete order-processing

For programmatic queue management, see SDK Utilities for External Apps.

Consuming Messages

Webhook Destinations

Configure webhook destinations to automatically deliver messages to HTTP endpoints. Set these up in the App or programmatically.

Webhook destinations support:

  • Custom headers and authentication
  • Configurable timeouts (up to 30 seconds)
  • Retry policies with exponential backoff

Pull-Based Consumption

For workers that pull and process messages, see Pull-Based Consumption. This pattern is useful for long-running workers that need fine-grained control over message processing.

Real-Time Subscriptions

When you need to process messages as they arrive rather than polling, use the WebSocket subscription API from @agentuity/server.

Callback-Based API

import { createQueueWebSocket } from '@agentuity/server';
 
const connection = createQueueWebSocket({
  queueName: 'order-processing',
  baseUrl: 'https://catalyst.agentuity.cloud',
  onMessage: (message) => {
    console.log('Received:', message.id, message.payload);
  },
  onOpen: () => console.log('Connected'),
  onClose: (code, reason) => console.log('Closed:', code, reason),
  onError: (error) => console.error('Error:', error),
});
 
// Later: close the connection
connection.close();

The connection handle exposes state, clientId, and lastOffset properties for monitoring and session resumption.

Async Iterator API

For for await...of consumption, use subscribeToQueue:

import { subscribeToQueue } from '@agentuity/server';
 
const controller = new AbortController();
 
for await (const message of subscribeToQueue({
  queueName: 'order-processing',
  baseUrl: 'https://catalyst.agentuity.cloud',
  signal: controller.signal,
})) {
  console.log('Received:', message.id, message.payload);
}
 
// To stop: controller.abort()

Session Resumption

Save clientId and lastOffset from a connection and pass them back on reconnect to avoid reprocessing messages:

import { createQueueWebSocket } from '@agentuity/server';
 
// Save state from a previous connection
const savedClientId = previousConnection.clientId;
const savedOffset = previousConnection.lastOffset;
 
// Resume from where you left off
const connection = createQueueWebSocket({
  queueName: 'order-processing',
  baseUrl: 'https://catalyst.agentuity.cloud',
  clientId: savedClientId,     // Resume this subscription
  lastOffset: savedOffset,     // Skip already-processed messages
  onMessage: (message) => {
    console.log('Received:', message.id, message.payload);
  },
});

Connection Options

OptionDefaultDescription
autoReconnecttrueAutomatically reconnect on disconnection
maxReconnectAttemptsInfinityMaximum reconnection attempts before giving up
reconnectDelayMs1000Initial reconnection delay (uses exponential backoff with jitter)
maxReconnectDelayMs30000Maximum reconnection delay cap

Connection States

The connection transitions through these states:

connectingauthenticatingconnected

On disconnection with autoReconnect enabled, it cycles through reconnectingconnectingauthenticatingconnected. Authentication failures are terminal and do not trigger reconnection.

Dead Letter Queue

Messages that exceed the retry limit are moved to the dead letter queue (DLQ). Inspect and replay failed messages:

# List failed messages
agentuity cloud queue dlq order-processing
 
# Replay a message back to the queue
agentuity cloud queue dlq replay order-processing --message-id msg_abc123
 
# Purge all DLQ messages
agentuity cloud queue dlq purge order-processing

For programmatic DLQ access, see Dead Letter Queue Operations.

HTTP Ingestion Sources

Create public HTTP endpoints to ingest data into queues from external services. Configure these in the App or programmatically.

Auth TypeDescription
noneNo authentication
basicHTTP Basic Auth (username:password)
headerCustom header value (Bearer token)

Queue Settings

Configure queue behavior when creating or updating:

SettingDefaultDescription
default_ttl_secondsnullMessage expiration (null = never)
default_visibility_timeout_seconds30Processing timeout before message returns to queue
default_max_retries5Attempts before moving to DLQ
default_retry_backoff_ms1000Initial retry delay
default_retry_max_backoff_ms60000Maximum retry delay
default_retry_multiplier2.0Exponential backoff multiplier
max_in_flight_per_client10Concurrent messages per consumer
retention_seconds2592000How long to keep acknowledged messages (30 days)

Validation Limits

LimitValue
Queue name length1-256 characters
Queue name formatLowercase letters, digits, underscores, hyphens. Must start with letter or underscore.
Payload size1 MB max
Partition key length256 characters max
Idempotency key length256 characters max
Batch size1000 messages max

Best Practices

  • Use idempotency keys for operations that shouldn't be duplicated (payments, emails)
  • Set appropriate TTLs for time-sensitive messages
  • Use partition keys when message ordering matters within a group
  • Monitor DLQ regularly to catch and fix processing failures
  • Configure webhook retry policies to handle transient failures gracefully

Next Steps