Projections
Status: Implementiert (Sprint B.1 — 2026-04-16). Multi-Stream-Variante ergänzt Sprint E (2026-04-17). Siehe auch: event-sourcing-pivot.md §2.1 Level 3 / §3.3
Was ist das
Eine Projection ist ein user-definiertes Read-Model, gefüttert aus den Events einer oder mehrerer Entities. Das Framework liefert zwei “Projection-Arten” — die Auto-Projection (die tasks/units/… Entity-Tabelle, die r.crud() automatisch generiert) und die Custom Projection, die User via r.projection() registrieren.
Beispiel: units_per_property — ein Counter pro Property, gefüttert aus unit.created/updated/deleted/restored. Der Counter landet in einer eigenen Drizzle-Tabelle, gelesen direkt per Query.
API
import { integer, table as pgTable, uuid } from "@kumiko/framework/db";import type { ProjectionDefinition } from "@kumiko/framework/engine";import { sql } from "drizzle-orm";
// 1. User besitzt die Drizzle-Tabelleexport const unitsPerPropertyTable = pgTable("mn_units_per_property", { propertyId: uuid("property_id").primaryKey(), tenantId: uuid("tenant_id").notNull(), unitCount: integer("unit_count").notNull().default(0),});
// 2. User schreibt die apply-Handlerexport const unitsPerProperty: ProjectionDefinition = { name: "units-per-property", source: "unit", table: unitsPerPropertyTable, apply: { "unit.created": async (event, tx) => { const propertyId = event.payload["propertyId"] as string; await tx .insert(unitsPerPropertyTable) .values({ propertyId, tenantId: event.tenantId, unitCount: 1 }) .onConflictDoUpdate({ target: unitsPerPropertyTable.propertyId, set: { unitCount: sql`${unitsPerPropertyTable.unitCount} + 1` }, }); }, // ... "unit.updated" / "unit.deleted" / "unit.restored" },};
// 3. Feature registriert die ProjectiondefineFeature("masterdata", (r) => { r.entity("unit", unitEntity); r.projection(unitsPerProperty);});Design-Entscheidungen
Imperativ, nicht funktional
apply(event, tx) gibt dem User die TX-scoped Drizzle-Connection. User schreibt INSERT ... ON CONFLICT DO UPDATE, Postgres garantiert Atomicity.
Funktional ((state, event) => newState) wäre eleganter für 1-Row-pro-Aggregate Projektionen, scheitert aber an cross-aggregate-Races: zwei gleichzeitige unit.created-Events für dieselbe Property müssten SELECT-FOR-UPDATE oder optimistic-retry bauen — Framework-Code, den SQL-UPSERT kostenlos erledigt.
Same-TX, nicht eventual
Projections laufen im Dispatcher, in derselben TX wie das Event-Append, direkt nach dem handler-return und vor den postSave-Hooks. Wirft ein apply() → TX rollback → Event weg, Entity-Tabelle unangetastet, alle Projections unangetastet.
Event-Shape: previous ist Pflicht
Für cross-aggregate-Projections (wie units_per_property) braucht apply() für update/delete den vorherigen Zustand. Wenn sich propertyId ändert, muss der Counter der alten Property dekrementiert werden — ohne previous wäre das unlösbar.
Deshalb tragen *.updated Events payload: { changes, previous }, *.deleted Events payload: { previous }, *.restored Events payload: { previous }. Payloads werden größer, Rebuild aus Events allein bleibt möglich — das ist der Deal.
Source-Validation zur Boot-Zeit
r.projection({ source: "unti" }) (Typo) wäre ohne Guard ein silent no-op. createRegistry() lehnt das ab — Fail-Fast beim Boot.
Kein Opt-In pro Aufruf
Projections werden automatisch vom Dispatcher getriggert, sobald ein write-handler ein SaveContext/DeleteContext mit event: StoredEvent zurückgibt. Der Executor füllt das immer aus. Manuelle Handler müssen nichts durchreichen, keine registry-Option vergessen.
Tabellen-Setup
setupTestStack und createApp pushen automatisch alle registrierten Projection-Tabellen via pushTables(). Tests müssen die Tabellen nicht separat anlegen. Zwei Projections auf derselben Drizzle-Tabelle werden per Referenz dedupliziert.
Rebuild
Projections sind per Definition wegwerfbar: DB drop → yarn kumiko project rebuild <name> → Zustand ist wieder da. Events sind die Wahrheit.
CLI
yarn kumiko project list # alle registrierten Projections + Statusyarn kumiko project status <name> # Detail-Status (lastRebuildAt, Errors)yarn kumiko project rebuild <name> # TRUNCATE + Events replay in einer TXDie CLI lädt Features aus ./kumiko.config.ts im current working directory:
import { masterdataFeature } from "./src/features/masterdata";
export default { features: [masterdataFeature] };Ohne diese Datei bekommt der User eine hilfreiche Fehlermeldung mit dem erwarteten Pfad.
Mechanik
- State-Row für Projection upsert mit
status: "rebuilding" TRUNCATEder Projection-Tabelle- Events chronologisch lesen (gefiltert auf
source-Entities + subscribed apply-Keys) - Pro Event: passende
apply()-Function aufrufen, mit der aktiven TX - State-Row auf
status: "idle",lastProcessedEventId,lastRebuildAtupdaten
Alles in einer einzigen TX. Wirft apply() → Postgres rollbackt → alte Projection bleibt intakt. State-Row wird in separatem Write auf status: "failed" mit Error-Message gesetzt.
Grenzen / Ops-Regeln
- Während Rebuild blockieren Writes auf die gleiche Projection-Tabelle. Rebuild ist ein Ops-Operation — lauf ihn auf quiet Entities oder in einem geplanten Write-Pause-Fenster.
- Scale-Grenze: Single-TX TRUNCATE + Replay läuft, solange dein Maintenance-Window die Replay-Laufzeit aufnimmt. Messen, bevor du’s ernst nimmst — die effektive Grenze hängt an Event-Payload-Größe, apply()-Kosten und DB-Last. Wenn Replay deine akzeptable Downtime überschreitet, ist ein Shadow-Swap-Variante fällig (zwei Tabellen + atomic rename) — nicht in v1.
- Pro Projection rebuildet, nicht pro Tenant: globaler Rebuild, keine tenant-wise Einschränkung. Bei 1000+ Tenants im System ist das bewusste Limitierung.
- Apply muss idempotent sein — wie in der Live-Pipeline.
onConflictDoUpdatestatt blindesINSERT.
State-Tabelle
kumiko_projections (vom Framework gepflegt):
| Spalte | Typ | Zweck |
|---|---|---|
name | text (PK) | Fully-qualified Projection-Name |
lastProcessedEventId | bigint | Cursor — ID des letzten replayed Events |
status | text | idle / rebuilding / failed |
lastRebuildAt | timestamp | Wanduhr-Zeit des letzten erfolgreichen Rebuilds |
lastError | text | Letzte Fehlermeldung (wenn status = failed) |
updatedAt | timestamp | Auto-updated bei jeder Status-Änderung |
Observability
kumiko_projection_rebuild_duration_seconds(Histogram, Labels: projection, success)kumiko_projection_rebuild_events_total(Counter, Label: projection)
Emittiert nur beim Rebuild — nicht kontinuierlich. Eine Lag-Metrik gibt’s nicht, weil Apply synchron in der Schreib-TX läuft (Lag ist by-definition 0). Kommt erst mit async-Apply.
Multi-Stream-Projections
Seit Sprint E: r.multiStreamProjection(def) für cross-aggregate, async Read-Models. Marten-aligned. Eine MSP aggregiert Events aus vielen Streams in einer View, gruppiert über einen Identity-Key, den das apply() aus dem Payload extrahiert.
r.multiStreamProjection({ name: "customer-revenue", table: customerRevenueTable, // optional — ohne `table` reine Side-Effect-MSP apply: { "invoice:event:paid": async (event, tx) => { // Identity (Customer-UUID) kommt aus dem Event-Payload, nicht aus aggregate_id. const p = event.payload as { customer: string; amountCents: number }; await tx.insert(customerRevenueTable).values(...).onConflictDoUpdate(...); }, },});Unterschiede zu r.projection():
r.projection | r.multiStreamProjection | |
|---|---|---|
| Lifecycle | inline (Write-TX) | async (event-dispatcher-cursor) |
| Source | single entity | cross-aggregate, event-type-driven |
| Delivery | exactly-once (TX-atomar) | at-least-once, strictly-ordered per MSP |
| Latenz | 0 (same TX) | TCP-roundtrip (LISTEN/NOTIFY) |
| Einsatz | consistent read-after-write | Saga, cross-feature-view, audit, webhooks |
Table-less MSPs sind der Ersatz für den klassischen Pub/Sub-Subscriber: Mail senden, Webhook posten, externes System synchronisieren. Die apply bekommt weiter ihre TX, kann aber auch nur Side-Effects produzieren.
Naming: teilt den Namensraum mit r.projection — beide kebab-case-qualifiziert als <feature>:projection:<name>. Doppel-Registration desselben Namens (egal ob single- oder multi-stream) wird beim Boot abgelehnt.
Cursor: jede MSP bekommt eine eigene Row in kumiko_event_consumers. Handler-Fehler setzen den Consumer bei maxAttempts auf dead — andere MSPs laufen ungehindert weiter. Ops-Tools (restartConsumer, skipPoisonEvent) gelten 1:1.
Was (noch) nicht da ist
- Async Apply für Single-Stream Projections:
mode: "async"für hochvolumige Analytics-Projektionen. MSP ist der Work-around; echte async-single-stream ist B.3+. - Shadow-Swap Rebuild: für Replay-Zeiten jenseits des akzeptablen Maintenance-Windows. Dokumentierte Scale-Grenze.
- Per-Tenant Rebuild: aktuell global-only.
- MSP-Rebuild: aktuell nur live-apply — eine MSP nach Schema-Änderung zurückrollen + reseeden ist manuell (TRUNCATE + Cursor zurücksetzen).
r.reducerProjection()Zucker: Funktionaler Reducer-Stil über der imperative API. Wenn echter Use-Case auftaucht.
Contracts
- apply() läuft in der TX. Werfen = Rollback.
event.tenantIdist die Wahrheit für Tenant-Scoping. apply() MUSS sie in jede geschriebene Row übernehmen, sonst leakt cross-tenant.- Event-Schema ist stabil.
unit.updated.payload.previousist nicht optional, auch nicht zur Migration — falls sich das ändert, ist’s ein Upcaster (Phase 4,r.eventMigration()). - Projection-Namen sind kebab-case. Wird zur Registrierungs-Zeit validiert.
- source muss existierende Entity referenzieren. Boot-Zeit-Check.
- apply-Keys müssen bekannten Event-Typen entsprechen (
<source>.created|updated|deleted|restored). Boot-Zeit-Check verhindert Typos als Silent-No-Ops.
Sensitive Fields — GDPR / Compliance
Events sind ein append-only, immutable Log. Ohne Schutz landet jedes Feld-Update bei jeder Änderung permanent in der Historie — das kollidiert mit Right-to-be-Forgotten, Secrets-Rotation, und Datenschutz-Audits.
Feld-Flag sensitive: true:
const userEntity = createEntity({ fields: { email: createTextField({ required: true }), passwordHash: createTextField({ sensitive: true }), // nie im Event-Log apiToken: createTextField({ sensitive: true }), iban: createTextField({ sensitive: true }), },});Wirkung: Sensitive Felder werden aus allen Event-Payloads gestrippt — create.data, update.changes, update.previous, delete.previous, restore.previous. Die Entity-Tabelle bekommt sie weiterhin vollständig; nur das immutable Event-Log ist bereinigt.
Konsequenzen die der Feature-Autor akzeptiert:
- Custom-Projections können sensitive Felder nicht lesen — ein
user.passwordtaucht nie in einem apply()-Event auf. - Replay / Rebuild aus Events rekonstruiert sensitive Felder nicht. Projektionen über diese Felder sind nicht möglich.
- Historie enthält keine sensitive-Änderungen. Wenn der Audit-Pfad “wer hat das Passwort wann geändert” braucht, reicht der Event-Zeitpunkt (
event.createdAt+event.createdBy), aber nicht der alte/neue Wert.
Faustregel: Ein Feld ist sensitive, wenn seine Historie löschbar sein muss oder sein Wert nie in Log-Files auftauchen darf. Passwort-Hashes, API-Tokens, PII (unverschlüsselt), Bank-Daten, Steuer-IDs.