Event-Dispatcher (Async Post-Commit Delivery)
Ersetzt die ältere Transactional Outbox (Sprint D.5) und das Audit-Feature
(Sprint D.1). Ein einzelner cursor-basierter Dispatcher liefert jedes
committete Event an beliebig viele Subscriber — SSE-Broadcast,
Search-Indexing, Feature-eigene r.multiStreamProjection-Consumer. Pattern:
AsyncDaemon (Marten / EventStoreDB-Persistent-Subscriptions).
Warum überhaupt ein Dispatcher
Pre-D Pfad war drei verschiedene Systeme:
- Outbox-Tabelle + Poller + Redis-Broker für Pub/Sub-Events.
- Synchroner postSave-Hook für SSE-Broadcast.
- Synchroner postSave-Hook für Search-Index.
- Paralleles Audit-Feature mit eigener Tabelle.
Jedes System hatte eigene Retry-, Ordering- und Observability-Semantik. Ein Consumer-Problem (Meili weg, Redis-Reconnect) konnte den HTTP-Request blockieren, mehrere konkurrierende Stores für denselben “was ist passiert”-Datenpunkt.
Jetzt: alles geht über die events-Tabelle. Jedes Event gehört zu genau
einem Aggregate-Stream (Marten-Gold-Standard) — Auto-CRUD-Events via
r.crud + createEventStoreExecutor, Domain-Events via
ctx.appendEvent. Beide landen im Stream des jeweiligen Aggregates,
derselbe Async-Delivery-Pfad an die Consumer.
Mental-Model
HTTP Request → Dispatcher → TX { Business-Write → events-Tabelle INSERT (Auto-CRUD-Event) + optional ctx.appendEvent → events-Tabelle INSERT (Domain-Event) + optional r.projection apply → projection-Tabelle (inline)}-- Commit --
Event-Dispatcher (separater Daemon-Loop): pro Consumer, pro Pass, pro Event: SELECT FOR UPDATE SKIP LOCKED state-row SELECT events WHERE id > cursor LIMIT batchSize requestContext.run({correlationId, causationId: event.id}, ...) handler(event, ctx) – eigene TX UPDATE cursor + status in derselben TXr.multiStreamProjection — User-API
r.defineEvent( "user.invited", z.object({ userId: z.uuid(), invitedBy: z.uuid() }),);
r.multiStreamProjection({ name: "send-welcome-email", apply: { "users:event:user-invited": async (event, tx) => { // tx ist der tenant-scoped DbRunner (per event.tenantId). SYSTEM_TENANT_ID // Events bekommen den raw baseDb — cross-tenant sinks können das. await tx.insert(welcomeLog).values({ userId: event.aggregateId }); }, },});Strikte Semantik:
- Unbekanntes Event → Throw.
ctx.appendEvent("user.typo")ohner.defineEventwirftInternalError. Typos sterben am Emit-Site, nicht drei Consumer-Hops später. - Payload wird per Zod validiert vor
appendEvent. Schema-Mismatch →ValidationError, Event nicht persistiert. - Apply bekommt den tenant-scoped DbRunner. Cross-tenant-Query wird vom Framework verhindert, nicht vom Feature-Autor. SYSTEM_TENANT_ID Events werden als cross-tenant behandelt (raw baseDb).
- MSP ohne
tablesind reine Side-Effect-Consumer — sinnvoll für Mails, Webhooks, externe System-Sync. Mittablematerialisiert die Apply in einem persistenten Read-Model.
Consumer-States
| status | Bedeutung | Advance? |
|---|---|---|
idle | Bereit, nächster Pass liefert neue Events | ✅ |
processing | Aktueller Pass hält die Lock-Row (TX-scoped) | ✅ |
dead | Hit maxAttempts auf demselben Event, pausiert | ❌ bis restartConsumer oder skipPoisonEvent |
disabled | Ops hat den Consumer gezielt geparkt | ❌ bis enableConsumer |
Übergänge gehen über die Ops-CLI oder die exportierten Funktionen:
yarn kumiko consumer listyarn kumiko consumer status <name>yarn kumiko consumer restart <name> # dead → idle, Cursor bleibt (retry)yarn kumiko consumer skip <name> # Cursor vor nächstes Event (poison skip)yarn kumiko consumer disable <name> # pauseyarn kumiko consumer enable <name> # resumeOps-safety: restartConsumer verweigert auf nicht-dead State,
enableConsumer verweigert auf nicht-disabled — kein unabsichtlicher
State-Reset.
Multi-Instance-Garantien
Jeder Pass öffnet eine TX und macht SELECT FOR UPDATE SKIP LOCKED auf
der state-row. Zwei Dispatcher-Prozesse (N Pods) auf derselben DB:
- Der erste bekommt den Lock, advanciert den Cursor, committet.
- Der zweite sieht die Row als gesperrt, bekommt nichts, bricht diesen Pass für diesen Consumer ab. Nächster Pass versucht erneut.
- Keine Duplicate Delivery. Integration-Test:
event-dispatcher-multi-instance.integration.ts.
Kein Jitter auf pollIntervalMs — bei sehr vielen gleichzeitigen
Pollern würde Lock-Contention auf wenige Hot-Rows zunehmen. Prod-Setup
sollte moderate pollIntervalMs wählen (Default 100ms tauglich für ein
paar Instanzen, > 10 Pods → erwägen: 500ms oder PG NOTIFY-Hybrid).
delivery: "shared" vs. "per-instance" (Welle 2.7)
Das bisherige Modell ist shared: ein Cursor pro Consumer-Name,
SKIP-LOCKED serialisiert N Prozesse auf diese eine Row → exactly-once
global. Richtig für persistente Side-Effects (DB-writes, externe
APIs, Notifications, Projection-Tables).
Für lokale Side-Effects (SSE-Broker, In-Memory-Caches) ist das falsch: im Split-Deploy (API + Worker getrennt, mehrere API-Instanzen) hängen SSE-Clients an einer konkreten API-Instanz. Shared-Cursor würde den Event an genau eine API geben, die Clients der anderen Instanz würden nichts sehen.
Lösung: delivery: "per-instance" auf einem Consumer / MSP. Jeder
Dispatcher-Prozess bekommt seine eigene state-row (PK-Spalte
instance_id statt nur name), liest alle Events unabhängig, lokale
Clients kriegen alles.
r.multiStreamProjection({ name: "local-cache-invalidator", delivery: "per-instance", // ← jeder Prozess liefert apply: { ... },});Framework-intern ist der SSE-Broadcast-Consumer
(system:consumer:sse-broadcast) immer per-instance. Feature-MSPs
sind per Default "shared" — Opt-in nur wenn der Handler
ausschließlich Prozess-lokale Effekte produziert (sonst duplizieren
sich DB-Writes pro Instanz).
| Aspekt | shared (default) | per-instance |
|---|---|---|
| Cursor-Rows pro Consumer | 1 | N (1 pro Prozess) |
| Delivery-Zählung | exactly-once global | exactly-once pro Prozess |
| Handler-Constraint | idempotent reicht | zusätzlich: keine shared-storage-Writes |
| Use-case | DB-Writes, Webhooks, Notifications | SSE, lokale Caches, Prozess-interne Trigger |
instance_id + Sentinel __shared__
Die state-table hat eine Composite-PK (name, instance_id):
- Shared-Consumer:
instance_id = '__shared__'— eine Row, alle Prozesse konkurrieren via SKIP-LOCKED. - Per-Instance-Consumer:
instance_id = <ServerOptions.instanceId>— eine Row pro Prozess, voneinander unabhängig.
Postgres erlaubt keine NULL-Spalten in einem PK und behandelt
NULL = NULL als UNKNOWN. Sentinel-Literal löst beide Probleme
sauber. Der Name '__shared__' ist reserviert: Boot-Validator
(createEventDispatcher + buildServer) wirft fail-loud, wenn
ServerOptions.instanceId / KUMIKO_INSTANCE_ID den Wert hätte.
ServerOptions.instanceId (resolved in Prioritätsreihenfolge):
ServerOptions.instanceIdexplizit gesetztprocess.env.KUMIKO_INSTANCE_IDcrypto.randomUUID()beim Boot
Single-Prozess-Deploys brauchen nichts zu tun. Multi-Instance-Deploys
sollten KUMIKO_INSTANCE_ID auf einen stabilen Wert setzen
(Pod-Name, Hostname), damit:
- Metrics pro Instanz korrelierbar sind (Label
instance_id). - Ops beim Scale-Down gezielt stale Rows löschen kann (siehe Warnung unten).
⚠ Retention-Guard + decommissioned instances
Der Retention-Guard (pruneEvents) prüft MIN(lastProcessedEventId)
über alle nicht-disabled Shard-Rows. Eine heruntergefahrene
Instanz lässt ihre Row stehen — der Cursor bleibt an der letzten
gesehenen Event-ID pinnend, und Prune läuft nie mehr über diese ID
hinaus.
Scale-Down-Prozedur:
DELETE FROM kumiko_event_consumers WHERE instance_id = '<decommissioned-instance>';Ausführen bevor die Instanz dauerhaft wegfällt. Ein Auto-Cleanup
(Heartbeat-basierte Liveness) ist als Follow-Up dokumentiert, aber nicht
in v1 enthalten. Fehlt die manuelle Bereinigung, sehen Ops in
kumiko_event_consumer_lag_events{instance_id="...", ...} einen nie
fallenden Lag-Gauge auf der stale Row — das ist das einzige Signal.
Ops-CLI mit instanceId
restart / disable / enable / skip akzeptieren seit Welle 2.7 ein
optionales instanceId-Argument. Default:
SHARED_INSTANCE_SENTINEL — legacy-kompatibel für shared-Consumer.
Per-Instance-Consumer müssen die konkrete ID übergeben:
await restartConsumer(db, "feature:consumer:foo", "instance-A");await disableConsumer(db, "system:consumer:sse-broadcast", "api-pod-3");listConsumersWithState / getAllConsumerProgress liefern eine Row
pro (name, instance_id) Shard; Metric-Labels sind analog
{ consumer, instance_id }. Prometheus-Rules im Prod-Setup bei
Upgrade anpassen.
Delivery-Semantik
- Per-Consumer strict ordering by
events.id. Wir skippen kein fehlschlagendes Event — der Consumer bleibt stehen, andere laufen weiter. - At-least-once. Handler kann mehrfach für dasselbe Event aufgerufen
werden, wenn der cursor-Update-Commit zwischen Handler und COMMIT
crasht. Handlers MÜSSEN idempotent sein. Typisches Pattern:
INSERT … ON CONFLICT DO NOTHINGauf einem natural key aus dem Event-Payload. - Halt-on-poison. Handler-Throw → Counter, Retry im nächsten Pass,
nach
maxAttempts(Default 10) → status=dead, ops muss ran.
Retention
pruneEvents(db, { olderThanDays: N }) pruned nur pub/sub Events
(aggregateType = "pubsub"). Aggregate-Events tragen
loadAggregate / Projection-Rebuild / asOf — sie zu löschen bricht diese
Garantien. Wer es trotzdem will: explizites aggregateTypes-Argument.
Der Consumer-Lag-Guard weigert sich, Events zu löschen die noch
mindestens ein aktiver Consumer (status ≠ disabled) nicht verarbeitet
hat. Ops-Fehler-Meldung verweist auf consumer skip|disable als Fix.
Nicht automatisch scheduled — ops wired in cron / BullMQ / pg_cron.
Retention-Korrektheit hängt an ZWEI Invarianten
Das „Prune löscht nicht, was ein Consumer noch braucht”-Versprechen lehnt sich auf zwei Mechanismen, die beide nötig sind — wer einen entfernt weil er redundant aussieht, bringt die andere zum Kippen:
- Pre-Registrierung in
dispatcher.start()— jeder Consumer bekommt beim Dispatcher-Start einen State-Row (INSERT ON CONFLICT DO NOTHING, cursor=0). Ohne das passiert Bootstrap lazy beim erstenrunOnce(), und ein Prune zwischen Prozess-Start und erstem Pass würde Events unter dem noch unsichtbaren Consumer-Cursor löschen. LOCK TABLE kumiko_event_consumers IN SHARE MODEin der Prune-TX — blockiert INSERTs/UPDATEs auf die Consumer-Tabelle während der Prune läuft. Defence-in-Depth gegen den seltenen Mid-TX- Race (zweitesbuildServerbootet parallel zum Prune).
Allein: Pre-Reg deckt den Haupt-Fall (Deploy + Prune racen), reicht aber nicht wenn zwei Dispatcher-Instanzen gleichzeitig starten. Der Lock deckt das ab. Der Lock allein würde wiederum nicht reichen — nach dem Commit könnte ein ganz neuer Consumer immer noch mit cursor=0 bootstrappen und auf die gerade gelöschten Events zeigen.
Zusammen geschlossen.
Metrics
| Name | Type | Labels | Bedeutung |
|---|---|---|---|
kumiko_event_consumer_lag_events | gauge | consumer | Events zwischen Cursor und Head (ops-Primärsignal) |
kumiko_event_consumer_events_processed_total | counter | consumer | Erfolgreich zugestellt |
kumiko_event_consumer_events_failed_total | counter | consumer | Handler-Throws |
Lag als gauge → instantaner Zustand; Counter werden pro Pass
aggregiert. Registriert in observability/standard-metrics.ts.
Dateien
- Runtime:
packages/framework/src/pipeline/event-dispatcher.ts - State-Tabelle:
packages/framework/src/pipeline/event-consumer-state.ts - System-Consumer (SSE, Search):
packages/framework/src/pipeline/system-hooks.ts - Retention:
packages/framework/src/pipeline/event-retention.ts - CLI:
bin/kumiko.ts(commandsconsumer,events) - Tests:
pipeline/__tests__/event-dispatcher.integration.tspipeline/__tests__/event-dispatcher-wiring.integration.tspipeline/__tests__/event-dispatcher-recovery.integration.tspipeline/__tests__/event-dispatcher-multi-instance.integration.tspipeline/__tests__/event-retention.integration.tspipeline/__tests__/event-define-event-strict.integration.ts
Historie
- Sprint B.1 — event-sourcing-pivot, events-Tabelle
- Sprint D.1 — Audit-Feature entfernt (Events sind Audit)
- Sprint D.2 — Async Dispatcher (AsyncDaemon-Pattern)
- Sprint D.3 — SSE auf Dispatcher umgestellt
- Sprint D.4 — Search auf Dispatcher umgestellt
- Sprint D.5 — Outbox entfernt, pub/sub-Events in events-Tabelle
- Sprint E.1 — Dispatcher in
buildSerververdrahtet + Tenant-Wrapping + Lag-Metric - Sprint E.2 — Retention mit Consumer-Lag-Guard
- Sprint E.3 —
r.defineEventSchema-Validation strikt - Sprint E.5 — Multi-Instance Test-Suite (inkl. versteckter TX-Rollback-Bug gefixt)
- Sprint E.9 — Dead-Letter Recovery CLI + API
- Track C Runde 1 —
r.postEvent+ctx.emit+ PUBSUB_AGGREGATE_TYPE entfernt; MSP ist der einzige async-Consumer-API-Pfad, jedes Event gehört zu seinem Aggregate-Stream - Track C Runde 2 — correlationId + causationId End-to-End (HTTP → Events → MSP-apply → Jobs); MSP
errorMode.continuous.skipApplyErrors - Track C Runde 3 —
ctx.fetchForWriting(Marten FetchForWriting); MSP-apply mitctx.appendEventfür Saga-/Process-Manager-Pattern - Track C Runde 4 — Idempotency-DB-Index entfernt; HTTP-Retry-Idempotency läuft komplett via pipeline/idempotency.ts (Redis)