Background Queues โ
Background queues let agent hooks and long-running work execute outside the main request path โ with retries, persistence, and worker-based consumption. The interface is uniform across all backends.
ts
import {
InMemoryBackgroundQueue,
BullMQBackgroundQueue,
KafkaBackgroundQueue,
RabbitMQBackgroundQueue,
RedisPubSubBackgroundQueue,
SQSBackgroundQueue,
queueHook,
} from 'confused-ai';Quick start โ
ts
import { createAgent } from 'confused-ai';
import { InMemoryBackgroundQueue, queueHook } from 'confused-ai';
// In-memory queue โ no dependencies, good for dev/test
const queue = new InMemoryBackgroundQueue({ concurrency: 5 });
const agent = createAgent({
name: 'analytics-agent',
instructions: 'Help users with their questions.',
model: 'gpt-4o-mini',
apiKey: process.env.OPENAI_API_KEY!,
hooks: {
// Dispatch post-run analytics to the queue without blocking the response
afterRun: queueHook(queue, 'analytics', (result) => ({
steps: result.steps,
tokens: result.usage?.totalTokens,
runId: result.runId,
})),
},
});
// Register the worker handler (same or separate process)
await queue.consume('analytics', async (task) => {
await analyticsService.track('agent.run', task.payload);
});
const result = await agent.run('Help me track my order.');
// The afterRun hook fires the task to the queue and returns immediatelyQueue backends โ
InMemoryBackgroundQueue (development / testing) โ
ts
import { InMemoryBackgroundQueue } from 'confused-ai';
const queue = new InMemoryBackgroundQueue({ concurrency: 3 });BullMQBackgroundQueue โ Redis-backed, durable โ
bash
bun add bullmqts
import { BullMQBackgroundQueue } from 'confused-ai';
const queue = new BullMQBackgroundQueue({
queueName: 'agent-hooks',
connection: { host: 'localhost', port: 6379 },
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 1_000 },
removeOnComplete: 100,
removeOnFail: 500,
},
});
// Worker (same or separate process)
await queue.consume('afterRun', async (task) => {
await saveToDb(task.payload);
}, { concurrency: 5 });KafkaBackgroundQueue โ high-throughput, ordered, replay โ
bash
bun add kafkajsts
import { KafkaBackgroundQueue } from 'confused-ai';
const queue = new KafkaBackgroundQueue({
brokers: ['kafka:9092'],
topic: 'agent-hooks',
clientId: 'my-agent-app',
groupId: 'agent-workers',
});RabbitMQBackgroundQueue โ AMQP, dead-letter exchanges โ
bash
bun add amqplibts
import { RabbitMQBackgroundQueue } from 'confused-ai';
const queue = new RabbitMQBackgroundQueue({
url: process.env.RABBITMQ_URL!,
queue: 'agent-hooks',
exchange: 'agent',
routingKey: 'hook',
});RedisPubSubBackgroundQueue โ lightweight fanout โ
ts
import { RedisPubSubBackgroundQueue } from 'confused-ai';
const queue = new RedisPubSubBackgroundQueue({
redis: process.env.REDIS_URL!,
channel: 'agent-hooks',
});SQSBackgroundQueue โ AWS managed, serverless โ
bash
bun add @aws-sdk/client-sqsts
import { SQSBackgroundQueue } from 'confused-ai';
const queue = new SQSBackgroundQueue({
queueUrl: process.env.SQS_QUEUE_URL!,
region: 'us-east-1',
});BackgroundQueue interface โ
Implement this to add any backend:
ts
interface BackgroundQueue {
readonly name: string;
enqueue<T>(type: string, payload: T, options?: EnqueueOptions): Promise<BackgroundTask<T>>;
consume<T>(
type: string,
handler: (task: BackgroundTask<T>) => Promise<void> | void,
options?: WorkerOptions,
): Promise<void>;
close?(): Promise<void>;
}queueHook โ hook โ queue dispatch โ
queueHook turns any agent lifecycle hook into a fire-and-forget queue dispatch:
ts
import { queueHook } from 'confused-ai';
const hooks = {
afterRun: queueHook(queue, 'run-complete', (result) => ({ text: result.text, tokens: result.usage?.totalTokens })),
afterToolCall: queueHook(queue, 'tool-call-log', (name, result, args) => ({ name, args, result })),
};Enqueue manually โ
ts
// Fire a background task directly (without a hook)
const task = await queue.enqueue('send-report', {
userId: 'user-42',
reportType: 'weekly',
}, {
delay: 5_000, // delay 5 seconds (BullMQ, Kafka support this)
retries: 3,
});Where to go next โ
- Scheduler โ time-based recurring execution.
- Hooks โ lifecycle hooks where queue dispatch originates.
- Production โ circuit breakers and graceful shutdown.