Background Work

Use queues, status records, and durable streams for work that should outlive a request

Use background work when a request starts something slow: report generation, webhook fan-out, email preparation, model evaluation, or file processing. The pattern is: the route enqueues the job and returns a handle; the worker route does the work and writes output to a durable stream; KV holds a status record the client can poll at any time.

npm install hono @agentuity/queue @agentuity/keyvalue @agentuity/stream arktype

Set Up the Queue

Create the worker queue once, outside the request path. The --visibility-timeout controls how long a message stays invisible after delivery before it is retried if your worker never responds.

agentuity cloud queue create worker \
  --name report-jobs \
  --max-retries 3 \
  --visibility-timeout 120

After your app has a public URL, add an HTTP destination so the platform calls your worker route automatically on every new message. Use a tunnel like ngrok or cloudflared to expose your local dev server, or use the deployed app URL.

agentuity cloud queue destinations create report-jobs \
  --type http \
  --name report-worker \
  --url https://<your-app-host>/api/workers/report-jobs

Status Type

Define the discriminated union for the status record first. Both the request route and the worker route will write it; the status route and the client will read it.

typescriptsrc/index.ts
import { Hono } from 'hono';
import { QueueClient } from '@agentuity/queue';
import { KeyValueClient } from '@agentuity/keyvalue';
import { StreamClient } from '@agentuity/stream';
import { type } from 'arktype';
 
type ReportStatus =
  | { readonly kind: 'queued'; readonly messageId: string }
  | { readonly kind: 'processing' }
  | { readonly kind: 'done'; readonly streamUrl: string }
  | { readonly kind: 'failed'; readonly message: string };
 
const requestSchema = type({
  accountId: 'string',
  reportMonth: 'string',
});
 
const jobSchema = type({
  jobId: 'string',
  accountId: 'string',
  reportMonth: 'string',
});
 
type ReportJob = typeof jobSchema.infer;
 
const queue = new QueueClient();
const kv = new KeyValueClient();
const streams = new StreamClient();
const app = new Hono();

Request Route

The request route validates input, publishes the job, writes an initial status record, and returns 202 Accepted with a status URL the client can poll.

typescriptsrc/index.ts
app.post('/api/reports', async (c) => {
  const body: unknown = await c.req.json();
  const input = requestSchema(body);
  if (input instanceof type.errors) {
    return c.json({ error: 'accountId and reportMonth are required' }, 400);
  }
  const jobId = crypto.randomUUID();
 
  const job: ReportJob = {
    jobId,
    accountId: input.accountId,
    reportMonth: input.reportMonth,
  };
 
  // idempotencyKey ties the message to this jobId so retries don't duplicate
  const message = await queue.publish('report-jobs', job, {
    idempotencyKey: jobId, 
  });
 
  await kv.set<ReportStatus>('report-status', jobId, { 
    kind: 'queued',
    messageId: message.id,
  });
 
  return c.json({ jobId, statusUrl: `/api/reports/${jobId}` }, 202);
});

queue.publish() returns { id, offset, publishedAt }. The id is the message ID stored in the status record for tracing.

Status Route

The status route reads the KV record written by the request route and later updated by the worker. Keep it thin.

typescriptsrc/index.ts
app.get('/api/reports/:jobId', async (c) => {
  const result = await kv.get<ReportStatus>('report-status', c.req.param('jobId'));
 
  if (!result.exists) {
    return c.json({ error: 'Report job not found' }, 404);
  }
 
  return c.json(result.data);
});

Worker Route

The worker route receives the queue message payload from the HTTP destination, updates the status record, writes output to a durable stream, and marks the job done.

typescriptsrc/index.ts
async function writeReport(job: ReportJob): Promise<string> {
  const stream = await streams.create('monthly-reports', {
    contentType: 'text/csv',
    metadata: {
      accountId: job.accountId,
      reportMonth: job.reportMonth,
    },
    ttl: 60 * 60 * 24 * 90, // keep for 90 days
  });
 
  try {
    await stream.write('account_id,report_month,total\n');
    await stream.write(`${job.accountId},${job.reportMonth},42817\n`);
  } finally {
    await stream.close(); // must always close to finalize
  }
 
  return stream.url;
}
 
app.post('/api/workers/report-jobs', async (c) => {
  const body: unknown = await c.req.json();
  const job = jobSchema(body);
  if (job instanceof type.errors) {
    return c.json({ ok: false }, 400);
  }
 
  await kv.set<ReportStatus>('report-status', job.jobId, { kind: 'processing' });
 
  try {
    const streamUrl = await writeReport(job);
    await kv.set<ReportStatus>('report-status', job.jobId, {
      kind: 'done',
      streamUrl,
    });
 
    return c.json({ ok: true });
  } catch (error) {
    const message = error instanceof Error ? error.message : 'Report generation failed';
 
    await kv.set<ReportStatus>('report-status', job.jobId, {
      kind: 'failed',
      message,
    });
 
    return c.json({ ok: false }, 500);
  }
});
 
export default app;

Poll from the Client

Poll the status URL until the worker writes done or failed. Validate the response at the boundary so TypeScript knows which fields are present.

import { z } from 'zod';
 
const reportStatusSchema = z.discriminatedUnion('kind', [
  z.object({ kind: z.literal('queued'), messageId: z.string() }),
  z.object({ kind: z.literal('processing') }),
  z.object({ kind: z.literal('done'), streamUrl: z.string() }),
  z.object({ kind: z.literal('failed'), message: z.string() }),
]);
 
type ReportStatus = z.infer<typeof reportStatusSchema>;
 
async function waitForReport(statusUrl: string): Promise<ReportStatus> {
  for (;;) {
    const response = await fetch(statusUrl);
 
    if (!response.ok) {
      throw new Error(`Status check failed: ${response.status}`);
    }
 
    const body: unknown = await response.json();
    const status = reportStatusSchema.parse(body); 
 
    if (status.kind === 'done' || status.kind === 'failed') {
      return status;
    }
 
    await new Promise((resolve) => setTimeout(resolve, 1000));
  }
}

Pick the Right Service

NeedService
Async handoff between routes or workersQueues
User-visible job status, fast readsKey-Value Storage
Large generated output, shareable URLDurable Streams
Recurring work on a timerSchedules
External event intake from Stripe, GitHub, etc.Webhooks

Common Gotchas

SymptomCheck
Route returns 202 but the worker never runsThe queue has no enabled HTTP destination; run agentuity cloud queue destinations list report-jobs to verify
Local worker never receives messagesThe destination URL points at a public address, not 127.0.0.1; use a tunnel for local testing
Duplicate button clicks enqueue duplicate jobsPass an idempotencyKey derived from a stable job ID
Status record disappears before the user checks itSet a KV TTL that outlives the queue retry window
Stream URL returns empty contentThe worker threw before calling stream.close(); always close in a finally block

Next Steps

  • Queues: full publish API with partitioning, metadata, DLQ, and WebSocket subscriptions
  • Key-Value Storage: TTL, namespaces, and key search
  • Durable Streams: compression, background writes, and shareable URLs
  • Schedules: start recurring work without an incoming user request