Skip to content

Background Queues โ€‹

Background queues let agent hooks and long-running work execute outside the main request path โ€” with retries, persistence, and worker-based consumption. The interface is uniform across all backends.

ts
import {
  InMemoryBackgroundQueue,
  BullMQBackgroundQueue,
  KafkaBackgroundQueue,
  RabbitMQBackgroundQueue,
  RedisPubSubBackgroundQueue,
  SQSBackgroundQueue,
  queueHook,
} from 'confused-ai';

Quick start โ€‹

ts
import { createAgent } from 'confused-ai';
import { InMemoryBackgroundQueue, queueHook } from 'confused-ai';

// In-memory queue โ€” no dependencies, good for dev/test
const queue = new InMemoryBackgroundQueue({ concurrency: 5 });

const agent = createAgent({
  name: 'analytics-agent',
  instructions: 'Help users with their questions.',
  model: 'gpt-4o-mini',
  apiKey: process.env.OPENAI_API_KEY!,
  hooks: {
    // Dispatch post-run analytics to the queue without blocking the response
    afterRun: queueHook(queue, 'analytics', (result) => ({
      steps:  result.steps,
      tokens: result.usage?.totalTokens,
      runId:  result.runId,
    })),
  },
});

// Register the worker handler (same or separate process)
await queue.consume('analytics', async (task) => {
  await analyticsService.track('agent.run', task.payload);
});

const result = await agent.run('Help me track my order.');
// The afterRun hook fires the task to the queue and returns immediately

Queue backends โ€‹

InMemoryBackgroundQueue (development / testing) โ€‹

ts
import { InMemoryBackgroundQueue } from 'confused-ai';

const queue = new InMemoryBackgroundQueue({ concurrency: 3 });

BullMQBackgroundQueue โ€” Redis-backed, durable โ€‹

bash
bun add bullmq
ts
import { BullMQBackgroundQueue } from 'confused-ai';

const queue = new BullMQBackgroundQueue({
  queueName: 'agent-hooks',
  connection: { host: 'localhost', port: 6379 },
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'exponential', delay: 1_000 },
    removeOnComplete: 100,
    removeOnFail: 500,
  },
});

// Worker (same or separate process)
await queue.consume('afterRun', async (task) => {
  await saveToDb(task.payload);
}, { concurrency: 5 });

KafkaBackgroundQueue โ€” high-throughput, ordered, replay โ€‹

bash
bun add kafkajs
ts
import { KafkaBackgroundQueue } from 'confused-ai';

const queue = new KafkaBackgroundQueue({
  brokers: ['kafka:9092'],
  topic: 'agent-hooks',
  clientId: 'my-agent-app',
  groupId: 'agent-workers',
});

RabbitMQBackgroundQueue โ€” AMQP, dead-letter exchanges โ€‹

bash
bun add amqplib
ts
import { RabbitMQBackgroundQueue } from 'confused-ai';

const queue = new RabbitMQBackgroundQueue({
  url: process.env.RABBITMQ_URL!,
  queue: 'agent-hooks',
  exchange: 'agent',
  routingKey: 'hook',
});

RedisPubSubBackgroundQueue โ€” lightweight fanout โ€‹

ts
import { RedisPubSubBackgroundQueue } from 'confused-ai';

const queue = new RedisPubSubBackgroundQueue({
  redis: process.env.REDIS_URL!,
  channel: 'agent-hooks',
});

SQSBackgroundQueue โ€” AWS managed, serverless โ€‹

bash
bun add @aws-sdk/client-sqs
ts
import { SQSBackgroundQueue } from 'confused-ai';

const queue = new SQSBackgroundQueue({
  queueUrl: process.env.SQS_QUEUE_URL!,
  region: 'us-east-1',
});

BackgroundQueue interface โ€‹

Implement this to add any backend:

ts
interface BackgroundQueue {
  readonly name: string;

  enqueue<T>(type: string, payload: T, options?: EnqueueOptions): Promise<BackgroundTask<T>>;

  consume<T>(
    type: string,
    handler: (task: BackgroundTask<T>) => Promise<void> | void,
    options?: WorkerOptions,
  ): Promise<void>;

  close?(): Promise<void>;
}

queueHook โ€” hook โ†’ queue dispatch โ€‹

queueHook turns any agent lifecycle hook into a fire-and-forget queue dispatch:

ts
import { queueHook } from 'confused-ai';

const hooks = {
  afterRun:     queueHook(queue, 'run-complete',   (result) => ({ text: result.text, tokens: result.usage?.totalTokens })),
  afterToolCall: queueHook(queue, 'tool-call-log', (name, result, args) => ({ name, args, result })),
};

Enqueue manually โ€‹

ts
// Fire a background task directly (without a hook)
const task = await queue.enqueue('send-report', {
  userId: 'user-42',
  reportType: 'weekly',
}, {
  delay: 5_000,     // delay 5 seconds (BullMQ, Kafka support this)
  retries: 3,
});

Where to go next โ€‹

  • Scheduler โ€” time-based recurring execution.
  • Hooks โ€” lifecycle hooks where queue dispatch originates.
  • Production โ€” circuit breakers and graceful shutdown.

Released under the MIT License.