hectoday
DocsCoursesChangelog GitHub
DocsCoursesChangelog GitHub

Access Required

Enter your access code to view courses.

Invalid code

← All courses Background Jobs and Queues with @hectoday/http

Why Background Jobs

  • The Request Cycle Problem
  • Project Setup

Building a Queue

  • Database-Backed Queues
  • The Worker Loop
  • Job Serialization

Reliability

  • Retries and Backoff
  • Dead Letter Queues
  • Idempotent Jobs
  • Job Timeouts and Stale Jobs

Scheduling

  • Delayed Jobs
  • Recurring Jobs (Cron)

Scaling

  • Concurrency and Locking
  • Job Priorities
  • Rate-Limiting Jobs

Patterns

  • Job Chaining and Workflows
  • Monitoring and Observability

Putting It All Together

  • Capstone: Order Processing Pipeline

Project Setup

The domain

The same e-commerce API from the Error Handling course, extended with background processing: order confirmations, invoice generation, inventory sync, and daily sales reports. Every operation that does not need to happen during the request.

Create the project

mkdir job-queue-api
cd job-queue-api
npm init -y
npm install @hectoday/http zod srvx better-sqlite3
npm install -D typescript @types/node @types/better-sqlite3 tsx

Create tsconfig.json:

{
  "compilerOptions": {
    "target": "ES2022",
    "module": "ES2022",
    "moduleResolution": "bundler",
    "strict": true,
    "esModuleInterop": true,
    "skipLibCheck": true,
    "rootDir": "./src",
    "outDir": "dist",
    "types": ["node"]
  },
  "include": ["src"]
}

Add "type": "module" and scripts to package.json:

{
  "scripts": {
    "dev": "tsx watch src/server.ts",
    "worker": "tsx watch src/worker.ts"
  }
}

Two scripts: dev runs the HTTP server, worker runs the job processor. They are separate processes that share the same database.

The database

// src/db.ts
import Database from "better-sqlite3";

const db = new Database("shop.db");
db.pragma("journal_mode = WAL");
db.pragma("foreign_keys = ON");
db.pragma("busy_timeout = 5000");

db.exec(`
  CREATE TABLE IF NOT EXISTS products (
    id TEXT PRIMARY KEY,
    name TEXT NOT NULL,
    price REAL NOT NULL CHECK (price > 0),
    stock INTEGER NOT NULL DEFAULT 0,
    created_at TEXT NOT NULL DEFAULT (datetime('now'))
  );

  CREATE TABLE IF NOT EXISTS orders (
    id TEXT PRIMARY KEY,
    user_id TEXT NOT NULL,
    status TEXT NOT NULL DEFAULT 'pending',
    total REAL NOT NULL,
    created_at TEXT NOT NULL DEFAULT (datetime('now'))
  );

  CREATE TABLE IF NOT EXISTS order_items (
    order_id TEXT NOT NULL,
    product_id TEXT NOT NULL,
    quantity INTEGER NOT NULL CHECK (quantity > 0),
    price REAL NOT NULL,
    PRIMARY KEY (order_id, product_id),
    FOREIGN KEY (order_id) REFERENCES orders(id),
    FOREIGN KEY (product_id) REFERENCES products(id)
  );

  CREATE TABLE IF NOT EXISTS jobs (
    id TEXT PRIMARY KEY,
    type TEXT NOT NULL,
    payload TEXT NOT NULL DEFAULT '{}',
    status TEXT NOT NULL DEFAULT 'pending',
    priority INTEGER NOT NULL DEFAULT 0,
    attempts INTEGER NOT NULL DEFAULT 0,
    max_attempts INTEGER NOT NULL DEFAULT 5,
    last_error TEXT,
    scheduled_at TEXT NOT NULL DEFAULT (datetime('now')),
    created_at TEXT NOT NULL DEFAULT (datetime('now')),
    updated_at TEXT NOT NULL DEFAULT (datetime('now')),
    completed_at TEXT,
    locked_by TEXT,
    locked_at TEXT
  );

  CREATE INDEX IF NOT EXISTS idx_jobs_status_scheduled
    ON jobs(status, scheduled_at);
  CREATE INDEX IF NOT EXISTS idx_jobs_type
    ON jobs(type);
`);

// Seed products
const existing = db.prepare("SELECT id FROM products LIMIT 1").get();
if (!existing) {
  db.prepare("INSERT INTO products (id, name, price, stock) VALUES (?, ?, ?, ?)").run(
    "prod-1",
    "Mechanical Keyboard",
    89.99,
    50,
  );
  db.prepare("INSERT INTO products (id, name, price, stock) VALUES (?, ?, ?, ?)").run(
    "prod-2",
    "USB-C Hub",
    34.99,
    100,
  );
  db.prepare("INSERT INTO products (id, name, price, stock) VALUES (?, ?, ?, ?)").run(
    "prod-3",
    "Monitor Stand",
    49.99,
    25,
  );
}

export default db;

The jobs table is the queue. Each row is one job. The columns track everything a job queue needs: what to do (type, payload), when to run it (scheduled_at), how many times it has been tried (attempts, max_attempts), and what went wrong (last_error).

[!NOTE] The composite index on (status, scheduled_at) is critical for performance. The worker queries WHERE status = 'pending' AND scheduled_at <= datetime('now') on every poll. Without this index, every poll scans the entire table. The Database Design course’s Indexes lesson explains why column order matters in composite indexes.

The app shell

// src/app.ts
import { setup, route } from "@hectoday/http";

export const app = setup({
  routes: [route.get("/health", { resolve: () => Response.json({ status: "ok" }) })],
});
// src/server.ts
import { serve } from "srvx";
import { app } from "./app.js";
serve({ fetch: app.fetch, port: 3000 });

External services (simulated)

// src/services/email.ts
export async function sendEmail(to: string, subject: string, body: string): Promise<void> {
  await new Promise((r) => setTimeout(r, 500));
  if (Math.random() < 0.1) throw new Error("Email service unavailable");
  console.log(`[EMAIL] To: ${to} | Subject: ${subject}`);
}

// src/services/pdf.ts
export async function generatePDF(orderId: string): Promise<string> {
  await new Promise((r) => setTimeout(r, 2000));
  if (Math.random() < 0.05) throw new Error("PDF generation failed");
  const path = `/invoices/${orderId}.pdf`;
  console.log(`[PDF] Generated: ${path}`);
  return path;
}

// src/services/inventory.ts
export async function syncInventory(productId: string, quantity: number): Promise<void> {
  await new Promise((r) => setTimeout(r, 300));
  console.log(`[INVENTORY] Synced: ${productId} -= ${quantity}`);
}

These simulate the kind of work that runs in background jobs: slow, occasionally failing, and not something the user should wait for.

Exercises

Exercise 1: Start the server. Verify the database has the jobs table with the correct columns.

Exercise 2: Look at the jobs table schema. What does each column do? Which ones are for the queue system vs for the actual job?

Exercise 3: Run EXPLAIN QUERY PLAN SELECT * FROM jobs WHERE status = 'pending' AND scheduled_at <= datetime('now') ORDER BY priority DESC, scheduled_at ASC. Verify it uses the index.

Why do the HTTP server and the worker run as separate processes?

← The Request Cycle Problem Database-Backed Queues →

© 2026 hectoday. All rights reserved.