Capstone: Order Processing Pipeline
What we built
A complete background job system for an e-commerce API:
| Component | What it does | Lesson |
|---|---|---|
| Jobs table | Stores pending, processing, completed, failed jobs | Database-Backed Queues |
| enqueue / dequeue | Produce and consume jobs atomically | Database-Backed Queues |
| Worker loop | Poll for jobs, process, mark complete | The Worker Loop |
| Zod schemas | Validate job payloads at enqueue and dequeue time | Job Serialization |
| Retries + backoff | Retry failed jobs with increasing delays | Retries and Backoff |
| Dead letter queue | Store permanently failed jobs for investigation | Dead Letter Queues |
| Idempotency | Safe-to-repeat handlers with tracking tables | Idempotent Jobs |
| Delayed jobs | scheduled_at for future execution | Delayed Jobs |
| Cron scheduling | Recurring jobs via cron_schedules table | Recurring Jobs |
| Multiple workers | Atomic claiming prevents duplicate processing | Multiple Workers |
| Priorities | Urgent jobs first, fair FIFO within priority | Job Priorities |
| Rate limiting | Control processing speed per service | Rate Limiting Jobs |
| Job chaining | Multi-step workflows with fan-out | Job Chaining |
| Monitoring | Queue depth, failure rate, DLQ health | Monitoring |
The order processing pipeline
POST /orders
│
├─ Save order to database (synchronous — in request)
├─ Charge payment (synchronous — user needs the result)
│
└─ Background jobs (enqueued, processed by worker):
│
├─ send_order_confirmation (HIGH priority)
│ ├─ generate_invoice
│ └─ sync_inventory
│
├─ send_followup_email (delayed 24 hours)
│
└─ (cron) daily_sales_report (midnight)
(cron) cleanup_completed_jobs (3 AM) The complete route handler
route.post("/orders", {
request: { body: CreateOrderBody },
resolve: async (c) => {
if (!c.input.ok) {
throw new ValidationError(
c.input.issues.map((i) => ({ field: i.path.join("."), message: i.message })),
);
}
// Synchronous: save and charge
const order = createOrder(c.input.body);
await chargeCard(order.total, c.input.body.paymentToken);
updateOrderStatus(order.id, "paid");
// Background: everything else
enqueue(
"send_order_confirmation",
{
orderId: order.id,
userId: c.input.body.userId,
},
{ priority: PRIORITY.HIGH },
);
enqueueDelayed(
"send_followup_email",
{
orderId: order.id,
userId: c.input.body.userId,
},
24 * 60 * 60 * 1000,
); // 24 hours
return Response.json(order, { status: 201 });
},
}); The user waits for the order and payment (2 seconds). Everything else happens in the background. The confirmation email arrives in a few seconds. The follow-up email arrives in 24 hours. Reports run at midnight.
The complete worker
// src/worker.ts
import { dequeue, completeJob, failJob, releaseStaleJobs } from "./queue.js";
import { processJob } from "./job-handlers.js";
import { runDueCronJobs } from "./cron-scheduler.js";
import { JobRateLimiter } from "./rate-limiter.js";
const WORKER_ID = `worker-${process.pid}-${crypto.randomUUID().slice(0, 8)}`;
const POLL_INTERVAL_MS = 1000;
let running = true;
const rateLimiter = new JobRateLimiter();
const RATE_LIMITS: Record<string, number> = {
send_email: 120,
send_webhook: 30,
};
function shutdown(signal: string): void {
console.log(`[${WORKER_ID}] ${signal} received. Finishing current job...`);
running = false;
}
process.on("SIGTERM", () => shutdown("SIGTERM"));
process.on("SIGINT", () => shutdown("SIGINT"));
async function run(): Promise<void> {
console.log(`[${WORKER_ID}] Worker started.`);
// Release stale locks from crashed workers
const released = releaseStaleJobs(10);
if (released > 0) console.log(`[${WORKER_ID}] Released ${released} stale jobs.`);
let lastCronCheck = 0;
while (running) {
// Cron check every 60 seconds
if (Date.now() - lastCronCheck > 60_000) {
const count = runDueCronJobs(WORKER_ID);
if (count > 0) console.log(`[CRON] Enqueued ${count} scheduled jobs`);
lastCronCheck = Date.now();
}
const job = dequeue(WORKER_ID);
if (!job) {
await new Promise((r) => setTimeout(r, POLL_INTERVAL_MS));
continue;
}
// Rate limit check
const limit = RATE_LIMITS[job.type];
if (limit && !rateLimiter.canProcess(job.type, limit)) {
db.prepare(
`
UPDATE jobs SET status = 'pending', locked_by = NULL, locked_at = NULL,
scheduled_at = datetime('now', '+5 seconds') WHERE id = ?
`,
).run(job.id);
continue;
}
try {
await processJob(job);
completeJob(job.id);
if (limit) rateLimiter.record(job.type);
} catch (err) {
failJob(job.id, err instanceof Error ? err.message : String(err));
}
}
console.log(`[${WORKER_ID}] Worker stopped.`);
process.exit(0);
}
run(); Project structure
src/
app.ts # Hectoday HTTP setup, routes
server.ts # HTTP server
worker.ts # Job processing loop
db.ts # Database schema (jobs, dead_letter_jobs, cron_schedules)
queue.ts # enqueue, dequeue, completeJob, failJob, releaseStaleJobs
job-handlers.ts # Handler registry, processJob
job-schemas.ts # Zod schemas for job payloads
priorities.ts # PRIORITY constants
cron.ts # computeNextRun
cron-scheduler.ts # runDueCronJobs, registerCron
rate-limiter.ts # JobRateLimiter
services/
email.ts # Email service
pdf.ts # PDF generation
inventory.ts # Inventory sync
routes/
orders.ts # Order creation, enqueueing jobs
admin.ts # Queue stats, DLQ management The reliability stack
Job enqueued
│
├─ Worker claims atomically (no duplicates)
│ ├─ Rate limiter checks throughput
│ ├─ Zod validates payload
│ ├─ Handler processes (idempotently)
│ ├─ Success → completeJob
│ └─ Failure → failJob
│ ├─ Retries remaining → pending (delayed)
│ └─ Max retries → dead letter queue
│
├─ Stale lock cleanup (crashed workers)
├─ Cron scheduler (recurring jobs)
├─ Monitoring (queue depth, failure rate)
└─ Cleanup (purge old completed jobs) Challenges
Challenge 1: Add batch processing. Some job types process better in batches. Instead of dequeuing one email job at a time, dequeue 10 and send them in one API call. Implement a batch dequeue function.
Challenge 2: Add job dependencies. Job B should only run after Job A completes. Add a depends_on column. The worker skips Job B until Job A’s status is completed.
Challenge 3: Add a web dashboard. Build an HTML page showing real-time queue stats, recent failures, DLQ items, and cron schedule status. Auto-refresh every 5 seconds.
Challenge 4: Add webhook delivery. When an order is completed, deliver a webhook to registered URLs. Implement with retries, exponential backoff, and signature verification (from the Securing Your API course).
What happens to a job if the worker crashes after processing it but before calling completeJob?
Why does the order processing pipeline charge the payment synchronously but send the email as a background job?