Using Message Queues — Agentuity Documentation

Using Message Queues

Publish messages and manage queues with ctx.queue

Publish messages for async processing, webhooks, and event-driven workflows. The queue service is available via ctx.queue in agents and c.var.queue in routes.

For concepts, patterns, and consumer configuration, see Queues.

publish(queueName, payload, params?)

Publish a message to a queue. Objects are automatically JSON-stringified before publishing.

ParamTypeRequiredDescription
queueNamestringyesQueue name (lowercase letters, digits, underscores, hyphens)
payloadstring | objectyesMessage payload (max 1 MB)
paramsQueuePublishParamsnoOptional publish configuration (see below)

QueuePublishParams fields (all optional):

FieldTypeDescription
metadataRecord<string, unknown>Key-value metadata for message routing or filtering
partitionKeystringPartition key for ordered processing (max 256 chars)
idempotencyKeystringDeduplication key (max 256 chars)
ttlnumberTime-to-live in seconds
projectIdstringCross-project publishing target
agentIdstringAgent ID for attribution
syncbooleanWait for full persistence before returning (default false)

Returns: Promise<QueuePublishResult>

interface QueuePublishResult {
  id: string;          // Message ID (prefixed with msg_)
  offset: number;      // Sequential offset in the queue
  publishedAt: string; // ISO 8601 timestamp
}

Example

import { createAgent } from '@agentuity/runtime';
import { z } from 'zod';
 
const agent = createAgent('OrderProcessor', {
  schema: {
    input: z.object({ orderId: z.string(), email: z.string() }),
    output: z.object({ messageId: z.string() }),
  },
  handler: async (ctx, input) => {
    const result = await ctx.queue.publish('order-notifications', {
      orderId: input.orderId,
      action: 'send-confirmation',
      email: input.email,
    });
 
    ctx.logger.info('Published message %s at offset %d', result.id, result.offset);
    return { messageId: result.id };
  },
});

Example with publish options

// Ordered delivery for the same customer
const result = await ctx.queue.publish(
  'invoices',
  { invoiceId: 'inv-789', amount: 99.99 },
  {
    partitionKey: 'customer-123',
    idempotencyKey: 'inv-789-v1',
    ttl: 86400,
    metadata: { priority: 'high' },
  }
);

createQueue(queueName, params?)

Create a queue with idempotent semantics. If the queue already exists, this returns successfully without error. Safe to call multiple times.

ParamTypeRequiredDescription
queueNamestringyesQueue name (lowercase, starts with letter or underscore)
paramsQueueCreateParamsnoOptional creation parameters (see below)

QueueCreateParams fields (all optional):

FieldTypeDescription
queueType'worker' | 'pubsub'Queue type (default 'worker')
descriptionstringDescription of the queue's purpose
settingsobjectQueue behavior configuration (see below)

settings fields (all optional):

FieldTypeDescription
defaultTtlSecondsnumber | nullDefault TTL for messages (null = no expiration)
defaultVisibilityTimeoutSecondsnumberHow long a message is invisible after being received
defaultMaxRetriesnumberMax delivery attempts before moving to DLQ
maxInFlightPerClientnumberMax concurrent messages per consumer
retentionSecondsnumberHow long to retain acknowledged messages

Returns: Promise<QueueCreateResult>

interface QueueCreateResult {
  name: string;      // The queue name
  queueType: string; // 'worker' or 'pubsub'
}

Example

// Create a worker queue (default)
await ctx.queue.createQueue('task-queue');
 
// Create a pubsub queue with custom settings
await ctx.queue.createQueue('events', {
  queueType: 'pubsub',
  description: 'Broadcast events to all subscribers',
  settings: {
    defaultTtlSeconds: 86400,
    defaultMaxRetries: 5,
  },
});

deleteQueue(queueName)

Permanently delete a queue and all its messages. This operation cannot be undone.

ParamTypeRequiredDescription
queueNamestringyesThe queue name to delete

Returns: Promise<void>

Throws: QueueNotFoundError if the queue does not exist.

Example

await ctx.queue.deleteQueue('old-notifications');
ctx.logger.info('Queue deleted');

Error Handling

The queue service throws typed errors for common failure cases:

import { QueueNotFoundError, QueueValidationError, QueuePublishError } from '@agentuity/core';
 
try {
  await ctx.queue.publish('my-queue', payload);
} catch (error) {
  if (error instanceof QueueNotFoundError) {
    ctx.logger.error('Queue does not exist');
  } else if (error instanceof QueueValidationError) {
    ctx.logger.error('Validation failed: %s', error.message);
  } else if (error instanceof QueuePublishError) {
    ctx.logger.error('Publish failed: %s', error.message);
  }
}

Validation rules:

  • Queue names must start with a letter or underscore, contain only lowercase letters, digits, underscores, and hyphens, and be at most 256 characters
  • Payloads cannot be empty and must not exceed 1 MB
  • Partition keys and idempotency keys must not exceed 256 characters
  • TTL cannot be negative