hectoday
DocsCoursesChangelog GitHub
DocsCoursesChangelog GitHub

Access Required

Enter your access code to view courses.

Invalid code

← All courses Background Jobs and Queues with @hectoday/http

Why Background Jobs

  • The Request Cycle Problem
  • Project Setup

Building a Queue

  • Database-Backed Queues
  • The Worker Loop
  • Job Serialization

Reliability

  • Retries and Backoff
  • Dead Letter Queues
  • Idempotent Jobs
  • Job Timeouts and Stale Jobs

Scheduling

  • Delayed Jobs
  • Recurring Jobs (Cron)

Scaling

  • Concurrency and Locking
  • Job Priorities
  • Rate-Limiting Jobs

Patterns

  • Job Chaining and Workflows
  • Monitoring and Observability

Putting It All Together

  • Capstone: Order Processing Pipeline

Capstone: Order Processing Pipeline

What we built

A complete background job system for an e-commerce API:

ComponentWhat it doesLesson
Jobs tableStores pending, processing, completed, failed jobsDatabase-Backed Queues
enqueue / dequeueProduce and consume jobs atomicallyDatabase-Backed Queues
Worker loopPoll for jobs, process, mark completeThe Worker Loop
Zod schemasValidate job payloads at enqueue and dequeue timeJob Serialization
Retries + backoffRetry failed jobs with increasing delaysRetries and Backoff
Dead letter queueStore permanently failed jobs for investigationDead Letter Queues
IdempotencySafe-to-repeat handlers with tracking tablesIdempotent Jobs
Delayed jobsscheduled_at for future executionDelayed Jobs
Cron schedulingRecurring jobs via cron_schedules tableRecurring Jobs
Multiple workersAtomic claiming prevents duplicate processingMultiple Workers
PrioritiesUrgent jobs first, fair FIFO within priorityJob Priorities
Rate limitingControl processing speed per serviceRate Limiting Jobs
Job chainingMulti-step workflows with fan-outJob Chaining
MonitoringQueue depth, failure rate, DLQ healthMonitoring

The order processing pipeline

POST /orders
│
├─ Save order to database (synchronous — in request)
├─ Charge payment (synchronous — user needs the result)
│
└─ Background jobs (enqueued, processed by worker):
     │
     ├─ send_order_confirmation (HIGH priority)
     │   ├─ generate_invoice
     │   └─ sync_inventory
     │
     ├─ send_followup_email (delayed 24 hours)
     │
     └─ (cron) daily_sales_report (midnight)
         (cron) cleanup_completed_jobs (3 AM)

The complete route handler

route.post("/orders", {
  request: { body: CreateOrderBody },
  resolve: async (c) => {
    if (!c.input.ok) {
      throw new ValidationError(
        c.input.issues.map((i) => ({ field: i.path.join("."), message: i.message })),
      );
    }

    // Synchronous: save and charge
    const order = createOrder(c.input.body);
    await chargeCard(order.total, c.input.body.paymentToken);
    updateOrderStatus(order.id, "paid");

    // Background: everything else
    enqueue(
      "send_order_confirmation",
      {
        orderId: order.id,
        userId: c.input.body.userId,
      },
      { priority: PRIORITY.HIGH },
    );

    enqueueDelayed(
      "send_followup_email",
      {
        orderId: order.id,
        userId: c.input.body.userId,
      },
      24 * 60 * 60 * 1000,
    ); // 24 hours

    return Response.json(order, { status: 201 });
  },
});

The user waits for the order and payment (2 seconds). Everything else happens in the background. The confirmation email arrives in a few seconds. The follow-up email arrives in 24 hours. Reports run at midnight.

The complete worker

// src/worker.ts
import { dequeue, completeJob, failJob, releaseStaleJobs } from "./queue.js";
import { processJob } from "./job-handlers.js";
import { runDueCronJobs } from "./cron-scheduler.js";
import { JobRateLimiter } from "./rate-limiter.js";

const WORKER_ID = `worker-${process.pid}-${crypto.randomUUID().slice(0, 8)}`;
const POLL_INTERVAL_MS = 1000;
let running = true;

const rateLimiter = new JobRateLimiter();
const RATE_LIMITS: Record<string, number> = {
  send_email: 120,
  send_webhook: 30,
};

function shutdown(signal: string): void {
  console.log(`[${WORKER_ID}] ${signal} received. Finishing current job...`);
  running = false;
}

process.on("SIGTERM", () => shutdown("SIGTERM"));
process.on("SIGINT", () => shutdown("SIGINT"));

async function run(): Promise<void> {
  console.log(`[${WORKER_ID}] Worker started.`);

  // Release stale locks from crashed workers
  const released = releaseStaleJobs(10);
  if (released > 0) console.log(`[${WORKER_ID}] Released ${released} stale jobs.`);

  let lastCronCheck = 0;

  while (running) {
    // Cron check every 60 seconds
    if (Date.now() - lastCronCheck > 60_000) {
      const count = runDueCronJobs(WORKER_ID);
      if (count > 0) console.log(`[CRON] Enqueued ${count} scheduled jobs`);
      lastCronCheck = Date.now();
    }

    const job = dequeue(WORKER_ID);

    if (!job) {
      await new Promise((r) => setTimeout(r, POLL_INTERVAL_MS));
      continue;
    }

    // Rate limit check
    const limit = RATE_LIMITS[job.type];
    if (limit && !rateLimiter.canProcess(job.type, limit)) {
      db.prepare(
        `
        UPDATE jobs SET status = 'pending', locked_by = NULL, locked_at = NULL,
        scheduled_at = datetime('now', '+5 seconds') WHERE id = ?
      `,
      ).run(job.id);
      continue;
    }

    try {
      await processJob(job);
      completeJob(job.id);
      if (limit) rateLimiter.record(job.type);
    } catch (err) {
      failJob(job.id, err instanceof Error ? err.message : String(err));
    }
  }

  console.log(`[${WORKER_ID}] Worker stopped.`);
  process.exit(0);
}

run();

Project structure

src/
  app.ts                    # Hectoday HTTP setup, routes
  server.ts                 # HTTP server
  worker.ts                 # Job processing loop
  db.ts                     # Database schema (jobs, dead_letter_jobs, cron_schedules)
  queue.ts                  # enqueue, dequeue, completeJob, failJob, releaseStaleJobs
  job-handlers.ts           # Handler registry, processJob
  job-schemas.ts            # Zod schemas for job payloads
  priorities.ts             # PRIORITY constants
  cron.ts                   # computeNextRun
  cron-scheduler.ts         # runDueCronJobs, registerCron
  rate-limiter.ts           # JobRateLimiter
  services/
    email.ts                # Email service
    pdf.ts                  # PDF generation
    inventory.ts            # Inventory sync
  routes/
    orders.ts               # Order creation, enqueueing jobs
    admin.ts                # Queue stats, DLQ management

The reliability stack

Job enqueued
│
├─ Worker claims atomically (no duplicates)
│   ├─ Rate limiter checks throughput
│   ├─ Zod validates payload
│   ├─ Handler processes (idempotently)
│   ├─ Success → completeJob
│   └─ Failure → failJob
│       ├─ Retries remaining → pending (delayed)
│       └─ Max retries → dead letter queue
│
├─ Stale lock cleanup (crashed workers)
├─ Cron scheduler (recurring jobs)
├─ Monitoring (queue depth, failure rate)
└─ Cleanup (purge old completed jobs)

Challenges

Challenge 1: Add batch processing. Some job types process better in batches. Instead of dequeuing one email job at a time, dequeue 10 and send them in one API call. Implement a batch dequeue function.

Challenge 2: Add job dependencies. Job B should only run after Job A completes. Add a depends_on column. The worker skips Job B until Job A’s status is completed.

Challenge 3: Add a web dashboard. Build an HTML page showing real-time queue stats, recent failures, DLQ items, and cron schedule status. Auto-refresh every 5 seconds.

Challenge 4: Add webhook delivery. When an order is completed, deliver a webhook to registered URLs. Implement with retries, exponential backoff, and signature verification (from the Securing Your API course).

What happens to a job if the worker crashes after processing it but before calling completeJob?

Why does the order processing pipeline charge the payment synchronously but send the email as a background job?

← Monitoring and Observability Back to course →

© 2026 hectoday. All rights reserved.