From 9d04bdc65b31729a133c94d55da04f8024e79cc8 Mon Sep 17 00:00:00 2001 From: Danijel Martinek Date: Fri, 8 May 2026 21:15:07 +0200 Subject: [PATCH] feat(core-realtime): SocketIORealtimeBroadcaster --- .../socket-io-realtime-broadcaster.test.ts | 33 +++++++++++++++++++ .../src/socket-io-realtime-broadcaster.ts | 16 +++++++++ 2 files changed, 49 insertions(+) create mode 100644 packages/core-realtime/src/socket-io-realtime-broadcaster.test.ts create mode 100644 packages/core-realtime/src/socket-io-realtime-broadcaster.ts diff --git a/packages/core-realtime/src/socket-io-realtime-broadcaster.test.ts b/packages/core-realtime/src/socket-io-realtime-broadcaster.test.ts new file mode 100644 index 0000000..ec25685 --- /dev/null +++ b/packages/core-realtime/src/socket-io-realtime-broadcaster.test.ts @@ -0,0 +1,33 @@ +import { describe, it, expect, vi } from "vitest"; +import { z } from "zod"; +import { SocketIORealtimeBroadcaster } from "@/socket-io-realtime-broadcaster"; +import { defineRealtimeChannel } from "@/realtime-channel"; + +const ch = defineRealtimeChannel( + "a.b", + z.object({ x: z.number() }).strict(), + { scope: "public" }, +); + +describe("SocketIORealtimeBroadcaster", () => { + it("emits to the channel's room with the channel name as event", async () => { + const emit = vi.fn(); + const to = vi.fn(() => ({ emit })); + const io = { to } as never; + const b = new SocketIORealtimeBroadcaster(io); + await b.broadcast(ch, { x: 1 }); + expect(to).toHaveBeenCalledWith("ch:a.b"); + expect(emit).toHaveBeenCalledWith("a.b", { x: 1 }); + }); + + it("validates payload before emitting", async () => { + const emit = vi.fn(); + const to = vi.fn(() => ({ emit })); + const io = { to } as never; + const b = new SocketIORealtimeBroadcaster(io); + await expect( + b.broadcast(ch, { x: "not a number" } as never), + ).rejects.toThrow(); + expect(emit).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/core-realtime/src/socket-io-realtime-broadcaster.ts b/packages/core-realtime/src/socket-io-realtime-broadcaster.ts new file mode 100644 index 0000000..e1af68d --- /dev/null +++ b/packages/core-realtime/src/socket-io-realtime-broadcaster.ts @@ -0,0 +1,16 @@ +import type { Server as IOServer } from "socket.io"; +import type { z } from "zod"; +import type { IRealtimeBroadcaster } from "./realtime-broadcaster.interface"; +import type { RealtimeChannelDescriptor } from "./realtime-channel"; + +export class SocketIORealtimeBroadcaster implements IRealtimeBroadcaster { + constructor(private readonly io: IOServer) {} + + async broadcast( + descriptor: RealtimeChannelDescriptor>, + payload: T, + ): Promise { + descriptor.schema.parse(payload); + this.io.to(`ch:${descriptor.name}`).emit(descriptor.name, payload); + } +}