From d800d98574e369947f87f19ca00daa66811552fa Mon Sep 17 00:00:00 2001 From: Danijel Martinek Date: Fri, 8 May 2026 16:09:57 +0200 Subject: [PATCH] feat(web-next): bindAll resolves IEventBus + IJobQueue per env resolveEventsAndJobsProduction wires PayloadJobsEventBus + PayloadJobQueue against the bootstrapped Payload instance; resolveEventsAndJobsDevSeed wires InMemoryEventBus + InMemoryJobQueue. Each per-feature binder now receives (bus, queue) so Phase 7 generators can subscribe handlers and register job tasks at the / anchors. --- apps/web-next/package.json | 1 + .../src/server/bind-production.test.ts | 36 ++++++++++ apps/web-next/src/server/bind-production.ts | 72 ++++++++++++++++--- pnpm-lock.yaml | 3 + 4 files changed, 102 insertions(+), 10 deletions(-) diff --git a/apps/web-next/package.json b/apps/web-next/package.json index b1162dd..5a005c7 100644 --- a/apps/web-next/package.json +++ b/apps/web-next/package.json @@ -17,6 +17,7 @@ "@repo/blog": "workspace:*", "@repo/core-api": "workspace:*", "@repo/core-cms": "workspace:*", + "@repo/core-events": "workspace:*", "@repo/core-shared": "workspace:*", "@repo/core-trpc": "workspace:*", "@repo/core-ui": "workspace:*", diff --git a/apps/web-next/src/server/bind-production.test.ts b/apps/web-next/src/server/bind-production.test.ts index ebfff8e..838f0e1 100644 --- a/apps/web-next/src/server/bind-production.test.ts +++ b/apps/web-next/src/server/bind-production.test.ts @@ -1,6 +1,7 @@ import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; vi.mock("@repo/core-cms", () => ({ default: Promise.resolve({}) })); +vi.mock("payload", () => ({ getPayload: vi.fn(async () => ({ jobs: { queue: vi.fn() } })) })); vi.mock("@repo/blog/di/bind-production", () => ({ bindProductionBlog: vi.fn() })); vi.mock("@repo/auth/di/bind-production", () => ({ bindProductionAuth: vi.fn() })); vi.mock("@repo/marketing-pages/di/bind-production", () => ({ bindProductionMarketingPages: vi.fn() })); @@ -50,6 +51,41 @@ describe("bindAllProduction", () => { await bindAllProduction(); expect(bindProductionBlog).toHaveBeenCalledOnce(); }); + + it("passes a Payload-backed bus + queue to each per-feature binder", async () => { + const { bindAllProduction } = await import("./bind-production"); + const { bindProductionAuth } = await import("@repo/auth/di/bind-production"); + const { PayloadJobsEventBus } = await import("@repo/core-events"); + const { PayloadJobQueue } = await import("@repo/core-shared/jobs"); + + await bindAllProduction(); + + const args = vi.mocked(bindProductionAuth).mock.calls[0]!; + // Args: (config, tracer, logger, bus, queue) + expect(args[3]).toBeInstanceOf(PayloadJobsEventBus); + expect(args[4]).toBeInstanceOf(PayloadJobQueue); + }); +}); + +describe("bindAllDevSeed", () => { + beforeEach(() => { + vi.resetModules(); + vi.clearAllMocks(); + }); + + it("passes an in-memory bus + queue to each per-feature dev-seed binder", async () => { + const { bindAllDevSeed } = await import("./bind-production"); + const { bindDevSeedAuth } = await import("@repo/auth/di/bind-dev-seed"); + const { InMemoryEventBus } = await import("@repo/core-events"); + const { InMemoryJobQueue } = await import("@repo/core-shared/jobs"); + + await bindAllDevSeed(); + + const args = vi.mocked(bindDevSeedAuth).mock.calls[0]!; + // Args: (tracer, logger, bus, queue) + expect(args[2]).toBeInstanceOf(InMemoryEventBus); + expect(args[3]).toBeInstanceOf(InMemoryJobQueue); + }); }); describe("bindAll dispatcher", () => { diff --git a/apps/web-next/src/server/bind-production.ts b/apps/web-next/src/server/bind-production.ts index d5d2b3d..7808de5 100644 --- a/apps/web-next/src/server/bind-production.ts +++ b/apps/web-next/src/server/bind-production.ts @@ -2,6 +2,7 @@ // SERVER-ONLY: this module imports Payload config and must never be bundled into the browser. import "reflect-metadata"; import { Container } from "inversify"; +import { getPayload } from "payload"; import config from "@repo/core-cms"; import { bindNoopInstrumentation, @@ -9,6 +10,16 @@ import { type ITracer, type ILogger, } from "@repo/core-shared/instrumentation"; +import { + InMemoryEventBus, + PayloadJobsEventBus, + type IEventBus, +} from "@repo/core-events"; +import { + InMemoryJobQueue, + PayloadJobQueue, + type IJobQueue, +} from "@repo/core-shared/jobs"; import { bindProductionBlog } from "@repo/blog/di/bind-production"; import { bindProductionAuth } from "@repo/auth/di/bind-production"; import { bindProductionMarketingPages } from "@repo/marketing-pages/di/bind-production"; @@ -29,6 +40,8 @@ const sharedContainer = new Container(); let resolvedTracer: ITracer | null = null; let resolvedLogger: ILogger | null = null; +let resolvedBus: IEventBus | null = null; +let resolvedQueue: IJobQueue | null = null; /** Rule 0: pick instrumentation backend from DSN env (orthogonal to repo mode). */ function resolveInstrumentation(): { tracer: ITracer; logger: ILogger } { @@ -44,6 +57,41 @@ function resolveInstrumentation(): { tracer: ITracer; logger: ILogger } { return result; } +/** + * Production-mode event bus + job queue: backed by Payload's job system so + * events fan out via durable Payload tasks (`PayloadJobsEventBus` enqueues + * `__events...` per subscribed handler) and ad-hoc + * jobs go through `PayloadJobQueue.enqueue`. Cached after first resolution. + */ +async function resolveEventsAndJobsProduction(): Promise<{ + bus: IEventBus; + queue: IJobQueue; +}> { + if (resolvedBus && resolvedQueue) return { bus: resolvedBus, queue: resolvedQueue }; + const resolvedConfig = await config; + const payload = await getPayload({ config: resolvedConfig }); + const queue = new PayloadJobQueue(payload); + const bus = new PayloadJobsEventBus(queue); + resolvedBus = bus; + resolvedQueue = queue; + return { bus, queue }; +} + +/** + * Dev-seed mode: in-process bus + queue. Per-feature binders register their + * job handlers via `queue.register(slug, handler)` and subscribe their event + * handlers via `bus.subscribe(...)` at bind time, so dev/test exercises the + * full publish → handler → enqueue path without booting Payload. + */ +function resolveEventsAndJobsDevSeed(): { bus: IEventBus; queue: IJobQueue } { + if (resolvedBus && resolvedQueue) return { bus: resolvedBus, queue: resolvedQueue }; + const queue = new InMemoryJobQueue(); + const bus = new InMemoryEventBus(); + resolvedBus = bus; + resolvedQueue = queue; + return { bus, queue }; +} + /** * Production path: swap each feature's mock repository binding for the real * Payload-backed one. Constructs `new XRepository(config, tracer, logger)` per @@ -53,12 +101,13 @@ export async function bindAllProduction(): Promise { if (bound) return; bound = true; const { tracer, logger } = resolveInstrumentation(); // Rule 0 + const { bus, queue } = await resolveEventsAndJobsProduction(); const resolvedConfig = await config; - bindProductionAuth(resolvedConfig, tracer, logger); // Phase E task 19 - bindProductionBlog(resolvedConfig, tracer, logger); // Phase E task 18 - bindProductionMarketingPages(resolvedConfig, tracer, logger); // Phase E task 20 - bindProductionNavigation(resolvedConfig, tracer, logger); // Phase E task 21 - bindProductionMedia(resolvedConfig, tracer, logger); // Phase E task 22 + bindProductionAuth(resolvedConfig, tracer, logger, bus, queue); // Phase E task 19 + bindProductionBlog(resolvedConfig, tracer, logger, bus, queue); // Phase E task 18 + bindProductionMarketingPages(resolvedConfig, tracer, logger, bus, queue); // Phase E task 20 + bindProductionNavigation(resolvedConfig, tracer, logger, bus, queue); // Phase E task 21 + bindProductionMedia(resolvedConfig, tracer, logger, bus, queue); // Phase E task 22 } /** @@ -70,11 +119,12 @@ export async function bindAllDevSeed(): Promise { if (bound) return; bound = true; const { tracer, logger } = resolveInstrumentation(); // Rule 0 - await bindDevSeedAuth(tracer, logger); // Phase E task 19 - await bindDevSeedBlog(tracer, logger); // Phase E task 18 - await bindDevSeedMarketingPages(tracer, logger); // Phase E task 20 - await bindDevSeedNavigation(tracer, logger); // Phase E task 21 - await bindDevSeedMedia(tracer, logger); // Phase E task 22 + const { bus, queue } = resolveEventsAndJobsDevSeed(); + await bindDevSeedAuth(tracer, logger, bus, queue); // Phase E task 19 + await bindDevSeedBlog(tracer, logger, bus, queue); // Phase E task 18 + await bindDevSeedMarketingPages(tracer, logger, bus, queue); // Phase E task 20 + await bindDevSeedNavigation(tracer, logger, bus, queue); // Phase E task 21 + await bindDevSeedMedia(tracer, logger, bus, queue); // Phase E task 22 } /** @@ -106,6 +156,8 @@ export function __resetBindStateForTests(): void { bound = false; resolvedTracer = null; resolvedLogger = null; + resolvedBus = null; + resolvedQueue = null; } /** Test-only accessor for resolved instrumentation. */ diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e17dc33..e7ed3b4 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -154,6 +154,9 @@ importers: '@repo/core-cms': specifier: workspace:* version: link:../../packages/core-cms + '@repo/core-events': + specifier: workspace:* + version: link:../../packages/core-events '@repo/core-shared': specifier: workspace:* version: link:../../packages/core-shared