hectoday
DocsCoursesChangelog GitHub
DocsCoursesChangelog GitHub

Access Required

Enter your access code to view courses.

Invalid code

← All courses Real-Time APIs with @hectoday/http

Beyond Request-Response

  • Why Real-Time Matters
  • Project Setup

Polling

  • Short Polling
  • Long Polling

Server-Sent Events

  • How SSE Works
  • Building an SSE Endpoint
  • Event Types and IDs
  • SSE in Practice

WebSockets

  • How WebSockets Work
  • Building a WebSocket Server
  • Rooms and Broadcasting
  • Authentication on WebSockets
  • Handling Disconnects and Reconnection

Patterns and Architecture

  • Pub/Sub
  • Presence
  • Scaling Real-Time

Putting It All Together

  • Choosing the Right Approach
  • Capstone: Live Task Board

Pub/Sub

The coupling problem

Right now, the task routes directly call broadcast (for SSE) and broadcastToRoom (for WebSockets). The route handler knows about both delivery mechanisms:

// In POST /tasks — coupled to both SSE and WebSocket
broadcast(boardId, { type: "task_created", task }); // SSE
broadcastToRoom(boardId, { type: "task_created", task }); // WebSocket

If you add a third mechanism (push notifications, webhook delivery), you add a third call. Every route handler grows. The route code is coupled to the delivery infrastructure.

The pub/sub model

Publish-subscribe decouples the producer (the route that creates a task) from the consumers (SSE, WebSocket, push notifications). The producer publishes an event. Consumers subscribe to events they care about. The event bus routes events from publishers to subscribers.

// src/event-bus.ts

type EventHandler = (event: { boardId: string; type: string; data: any }) => void;

const subscribers: EventHandler[] = [];

export function subscribe(handler: EventHandler): () => void {
  subscribers.push(handler);
  return () => {
    const index = subscribers.indexOf(handler);
    if (index !== -1) subscribers.splice(index, 1);
  };
}

export function publish(boardId: string, type: string, data: any): void {
  const event = { boardId, type, data };
  for (const handler of subscribers) {
    try {
      handler(event);
    } catch (err) {
      console.error("Event handler error:", err);
    }
  }
}

Registering consumers

Each delivery mechanism subscribes to the event bus:

// src/sse.ts — subscribe SSE
import { subscribe } from "./event-bus.js";

subscribe((event) => {
  broadcast(event.boardId, { type: event.type, ...event.data }, event.type);
});

// src/rooms.ts — subscribe WebSocket
import { subscribe } from "./event-bus.js";

subscribe((event) => {
  broadcastToRoom(event.boardId, { type: event.type, ...event.data });
});

Simplified route handlers

The route handlers now publish to the event bus. They do not know or care about SSE, WebSocket, or any other delivery mechanism:

import { publish } from "../event-bus.js";

// POST /tasks
route.post("/tasks", {
  request: { body: CreateTaskBody },
  resolve: (c) => {
    if (!c.input.ok) return Response.json({ error: c.input.issues }, { status: 400 });

    const { title, description, listId, assigneeId } = c.input.body;
    const id = crypto.randomUUID();
    db.prepare(
      "INSERT INTO tasks (id, list_id, title, description, assignee_id, position) VALUES (?, ?, ?, ?, ?, ?)",
    ).run(id, listId, title, description ?? null, assigneeId ?? null, 0);

    const task = db.prepare("SELECT * FROM tasks WHERE id = ?").get(id);
    const list = db.prepare("SELECT board_id FROM lists WHERE id = ?").get(listId) as any;

    publish(list.board_id, "task_created", { task });

    return Response.json(task, { status: 201 });
  },
});

One line: publish(boardId, "task_created", { task }). The event bus routes it to SSE, WebSocket, and any future consumer.

Adding a new consumer

Want to log every event? Add one subscriber:

subscribe((event) => {
  console.log(`[${event.type}] board=${event.boardId}`, JSON.stringify(event.data));
});

Want to send webhook notifications? Add another:

subscribe(async (event) => {
  const webhooks = db.prepare("SELECT url FROM webhooks WHERE board_id = ?").all(event.boardId);
  for (const wh of webhooks) {
    fetch(wh.url, {
      method: "POST",
      headers: { "content-type": "application/json" },
      body: JSON.stringify(event),
    }).catch(() => {}); // Fire and forget
  }
});

No changes to the task routes. The pub/sub model makes adding consumers trivial.

Exercises

Exercise 1: Implement the event bus. Refactor the task routes to use publish instead of calling broadcast and broadcastToRoom directly.

Exercise 2: Add a logging subscriber. Verify every task event is logged.

Exercise 3: Add a subscriber that counts events per board. Expose the count via GET /admin/event-counts.

What is the main benefit of the pub/sub model for real-time delivery?

← Handling Disconnects and Reconnection Presence →

© 2026 hectoday. All rights reserved.