Pub/Sub
The coupling problem
Right now, the task routes directly call broadcast (for SSE) and broadcastToRoom (for WebSockets). The route handler knows about both delivery mechanisms:
// In POST /tasks — coupled to both SSE and WebSocket
broadcast(boardId, { type: "task_created", task }); // SSE
broadcastToRoom(boardId, { type: "task_created", task }); // WebSocket If you add a third mechanism (push notifications, webhook delivery), you add a third call. Every route handler grows. The route code is coupled to the delivery infrastructure.
The pub/sub model
Publish-subscribe decouples the producer (the route that creates a task) from the consumers (SSE, WebSocket, push notifications). The producer publishes an event. Consumers subscribe to events they care about. The event bus routes events from publishers to subscribers.
// src/event-bus.ts
type EventHandler = (event: { boardId: string; type: string; data: any }) => void;
const subscribers: EventHandler[] = [];
export function subscribe(handler: EventHandler): () => void {
subscribers.push(handler);
return () => {
const index = subscribers.indexOf(handler);
if (index !== -1) subscribers.splice(index, 1);
};
}
export function publish(boardId: string, type: string, data: any): void {
const event = { boardId, type, data };
for (const handler of subscribers) {
try {
handler(event);
} catch (err) {
console.error("Event handler error:", err);
}
}
} Registering consumers
Each delivery mechanism subscribes to the event bus:
// src/sse.ts — subscribe SSE
import { subscribe } from "./event-bus.js";
subscribe((event) => {
broadcast(event.boardId, { type: event.type, ...event.data }, event.type);
});
// src/rooms.ts — subscribe WebSocket
import { subscribe } from "./event-bus.js";
subscribe((event) => {
broadcastToRoom(event.boardId, { type: event.type, ...event.data });
}); Simplified route handlers
The route handlers now publish to the event bus. They do not know or care about SSE, WebSocket, or any other delivery mechanism:
import { publish } from "../event-bus.js";
// POST /tasks
route.post("/tasks", {
request: { body: CreateTaskBody },
resolve: (c) => {
if (!c.input.ok) return Response.json({ error: c.input.issues }, { status: 400 });
const { title, description, listId, assigneeId } = c.input.body;
const id = crypto.randomUUID();
db.prepare(
"INSERT INTO tasks (id, list_id, title, description, assignee_id, position) VALUES (?, ?, ?, ?, ?, ?)",
).run(id, listId, title, description ?? null, assigneeId ?? null, 0);
const task = db.prepare("SELECT * FROM tasks WHERE id = ?").get(id);
const list = db.prepare("SELECT board_id FROM lists WHERE id = ?").get(listId) as any;
publish(list.board_id, "task_created", { task });
return Response.json(task, { status: 201 });
},
}); One line: publish(boardId, "task_created", { task }). The event bus routes it to SSE, WebSocket, and any future consumer.
Adding a new consumer
Want to log every event? Add one subscriber:
subscribe((event) => {
console.log(`[${event.type}] board=${event.boardId}`, JSON.stringify(event.data));
}); Want to send webhook notifications? Add another:
subscribe(async (event) => {
const webhooks = db.prepare("SELECT url FROM webhooks WHERE board_id = ?").all(event.boardId);
for (const wh of webhooks) {
fetch(wh.url, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(event),
}).catch(() => {}); // Fire and forget
}
}); No changes to the task routes. The pub/sub model makes adding consumers trivial.
Exercises
Exercise 1: Implement the event bus. Refactor the task routes to use publish instead of calling broadcast and broadcastToRoom directly.
Exercise 2: Add a logging subscriber. Verify every task event is logged.
Exercise 3: Add a subscriber that counts events per board. Expose the count via GET /admin/event-counts.
What is the main benefit of the pub/sub model for real-time delivery?