- Published on
AI Background Processing — Async LLM Jobs, Queues, and Webhook Callbacks
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
LLM requests take 2-30 seconds. Synchronous API calls kill user experience. This guide covers production patterns for async LLM processing with job queues, idempotency, and status tracking.
- Why LLM Calls Must Be Async
- BullMQ Job Queue Setup
- Job Status Polling vs Webhooks
- Idempotency for AI Jobs
- Retry Strategy for Rate Limits
- Streaming Progress Updates via SSE
- Storing and Versioning AI Outputs
- Checklist
- Conclusion
Why LLM Calls Must Be Async
LLM latency makes synchronous requests impractical for user-facing features.
import Bull from 'bull';
import Redis from 'ioredis';
interface LLMJob {
id: string;
input: string;
model: string;
userId: string;
createdAt: Date;
startedAt?: Date;
completedAt?: Date;
status: 'pending' | 'processing' | 'completed' | 'failed';
result?: string;
error?: string;
}
class AsyncLLMProcessor {
private queue: Bull.Queue<LLMJob>;
private redis: Redis.Redis;
private jobs = new Map<string, LLMJob>();
constructor(redisUrl: string) {
this.redis = new Redis(redisUrl);
this.queue = new Bull('llm-processing', redisUrl);
this.queue.process(5, async (job) => {
const data = job.data as LLMJob;
return this.processLLMJob(data);
});
}
async submitJob(input: string, model: string, userId: string): Promise<string> {
const jobId = `job_${Date.now()}_${Math.random()}`;
const job: LLMJob = {
id: jobId,
input,
model,
userId,
createdAt: new Date(),
status: 'pending',
};
this.jobs.set(jobId, job);
// Add to queue without waiting for result
await this.queue.add(job, { jobId, attempts: 3, backoff: { type: 'exponential', delay: 2000 } });
// Return immediately
return jobId;
}
private async processLLMJob(job: LLMJob): Promise<string> {
const stored = this.jobs.get(job.id);
if (!stored) {
stored = job;
this.jobs.set(job.id, job);
}
stored.status = 'processing';
stored.startedAt = new Date();
try {
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
},
body: JSON.stringify({
model: job.model,
messages: [{ role: 'user', content: job.input }],
temperature: 0.7,
}),
});
const data = (await response.json()) as { choices: Array<{ message: { content: string } }> };
const result = data.choices[0].message.content;
stored.result = result;
stored.status = 'completed';
stored.completedAt = new Date();
await this.redis.setex(
`job:${job.id}`,
86400 * 7, // 7 days retention
JSON.stringify(stored)
);
return result;
} catch (error) {
stored.error = error instanceof Error ? error.message : String(error);
stored.status = 'failed';
throw error;
}
}
async getJobStatus(jobId: string): Promise<LLMJob | null> {
return this.jobs.get(jobId) || null;
}
async waitForCompletion(jobId: string, timeoutMs: number = 60000): Promise<string> {
const startTime = Date.now();
while (Date.now() - startTime < timeoutMs) {
const job = this.jobs.get(jobId);
if (job?.status === 'completed') {
return job.result || '';
}
if (job?.status === 'failed') {
throw new Error(job.error || 'Job failed');
}
await new Promise((resolve) => setTimeout(resolve, 500));
}
throw new Error(`Job ${jobId} timed out`);
}
}
const processor = new AsyncLLMProcessor(process.env.REDIS_URL!);
const jobId = await processor.submitJob('Summarize this text...', 'gpt-4', 'user123');
console.log(`Job submitted: ${jobId}`);
const status = await processor.getJobStatus(jobId);
console.log(`Current status: ${status?.status}`);
BullMQ Job Queue Setup
Use BullMQ for reliable job processing with retries and scheduling.
import { Queue, Worker, QueueScheduler } from 'bullmq';
import IORedis from 'ioredis';
interface AIProcessingJob {
type: 'summarize' | 'classify' | 'extract';
content: string;
options?: Record<string, unknown>;
userId: string;
}
class BullMQProcessor {
private queue: Queue<AIProcessingJob>;
private worker: Worker<AIProcessingJob>;
private scheduler: QueueScheduler;
constructor(redisUrl: string) {
const connection = new IORedis(redisUrl);
this.queue = new Queue<AIProcessingJob>('ai-processing', { connection });
this.worker = new Worker<AIProcessingJob>(
'ai-processing',
async (job) => {
return this.handleJob(job);
},
{ connection, concurrency: 5 }
);
this.scheduler = new QueueScheduler('ai-processing', { connection });
this.worker.on('completed', (job) => {
console.log(`Job ${job.id} completed`);
});
this.worker.on('failed', (job, err) => {
console.log(`Job ${job?.id} failed: ${err.message}`);
});
}
async addJob(
job: AIProcessingJob,
options?: { delay?: number; priority?: number; attempts?: number }
): Promise<string> {
const bullJob = await this.queue.add(job.type, job, {
attempts: options?.attempts || 3,
backoff: { type: 'exponential', delay: 2000 },
priority: options?.priority || 0,
delay: options?.delay,
removeOnComplete: { age: 3600 }, // Remove after 1 hour
});
return bullJob.id!;
}
async getJobStatus(jobId: string): Promise<string> {
const job = await this.queue.getJob(jobId);
if (!job) return 'not_found';
const state = await job.getState();
return state;
}
async getJobResult(jobId: string): Promise<unknown> {
const job = await this.queue.getJob(jobId);
if (!job) throw new Error(`Job ${jobId} not found`);
return job.returnvalue;
}
private async handleJob(job: any): Promise<unknown> {
const data = job.data as AIProcessingJob;
switch (data.type) {
case 'summarize':
return this.summarize(data.content);
case 'classify':
return this.classify(data.content);
case 'extract':
return this.extract(data.content);
default:
throw new Error(`Unknown job type: ${data.type}`);
}
}
private async summarize(content: string): Promise<string> {
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
},
body: JSON.stringify({
model: 'gpt-3.5-turbo',
messages: [{ role: 'user', content: `Summarize: ${content}` }],
}),
});
const result = (await response.json()) as { choices: Array<{ message: { content: string } }> };
return result.choices[0].message.content;
}
private async classify(content: string): Promise<string> {
return `Classification of: ${content.slice(0, 50)}`;
}
private async extract(content: string): Promise<unknown> {
return { extracted: content.slice(0, 100) };
}
async close(): Promise<void> {
await this.worker.close();
await this.scheduler.close();
await this.queue.close();
}
}
const processor = new BullMQProcessor(process.env.REDIS_URL!);
const jobId = await processor.addJob({ type: 'summarize', content: 'Long text...', userId: 'user1' });
const status = await processor.getJobStatus(jobId);
console.log(`Job status: ${status}`);
Job Status Polling vs Webhooks
Implement both polling and webhook delivery for job completion.
class JobStatusTracker {
private completionCallbacks = new Map<string, { url: string; retries: number }>();
private pollingClients = new Map<string, { resolve: Function; reject: Function; timeoutId: NodeJS.Timeout }>();
registerWebhook(jobId: string, webhookUrl: string): void {
this.completionCallbacks.set(jobId, { url: webhookUrl, retries: 3 });
}
setupPolling(jobId: string): Promise<unknown> {
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
this.pollingClients.delete(jobId);
reject(new Error(`Polling timeout for ${jobId}`));
}, 300000); // 5 minute timeout
this.pollingClients.set(jobId, { resolve, reject, timeoutId });
});
}
async notifyCompletion(jobId: string, result: unknown): Promise<void> {
// Notify polling clients
const polling = this.pollingClients.get(jobId);
if (polling) {
clearTimeout(polling.timeoutId);
polling.resolve(result);
this.pollingClients.delete(jobId);
}
// Send webhook
const webhook = this.completionCallbacks.get(jobId);
if (webhook) {
await this.sendWebhook(webhook.url, result, jobId, webhook.retries);
}
}
private async sendWebhook(
url: string,
data: unknown,
jobId: string,
retriesLeft: number
): Promise<void> {
try {
const response = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ jobId, data, timestamp: new Date().toISOString() }),
});
if (!response.ok) {
throw new Error(`Webhook returned ${response.status}`);
}
this.completionCallbacks.delete(jobId);
} catch (error) {
if (retriesLeft > 0) {
console.log(`Webhook failed, retrying... (${retriesLeft} left)`);
const delay = 5000 * (3 - retriesLeft);
await new Promise((resolve) => setTimeout(resolve, delay));
await this.sendWebhook(url, data, jobId, retriesLeft - 1);
} else {
console.error(`Webhook failed permanently for ${jobId}`);
}
}
}
}
const tracker = new JobStatusTracker();
tracker.registerWebhook('job_123', 'https://example.com/webhook');
await tracker.notifyCompletion('job_123', { result: 'Success' });
Idempotency for AI Jobs
Ensure same input produces same job ID to prevent duplicate processing.
class IdempotentJobManager {
private jobIdCache = new Map<string, string>(); // Hash -> Job ID
private processedJobs = new Map<string, unknown>(); // Job ID -> Result
private hashInput(input: string, userId: string, model: string): string {
const crypto = require('crypto');
return crypto.createHash('sha256').update(`${input}${userId}${model}`).digest('hex');
}
async submitIdempotent(
input: string,
userId: string,
model: string,
submitter: (jobId: string) => Promise<void>
): Promise<string> {
const hash = this.hashInput(input, userId, model);
// Check if we've already submitted this
if (this.jobIdCache.has(hash)) {
console.log('Duplicate request detected, returning existing job ID');
return this.jobIdCache.get(hash)!;
}
// Check if it's already processed
const existingJob = Array.from(this.jobIdCache.entries()).find(
([h, _]) => h === hash
);
if (existingJob) {
return existingJob[1];
}
// New job
const jobId = `job_${Date.now()}_${Math.random()}`;
this.jobIdCache.set(hash, jobId);
await submitter(jobId);
return jobId;
}
recordCompletion(jobId: string, result: unknown): void {
this.processedJobs.set(jobId, result);
}
getResult(jobId: string): unknown | undefined {
return this.processedJobs.get(jobId);
}
isAlreadyProcessed(jobId: string): boolean {
return this.processedJobs.has(jobId);
}
}
const idempotenceManager = new IdempotentJobManager();
const jobId1 = await idempotenceManager.submitIdempotent(
'Summarize this document',
'user1',
'gpt-4',
async (id) => console.log(`Processing job ${id}`)
);
const jobId2 = await idempotenceManager.submitIdempotent(
'Summarize this document',
'user1',
'gpt-4',
async (id) => console.log(`Processing job ${id}`)
);
console.log(`Same job: ${jobId1 === jobId2}`); // true
Retry Strategy for Rate Limits
Handle 429 responses with exponential backoff and jitter.
class RateLimitHandler {
async executeWithRateLimit<T>(
fn: () => Promise<T>,
maxRetries: number = 5
): Promise<T> {
let retryCount = 0;
while (true) {
try {
return await fn();
} catch (error) {
const statusCode = (error as any).statusCode || (error as any).status;
if (statusCode === 429) {
if (retryCount >= maxRetries) {
throw new Error(`Rate limited after ${maxRetries} retries`);
}
const delayMs = this.calculateBackoff(retryCount);
console.log(`Rate limited (429), retrying in ${delayMs}ms...`);
await new Promise((resolve) => setTimeout(resolve, delayMs));
retryCount++;
} else {
throw error;
}
}
}
}
private calculateBackoff(attempt: number): number {
// Exponential backoff: 2^attempt * 1000ms + random jitter
const exponential = Math.pow(2, attempt) * 1000;
const jitter = Math.random() * 1000;
const maxDelay = 60000; // Cap at 60 seconds
return Math.min(exponential + jitter, maxDelay);
}
async batchWithRateLimit<T>(
items: T[],
processor: (item: T) => Promise<void>,
batchSize: number = 5,
delayBetweenBatches: number = 5000
): Promise<void> {
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
await Promise.all(batch.map((item) => this.executeWithRateLimit(() => processor(item))));
if (i + batchSize < items.length) {
console.log(`Batch processed, waiting ${delayBetweenBatches}ms before next batch...`);
await new Promise((resolve) => setTimeout(resolve, delayBetweenBatches));
}
}
}
}
const rateLimiter = new RateLimitHandler();
await rateLimiter.executeWithRateLimit(async () => {
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
body: JSON.stringify({}),
});
if (response.status === 429) {
const error = new Error('Rate limited');
(error as any).status = 429;
throw error;
}
return response.json();
});
Streaming Progress Updates via SSE
Send real-time progress updates to clients using Server-Sent Events.
import { EventEmitter } from 'events';
class ProgressEmitter extends EventEmitter {
emit(eventName: string | symbol, ...args: unknown[]): boolean {
return super.emit(eventName, ...args);
}
recordProgress(jobId: string, message: string, progress: number): void {
this.emit('progress', { jobId, message, progress, timestamp: new Date() });
}
recordCompletion(jobId: string, result: unknown): void {
this.emit('completed', { jobId, result, timestamp: new Date() });
}
recordError(jobId: string, error: string): void {
this.emit('error', { jobId, error, timestamp: new Date() });
}
}
class SSEServer {
private emitter: ProgressEmitter;
private clients = new Map<string, { res: any; jobId: string }>();
constructor() {
this.emitter = new ProgressEmitter();
this.emitter.on('progress', (data) => {
this.broadcast('progress', data);
});
this.emitter.on('completed', (data) => {
this.broadcast('completed', data);
});
this.emitter.on('error', (data) => {
this.broadcast('error', data);
});
}
createSSEEndpoint(jobId: string, res: any): void {
const clientId = `client_${Date.now()}_${Math.random()}`;
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
this.clients.set(clientId, { res, jobId });
res.write(`: Connected to job ${jobId}\n\n`);
res.on('close', () => {
this.clients.delete(clientId);
});
}
private broadcast(eventType: string, data: unknown): void {
for (const [, client] of this.clients) {
try {
client.res.write(`event: ${eventType}\n`);
client.res.write(`data: ${JSON.stringify(data)}\n\n`);
} catch (error) {
// Client disconnected
}
}
}
getEmitter(): ProgressEmitter {
return this.emitter;
}
}
const sseServer = new SSEServer();
const emitter = sseServer.getEmitter();
// Simulate job progress
emitter.recordProgress('job_123', 'Starting processing...', 0);
await new Promise((resolve) => setTimeout(resolve, 1000));
emitter.recordProgress('job_123', 'Sending to LLM...', 30);
await new Promise((resolve) => setTimeout(resolve, 2000));
emitter.recordProgress('job_123', 'Processing response...', 80);
await new Promise((resolve) => setTimeout(resolve, 500));
emitter.recordCompletion('job_123', { result: 'Done' });
Storing and Versioning AI Outputs
Persist job outputs with versioning and audit trails.
interface StoredOutput {
jobId: string;
version: number;
input: string;
output: string;
model: string;
tokens: { input: number; output: number };
userId: string;
createdAt: Date;
expiresAt: Date;
metadata?: Record<string, unknown>;
}
class OutputStorage {
private outputs = new Map<string, StoredOutput[]>();
private readonly defaultRetentionDays = 90;
store(jobId: string, input: string, output: string, model: string, userId: string): void {
const stored: StoredOutput = {
jobId,
version: 1,
input,
output,
model,
tokens: { input: Math.ceil(input.length / 4), output: Math.ceil(output.length / 4) },
userId,
createdAt: new Date(),
expiresAt: new Date(Date.now() + this.defaultRetentionDays * 86400000),
};
const existing = this.outputs.get(jobId) || [];
stored.version = existing.length + 1;
existing.push(stored);
this.outputs.set(jobId, existing);
}
getLatest(jobId: string): StoredOutput | undefined {
const all = this.outputs.get(jobId);
return all?.[all.length - 1];
}
getVersion(jobId: string, version: number): StoredOutput | undefined {
const all = this.outputs.get(jobId);
return all?.find((o) => o.version === version);
}
getHistory(jobId: string): StoredOutput[] {
return this.outputs.get(jobId) || [];
}
pruneExpired(): number {
let removed = 0;
for (const [jobId, outputs] of this.outputs.entries()) {
const filtered = outputs.filter((o) => o.expiresAt > new Date());
if (filtered.length === 0) {
this.outputs.delete(jobId);
removed++;
} else {
this.outputs.set(jobId, filtered);
}
}
return removed;
}
}
const storage = new OutputStorage();
storage.store('job_123', 'Summarize this...', 'Summary result', 'gpt-4', 'user1');
const latest = storage.getLatest('job_123');
console.log('Latest output:', latest?.output);
const history = storage.getHistory('job_123');
console.log('Output history:', history.length, 'versions');
Checklist
- Submit LLM requests to background queues immediately
- Use BullMQ or similar for reliable job processing
- Implement idempotency using input hashing
- Support both webhook callbacks and polling
- Handle 429 rate limits with exponential backoff
- Stream progress updates via Server-Sent Events
- Persist all outputs with retention policies
- Version outputs for audit trails
- Retry failed jobs up to 3 times
- Monitor queue depth and job latency
- Prune expired outputs regularly
- Log all job state transitions for debugging
Conclusion
Async job processing is essential for LLM workloads. Use BullMQ with exponential backoff for reliable processing, idempotency hashing to prevent duplicates, webhooks or SSE for status updates, and persistent storage with versioning. This layered approach provides reliability, observability, and scalability for background AI tasks at any scale.