WebSocket & Streaming โ
Agents support three streaming modes out of the box: simple string chunks via agent.stream(), typed events via agent.streamEvents(), and resumable SSE streams via ResumableStreamManager.
ts
import { createAgent } from 'confused-ai';
import { ResumableStreamManager, formatSSE } from 'confused-ai';agent.stream() โ text chunks โ
The simplest streaming mode โ yields string chunks as the model generates:
ts
const agent = createAgent({
name: 'streamer',
instructions: 'Be helpful.',
model: 'gpt-4o-mini',
apiKey: process.env.OPENAI_API_KEY!,
});
for await (const chunk of agent.stream('Explain async/await in TypeScript')) {
process.stdout.write(chunk);
}agent.streamEvents() โ typed events โ
Yields StreamChunk events differentiating text deltas, tool calls, step completions, and the final result:
ts
for await (const event of agent.streamEvents('Research quantum computing')) {
switch (event.type) {
case 'text-delta':
process.stdout.write(event.delta ?? '');
break;
case 'tool-call':
console.log(`Calling tool: ${event.tool?.name}`);
break;
case 'tool-result':
console.log(`Tool result: ${event.tool?.name}`);
break;
case 'step-finish':
console.log(`Step ${event.stepNumber} done`);
break;
case 'run-finish':
console.log(`Completed in ${event.run?.steps} steps`);
break;
case 'error':
console.error('Stream error:', event.error);
break;
}
}StreamChunk fields โ
type | Fields | Description |
|---|---|---|
text-delta | delta | One token or phrase from the model |
tool-call | tool.name, tool.input | Tool called by the agent |
tool-result | tool.name, tool.output | Tool returned its result |
step-finish | stepNumber | Reasoning step completed |
run-finish | run (AgentRunResult) | Run complete โ final result |
error | error | Error during the run |
SSE endpoint (HTTP) โ
Serve streaming responses as Server-Sent Events from any HTTP server:
ts
import Fastify from 'fastify';
import { createAgent, formatSSE, ResumableStreamManager } from 'confused-ai';
const app = Fastify();
const agent = createAgent({ ... });
const streams = new ResumableStreamManager({ maxAgeMs: 5 * 60_000 });
app.get('/stream', async (req, reply) => {
const prompt = req.query.prompt as string;
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
});
const streamId = streams.createStream();
for await (const event of agent.streamEvents(prompt)) {
const chunk = streams.saveChunk(streamId, {
type: event.type === 'text-delta' ? 'text' : 'tool_call',
content: event.delta ?? JSON.stringify(event.tool),
});
if (chunk) {
reply.raw.write(formatSSE(chunk));
}
if (event.type === 'run-finish' || event.type === 'error') break;
}
reply.raw.end();
});Resumable streams (reconnect after disconnect) โ
ResumableStreamManager checkpoints every chunk so clients can reconnect from where they left off:
ts
import { ResumableStreamManager } from 'confused-ai';
const streams = new ResumableStreamManager({
maxAgeMs: 5 * 60_000, // keep streams resumable for 5 minutes
maxStreams: 1000,
});
// Create a stream
const streamId = streams.createStream();
// Save each chunk as it arrives
streams.saveChunk(streamId, { type: 'text', content: 'Hello' });
streams.saveChunk(streamId, { type: 'text', content: ' world!' });
// Client reconnects โ resume from position
const checkpoint = streams.getCheckpoint(streamId);
const missedChunks = streams.getChunksSince(streamId, checkpoint.position);Reconnect endpoint โ
ts
app.get('/stream/:id/resume', async (req, reply) => {
const { id } = req.params as { id: string };
const position = Number(req.query.position ?? 0);
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
});
// Send missed chunks
for (const chunk of streams.getChunksSince(id, position)) {
reply.raw.write(formatSSE(chunk));
}
reply.raw.end();
});WebSocket server โ
Use streamEvents inside a WebSocket handler to push events directly:
ts
import { WebSocketServer } from 'ws';
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', (ws) => {
ws.on('message', async (data) => {
const { prompt } = JSON.parse(data.toString());
for await (const event of agent.streamEvents(prompt)) {
if (ws.readyState !== ws.OPEN) break;
ws.send(JSON.stringify(event));
}
});
});Where to go next โ
- Observability โ trace and monitor streaming runs.
- Production โ graceful shutdown, rate limiting.
- Voice โ voice streaming via
VoiceStreamSession.