Skip to content

Event-Dispatcher (Async Post-Commit Delivery)

Ersetzt die ältere Transactional Outbox (Sprint D.5) und das Audit-Feature (Sprint D.1). Ein einzelner cursor-basierter Dispatcher liefert jedes committete Event an beliebig viele Subscriber — SSE-Broadcast, Search-Indexing, Feature-eigene r.multiStreamProjection-Consumer. Pattern: AsyncDaemon (Marten / EventStoreDB-Persistent-Subscriptions).

Warum überhaupt ein Dispatcher

Pre-D Pfad war drei verschiedene Systeme:

  • Outbox-Tabelle + Poller + Redis-Broker für Pub/Sub-Events.
  • Synchroner postSave-Hook für SSE-Broadcast.
  • Synchroner postSave-Hook für Search-Index.
  • Paralleles Audit-Feature mit eigener Tabelle.

Jedes System hatte eigene Retry-, Ordering- und Observability-Semantik. Ein Consumer-Problem (Meili weg, Redis-Reconnect) konnte den HTTP-Request blockieren, mehrere konkurrierende Stores für denselben “was ist passiert”-Datenpunkt.

Jetzt: alles geht über die events-Tabelle. Jedes Event gehört zu genau einem Aggregate-Stream (Marten-Gold-Standard) — Auto-CRUD-Events via r.crud + createEventStoreExecutor, Domain-Events via ctx.appendEvent. Beide landen im Stream des jeweiligen Aggregates, derselbe Async-Delivery-Pfad an die Consumer.

Mental-Model

HTTP Request → Dispatcher → TX {
Business-Write → events-Tabelle INSERT (Auto-CRUD-Event)
+ optional ctx.appendEvent → events-Tabelle INSERT (Domain-Event)
+ optional r.projection apply → projection-Tabelle (inline)
}
-- Commit --
Event-Dispatcher (separater Daemon-Loop):
pro Consumer, pro Pass, pro Event:
SELECT FOR UPDATE SKIP LOCKED state-row
SELECT events WHERE id > cursor LIMIT batchSize
requestContext.run({correlationId, causationId: event.id}, ...)
handler(event, ctx) – eigene TX
UPDATE cursor + status in derselben TX

r.multiStreamProjection — User-API

r.defineEvent(
"user.invited",
z.object({ userId: z.uuid(), invitedBy: z.uuid() }),
);
r.multiStreamProjection({
name: "send-welcome-email",
apply: {
"users:event:user-invited": async (event, tx) => {
// tx ist der tenant-scoped DbRunner (per event.tenantId). SYSTEM_TENANT_ID
// Events bekommen den raw baseDb — cross-tenant sinks können das.
await tx.insert(welcomeLog).values({ userId: event.aggregateId });
},
},
});

Strikte Semantik:

  • Unbekanntes Event → Throw. ctx.appendEvent("user.typo") ohne r.defineEvent wirft InternalError. Typos sterben am Emit-Site, nicht drei Consumer-Hops später.
  • Payload wird per Zod validiert vor appendEvent. Schema-Mismatch → ValidationError, Event nicht persistiert.
  • Apply bekommt den tenant-scoped DbRunner. Cross-tenant-Query wird vom Framework verhindert, nicht vom Feature-Autor. SYSTEM_TENANT_ID Events werden als cross-tenant behandelt (raw baseDb).
  • MSP ohne table sind reine Side-Effect-Consumer — sinnvoll für Mails, Webhooks, externe System-Sync. Mit table materialisiert die Apply in einem persistenten Read-Model.

Consumer-States

statusBedeutungAdvance?
idleBereit, nächster Pass liefert neue Events
processingAktueller Pass hält die Lock-Row (TX-scoped)
deadHit maxAttempts auf demselben Event, pausiert❌ bis restartConsumer oder skipPoisonEvent
disabledOps hat den Consumer gezielt geparkt❌ bis enableConsumer

Übergänge gehen über die Ops-CLI oder die exportierten Funktionen:

Terminal window
yarn kumiko consumer list
yarn kumiko consumer status <name>
yarn kumiko consumer restart <name> # dead → idle, Cursor bleibt (retry)
yarn kumiko consumer skip <name> # Cursor vor nächstes Event (poison skip)
yarn kumiko consumer disable <name> # pause
yarn kumiko consumer enable <name> # resume

Ops-safety: restartConsumer verweigert auf nicht-dead State, enableConsumer verweigert auf nicht-disabled — kein unabsichtlicher State-Reset.

Multi-Instance-Garantien

Jeder Pass öffnet eine TX und macht SELECT FOR UPDATE SKIP LOCKED auf der state-row. Zwei Dispatcher-Prozesse (N Pods) auf derselben DB:

  • Der erste bekommt den Lock, advanciert den Cursor, committet.
  • Der zweite sieht die Row als gesperrt, bekommt nichts, bricht diesen Pass für diesen Consumer ab. Nächster Pass versucht erneut.
  • Keine Duplicate Delivery. Integration-Test: event-dispatcher-multi-instance.integration.ts.

Kein Jitter auf pollIntervalMs — bei sehr vielen gleichzeitigen Pollern würde Lock-Contention auf wenige Hot-Rows zunehmen. Prod-Setup sollte moderate pollIntervalMs wählen (Default 100ms tauglich für ein paar Instanzen, > 10 Pods → erwägen: 500ms oder PG NOTIFY-Hybrid).

delivery: "shared" vs. "per-instance" (Welle 2.7)

Das bisherige Modell ist shared: ein Cursor pro Consumer-Name, SKIP-LOCKED serialisiert N Prozesse auf diese eine Row → exactly-once global. Richtig für persistente Side-Effects (DB-writes, externe APIs, Notifications, Projection-Tables).

Für lokale Side-Effects (SSE-Broker, In-Memory-Caches) ist das falsch: im Split-Deploy (API + Worker getrennt, mehrere API-Instanzen) hängen SSE-Clients an einer konkreten API-Instanz. Shared-Cursor würde den Event an genau eine API geben, die Clients der anderen Instanz würden nichts sehen.

Lösung: delivery: "per-instance" auf einem Consumer / MSP. Jeder Dispatcher-Prozess bekommt seine eigene state-row (PK-Spalte instance_id statt nur name), liest alle Events unabhängig, lokale Clients kriegen alles.

r.multiStreamProjection({
name: "local-cache-invalidator",
delivery: "per-instance", // ← jeder Prozess liefert
apply: { ... },
});

Framework-intern ist der SSE-Broadcast-Consumer (system:consumer:sse-broadcast) immer per-instance. Feature-MSPs sind per Default "shared" — Opt-in nur wenn der Handler ausschließlich Prozess-lokale Effekte produziert (sonst duplizieren sich DB-Writes pro Instanz).

Aspektshared (default)per-instance
Cursor-Rows pro Consumer1N (1 pro Prozess)
Delivery-Zählungexactly-once globalexactly-once pro Prozess
Handler-Constraintidempotent reichtzusätzlich: keine shared-storage-Writes
Use-caseDB-Writes, Webhooks, NotificationsSSE, lokale Caches, Prozess-interne Trigger

instance_id + Sentinel __shared__

Die state-table hat eine Composite-PK (name, instance_id):

  • Shared-Consumer: instance_id = '__shared__' — eine Row, alle Prozesse konkurrieren via SKIP-LOCKED.
  • Per-Instance-Consumer: instance_id = <ServerOptions.instanceId> — eine Row pro Prozess, voneinander unabhängig.

Postgres erlaubt keine NULL-Spalten in einem PK und behandelt NULL = NULL als UNKNOWN. Sentinel-Literal löst beide Probleme sauber. Der Name '__shared__' ist reserviert: Boot-Validator (createEventDispatcher + buildServer) wirft fail-loud, wenn ServerOptions.instanceId / KUMIKO_INSTANCE_ID den Wert hätte.

ServerOptions.instanceId (resolved in Prioritätsreihenfolge):

  1. ServerOptions.instanceId explizit gesetzt
  2. process.env.KUMIKO_INSTANCE_ID
  3. crypto.randomUUID() beim Boot

Single-Prozess-Deploys brauchen nichts zu tun. Multi-Instance-Deploys sollten KUMIKO_INSTANCE_ID auf einen stabilen Wert setzen (Pod-Name, Hostname), damit:

  • Metrics pro Instanz korrelierbar sind (Label instance_id).
  • Ops beim Scale-Down gezielt stale Rows löschen kann (siehe Warnung unten).

⚠ Retention-Guard + decommissioned instances

Der Retention-Guard (pruneEvents) prüft MIN(lastProcessedEventId) über alle nicht-disabled Shard-Rows. Eine heruntergefahrene Instanz lässt ihre Row stehen — der Cursor bleibt an der letzten gesehenen Event-ID pinnend, und Prune läuft nie mehr über diese ID hinaus.

Scale-Down-Prozedur:

DELETE FROM kumiko_event_consumers
WHERE instance_id = '<decommissioned-instance>';

Ausführen bevor die Instanz dauerhaft wegfällt. Ein Auto-Cleanup (Heartbeat-basierte Liveness) ist als Follow-Up dokumentiert, aber nicht in v1 enthalten. Fehlt die manuelle Bereinigung, sehen Ops in kumiko_event_consumer_lag_events{instance_id="...", ...} einen nie fallenden Lag-Gauge auf der stale Row — das ist das einzige Signal.

Ops-CLI mit instanceId

restart / disable / enable / skip akzeptieren seit Welle 2.7 ein optionales instanceId-Argument. Default: SHARED_INSTANCE_SENTINEL — legacy-kompatibel für shared-Consumer. Per-Instance-Consumer müssen die konkrete ID übergeben:

await restartConsumer(db, "feature:consumer:foo", "instance-A");
await disableConsumer(db, "system:consumer:sse-broadcast", "api-pod-3");

listConsumersWithState / getAllConsumerProgress liefern eine Row pro (name, instance_id) Shard; Metric-Labels sind analog { consumer, instance_id }. Prometheus-Rules im Prod-Setup bei Upgrade anpassen.

Delivery-Semantik

  • Per-Consumer strict ordering by events.id. Wir skippen kein fehlschlagendes Event — der Consumer bleibt stehen, andere laufen weiter.
  • At-least-once. Handler kann mehrfach für dasselbe Event aufgerufen werden, wenn der cursor-Update-Commit zwischen Handler und COMMIT crasht. Handlers MÜSSEN idempotent sein. Typisches Pattern: INSERT … ON CONFLICT DO NOTHING auf einem natural key aus dem Event-Payload.
  • Halt-on-poison. Handler-Throw → Counter, Retry im nächsten Pass, nach maxAttempts (Default 10) → status=dead, ops muss ran.

Retention

pruneEvents(db, { olderThanDays: N }) pruned nur pub/sub Events (aggregateType = "pubsub"). Aggregate-Events tragen loadAggregate / Projection-Rebuild / asOf — sie zu löschen bricht diese Garantien. Wer es trotzdem will: explizites aggregateTypes-Argument.

Der Consumer-Lag-Guard weigert sich, Events zu löschen die noch mindestens ein aktiver Consumer (status ≠ disabled) nicht verarbeitet hat. Ops-Fehler-Meldung verweist auf consumer skip|disable als Fix.

Nicht automatisch scheduled — ops wired in cron / BullMQ / pg_cron.

Retention-Korrektheit hängt an ZWEI Invarianten

Das „Prune löscht nicht, was ein Consumer noch braucht”-Versprechen lehnt sich auf zwei Mechanismen, die beide nötig sind — wer einen entfernt weil er redundant aussieht, bringt die andere zum Kippen:

  1. Pre-Registrierung in dispatcher.start() — jeder Consumer bekommt beim Dispatcher-Start einen State-Row (INSERT ON CONFLICT DO NOTHING, cursor=0). Ohne das passiert Bootstrap lazy beim ersten runOnce(), und ein Prune zwischen Prozess-Start und erstem Pass würde Events unter dem noch unsichtbaren Consumer-Cursor löschen.
  2. LOCK TABLE kumiko_event_consumers IN SHARE MODE in der Prune-TX — blockiert INSERTs/UPDATEs auf die Consumer-Tabelle während der Prune läuft. Defence-in-Depth gegen den seltenen Mid-TX- Race (zweites buildServer bootet parallel zum Prune).

Allein: Pre-Reg deckt den Haupt-Fall (Deploy + Prune racen), reicht aber nicht wenn zwei Dispatcher-Instanzen gleichzeitig starten. Der Lock deckt das ab. Der Lock allein würde wiederum nicht reichen — nach dem Commit könnte ein ganz neuer Consumer immer noch mit cursor=0 bootstrappen und auf die gerade gelöschten Events zeigen.

Zusammen geschlossen.

Metrics

NameTypeLabelsBedeutung
kumiko_event_consumer_lag_eventsgaugeconsumerEvents zwischen Cursor und Head (ops-Primärsignal)
kumiko_event_consumer_events_processed_totalcounterconsumerErfolgreich zugestellt
kumiko_event_consumer_events_failed_totalcounterconsumerHandler-Throws

Lag als gauge → instantaner Zustand; Counter werden pro Pass aggregiert. Registriert in observability/standard-metrics.ts.

Dateien

  • Runtime: packages/framework/src/pipeline/event-dispatcher.ts
  • State-Tabelle: packages/framework/src/pipeline/event-consumer-state.ts
  • System-Consumer (SSE, Search): packages/framework/src/pipeline/system-hooks.ts
  • Retention: packages/framework/src/pipeline/event-retention.ts
  • CLI: bin/kumiko.ts (commands consumer, events)
  • Tests:
    • pipeline/__tests__/event-dispatcher.integration.ts
    • pipeline/__tests__/event-dispatcher-wiring.integration.ts
    • pipeline/__tests__/event-dispatcher-recovery.integration.ts
    • pipeline/__tests__/event-dispatcher-multi-instance.integration.ts
    • pipeline/__tests__/event-retention.integration.ts
    • pipeline/__tests__/event-define-event-strict.integration.ts

Historie

  • Sprint B.1 — event-sourcing-pivot, events-Tabelle
  • Sprint D.1 — Audit-Feature entfernt (Events sind Audit)
  • Sprint D.2 — Async Dispatcher (AsyncDaemon-Pattern)
  • Sprint D.3 — SSE auf Dispatcher umgestellt
  • Sprint D.4 — Search auf Dispatcher umgestellt
  • Sprint D.5 — Outbox entfernt, pub/sub-Events in events-Tabelle
  • Sprint E.1 — Dispatcher in buildServer verdrahtet + Tenant-Wrapping + Lag-Metric
  • Sprint E.2 — Retention mit Consumer-Lag-Guard
  • Sprint E.3 — r.defineEvent Schema-Validation strikt
  • Sprint E.5 — Multi-Instance Test-Suite (inkl. versteckter TX-Rollback-Bug gefixt)
  • Sprint E.9 — Dead-Letter Recovery CLI + API
  • Track C Runde 1 — r.postEvent + ctx.emit + PUBSUB_AGGREGATE_TYPE entfernt; MSP ist der einzige async-Consumer-API-Pfad, jedes Event gehört zu seinem Aggregate-Stream
  • Track C Runde 2 — correlationId + causationId End-to-End (HTTP → Events → MSP-apply → Jobs); MSP errorMode.continuous.skipApplyErrors
  • Track C Runde 3 — ctx.fetchForWriting (Marten FetchForWriting); MSP-apply mit ctx.appendEvent für Saga-/Process-Manager-Pattern
  • Track C Runde 4 — Idempotency-DB-Index entfernt; HTTP-Retry-Idempotency läuft komplett via pipeline/idempotency.ts (Redis)