feat(core-realtime): InMemoryRealtimeBroadcaster (test/dev impl)
This commit is contained in:
@@ -0,0 +1,34 @@
|
||||
import { describe, it, expect } from "vitest";
|
||||
import { z } from "zod";
|
||||
import { InMemoryRealtimeBroadcaster } from "@/in-memory-realtime-broadcaster";
|
||||
import { defineRealtimeChannel } from "@/realtime-channel";
|
||||
|
||||
const ch = defineRealtimeChannel(
|
||||
"a.b",
|
||||
z.object({ x: z.number() }).strict(),
|
||||
{ scope: "public" },
|
||||
);
|
||||
|
||||
describe("InMemoryRealtimeBroadcaster", () => {
|
||||
it("validates payload via the descriptor schema", async () => {
|
||||
const b = new InMemoryRealtimeBroadcaster();
|
||||
await expect(
|
||||
b.broadcast(ch, { x: "not a number" } as never),
|
||||
).rejects.toThrow();
|
||||
});
|
||||
|
||||
it("delivers to subscribers in order", async () => {
|
||||
const b = new InMemoryRealtimeBroadcaster();
|
||||
const got: number[] = [];
|
||||
b.subscribe(ch, async (p) => { got.push(p.x); });
|
||||
await b.broadcast(ch, { x: 1 });
|
||||
await b.broadcast(ch, { x: 2 });
|
||||
expect(got).toEqual([1, 2]);
|
||||
});
|
||||
|
||||
it("does nothing when no subscribers", async () => {
|
||||
const b = new InMemoryRealtimeBroadcaster();
|
||||
await b.broadcast(ch, { x: 1 });
|
||||
// does not throw
|
||||
});
|
||||
});
|
||||
28
packages/core-realtime/src/in-memory-realtime-broadcaster.ts
Normal file
28
packages/core-realtime/src/in-memory-realtime-broadcaster.ts
Normal file
@@ -0,0 +1,28 @@
|
||||
import type { z } from "zod";
|
||||
import type { IRealtimeBroadcaster } from "./realtime-broadcaster.interface";
|
||||
import type { RealtimeChannelDescriptor } from "./realtime-channel";
|
||||
|
||||
type Listener<T> = (payload: T) => Promise<void> | void;
|
||||
|
||||
export class InMemoryRealtimeBroadcaster implements IRealtimeBroadcaster {
|
||||
private readonly listeners = new Map<string, Listener<unknown>[]>();
|
||||
|
||||
async broadcast<T>(
|
||||
descriptor: RealtimeChannelDescriptor<string, z.ZodType<T>>,
|
||||
payload: T,
|
||||
): Promise<void> {
|
||||
descriptor.schema.parse(payload);
|
||||
const arr = this.listeners.get(descriptor.name) ?? [];
|
||||
for (const l of arr) await l(payload);
|
||||
}
|
||||
|
||||
// Test-friendly: lets unit tests subscribe directly without a Socket.IO server.
|
||||
subscribe<T>(
|
||||
descriptor: RealtimeChannelDescriptor<string, z.ZodType<T>>,
|
||||
listener: Listener<T>,
|
||||
): void {
|
||||
const arr = this.listeners.get(descriptor.name) ?? [];
|
||||
arr.push(listener as Listener<unknown>);
|
||||
this.listeners.set(descriptor.name, arr);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user