Building an SSE Endpoint
The SSE response
An SSE endpoint returns a response with Content-Type: text/event-stream and a body that is a ReadableStream. The stream stays open, and the server pushes events by writing to it.
// src/sse.ts
type SSEClient = {
boardId: string;
controller: ReadableStreamDefaultController;
};
const clients: SSEClient[] = [];
export function addClient(boardId: string): ReadableStream {
const stream = new ReadableStream({
start(controller) {
const client: SSEClient = { boardId, controller };
clients.push(client);
// Send initial connection event
controller.enqueue(formatSSE({ type: "connected" }));
},
cancel() {
// Client disconnected — clean up
const index = clients.findIndex((c) => c.controller === this);
if (index !== -1) clients.splice(index, 1);
},
});
return stream;
}
export function broadcast(boardId: string, data: any): void {
const event = formatSSE(data);
const matching = clients.filter((c) => c.boardId === boardId);
for (const client of matching) {
try {
client.controller.enqueue(event);
} catch {
// Client disconnected — remove
const index = clients.indexOf(client);
if (index !== -1) clients.splice(index, 1);
}
}
}
function formatSSE(data: any): string {
return `data: ${JSON.stringify(data)}\n\n`;
} addClient creates a ReadableStream and registers the client. broadcast sends an event to all clients watching a specific board. formatSSE encodes data in the SSE format (the data: prefix and double newline).
The cleanup problem
When a client disconnects (closes the browser tab, network drops), the cancel callback fires. But it does not always fire immediately — especially behind proxies. The try/catch in broadcast handles the case where the controller is closed but not yet removed.
A more robust cleanup approach uses the cancel callback tied to the controller instance:
export function addClient(boardId: string): ReadableStream {
let clientRef: SSEClient;
const stream = new ReadableStream({
start(controller) {
clientRef = { boardId, controller };
clients.push(clientRef);
controller.enqueue(formatSSE({ type: "connected" }));
},
cancel() {
const index = clients.indexOf(clientRef);
if (index !== -1) clients.splice(index, 1);
},
});
return stream;
} The SSE route
// In src/routes/events.ts
import { route, group } from "@hectoday/http";
import { addClient } from "../sse.js";
export const eventRoutes = group([
route.get("/boards/:boardId/events", {
resolve: (c) => {
const stream = addClient(c.params.boardId);
return new Response(stream, {
headers: {
"content-type": "text/event-stream",
"cache-control": "no-cache",
connection: "keep-alive",
},
});
},
}),
]); Three headers matter:
content-type: text/event-streamtells the browser this is an SSE stream.cache-control: no-cacheprevents caching the stream (each event is unique).connection: keep-alivekeeps the connection open.
Wiring broadcast into task routes
Update the task routes to broadcast events when data changes:
import { broadcast } from "../sse.js";
// In POST /tasks — after creating:
const task = db.prepare("SELECT * FROM tasks WHERE id = ?").get(id);
const list = db.prepare("SELECT board_id FROM lists WHERE id = ?").get(listId) as any;
broadcast(list.board_id, { type: "task_created", task });
return Response.json(task, { status: 201 });
// In PATCH /tasks/:id/move — after moving:
const task = db.prepare("SELECT * FROM tasks WHERE id = ?").get(c.params.id);
const list = db.prepare("SELECT board_id FROM lists WHERE id = ?").get(task.list_id) as any;
broadcast(list.board_id, { type: "task_moved", task });
return Response.json(task);
// In DELETE /tasks/:id — after deleting:
broadcast(boardId, { type: "task_deleted", taskId: c.params.id });
return new Response(null, { status: 204 }); Try it
# Terminal 1: Connect to the event stream
curl -N http://localhost:3000/boards/board-1/events
# Output: data: {"type":"connected"}
# Terminal 2: Create a task
curl -X POST http://localhost:3000/tasks \
-H "Content-Type: application/json" \
-d '{"title":"New task from SSE","listId":"list-todo"}'
# Terminal 1 immediately shows:
# data: {"type":"task_created","task":{...}} The -N flag on curl disables buffering so events appear immediately.
Exercises
Exercise 1: Build the SSE endpoint. Connect with curl. Create, move, and delete tasks in another terminal. Verify events arrive in real time.
Exercise 2: Open two SSE connections to the same board. Create a task. Both connections should receive the event.
Exercise 3: Open an SSE connection to board-1 and another to a different board. Create a task on board-1. Only the board-1 connection should receive the event.
Why does the broadcast function use try/catch when writing to the controller?