Publish messages for async processing, webhooks, and event-driven workflows. The queue service is available via ctx.queue in agents and c.var.queue in routes.
Use the @agentuity/queue standalone package to access this service from any Node.js or Bun app without the runtime.
For concepts, patterns, and consumer configuration, see Queues.
publish(queueName, payload, params?)
Publish a message to a queue. Objects are automatically JSON-stringified before publishing.
| Param | Type | Required | Description |
|---|---|---|---|
queueName | string | yes | Queue name (lowercase letters, digits, underscores, hyphens) |
payload | string | object | yes | Message payload (max 1 MB) |
params | QueuePublishParams | no | Optional publish configuration (see below) |
QueuePublishParams fields (all optional):
| Field | Type | Description |
|---|---|---|
metadata | Record<string, unknown> | Key-value metadata for message routing or filtering |
partitionKey | string | Partition key for ordered processing (max 256 chars) |
idempotencyKey | string | Deduplication key (max 256 chars) |
ttl | number | Time-to-live in seconds |
projectId | string | Cross-project publishing target |
agentId | string | Agent ID for attribution |
sync | boolean | Wait for full persistence before returning (default false) |
Returns: Promise<QueuePublishResult>
interface QueuePublishResult {
id: string; // Message ID (prefixed with msg_)
offset: number; // Sequential offset in the queue
publishedAt: string; // ISO 8601 timestamp
}Example
import { createAgent } from '@agentuity/runtime';
import { z } from 'zod';
const agent = createAgent('OrderProcessor', {
schema: {
input: z.object({ orderId: z.string(), email: z.string() }),
output: z.object({ messageId: z.string() }),
},
handler: async (ctx, input) => {
const result = await ctx.queue.publish('order-notifications', {
orderId: input.orderId,
action: 'send-confirmation',
email: input.email,
});
ctx.logger.info('Published message %s at offset %d', result.id, result.offset);
return { messageId: result.id };
},
});Example with publish options
// Ordered delivery for the same customer
const result = await ctx.queue.publish(
'invoices',
{ invoiceId: 'inv-789', amount: 99.99 },
{
partitionKey: 'customer-123',
idempotencyKey: 'inv-789-v1',
ttl: 86400,
metadata: { priority: 'high' },
}
);createQueue(queueName, params?)
Create a queue with idempotent semantics. If the queue already exists, this returns successfully without error. Safe to call multiple times.
| Param | Type | Required | Description |
|---|---|---|---|
queueName | string | yes | Queue name (lowercase, starts with letter or underscore) |
params | QueueCreateParams | no | Optional creation parameters (see below) |
QueueCreateParams fields (all optional):
| Field | Type | Description |
|---|---|---|
queueType | 'worker' | 'pubsub' | Queue type (default 'worker') |
description | string | Description of the queue's purpose |
settings | object | Queue behavior configuration (see below) |
settings fields (all optional):
| Field | Type | Description |
|---|---|---|
defaultTtlSeconds | number | null | Default TTL for messages (null = no expiration) |
defaultVisibilityTimeoutSeconds | number | How long a message is invisible after being received |
defaultMaxRetries | number | Max delivery attempts before moving to DLQ |
maxInFlightPerClient | number | Max concurrent messages per consumer |
retentionSeconds | number | How long to retain acknowledged messages |
Returns: Promise<QueueCreateResult>
interface QueueCreateResult {
name: string; // The queue name
queueType: string; // 'worker' or 'pubsub'
}Example
// Create a worker queue (default)
await ctx.queue.createQueue('task-queue');
// Create a pubsub queue with custom settings
await ctx.queue.createQueue('events', {
queueType: 'pubsub',
description: 'Broadcast events to all subscribers',
settings: {
defaultTtlSeconds: 86400,
defaultMaxRetries: 5,
},
});deleteQueue(queueName)
Permanently delete a queue and all its messages. This operation cannot be undone.
| Param | Type | Required | Description |
|---|---|---|---|
queueName | string | yes | The queue name to delete |
Returns: Promise<void>
Throws: QueueNotFoundError if the queue does not exist.
Example
await ctx.queue.deleteQueue('old-notifications');
ctx.logger.info('Queue deleted');Error Handling
The queue service throws typed errors for common failure cases:
import { QueueNotFoundError, QueueValidationError, QueuePublishError } from '@agentuity/core';
try {
await ctx.queue.publish('my-queue', payload);
} catch (error) {
if (error instanceof QueueNotFoundError) {
ctx.logger.error('Queue does not exist');
} else if (error instanceof QueueValidationError) {
ctx.logger.error('Validation failed: %s', error.message);
} else if (error instanceof QueuePublishError) {
ctx.logger.error('Publish failed: %s', error.message);
}
}Validation rules:
- Queue names must start with a letter or underscore, contain only lowercase letters, digits, underscores, and hyphens, and be at most 256 characters
- Payloads cannot be empty and must not exceed 1 MB
- Partition keys and idempotency keys must not exceed 256 characters
- TTL cannot be negative