# Queues

Publish messages for async processing, webhooks, and event-driven workflows

Queues enable asynchronous message processing for background tasks, webhooks, and event-driven workflows. Publish messages from agents and consume them with workers or webhook destinations.

> [!TIP]
> **Not inside an agent or route?**
> Use the [`@agentuity/queue`](/reference/standalone-packages#message-queues) standalone package to access this service from any Node.js or Bun app without the runtime.

## When to Use Queues

| Pattern | Best For |
|---------|----------|
| **Queues** | Background jobs, webhooks, event-driven processing, decoupled services |
| [Durable Streams](/services/storage/durable-streams) | Large exports, audit logs, streaming data |
| [Key-Value](/services/storage/key-value) | Fast lookups, caching, configuration |

**Use queues when you need to:**
- Process tasks asynchronously (email sending, report generation)
- Decouple services with reliable message delivery
- Deliver webhooks to external endpoints
- Handle bursty workloads with rate limiting
- Retry failed operations with exponential backoff

## Access Patterns

| Context | Access | Details |
|---------|--------|---------|
| Agents | `ctx.queue` | See examples below |
| Routes | `c.var.queue` | See [Using in Routes](#using-in-routes) |
| Standalone | `createAgentContext()` | See [Standalone Usage](#standalone-usage) |

> [!NOTE]
> **Same API Everywhere**
> The Queue API is identical in all contexts. `ctx.queue.publish()` and `c.var.queue.publish()` work the same way.

## Queue Types

| Type | Behavior |
|------|----------|
| `worker` | Point-to-point delivery. Each message is consumed by exactly one consumer and requires acknowledgment. Use for background jobs and task processing. |
| `pubsub` | Broadcast delivery. Every subscriber receives every message. Use for event notifications and fan-out patterns. |

## Creating Queues

Create a queue from an agent or route using `ctx.queue.createQueue()`:

```typescript
// Inside an agent handler or route
const result = await ctx.queue.createQueue('notifications', {
  queueType: 'worker',
  description: 'Order notifications',
  settings: {
    defaultTtlSeconds: 86400,   // messages expire after 24 hours
    defaultMaxRetries: 3,
  },
});
```

```typescript
interface QueueCreateResult {
  name: string;      // Queue name
  queueType: string; // 'worker' or 'pubsub'
}
```

All parameters are optional. Omitting `queueType` defaults to `worker`.

| Option | Type | Description |
|--------|------|-------------|
| `queueType` | `'worker' \| 'pubsub'` | Worker for point-to-point, pubsub for broadcast (optional, defaults to `worker`) |
| `description` | `string` | Human-readable description (optional) |
| `settings.defaultTtlSeconds` | `number` | Message expiration window in seconds (optional) |
| `settings.defaultVisibilityTimeoutSeconds` | `number` | Visibility timeout after a message is received, before it returns to the queue (optional) |
| `settings.defaultMaxRetries` | `number` | Delivery retry limit before moving to DLQ (optional) |
| `settings.maxInFlightPerClient` | `number` | Concurrent message limit per consumer (optional) |
| `settings.retentionSeconds` | `number` | Retention period for acknowledged messages (optional) |

> [!NOTE]
> **Idempotent Creation**
> `createQueue` is safe to call multiple times. If the queue already exists, it returns successfully without error. The SDK caches creation within the current request context, so duplicate calls in the same handler return immediately without additional API requests.

## Deleting Queues

Delete a queue and all its messages with `ctx.queue.deleteQueue()`:

```typescript
await ctx.queue.deleteQueue('old-notifications');
```

> [!WARNING]
> **Permanent Deletion**
> Deleting a queue is irreversible. All pending messages are lost immediately. If the queue does not exist, a `QueueNotFoundError` is thrown.

## Publishing Messages

Publish messages from agents using `ctx.queue.publish()`:

```typescript
import { createAgent } from '@agentuity/runtime';

const agent = createAgent('OrderProcessor', {
  handler: async (ctx, input) => {
    // Queue an email to be sent asynchronously
    const result = await ctx.queue.publish('email-queue', {
      to: input.customerEmail,
      subject: 'Order Confirmed',
      orderId: input.orderId,
    });

    ctx.logger.info('Email queued', { messageId: result.id });

    return { success: true, messageId: result.id };
  },
});
```

### Publish Options

```typescript
const agent = createAgent('TaskScheduler', {
  handler: async (ctx, input) => {
    await ctx.queue.publish('task-queue', input.task, {
      // Attach metadata for filtering or routing
      metadata: { priority: 'high', region: 'us-west' },

      // Guarantee ordering for messages with the same key
      partitionKey: input.customerId,

      // Prevent duplicate messages
      idempotencyKey: `task-${input.taskId}-v1`,

      // Auto-expire after 1 hour
      ttl: 3600,
    });

    return { queued: true };
  },
});
```

| Option | Description |
|--------|-------------|
| `metadata` | Key-value pairs for routing or filtering |
| `partitionKey` | Messages with the same key are processed in order |
| `idempotencyKey` | Prevents duplicate messages if the same key is published again |
| `ttl` | Time-to-live in seconds before the message expires |

### Publish Result

```typescript
interface QueuePublishResult {
  id: string;         // Unique message ID (msg_...)
  offset: number;     // Sequential position in the queue
  publishedAt: string; // ISO 8601 timestamp
}
```

### Synchronous Publishing

Use `sync: true` when you need to wait for the message to be persisted before returning:

```typescript
import { createAgent } from '@agentuity/runtime';

const agent = createAgent('CriticalProcessor', {
  handler: async (ctx, input) => {
    // Wait for message to be persisted
    const result = await ctx.queue.publish('critical-tasks', {
      taskId: input.taskId,
      payload: input.data,
    }, {
      sync: true,
    });

    ctx.logger.info('Task queued synchronously', { taskId: input.taskId });
    return { status: 'queued', messageId: result.id };
  },
});
```

> [!WARNING]
> **Sync Mode Performance**
> Synchronous publishing blocks until the message is persisted. Use it only when you need confirmation that the message was accepted. For most use cases, async publishing is faster and more resilient.

### CLI Publishing

```bash
# Publish a message via CLI
agentuity cloud queue publish order-processing '{"orderId": "123"}'

# With options
agentuity cloud queue publish order-processing '{"orderId": "123"}' \
  --partition-key customer-456 \
  --idempotency-key order-123 \
  --ttl 3600
```

## Using in Routes

Routes have the same queue access via `c.var.queue`:

```typescript
import { Hono } from 'hono';
import type { Env } from '@agentuity/runtime';

const router = new Hono<Env>();

router.post('/webhook/stripe', async (c) => {
  const event = await c.req.json();

  // Queue webhook for async processing
  await c.var.queue.publish('stripe-webhooks', {
    type: event.type,
    data: event.data,
  });

  // Return 200 immediately (Stripe expects fast responses)
  return c.json({ received: true });
});

export default router;
```

> [!TIP]
> **Webhook Pattern**
> Use queues for webhooks that need quick acknowledgment. Return 200 immediately, then process the payload asynchronously.

## Standalone Usage

Use queues from background jobs with `createAgentContext()`:

```typescript
import { createApp, createAgentContext } from '@agentuity/runtime';

const app = await createApp();
export default app;

// Scheduled job to send daily reports
async function sendDailyReports() {
  const ctx = createAgentContext({ trigger: 'cron' });

  await ctx.invoke(async () => {
    const users = await getActiveUsers();

    for (const user of users) {
      await ctx.queue.publish('email-reports', {
        userId: user.id,
        reportType: 'daily',
      });
    }

    ctx.logger.info('Queued daily reports', { count: users.length });
  });
}
```

See [Running Agents Without HTTP](/agents/standalone-execution) for more patterns.

## Error Handling

```typescript
import { QueueNotFoundError, QueueValidationError } from '@agentuity/core';

const agent = createAgent('SafePublisher', {
  handler: async (ctx, input) => {
    try {
      await ctx.queue.publish('notifications', input.notification);
      return { success: true };
    } catch (error) {
      if (error instanceof QueueNotFoundError) {
        ctx.logger.error('Queue does not exist', { queue: 'notifications' });
        return { success: false, error: 'Queue not found' };
      }
      if (error instanceof QueueValidationError) {
        ctx.logger.error('Invalid message', { field: error.field });
        return { success: false, error: 'Validation failed' };
      }
      throw error;
    }
  },
});
```

## Queue Management

Create and manage queues using the CLI or `@agentuity/server` package.

### CLI Commands

```bash
# Create a worker queue (--name is optional, auto-generated if omitted)
agentuity cloud queue create worker --name order-processing

# Create a pubsub queue for broadcasting
agentuity cloud queue create pubsub --name events
```

| Option | Description |
|--------|-------------|
| `--name <name>` | Queue name (optional, auto-generated if omitted) |
| `--description <desc>` | Queue description |
| `--ttl <seconds>` | Default message TTL in seconds |
| `--visibility-timeout <seconds>` | Default visibility timeout (worker queues) |
| `--max-retries <count>` | Maximum retry attempts before moving to DLQ |

```bash
# List all queues
agentuity cloud queue list

# Filter by type and status
agentuity cloud queue list --queue-type worker --status active

# Filter by name
agentuity cloud queue list --name order-processing

# Sort by message count, descending
agentuity cloud queue list --sort message_count --direction desc

# Paginate results
agentuity cloud queue list --limit 10 --offset 0
```

| Option | Description |
|--------|-------------|
| `--name <name>` | Filter by queue name |
| `--queue-type <type>` | Filter by type: `worker` or `pubsub` |
| `--status <status>` | Filter by status: `active` or `paused` |
| `--org-id <id>` | Filter by organization |
| `--sort <field>` | Sort by `name`, `created`, `updated`, `message_count`, or `dlq_count` (default: `created`) |
| `--direction <dir>` | Sort direction: `asc` or `desc` (default: `desc`) |
| `--limit <n>` | Maximum number of results |
| `--offset <n>` | Pagination offset |

```bash
# Get queue details and stats
agentuity cloud queue get order-processing

# Pause/resume processing
agentuity cloud queue pause order-processing
agentuity cloud queue resume order-processing

# Delete a queue
agentuity cloud queue delete order-processing --confirm
```

For programmatic queue management, see [SDK Utilities for External Apps](/cookbook/patterns/server-utilities#queue-management).

## Consuming Messages

### Webhook Destinations

Configure webhook destinations to automatically deliver messages to HTTP endpoints. Set these up in the [Web App](https://app.agentuity.com/services/queue) or [programmatically](/cookbook/patterns/server-utilities#webhook-destinations).

Webhook destinations support:
- Custom headers and authentication
- Configurable timeouts (up to 30 seconds)
- Retry policies with exponential backoff

```bash
# Add a webhook destination
agentuity cloud queue destinations create order-processing --url https://example.com/webhook

# List destinations
agentuity cloud queue destinations list order-processing

# Delete a destination
agentuity cloud queue destinations delete order-processing <destination_id>
```

### Pull-Based Consumption

For workers that pull and process messages, see [Pull-Based Consumption](/cookbook/patterns/server-utilities#pull-based-consumption). This pattern is useful for long-running workers that need fine-grained control over message processing.

```bash
# Receive a message from a worker queue
agentuity cloud queue receive order-processing

# Acknowledge a processed message
agentuity cloud queue ack order-processing <message_id>

# Return a message to the queue for retry
agentuity cloud queue nack order-processing <message_id>
```

### Real-Time Subscriptions

When you need to process messages as they arrive rather than polling, use the WebSocket subscription API from `@agentuity/server`.

#### Callback-Based API

```typescript
import { createQueueWebSocket } from '@agentuity/server';

const connection = createQueueWebSocket({
  queueName: 'order-processing',
  baseUrl: 'https://catalyst.agentuity.cloud',
  onMessage: (message) => {
    console.log('Received:', message.id, message.payload);
  },
  onOpen: () => console.log('Connected'),
  onClose: (code, reason) => console.log('Closed:', code, reason),
  onError: (error) => console.error('Error:', error),
});

// Later: close the connection
connection.close();
```

The connection handle exposes `state`, `clientId`, and `lastOffset` properties for monitoring and session resumption.

#### Async Iterator API

For `for await...of` consumption, use `subscribeToQueue`:

```typescript
import { subscribeToQueue } from '@agentuity/server';

const controller = new AbortController();

for await (const message of subscribeToQueue({
  queueName: 'order-processing',
  baseUrl: 'https://catalyst.agentuity.cloud',
  signal: controller.signal,
})) {
  console.log('Received:', message.id, message.payload);
}

// To stop: controller.abort()
```

#### Session Resumption

Save `clientId` and `lastOffset` from a connection and pass them back on reconnect to avoid reprocessing messages:

```typescript
import { createQueueWebSocket } from '@agentuity/server';

// Save state from a previous connection
const savedClientId = previousConnection.clientId;
const savedOffset = previousConnection.lastOffset;

// Resume from where you left off
const connection = createQueueWebSocket({
  queueName: 'order-processing',
  baseUrl: 'https://catalyst.agentuity.cloud',
  clientId: savedClientId,     // Resume this subscription
  lastOffset: savedOffset,     // Skip already-processed messages
  onMessage: (message) => {
    console.log('Received:', message.id, message.payload);
  },
});
```

#### Connection Options

| Option | Default | Description |
|--------|---------|-------------|
| `autoReconnect` | `true` | Automatically reconnect on disconnection |
| `maxReconnectAttempts` | `Infinity` | Maximum reconnection attempts before giving up |
| `reconnectDelayMs` | `1000` | Initial reconnection delay (uses exponential backoff with jitter) |
| `maxReconnectDelayMs` | `30000` | Maximum reconnection delay cap |

#### Connection States

The connection transitions through these states:

`connecting` → `authenticating` → `connected`

On disconnection with `autoReconnect` enabled, it cycles through `reconnecting` → `connecting` → `authenticating` → `connected`. Authentication failures are terminal and do not trigger reconnection.

## Dead Letter Queue

Messages that exceed the retry limit are moved to the dead letter queue (DLQ). Inspect and replay failed messages:

```bash
# List failed messages
agentuity cloud queue dlq list order-processing

# Replay a message back to the queue
agentuity cloud queue dlq replay order-processing msg_abc123

# Purge all DLQ messages
agentuity cloud queue dlq purge order-processing --confirm
```

DLQ list supports `--limit` and `--offset` for pagination.

For programmatic DLQ access, see [Dead Letter Queue Operations](/cookbook/patterns/server-utilities#dead-letter-queue-operations).

## HTTP Ingestion Sources

Create public HTTP endpoints to ingest data into queues from external services. Configure these in the [Web App](https://app.agentuity.com/services/queue) or [programmatically](/cookbook/patterns/server-utilities#http-ingestion-sources).

| Auth Type | Description |
|-----------|-------------|
| `none` | No authentication |
| `basic` | HTTP Basic Auth (`username:password`) |
| `header` | Custom header value (`Bearer token`) |

```bash
# Create an HTTP ingestion source
agentuity cloud queue sources create order-processing \
  --name stripe-ingest \
  --auth-type basic \
  --auth-value user:pass

# List sources
agentuity cloud queue sources list order-processing
```

## Monitoring

```bash
# View queue statistics
agentuity cloud queue stats order-processing

# Live stats with auto-refresh
agentuity cloud queue stats order-processing --live

# List messages in a queue
agentuity cloud queue messages order-processing
```

## Queue Settings

Configure queue behavior when creating or updating:

| Setting | Default | Description |
|---------|---------|-------------|
| `default_ttl_seconds` | null | Message expiration (null = never) |
| `default_visibility_timeout_seconds` | 30 | Processing timeout before message returns to queue |
| `default_max_retries` | 5 | Attempts before moving to DLQ |
| `default_retry_backoff_ms` | 1000 | Initial retry delay |
| `default_retry_max_backoff_ms` | 60000 | Maximum retry delay |
| `default_retry_multiplier` | 2.0 | Exponential backoff multiplier |
| `max_in_flight_per_client` | 10 | Concurrent messages per consumer |
| `retention_seconds` | 2592000 | How long to keep acknowledged messages (30 days) |

## Validation Limits

| Limit | Value |
|-------|-------|
| Queue name length | 1-256 characters |
| Queue name format | Lowercase letters, digits, underscores, hyphens. Must start with letter or underscore. |
| Payload size | 1 MB max |
| Partition key length | 256 characters max |
| Idempotency key length | 256 characters max |
| Batch size | 1000 messages max |

## Best Practices

- **Use idempotency keys** for operations that shouldn't be duplicated (payments, emails)
- **Set appropriate TTLs** for time-sensitive messages
- **Use partition keys** when message ordering matters within a group
- **Monitor DLQ** regularly to catch and fix processing failures
- **Configure webhook retry policies** to handle transient failures gracefully

## Next Steps

- [Durable Streams](/services/storage/durable-streams): Streaming storage for large exports
- [Key-Value Storage](/services/storage/key-value): Fast caching and configuration
- [Background Tasks](/cookbook/patterns/background-tasks): Patterns for async processing
- [Webhook Handler](/cookbook/patterns/webhook-handler): Receiving external webhooks