Publish messages for async processing, webhooks, and event-driven workflows. The queue service is available via ctx.queue in agents and c.var.queue in routes.
Use the @agentuity/queue standalone package to access this service from any Node.js or Bun app without the runtime.
For concepts, patterns, and consumer configuration, see Queues.
In local development, publish() stores messages in SQLite. createQueue() returns a queue descriptor and deleteQueue() is a no-op because local queues are created as messages are written.
publish(queueName, payload, params?)
Publish a message to a queue. Objects are automatically JSON-stringified before publishing.
| Param | Type | Required | Description |
|---|---|---|---|
queueName | string | yes | Queue name (lowercase letters, digits, underscores, hyphens) |
payload | string | object | yes | Message payload (max 1 MB) |
params | QueuePublishParams | no | Optional publish configuration (see below) |
QueuePublishParams fields (all optional):
| Field | Type | Description |
|---|---|---|
metadata | Record<string, unknown> | Key-value metadata for message routing or filtering |
partitionKey | string | Partition key for ordered processing (max 256 chars) |
idempotencyKey | string | Deduplication key (max 256 chars) |
ttl | number | Time-to-live in seconds |
projectId | string | Cross-project publishing target |
agentId | string | Agent ID for attribution |
sync | boolean | Wait for full persistence before returning (default false) |
Returns: Promise<QueuePublishResult>
interface QueuePublishResult {
id: string; // Message ID (prefixed with qmsg_)
offset: number; // Sequential offset in the queue
publishedAt: string; // ISO 8601 timestamp
}Example
import { createAgent } from '@agentuity/runtime';
import { z } from 'zod';
const agent = createAgent('OrderProcessor', {
schema: {
input: z.object({ orderId: z.string(), email: z.string() }),
output: z.object({ messageId: z.string() }),
},
handler: async (ctx, input) => {
const result = await ctx.queue.publish('order-notifications', {
orderId: input.orderId,
action: 'send-confirmation',
email: input.email,
});
ctx.logger.info('Published message %s at offset %d', result.id, result.offset);
return { messageId: result.id };
},
});Example with publish options
// Ordered delivery for the same customer
const result = await ctx.queue.publish(
'invoices',
{ invoiceId: 'inv-789', amount: 99.99 },
{
partitionKey: 'customer-123',
idempotencyKey: 'inv-789-v1',
ttl: 86400,
metadata: { priority: 'high' },
}
);createQueue(queueName, params?)
Create a queue with idempotent semantics. If the queue already exists, this returns successfully without error. Safe to call multiple times.
| Param | Type | Required | Description |
|---|---|---|---|
queueName | string | yes | Queue name (lowercase, starts with letter or underscore) |
params | QueueCreateParams | no | Optional creation parameters (see below) |
QueueCreateParams fields (all optional):
| Field | Type | Description |
|---|---|---|
queueType | 'worker' | 'pubsub' | Queue type (default 'worker') |
description | string | Description of the queue's purpose |
settings | object | Queue behavior configuration (see below) |
settings fields (all optional):
| Field | Type | Description |
|---|---|---|
defaultTtlSeconds | number | null | Default TTL for messages (null = no expiration) |
defaultVisibilityTimeoutSeconds | number | How long a message is invisible after being received |
defaultMaxRetries | number | Max delivery attempts before moving to DLQ |
maxInFlightPerClient | number | Max concurrent messages per consumer |
retentionSeconds | number | How long to retain acknowledged messages |
Returns: Promise<QueueCreateResult>
interface QueueCreateResult {
name: string; // The queue name
queueType: string; // 'worker' or 'pubsub'
}Example
// Create a worker queue (default)
await ctx.queue.createQueue('task-queue');
// Create a pubsub queue with custom settings
await ctx.queue.createQueue('events', {
queueType: 'pubsub',
description: 'Broadcast events to all subscribers',
settings: {
defaultTtlSeconds: 86400,
defaultMaxRetries: 5,
},
});deleteQueue(queueName)
Permanently delete a queue and all its messages. This operation cannot be undone.
| Param | Type | Required | Description |
|---|---|---|---|
queueName | string | yes | The queue name to delete |
Returns: Promise<void>
Throws: QueueNotFoundError if the queue does not exist.
Example
await ctx.queue.deleteQueue('old-notifications');
ctx.logger.info('Queue deleted');Error Handling
The queue service throws typed errors for common failure cases:
import { QueueNotFoundError, QueueValidationError, QueuePublishError } from '@agentuity/core';
try {
await ctx.queue.publish('my-queue', payload);
} catch (error) {
if (error instanceof QueueNotFoundError) {
ctx.logger.error('Queue does not exist');
} else if (error instanceof QueueValidationError) {
ctx.logger.error('Validation failed: %s', error.message);
} else if (error instanceof QueuePublishError) {
ctx.logger.error('Publish failed: %s', error.message);
}
}Validation rules:
- Queue names must start with a letter or underscore, contain only lowercase letters, digits, underscores, and hyphens, and be at most 256 characters
- Payloads cannot be empty and must not exceed 1 MB
- Partition keys and idempotency keys must not exceed 256 characters
- TTL cannot be negative