SDK Utilities for External Apps — Agentuity Documentation

SDK Utilities for External Apps

Use storage, queues, logging, and error handling utilities from external backends like Next.js or Express

Use @agentuity/server and @agentuity/core utilities in external apps, scripts, or backends that integrate with Agentuity.

Storage Access

Access Agentuity storage (KV, Vector, Stream) directly from your Next.js or Express backend using your SDK key.

Add these environment variables to your .env:

bash.env.local
AGENTUITY_SDK_KEY=your_sdk_key    # From Agentuity console or project .env
AGENTUITY_REGION=use              # From agentuity.json (region field)
typescriptlib/agentuity-storage.ts
import {
  KeyValueStorageService,
  VectorStorageService,
  StreamStorageService,
} from '@agentuity/core';
import { createServerFetchAdapter, getServiceUrls, createLogger } from '@agentuity/server';
 
const logger = createLogger('info');
 
const adapter = createServerFetchAdapter({
  headers: {
    Authorization: `Bearer ${process.env.AGENTUITY_SDK_KEY}`,
  },
}, logger);
 
const urls = getServiceUrls(process.env.AGENTUITY_REGION!);
 
export const kv = new KeyValueStorageService(urls.catalyst, adapter);
export const vector = new VectorStorageService(urls.catalyst, adapter);
export const stream = new StreamStorageService(urls.stream, adapter);

Use the client in your Next.js backend:

typescriptapp/api/sessions/route.ts
import { type NextRequest } from 'next/server';
import { kv } from '@/lib/agentuity-storage';
 
interface ChatSession {
  messages: Array<{ role: string; content: string }>;
  createdAt: string;
}
 
export async function GET(request: NextRequest) {
  const sessionId = request.nextUrl.searchParams.get('id');
 
  if (!sessionId) {
    return Response.json({ error: 'Missing session ID' }, { status: 400 });
  }
 
  const result = await kv.get<ChatSession>('sessions', sessionId);
 
  if (!result.exists) {
    return Response.json({ error: 'Not found' }, { status: 404 });
  }
 
  return Response.json(result.data);
}
 
export async function POST(request: NextRequest) {
  const { sessionId, messages } = await request.json();
 
  await kv.set('sessions', sessionId, {
    messages,
    createdAt: new Date().toISOString(),
  }, { ttl: 86400 });
 
  return Response.json({ success: true });
}

Vector Storage

typescriptapp/api/search/route.ts
import { type NextRequest } from 'next/server';
import { vector } from '@/lib/agentuity-storage';
 
export async function POST(request: NextRequest) {
  const { query, limit = 10 } = await request.json();
 
  const results = await vector.search('documents', {
    query,
    limit,
  });
 
  return Response.json(results);
}

Stream Storage

Use streams for large data exports or file transfers:

typescriptapp/api/export/route.ts
import { type NextRequest } from 'next/server';
import { stream } from '@/lib/agentuity-storage';
 
export async function GET(request: NextRequest) {
  const id = request.nextUrl.searchParams.get('id');
 
  if (!id) {
    return Response.json({ error: 'Missing stream ID' }, { status: 400 });
  }
 
  const data = await stream.download(id);
  return new Response(data, {
    headers: { 'Content-Type': 'application/octet-stream' },
  });
}

Queue Management

Manage queues programmatically from external apps or scripts using APIClient:

typescriptlib/agentuity-queues.ts
import { APIClient, createLogger, getServiceUrls } from '@agentuity/server';
 
if (!process.env.AGENTUITY_REGION) {
  throw new Error('AGENTUITY_REGION environment variable is required');
}
if (!process.env.AGENTUITY_SDK_KEY) {
  throw new Error('AGENTUITY_SDK_KEY environment variable is required');
}
 
export const logger = createLogger('info');
const urls = getServiceUrls(process.env.AGENTUITY_REGION);
 
export const client = new APIClient(
  urls.catalyst,
  logger,
  process.env.AGENTUITY_SDK_KEY
);

Creating and Managing Queues

import {
  createQueue,
  listQueues,
  deleteQueue,
  pauseQueue,
  resumeQueue,
} from '@agentuity/server';
import { client } from '@/lib/agentuity-queues';
 
// Create a worker queue
const queue = await createQueue(client, {
  name: 'order-processing',
  queue_type: 'worker',
  settings: {
    default_max_retries: 5,
    default_visibility_timeout_seconds: 60,
  },
});
 
// List all queues
const { queues } = await listQueues(client);
 
// Pause and resume
await pauseQueue(client, 'order-processing');
await resumeQueue(client, 'order-processing');
 
// Delete a queue
await deleteQueue(client, 'old-queue');

Dead Letter Queue Operations

import {
  listDeadLetterMessages,
  replayDeadLetterMessage,
  purgeDeadLetter,
} from '@agentuity/server';
import { client, logger } from '@/lib/agentuity-queues';
 
// List failed messages
const { messages } = await listDeadLetterMessages(client, 'order-processing');
 
for (const msg of messages) {
  logger.warn('Failed message', { id: msg.id, reason: msg.failure_reason });
 
  // Replay back to the queue
  await replayDeadLetterMessage(client, 'order-processing', msg.id);
}
 
// Purge all DLQ messages
await purgeDeadLetter(client, 'order-processing');

Webhook Destinations

import { createDestination } from '@agentuity/server';
import { client } from '@/lib/agentuity-queues';
 
await createDestination(client, 'order-processing', {
  destination_type: 'http',
  config: {
    url: 'https://api.example.com/webhook/orders',
    method: 'POST',
    headers: { 'X-API-Key': process.env.WEBHOOK_API_KEY! },
    timeout_ms: 30000,
    retry_policy: {
      max_attempts: 5,
      initial_backoff_ms: 1000,
      max_backoff_ms: 60000,
      backoff_multiplier: 2.0,
    },
  },
});

HTTP Ingestion Sources

import { createSource } from '@agentuity/server';
import { client, logger } from '@/lib/agentuity-queues';
 
const source = await createSource(client, 'webhook-queue', {
  name: 'stripe-webhooks',
  description: 'Receives Stripe payment events',
  auth_type: 'header',
  auth_value: `Bearer ${process.env.STRIPE_WEBHOOK_SECRET}`,
});
 
// External services POST to this URL
logger.info('Source created', { url: source.url });

Pull-Based Consumption

For workers that pull and acknowledge messages:

import { receiveMessage, ackMessage, nackMessage } from '@agentuity/server';
import { client } from '@/lib/agentuity-queues';
 
// Receive a message (blocks until available or timeout)
const message = await receiveMessage(client, 'order-processing');
 
if (message) {
  try {
    await processOrder(message.payload);
    await ackMessage(client, 'order-processing', message.id);
  } catch (error) {
    // Message returns to queue for retry
    await nackMessage(client, 'order-processing', message.id);
  }
}

Alternative: HTTP Routes

If you want to centralize storage logic in your Agentuity project (for middleware, sharing across multiple apps, or avoiding SDK key distribution), use HTTP routes instead.

typescriptsrc/api/sessions/route.ts
import { createRouter } from '@agentuity/runtime';
 
const router = createRouter();
 
interface ChatSession {
  messages: Array<{ role: string; content: string }>;
  createdAt: string;
  updatedAt: string;
}
 
// Get a session by ID
router.get('/:id', async (c) => {
  const sessionId = c.req.param('id');
  const result = await c.var.kv.get<ChatSession>('sessions', sessionId); 
 
  if (!result.exists) {
    return c.json({ error: 'Session not found' }, 404);
  }
 
  return c.json(result.data);
});
 
// Create or update a session
router.post('/:id', async (c) => {
  const sessionId = c.req.param('id');
  const body = await c.req.json<Partial<ChatSession>>();
 
  const existing = await c.var.kv.get<ChatSession>('sessions', sessionId);
  const session: ChatSession = existing.exists
    ? { ...existing.data, ...body, updatedAt: new Date().toISOString() }
    : { messages: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), ...body };
 
  await c.var.kv.set('sessions', sessionId, session, { ttl: 86400 * 7 }); 
 
  c.var.logger.info('Session updated', { sessionId });
  return c.json(session);
});
 
// Delete a session
router.delete('/:id', async (c) => {
  const sessionId = c.req.param('id');
  await c.var.kv.delete('sessions', sessionId); 
 
  c.var.logger.info('Session deleted', { sessionId });
  return c.json({ success: true });
});
 
export default router;

Securing Storage Routes

Add authentication middleware to protect storage endpoints:

typescriptsrc/api/sessions/route.ts
import { createRouter } from '@agentuity/runtime';
import { createMiddleware } from 'hono/factory';
 
const router = createRouter();
 
const requireAuth = createMiddleware(async (c, next) => {
  const apiKey = c.req.header('x-api-key');
 
  if (!apiKey || apiKey !== process.env.STORAGE_API_KEY) {
    c.var.logger.warn('Unauthorized storage access attempt');
    return c.json({ error: 'Unauthorized' }, 401);
  }
 
  await next();
});
 
router.use('/*', requireAuth); 
 
// Routes now require x-api-key header
router.get('/:id', async (c) => {
  // ... same as above
});
 
export default router;

Routes also have access to c.var.vector and c.var.stream for Vector and Stream storage.

Logging with createLogger

Create structured loggers for scripts or external services:

import { createLogger } from '@agentuity/server';
 
// Create a logger with info level, no timestamps, dark color scheme
const logger = createLogger('info', false, 'dark');
 
logger.info('Processing request', { userId: '123' });
logger.error('Failed to connect', { error: err.message });
 
// Child logger with persistent context
const requestLogger = logger.child({ requestId: 'req_abc' });
requestLogger.info('Starting'); // Includes requestId in all logs

Parameters:

ParameterTypeDefaultDescription
level'trace' | 'debug' | 'info' | 'warn' | 'error''info'Minimum log level
showTimestampbooleanfalseInclude ISO timestamps
colorScheme'light' | 'dark''dark'Terminal color scheme
contextRecord<string, unknown>{}Default context for all logs

Safe JSON Serialization

safeStringify handles circular references and BigInt values that break JSON.stringify:

import { safeStringify } from '@agentuity/core';
 
// Circular references become "[Circular]"
const obj = { name: 'Alice' };
obj.self = obj;
safeStringify(obj); // '{"name":"Alice","self":"[Circular]"}'
 
// BigInt converts to string
safeStringify({ id: 9007199254740991n }); // '{"id":"9007199254740991"}'
 
// Pretty-print with indentation
safeStringify(data, 2);

Error Handling Utilities

RichError

Enhanced errors with context and pretty printing:

import { RichError } from '@agentuity/core';
 
const error = new RichError({
  message: 'Request failed',
  statusCode: 500,
  endpoint: '/api/users',
  cause: originalError,
});
 
error.plainArgs;     // { statusCode: 500, endpoint: '/api/users' }
error.cause;         // originalError (the cause chain)
error.prettyPrint(); // Formatted multi-line output with stack traces
error.toJSON();      // Serializable object for logging
error.toString();    // Same as prettyPrint()

StructuredError

Type-safe, discriminated errors with a _tag for pattern matching:

import { StructuredError, isStructuredError } from '@agentuity/core';
 
// Create error types
const NotFoundError = StructuredError('NotFound');
const ValidationError = StructuredError('ValidationError')<{
  field: string;
  code: string;
}>();
 
// With a default message (message is preset, cannot be overridden)
const UpgradeRequired = StructuredError('UpgradeRequired', 'Upgrade required to access this feature');
 
// Throw with context
throw new ValidationError({
  field: 'email',
  code: 'INVALID_FORMAT',
  message: 'Email format is invalid',
});
 
// Type-safe handling
if (isStructuredError(err) && err._tag === 'ValidationError') {
  logger.error('Validation failed', { field: err.field, code: err.code });
}

Schema Conversion

Convert Zod or Agentuity schemas to JSON Schema for tooling or OpenAPI generation:

import { toJSONSchema } from '@agentuity/server';
import { z } from 'zod';
 
const schema = z.object({ name: z.string(), age: z.number() });
const jsonSchema = toJSONSchema(schema);
// { type: 'object', properties: { name: { type: 'string' }, age: { type: 'number' } }, ... }

Key Points

  • Direct SDK is the simplest way to access storage from external backends
  • Required env vars: AGENTUITY_SDK_KEY and AGENTUITY_REGION (from agentuity.json)
  • HTTP routes are an alternative for centralized logic, middleware, or sharing across apps
  • Works with any framework: Next.js, Express, Remix, etc.

See Also