Database-Backed Queues
Why a database queue
Most job queue tutorials start with Redis or RabbitMQ. But you already have a database. SQLite is on every machine, requires no extra infrastructure, and handles thousands of jobs per second.
A database-backed queue is a table where each row is a job. The HTTP server inserts rows (enqueue). The worker reads and deletes them (dequeue). The same database you already use for users, orders, and products.
The jobs table
The Project Setup lesson created this table:
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
type TEXT NOT NULL,
payload TEXT NOT NULL DEFAULT '{}',
status TEXT NOT NULL DEFAULT 'pending',
priority INTEGER NOT NULL DEFAULT 0,
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 5,
last_error TEXT,
scheduled_at TEXT NOT NULL DEFAULT (datetime('now')),
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now')),
completed_at TEXT,
locked_by TEXT,
locked_at TEXT
); Each column has a purpose:
type — What kind of job: "send_email", "generate_invoice", "sync_inventory". The worker uses this to decide which function to call.
payload — The data the job needs, stored as a JSON string. For a send_email job: {"to": "[email protected]", "subject": "Order confirmed", "orderId": "order-1"}.
status — The job’s lifecycle: "pending" (waiting to run), "processing" (a worker picked it up), "completed" (finished successfully), "failed" (gave up after max attempts).
priority — Higher priority jobs run first. Default 0. Payment jobs might be priority 10 (urgent), report jobs might be priority 0 (whenever).
attempts / max_attempts — How many times the job has been tried and the maximum allowed. When attempts >= max_attempts, the job moves to failed status.
scheduled_at — When the job should run. Default is now (run immediately). For delayed jobs, set a future time.
locked_by / locked_at — Which worker is processing this job and when it started. Prevents two workers from picking up the same job.
Enqueue: adding a job
// src/queue.ts
import db from "./db.js";
export function enqueue(
type: string,
payload: Record<string, unknown> = {},
options: { priority?: number; scheduledAt?: string; maxAttempts?: number } = {},
): string {
const id = crypto.randomUUID();
const { priority = 0, scheduledAt, maxAttempts = 5 } = options;
db.prepare(
`
INSERT INTO jobs (id, type, payload, priority, max_attempts, scheduled_at)
VALUES (?, ?, ?, ?, ?, ?)
`,
).run(
id,
type,
JSON.stringify(payload),
priority,
maxAttempts,
scheduledAt ?? new Date().toISOString(),
);
return id;
} enqueue inserts a row into the jobs table with status "pending". The job sits in the queue until a worker picks it up.
// In a route handler
enqueue("send_email", {
to: "[email protected]",
subject: "Order Confirmed",
orderId: "order-1",
}); This is the same enqueue function the Error Handling course used in its Fallbacks lesson — now you see how it works internally.
Dequeue: claiming a job
export function dequeue(workerId: string): Job | null {
const job = db
.prepare(
`
UPDATE jobs
SET status = 'processing', locked_by = ?, locked_at = datetime('now'), updated_at = datetime('now')
WHERE id = (
SELECT id FROM jobs
WHERE status = 'pending' AND scheduled_at <= datetime('now')
ORDER BY priority DESC, scheduled_at ASC
LIMIT 1
)
RETURNING *
`,
)
.get(workerId) as Job | null;
return job;
} This is the critical query. It does three things atomically (in one statement):
- Finds the highest-priority pending job whose scheduled time has arrived.
- Claims it by setting
status = 'processing'andlocked_by = workerId. - Returns the job data so the worker can process it.
The UPDATE ... WHERE id = (SELECT ...) pattern is atomic — no other worker can claim the same job between the SELECT and the UPDATE. This prevents two workers from processing the same job.
[!NOTE]
RETURNING *is a SQLite feature (3.35+) that returns the updated row in the same statement. Without it, you would need a separate SELECT after the UPDATE, which creates a race condition window.
Complete: marking a job done
export function completeJob(jobId: string): void {
db.prepare(
`
UPDATE jobs
SET status = 'completed', completed_at = datetime('now'), updated_at = datetime('now'),
locked_by = NULL, locked_at = NULL
WHERE id = ?
`,
).run(jobId);
} Fail: recording an error
export function failJob(jobId: string, error: string): void {
db.prepare(
`
UPDATE jobs
SET status = CASE
WHEN attempts + 1 >= max_attempts THEN 'failed'
ELSE 'pending'
END,
attempts = attempts + 1,
last_error = ?,
locked_by = NULL,
locked_at = NULL,
updated_at = datetime('now'),
scheduled_at = CASE
WHEN attempts + 1 >= max_attempts THEN scheduled_at
ELSE datetime('now', '+' || (attempts + 1) * 30 || ' seconds')
END
WHERE id = ?
`,
).run(error, jobId);
} When a job fails, two things happen. If it has retries remaining (attempts + 1 < max_attempts), the status goes back to "pending" with a delayed scheduled_at — the job will be retried later. If it has exhausted all retries, the status becomes "failed" permanently.
The backoff delay is (attempts + 1) * 30 seconds: 30s, 60s, 90s, 120s. This gives the failing service time to recover, just like the exponential backoff from the Error Handling course’s Retries lesson.
The job lifecycle
enqueue() → pending → dequeue() → processing → completeJob() → completed
↓
failJob()
↓
attempts < max?
├─ yes → pending (retry later)
└─ no → failed (permanent) Exercises
Exercise 1: Implement enqueue. Insert 5 jobs. Verify they appear in the database with status "pending".
Exercise 2: Implement dequeue. Call it once. Verify it returns the first pending job and sets status to "processing".
Exercise 3: Call dequeue again. Verify it returns the second job (the first is already processing).
Exercise 4: Implement completeJob and failJob. Complete a job. Fail a job. Verify the statuses and attempt counts.
Why does the dequeue query use UPDATE ... WHERE id = (SELECT ...) instead of a separate SELECT then UPDATE?