Use background work when a request starts something slow: report generation, webhook fan-out, email preparation, model evaluation, or file processing. The pattern is: the route enqueues the job and returns a handle; the worker route does the work and writes output to a durable stream; KV holds a status record the client can poll at any time.
npm install hono @agentuity/queue @agentuity/keyvalue @agentuity/stream arktypeSet Up the Queue
Create the worker queue once, outside the request path. The --visibility-timeout controls how long a message stays invisible after delivery before it is retried if your worker never responds.
agentuity cloud queue create worker \
--name report-jobs \
--max-retries 3 \
--visibility-timeout 120After your app has a public URL, add an HTTP destination so the platform calls your worker route automatically on every new message. Use a tunnel like ngrok or cloudflared to expose your local dev server, or use the deployed app URL.
agentuity cloud queue destinations create report-jobs \
--type http \
--name report-worker \
--url https://<your-app-host>/api/workers/report-jobsqueue.publish() stores the message. The platform only calls your worker route when a queue destination is configured and points at a reachable public URL. 127.0.0.1 and localhost are not reachable from the queue; use a tunnel for local testing or the deployed URL.
Status Type
Define the discriminated union for the status record first. Both the request route and the worker route will write it; the status route and the client will read it.
import { Hono } from 'hono';
import { QueueClient } from '@agentuity/queue';
import { KeyValueClient } from '@agentuity/keyvalue';
import { StreamClient } from '@agentuity/stream';
import { type } from 'arktype';
type ReportStatus =
| { readonly kind: 'queued'; readonly messageId: string }
| { readonly kind: 'processing' }
| { readonly kind: 'done'; readonly streamUrl: string }
| { readonly kind: 'failed'; readonly message: string };
const requestSchema = type({
accountId: 'string',
reportMonth: 'string',
});
const jobSchema = type({
jobId: 'string',
accountId: 'string',
reportMonth: 'string',
});
type ReportJob = typeof jobSchema.infer;
const queue = new QueueClient();
const kv = new KeyValueClient();
const streams = new StreamClient();
const app = new Hono();Request Route
The request route validates input, publishes the job, writes an initial status record, and returns 202 Accepted with a status URL the client can poll.
app.post('/api/reports', async (c) => {
const body: unknown = await c.req.json();
const input = requestSchema(body);
if (input instanceof type.errors) {
return c.json({ error: 'accountId and reportMonth are required' }, 400);
}
const jobId = crypto.randomUUID();
const job: ReportJob = {
jobId,
accountId: input.accountId,
reportMonth: input.reportMonth,
};
// idempotencyKey ties the message to this jobId so retries don't duplicate
const message = await queue.publish('report-jobs', job, {
idempotencyKey: jobId,
});
await kv.set<ReportStatus>('report-status', jobId, {
kind: 'queued',
messageId: message.id,
});
return c.json({ jobId, statusUrl: `/api/reports/${jobId}` }, 202);
});queue.publish() returns { id, offset, publishedAt }. The id is the message ID stored in the status record for tracing.
Treat the queue message as the retry boundary. Use a stable jobId as the idempotencyKey, make the worker safe to run more than once, and write status only after each durable step completes.
Status Route
The status route reads the KV record written by the request route and later updated by the worker. Keep it thin.
app.get('/api/reports/:jobId', async (c) => {
const result = await kv.get<ReportStatus>('report-status', c.req.param('jobId'));
if (!result.exists) {
return c.json({ error: 'Report job not found' }, 404);
}
return c.json(result.data);
});Worker Route
The worker route receives the queue message payload from the HTTP destination, updates the status record, writes output to a durable stream, and marks the job done.
async function writeReport(job: ReportJob): Promise<string> {
const stream = await streams.create('monthly-reports', {
contentType: 'text/csv',
metadata: {
accountId: job.accountId,
reportMonth: job.reportMonth,
},
ttl: 60 * 60 * 24 * 90, // keep for 90 days
});
try {
await stream.write('account_id,report_month,total\n');
await stream.write(`${job.accountId},${job.reportMonth},42817\n`);
} finally {
await stream.close(); // must always close to finalize
}
return stream.url;
}
app.post('/api/workers/report-jobs', async (c) => {
const body: unknown = await c.req.json();
const job = jobSchema(body);
if (job instanceof type.errors) {
return c.json({ ok: false }, 400);
}
await kv.set<ReportStatus>('report-status', job.jobId, { kind: 'processing' });
try {
const streamUrl = await writeReport(job);
await kv.set<ReportStatus>('report-status', job.jobId, {
kind: 'done',
streamUrl,
});
return c.json({ ok: true });
} catch (error) {
const message = error instanceof Error ? error.message : 'Report generation failed';
await kv.set<ReportStatus>('report-status', job.jobId, {
kind: 'failed',
message,
});
return c.json({ ok: false }, 500);
}
});
export default app;The queue decouples the request from the worker. KV holds user-visible status so any route or client can check it without coupling to the worker. Durable streams hold output that is too large or too useful to fit in a response body.
Poll from the Client
Poll the status URL until the worker writes done or failed. Validate the response at the boundary so TypeScript knows which fields are present.
import { z } from 'zod';
const reportStatusSchema = z.discriminatedUnion('kind', [
z.object({ kind: z.literal('queued'), messageId: z.string() }),
z.object({ kind: z.literal('processing') }),
z.object({ kind: z.literal('done'), streamUrl: z.string() }),
z.object({ kind: z.literal('failed'), message: z.string() }),
]);
type ReportStatus = z.infer<typeof reportStatusSchema>;
async function waitForReport(statusUrl: string): Promise<ReportStatus> {
for (;;) {
const response = await fetch(statusUrl);
if (!response.ok) {
throw new Error(`Status check failed: ${response.status}`);
}
const body: unknown = await response.json();
const status = reportStatusSchema.parse(body);
if (status.kind === 'done' || status.kind === 'failed') {
return status;
}
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}Pick the Right Service
| Need | Service |
|---|---|
| Async handoff between routes or workers | Queues |
| User-visible job status, fast reads | Key-Value Storage |
| Large generated output, shareable URL | Durable Streams |
| Recurring work on a timer | Schedules |
| External event intake from Stripe, GitHub, etc. | Webhooks |
Common Gotchas
| Symptom | Check |
|---|---|
Route returns 202 but the worker never runs | The queue has no enabled HTTP destination; run agentuity cloud queue destinations list report-jobs to verify |
| Local worker never receives messages | The destination URL points at a public address, not 127.0.0.1; use a tunnel for local testing |
| Duplicate button clicks enqueue duplicate jobs | Pass an idempotencyKey derived from a stable job ID |
| Status record disappears before the user checks it | Set a KV TTL that outlives the queue retry window |
| Stream URL returns empty content | The worker threw before calling stream.close(); always close in a finally block |
Next Steps
- Queues: full publish API with partitioning, metadata, DLQ, and WebSocket subscriptions
- Key-Value Storage: TTL, namespaces, and key search
- Durable Streams: compression, background writes, and shareable URLs
- Schedules: start recurring work without an incoming user request