Queues
Publish messages for async processing, webhooks, and event-driven workflows
Queues enable asynchronous message processing for background tasks, webhooks, and event-driven workflows. Publish messages from agents and consume them with workers or webhook destinations.
When to Use Queues
| Pattern | Best For |
|---|---|
| Queues | Background jobs, webhooks, event-driven processing, decoupled services |
| Durable Streams | Large exports, audit logs, streaming data |
| Key-Value | Fast lookups, caching, configuration |
Use queues when you need to:
- Process tasks asynchronously (email sending, report generation)
- Decouple services with reliable message delivery
- Deliver webhooks to external endpoints
- Handle bursty workloads with rate limiting
- Retry failed operations with exponential backoff
Access Patterns
| Context | Access | Details |
|---|---|---|
| Agents | ctx.queue | See examples below |
| Routes | c.var.queue | See Using in Routes |
| Standalone | createAgentContext() | See Standalone Usage |
Same API Everywhere
The Queue API is identical in all contexts. ctx.queue.publish() and c.var.queue.publish() work the same way.
Queue Types
| Type | Behavior |
|---|---|
worker | Each message is processed by exactly one consumer. Use for background jobs. |
pubsub | Messages are broadcast to all subscribers. Use for event notifications. |
Publishing Messages
Publish messages from agents using ctx.queue.publish():
import { createAgent } from '@agentuity/runtime';
const agent = createAgent('OrderProcessor', {
handler: async (ctx, input) => {
// Queue an email to be sent asynchronously
const result = await ctx.queue.publish('email-queue', {
to: input.customerEmail,
subject: 'Order Confirmed',
orderId: input.orderId,
});
ctx.logger.info('Email queued', { messageId: result.id });
return { success: true, messageId: result.id };
},
});Publish Options
const agent = createAgent('TaskScheduler', {
handler: async (ctx, input) => {
await ctx.queue.publish('task-queue', input.task, {
// Attach metadata for filtering or routing
metadata: { priority: 'high', region: 'us-west' },
// Guarantee ordering for messages with the same key
partitionKey: input.customerId,
// Prevent duplicate messages
idempotencyKey: `task-${input.taskId}-v1`,
// Auto-expire after 1 hour
ttl: 3600,
});
return { queued: true };
},
});| Option | Description |
|---|---|
metadata | Key-value pairs for routing or filtering |
partitionKey | Messages with the same key are processed in order |
idempotencyKey | Prevents duplicate messages if the same key is published again |
ttl | Time-to-live in seconds before the message expires |
Publish Result
interface QueuePublishResult {
id: string; // Unique message ID (msg_...)
offset: number; // Sequential position in the queue
publishedAt: string; // ISO 8601 timestamp
}Synchronous Publishing
Use sync: true when you need to wait for the message to be persisted before returning:
import { createAgent } from '@agentuity/runtime';
const agent = createAgent('CriticalProcessor', {
handler: async (ctx, input) => {
// Wait for message to be persisted
const result = await ctx.queue.publish('critical-tasks', {
taskId: input.taskId,
payload: input.data,
}, {
sync: true,
});
ctx.logger.info('Task queued synchronously', { taskId: input.taskId });
return { status: 'queued', messageId: result.id };
},
});Sync Mode Performance
Synchronous publishing blocks until the message is persisted. Use it only when you need confirmation that the message was accepted. For most use cases, async publishing is faster and more resilient.
Using in Routes
Routes have the same queue access via c.var.queue:
import { createRouter } from '@agentuity/runtime';
const router = createRouter();
router.post('/webhook/stripe', async (c) => {
const event = await c.req.json();
// Queue webhook for async processing
await c.var.queue.publish('stripe-webhooks', {
type: event.type,
data: event.data,
});
// Return 200 immediately (Stripe expects fast responses)
return c.json({ received: true });
});
export default router;Webhook Pattern
Use queues for webhooks that need quick acknowledgment. Return 200 immediately, then process the payload asynchronously.
Standalone Usage
Use queues from background jobs with createAgentContext():
import { createApp, createAgentContext } from '@agentuity/runtime';
await createApp();
// Scheduled job to send daily reports
async function sendDailyReports() {
const ctx = createAgentContext({ trigger: 'cron' });
await ctx.invoke(async () => {
const users = await getActiveUsers();
for (const user of users) {
await ctx.queue.publish('email-reports', {
userId: user.id,
reportType: 'daily',
});
}
ctx.logger.info('Queued daily reports', { count: users.length });
});
}See Running Agents Without HTTP for more patterns.
Error Handling
import { QueueNotFoundError, QueueValidationError } from '@agentuity/core';
const agent = createAgent('SafePublisher', {
handler: async (ctx, input) => {
try {
await ctx.queue.publish('notifications', input.notification);
return { success: true };
} catch (error) {
if (error instanceof QueueNotFoundError) {
ctx.logger.error('Queue does not exist', { queue: 'notifications' });
return { success: false, error: 'Queue not found' };
}
if (error instanceof QueueValidationError) {
ctx.logger.error('Invalid message', { field: error.field });
return { success: false, error: 'Validation failed' };
}
throw error;
}
},
});Queue Management
Create and manage queues using the CLI or @agentuity/server package.
CLI Commands
# Create a worker queue
agentuity cloud queue create --name order-processing --type worker
# Create a pubsub queue for broadcasting
agentuity cloud queue create --name events --type pubsub
# List all queues
agentuity cloud queue list
# Get queue details and stats
agentuity cloud queue get order-processing
# Pause/resume processing
agentuity cloud queue pause order-processing
agentuity cloud queue resume order-processing
# Delete a queue
agentuity cloud queue delete order-processingFor programmatic queue management, see SDK Utilities for External Apps.
Consuming Messages
Webhook Destinations
Configure webhook destinations to automatically deliver messages to HTTP endpoints. Set these up in the App or programmatically.
Webhook destinations support:
- Custom headers and authentication
- Configurable timeouts (up to 30 seconds)
- Retry policies with exponential backoff
Pull-Based Consumption
For workers that pull and process messages, see Pull-Based Consumption. This pattern is useful for long-running workers that need fine-grained control over message processing.
Dead Letter Queue
Messages that exceed the retry limit are moved to the dead letter queue (DLQ). Inspect and replay failed messages:
# List failed messages
agentuity cloud queue dlq order-processing
# Replay a message back to the queue
agentuity cloud queue dlq replay order-processing --message-id msg_abc123
# Purge all DLQ messages
agentuity cloud queue dlq purge order-processingFor programmatic DLQ access, see Dead Letter Queue Operations.
HTTP Ingestion Sources
Create public HTTP endpoints to ingest data into queues from external services. Configure these in the App or programmatically.
| Auth Type | Description |
|---|---|
none | No authentication |
basic | HTTP Basic Auth (username:password) |
header | Custom header value (Bearer token) |
Queue Settings
Configure queue behavior when creating or updating:
| Setting | Default | Description |
|---|---|---|
default_ttl_seconds | null | Message expiration (null = never) |
default_visibility_timeout_seconds | 30 | Processing timeout before message returns to queue |
default_max_retries | 5 | Attempts before moving to DLQ |
default_retry_backoff_ms | 1000 | Initial retry delay |
default_retry_max_backoff_ms | 60000 | Maximum retry delay |
default_retry_multiplier | 2.0 | Exponential backoff multiplier |
max_in_flight_per_client | 10 | Concurrent messages per consumer |
retention_seconds | 2592000 | How long to keep acknowledged messages (30 days) |
Validation Limits
| Limit | Value |
|---|---|
| Queue name length | 1-256 characters |
| Queue name format | Lowercase letters, digits, underscores, hyphens. Must start with letter or underscore. |
| Payload size | 1 MB max |
| Partition key length | 256 characters max |
| Idempotency key length | 256 characters max |
| Batch size | 1000 messages max |
Best Practices
- Use idempotency keys for operations that shouldn't be duplicated (payments, emails)
- Set appropriate TTLs for time-sensitive messages
- Use partition keys when message ordering matters within a group
- Monitor DLQ regularly to catch and fix processing failures
- Configure webhook retry policies to handle transient failures gracefully
Next Steps
- Durable Streams: Streaming storage for large exports
- Key-Value Storage: Fast caching and configuration
- Background Tasks: Patterns for async processing
- Webhook Handler: Receiving external webhooks
Need Help?
Join our Community for assistance or just to hang with other humans building agents.
Send us an email at hi@agentuity.com if you'd like to get in touch.
Please Follow us on
If you haven't already, please Signup for your free account now and start building your first agent!