The Worker Loop
What a worker does
A worker is a long-running process that repeatedly checks the queue for pending jobs, processes them, and marks them complete. It is the consumer side of the producer-consumer pattern established in the previous lesson.
The simplest worker
// src/worker.ts
import { dequeue, completeJob, failJob } from "./queue.js";
import { processJob } from "./job-handlers.js";
const WORKER_ID = `worker-${crypto.randomUUID().slice(0, 8)}`;
const POLL_INTERVAL_MS = 1000;
async function run(): Promise<void> {
console.log(`[${WORKER_ID}] Worker started. Polling every ${POLL_INTERVAL_MS}ms...`);
while (true) {
const job = dequeue(WORKER_ID);
if (!job) {
// No pending jobs — wait before polling again
await new Promise((r) => setTimeout(r, POLL_INTERVAL_MS));
continue;
}
console.log(`[${WORKER_ID}] Processing job ${job.id} (${job.type})`);
try {
await processJob(job);
completeJob(job.id);
console.log(`[${WORKER_ID}] Completed job ${job.id}`);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
failJob(job.id, message);
console.error(`[${WORKER_ID}] Failed job ${job.id}: ${message}`);
}
}
}
run(); The worker loops forever: dequeue a job, process it, mark it complete or failed. If no jobs are pending, sleep for 1 second and check again.
The poll interval tradeoff
Short interval (100ms): Jobs are picked up quickly. But the worker queries the database 10 times per second, even when the queue is empty. With SQLite’s WAL mode this is fine for low-to-moderate volume, but wasteful at scale.
Long interval (10s): Less database load. But a job sits in the queue for up to 10 seconds before a worker picks it up. For time-sensitive jobs (password reset emails), this is too slow.
1 second is a good default. Most background jobs are not time-critical (the user is not waiting), and 1 second is imperceptible for notifications and reports.
Job handlers
The processJob function dispatches to the right handler based on the job type:
// src/job-handlers.ts
import { sendEmail } from "./services/email.js";
import { generatePDF } from "./services/pdf.js";
import { syncInventory } from "./services/inventory.js";
export interface Job {
id: string;
type: string;
payload: string;
status: string;
attempts: number;
max_attempts: number;
}
type JobHandler = (payload: any) => Promise<void>;
const handlers: Record<string, JobHandler> = {
send_email: async (payload) => {
await sendEmail(payload.to, payload.subject, payload.body);
},
generate_invoice: async (payload) => {
await generatePDF(payload.orderId);
},
sync_inventory: async (payload) => {
await syncInventory(payload.productId, payload.quantity);
},
};
export async function processJob(job: Job): Promise<void> {
const handler = handlers[job.type];
if (!handler) {
throw new Error(`Unknown job type: ${job.type}`);
}
const payload = JSON.parse(job.payload);
await handler(payload);
} Each job type maps to a function. The worker does not know about emails or PDFs — it just looks up the handler and calls it. Adding a new job type means adding one entry to the handlers object.
Running the worker
# Terminal 1: HTTP server
npm run dev
# Terminal 2: Worker
npm run worker Now enqueue a job from the server and watch the worker process it:
// In a route handler
route.post("/orders", {
resolve: async (c) => {
const order = createOrder(c.input.body);
enqueue("send_email", {
to: c.input.body.email,
subject: "Order Confirmed",
body: `Your order ${order.id} has been placed.`,
});
enqueue("generate_invoice", { orderId: order.id });
return Response.json(order, { status: 201 });
},
}); The server responds immediately after enqueueing. A second later, the worker picks up the email job, processes it, then picks up the invoice job.
Graceful shutdown for the worker
The worker needs graceful shutdown too — just like the HTTP server from the Error Handling course’s Graceful Shutdown lesson:
let running = true;
function shutdown(signal: string): void {
console.log(`\n[${WORKER_ID}] ${signal} received. Finishing current job...`);
running = false;
}
process.on("SIGTERM", () => shutdown("SIGTERM"));
process.on("SIGINT", () => shutdown("SIGINT"));
async function run(): Promise<void> {
while (running) {
const job = dequeue(WORKER_ID);
if (!job) {
await new Promise((r) => setTimeout(r, POLL_INTERVAL_MS));
continue;
}
// Process job (same as before)
try {
await processJob(job);
completeJob(job.id);
} catch (err) {
failJob(job.id, err instanceof Error ? err.message : String(err));
}
}
console.log(`[${WORKER_ID}] Worker stopped.`);
process.exit(0);
} When SIGTERM arrives, running is set to false. The current job finishes processing. Then the loop exits and the process shuts down cleanly. No job is left half-processed.
Exercises
Exercise 1: Implement the worker loop. Enqueue 3 jobs from the server. Watch the worker process them in order.
Exercise 2: Enqueue a job that fails (use the simulated email service’s 10% failure rate). Verify the worker logs the error and the job’s attempt count increases.
Exercise 3: Add graceful shutdown. Start the worker. Start a long-running job (add a 5-second delay). Press Ctrl+C. Verify the job completes before the worker exits.
Why does the worker use polling instead of being notified when a new job is available?