From 5e57c661402a39aa7bff6cf98ccead705d9273d3 Mon Sep 17 00:00:00 2001 From: Danijel Martinek Date: Fri, 8 May 2026 21:14:33 +0200 Subject: [PATCH] feat(core-realtime): InMemoryRealtimeBroadcaster (test/dev impl) --- .../in-memory-realtime-broadcaster.test.ts | 34 +++++++++++++++++++ .../src/in-memory-realtime-broadcaster.ts | 28 +++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 packages/core-realtime/src/in-memory-realtime-broadcaster.test.ts create mode 100644 packages/core-realtime/src/in-memory-realtime-broadcaster.ts diff --git a/packages/core-realtime/src/in-memory-realtime-broadcaster.test.ts b/packages/core-realtime/src/in-memory-realtime-broadcaster.test.ts new file mode 100644 index 0000000..66de9b8 --- /dev/null +++ b/packages/core-realtime/src/in-memory-realtime-broadcaster.test.ts @@ -0,0 +1,34 @@ +import { describe, it, expect } from "vitest"; +import { z } from "zod"; +import { InMemoryRealtimeBroadcaster } from "@/in-memory-realtime-broadcaster"; +import { defineRealtimeChannel } from "@/realtime-channel"; + +const ch = defineRealtimeChannel( + "a.b", + z.object({ x: z.number() }).strict(), + { scope: "public" }, +); + +describe("InMemoryRealtimeBroadcaster", () => { + it("validates payload via the descriptor schema", async () => { + const b = new InMemoryRealtimeBroadcaster(); + await expect( + b.broadcast(ch, { x: "not a number" } as never), + ).rejects.toThrow(); + }); + + it("delivers to subscribers in order", async () => { + const b = new InMemoryRealtimeBroadcaster(); + const got: number[] = []; + b.subscribe(ch, async (p) => { got.push(p.x); }); + await b.broadcast(ch, { x: 1 }); + await b.broadcast(ch, { x: 2 }); + expect(got).toEqual([1, 2]); + }); + + it("does nothing when no subscribers", async () => { + const b = new InMemoryRealtimeBroadcaster(); + await b.broadcast(ch, { x: 1 }); + // does not throw + }); +}); diff --git a/packages/core-realtime/src/in-memory-realtime-broadcaster.ts b/packages/core-realtime/src/in-memory-realtime-broadcaster.ts new file mode 100644 index 0000000..516fc08 --- /dev/null +++ b/packages/core-realtime/src/in-memory-realtime-broadcaster.ts @@ -0,0 +1,28 @@ +import type { z } from "zod"; +import type { IRealtimeBroadcaster } from "./realtime-broadcaster.interface"; +import type { RealtimeChannelDescriptor } from "./realtime-channel"; + +type Listener = (payload: T) => Promise | void; + +export class InMemoryRealtimeBroadcaster implements IRealtimeBroadcaster { + private readonly listeners = new Map[]>(); + + async broadcast( + descriptor: RealtimeChannelDescriptor>, + payload: T, + ): Promise { + descriptor.schema.parse(payload); + const arr = this.listeners.get(descriptor.name) ?? []; + for (const l of arr) await l(payload); + } + + // Test-friendly: lets unit tests subscribe directly without a Socket.IO server. + subscribe( + descriptor: RealtimeChannelDescriptor>, + listener: Listener, + ): void { + const arr = this.listeners.get(descriptor.name) ?? []; + arr.push(listener as Listener); + this.listeners.set(descriptor.name, arr); + } +}