Loading
Build a persistent task queue with the producer/consumer pattern, retry with exponential backoff, dead letter queues, and a status dashboard.
Task queues are essential infrastructure for any backend system that handles work too slow or too unreliable for synchronous request-response cycles. Sending emails, processing images, generating reports, calling external APIs — all of these belong in a queue rather than blocking your API handler.
In this tutorial, you will build a complete task queue system from scratch in Node.js. You will implement the producer/consumer pattern, persist jobs to SQLite so they survive restarts, retry failed jobs with exponential backoff, route permanently failing jobs to a dead letter queue, and build a simple dashboard to monitor queue health.
The goal is not to replace production systems like BullMQ or AWS SQS — it is to deeply understand how they work by building the core mechanics yourself.
Initialize the project with SQLite for persistence.
Define the types that model your queue:
Each job has a status lifecycle: pending → processing → completed (success) or failed (retriable) → dead (permanently failed after max retries).
Build the persistence layer using SQLite. SQLite is a perfect fit for a task queue — it handles concurrent reads, supports transactions, and requires zero infrastructure.
WAL mode (Write-Ahead Logging) allows concurrent readers while a write is in progress, which is critical when workers are claiming jobs while the producer is inserting new ones.
The producer enqueues jobs for workers to process.
The optional delay parameter lets you schedule jobs for future execution — useful for things like "send a follow-up email in 24 hours."
The worker polls for jobs, executes handlers, and manages retries with exponential backoff.
Exponential backoff with jitter is the gold standard for retry strategies. The formula base * 2^attempt + random_jitter spaces retries further apart with each failure (1s, 2s, 4s, 8s...) and the jitter prevents thundering herd problems when many jobs fail simultaneously.
Jobs that exhaust their retries land in the dead letter queue. Build utilities to inspect and replay them.
Allow multiple jobs to process simultaneously with configurable concurrency.
Expose queue metrics and job data through an API.
Create example handlers that simulate real work.
Add support for job priorities so urgent work gets processed first.
Register workers for each priority level. The worker for queue:high gets polled first, then queue:normal, then queue:low. This ensures high-priority jobs are always processed before lower-priority ones.
Wire everything together.
Run with npx tsx src/index.ts. Watch the terminal as jobs are claimed, processed, retried on failure, and eventually completed or moved to the dead letter queue. Hit http://localhost:3000/dashboard/stats/email to see live queue metrics.
You now understand the fundamentals behind every production task queue. The patterns here — polling, claiming, backoff, dead letters, priority — are the same patterns used by Redis-backed queues, SQS, and every other job processing system at scale.
mkdir task-queue && cd task-queue
npm init -y
npm install better-sqlite3 express
npm install -D typescript @types/node @types/better-sqlite3 @types/express tsx
npx tsc --init --target ES2022 --module NodeNext --moduleResolution NodeNext// src/types.ts
export type JobStatus = "pending" | "processing" | "completed" | "failed" | "dead";
export interface Job {
id: string;
queue: string;
payload: Record<string, unknown>;
status: JobStatus;
attempts: number;
maxAttempts: number;
lastError: string | null;
createdAt: string;
updatedAt: string;
scheduledFor: string;
}
export type JobHandler = (payload: Record<string, unknown>) => Promise<void>;// src/database.ts
import Database from "better-sqlite3";
import { Job, JobStatus } from "./types.js";
export class JobDatabase {
private db: Database.Database;
constructor(dbPath = "queue.db") {
this.db = new Database(dbPath);
this.db.pragma("journal_mode = WAL");
this.initialize();
}
private initialize(): void {
this.db.exec(`
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
queue TEXT NOT NULL,
payload TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 3,
last_error TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now')),
scheduled_for TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, scheduled_for);
CREATE INDEX IF NOT EXISTS idx_jobs_queue ON jobs(queue, status);
`);
}
insert(job: Omit<Job, "createdAt" | "updatedAt">): void {
this.db
.prepare(
`
INSERT INTO jobs (id, queue, payload, status, attempts, max_attempts, scheduled_for)
VALUES (?, ?, ?, ?, ?, ?, ?)
`
)
.run(
job.id,
job.queue,
JSON.stringify(job.payload),
job.status,
job.attempts,
job.maxAttempts,
job.scheduledFor
);
}
claimNextJob(queue: string): Job | null {
const row = this.db
.prepare(
`
SELECT * FROM jobs
WHERE queue = ? AND status = 'pending' AND scheduled_for <= datetime('now')
ORDER BY created_at ASC
LIMIT 1
`
)
.get(queue) as Record<string, unknown> | undefined;
if (!row) return null;
this.db
.prepare(
`
UPDATE jobs SET status = 'processing', updated_at = datetime('now')
WHERE id = ?
`
)
.run(row.id);
return this.rowToJob(row);
}
updateStatus(id: string, status: JobStatus, error?: string): void {
this.db
.prepare(
`
UPDATE jobs SET status = ?, last_error = ?, attempts = attempts + 1, updated_at = datetime('now')
WHERE id = ?
`
)
.run(status, error ?? null, id);
}
reschedule(id: string, scheduledFor: string): void {
this.db
.prepare(
`
UPDATE jobs SET status = 'pending', scheduled_for = ?, updated_at = datetime('now')
WHERE id = ?
`
)
.run(scheduledFor, id);
}
getStats(queue: string): Record<JobStatus, number> {
const rows = this.db
.prepare(
`
SELECT status, COUNT(*) as count FROM jobs WHERE queue = ? GROUP BY status
`
)
.all(queue) as Array<{ status: JobStatus; count: number }>;
const stats: Record<string, number> = {
pending: 0,
processing: 0,
completed: 0,
failed: 0,
dead: 0,
};
for (const row of rows) {
stats[row.status] = row.count;
}
return stats as Record<JobStatus, number>;
}
getRecentJobs(queue: string, limit = 20): Job[] {
const rows = this.db
.prepare(
`
SELECT * FROM jobs WHERE queue = ? ORDER BY updated_at DESC LIMIT ?
`
)
.all(queue, limit) as Array<Record<string, unknown>>;
return rows.map((row) => this.rowToJob(row));
}
private rowToJob(row: Record<string, unknown>): Job {
return {
id: row.id as string,
queue: row.queue as string,
payload: JSON.parse(row.payload as string),
status: row.status as JobStatus,
attempts: row.attempts as number,
maxAttempts: row.max_attempts as number,
lastError: row.last_error as string | null,
createdAt: row.created_at as string,
updatedAt: row.updated_at as string,
scheduledFor: row.scheduled_for as string,
};
}
}// src/producer.ts
import { JobDatabase } from "./database.js";
export class Producer {
constructor(private db: JobDatabase) {}
enqueue(
queue: string,
payload: Record<string, unknown>,
options: { maxAttempts?: number; delay?: number } = {}
): string {
const id = crypto.randomUUID();
const scheduledFor = options.delay
? new Date(Date.now() + options.delay).toISOString()
: new Date().toISOString();
this.db.insert({
id,
queue,
payload,
status: "pending",
attempts: 0,
maxAttempts: options.maxAttempts ?? 3,
lastError: null,
scheduledFor,
});
return id;
}
enqueueBatch(queue: string, payloads: Array<Record<string, unknown>>): string[] {
return payloads.map((payload) => this.enqueue(queue, payload));
}
}// src/worker.ts
import { JobDatabase } from "./database.js";
import { JobHandler, Job } from "./types.js";
export class Worker {
private handlers = new Map<string, JobHandler>();
private running = false;
private pollIntervalMs: number;
constructor(
private db: JobDatabase,
pollIntervalMs = 1000
) {
this.pollIntervalMs = pollIntervalMs;
}
register(queue: string, handler: JobHandler): void {
this.handlers.set(queue, handler);
}
async start(): Promise<void> {
this.running = true;
console.log(`Worker started. Polling every ${this.pollIntervalMs}ms.`);
while (this.running) {
let processedAny = false;
for (const [queue, handler] of this.handlers) {
const job = this.db.claimNextJob(queue);
if (job) {
processedAny = true;
await this.processJob(job, handler);
}
}
if (!processedAny) {
await this.sleep(this.pollIntervalMs);
}
}
}
stop(): void {
this.running = false;
}
private async processJob(job: Job, handler: JobHandler): Promise<void> {
console.log(`Processing job ${job.id} (attempt ${job.attempts + 1}/${job.maxAttempts})`);
try {
await handler(job.payload);
this.db.updateStatus(job.id, "completed");
console.log(`Job ${job.id} completed.`);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
const nextAttempt = job.attempts + 1;
if (nextAttempt >= job.maxAttempts) {
this.db.updateStatus(job.id, "dead", errorMessage);
console.error(`Job ${job.id} moved to dead letter queue: ${errorMessage}`);
} else {
const backoffMs = Math.min(
this.calculateBackoff(nextAttempt),
300000 // max 5 minutes
);
const scheduledFor = new Date(Date.now() + backoffMs).toISOString();
this.db.updateStatus(job.id, "failed", errorMessage);
this.db.reschedule(job.id, scheduledFor);
console.warn(`Job ${job.id} failed. Retrying in ${backoffMs}ms.`);
}
}
}
private calculateBackoff(attempt: number): number {
const base = 1000;
const jitter = Math.random() * 1000;
return base * Math.pow(2, attempt) + jitter;
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}// src/dead-letter.ts
import { JobDatabase } from "./database.js";
import { Job } from "./types.js";
export class DeadLetterQueue {
constructor(private db: JobDatabase) {}
getDeadJobs(queue: string): Job[] {
return this.db.getRecentJobs(queue).filter((j) => j.status === "dead");
}
replay(jobId: string): void {
this.db.reschedule(jobId, new Date().toISOString());
console.log(`Replayed dead job ${jobId}`);
}
replayAll(queue: string): number {
const dead = this.getDeadJobs(queue);
for (const job of dead) {
this.replay(job.id);
}
return dead.length;
}
}// src/concurrent-worker.ts
import { JobDatabase } from "./database.js";
import { JobHandler, Job } from "./types.js";
export class ConcurrentWorker {
private handlers = new Map<string, JobHandler>();
private running = false;
private activeJobs = 0;
private concurrency: number;
constructor(
private db: JobDatabase,
concurrency = 5
) {
this.concurrency = concurrency;
}
register(queue: string, handler: JobHandler): void {
this.handlers.set(queue, handler);
}
async start(): Promise<void> {
this.running = true;
while (this.running) {
if (this.activeJobs >= this.concurrency) {
await this.sleep(100);
continue;
}
for (const [queue, handler] of this.handlers) {
if (this.activeJobs >= this.concurrency) break;
const job = this.db.claimNextJob(queue);
if (job) {
this.activeJobs++;
this.processJob(job, handler).finally(() => {
this.activeJobs--;
});
}
}
await this.sleep(200);
}
}
stop(): void {
this.running = false;
}
private async processJob(job: Job, handler: JobHandler): Promise<void> {
try {
await handler(job.payload);
this.db.updateStatus(job.id, "completed");
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
this.db.updateStatus(job.id, "failed", message);
}
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}// src/dashboard.ts
import express from "express";
import { JobDatabase } from "./database.js";
import { DeadLetterQueue } from "./dead-letter.js";
export function createDashboard(db: JobDatabase): express.Router {
const router = express.Router();
const dlq = new DeadLetterQueue(db);
router.get("/stats/:queue", (req, res) => {
const stats = db.getStats(req.params.queue);
res.json(stats);
});
router.get("/jobs/:queue", (req, res) => {
const limit = Number(req.query.limit) || 20;
const jobs = db.getRecentJobs(req.params.queue, limit);
res.json(jobs);
});
router.get("/dead/:queue", (req, res) => {
const dead = dlq.getDeadJobs(req.params.queue);
res.json(dead);
});
router.post("/dead/:jobId/replay", (req, res) => {
dlq.replay(req.params.jobId);
res.json({ replayed: true });
});
router.post("/dead/:queue/replay-all", (req, res) => {
const count = dlq.replayAll(req.params.queue);
res.json({ replayed: count });
});
return router;
}// src/handlers.ts
import { JobHandler } from "./types.js";
export const sendEmail: JobHandler = async (payload) => {
const { to, subject } = payload as { to: string; subject: string };
console.log(`Sending email to ${to}: ${subject}`);
// Simulate network latency
await new Promise((resolve) => setTimeout(resolve, 500));
// Simulate 20% failure rate for testing retries
if (Math.random() < 0.2) {
throw new Error("SMTP connection timeout");
}
};
export const processImage: JobHandler = async (payload) => {
const { url, size } = payload as { url: string; size: string };
console.log(`Processing image ${url} at size ${size}`);
await new Promise((resolve) => setTimeout(resolve, 2000));
};// src/priority.ts
import { JobDatabase } from "./database.js";
export class PriorityProducer {
constructor(private db: JobDatabase) {}
enqueueWithPriority(
queue: string,
payload: Record<string, unknown>,
priority: "high" | "normal" | "low"
): string {
const priorityQueues: Record<string, string> = {
high: `${queue}:high`,
normal: `${queue}:normal`,
low: `${queue}:low`,
};
const id = crypto.randomUUID();
this.db.insert({
id,
queue: priorityQueues[priority],
payload,
status: "pending",
attempts: 0,
maxAttempts: 3,
lastError: null,
scheduledFor: new Date().toISOString(),
});
return id;
}
}// src/index.ts
import express from "express";
import { JobDatabase } from "./database.js";
import { Producer } from "./producer.js";
import { Worker } from "./worker.js";
import { createDashboard } from "./dashboard.js";
import { sendEmail, processImage } from "./handlers.js";
const db = new JobDatabase();
const producer = new Producer(db);
const worker = new Worker(db);
worker.register("email", sendEmail);
worker.register("images", processImage);
// Enqueue some test jobs
for (let i = 0; i < 10; i++) {
producer.enqueue("email", { to: `user${i}@example.com`, subject: `Welcome #${i}` });
}
producer.enqueue("images", { url: "https://example.com/photo.jpg", size: "thumbnail" });
// Start dashboard
const app = express();
app.use("/dashboard", createDashboard(db));
app.listen(3000, () => console.log("Dashboard on port 3000"));
// Start processing
worker.start().catch(console.error);