Skip to content

Projections

Status: Implementiert (Sprint B.1 — 2026-04-16). Multi-Stream-Variante ergänzt Sprint E (2026-04-17). Siehe auch: event-sourcing-pivot.md §2.1 Level 3 / §3.3


Was ist das

Eine Projection ist ein user-definiertes Read-Model, gefüttert aus den Events einer oder mehrerer Entities. Das Framework liefert zwei “Projection-Arten” — die Auto-Projection (die tasks/units/… Entity-Tabelle, die r.crud() automatisch generiert) und die Custom Projection, die User via r.projection() registrieren.

Beispiel: units_per_property — ein Counter pro Property, gefüttert aus unit.created/updated/deleted/restored. Der Counter landet in einer eigenen Drizzle-Tabelle, gelesen direkt per Query.


API

import { integer, table as pgTable, uuid } from "@kumiko/framework/db";
import type { ProjectionDefinition } from "@kumiko/framework/engine";
import { sql } from "drizzle-orm";
// 1. User besitzt die Drizzle-Tabelle
export const unitsPerPropertyTable = pgTable("mn_units_per_property", {
propertyId: uuid("property_id").primaryKey(),
tenantId: uuid("tenant_id").notNull(),
unitCount: integer("unit_count").notNull().default(0),
});
// 2. User schreibt die apply-Handler
export const unitsPerProperty: ProjectionDefinition = {
name: "units-per-property",
source: "unit",
table: unitsPerPropertyTable,
apply: {
"unit.created": async (event, tx) => {
const propertyId = event.payload["propertyId"] as string;
await tx
.insert(unitsPerPropertyTable)
.values({ propertyId, tenantId: event.tenantId, unitCount: 1 })
.onConflictDoUpdate({
target: unitsPerPropertyTable.propertyId,
set: { unitCount: sql`${unitsPerPropertyTable.unitCount} + 1` },
});
},
// ... "unit.updated" / "unit.deleted" / "unit.restored"
},
};
// 3. Feature registriert die Projection
defineFeature("masterdata", (r) => {
r.entity("unit", unitEntity);
r.projection(unitsPerProperty);
});

Design-Entscheidungen

Imperativ, nicht funktional

apply(event, tx) gibt dem User die TX-scoped Drizzle-Connection. User schreibt INSERT ... ON CONFLICT DO UPDATE, Postgres garantiert Atomicity.

Funktional ((state, event) => newState) wäre eleganter für 1-Row-pro-Aggregate Projektionen, scheitert aber an cross-aggregate-Races: zwei gleichzeitige unit.created-Events für dieselbe Property müssten SELECT-FOR-UPDATE oder optimistic-retry bauen — Framework-Code, den SQL-UPSERT kostenlos erledigt.

Same-TX, nicht eventual

Projections laufen im Dispatcher, in derselben TX wie das Event-Append, direkt nach dem handler-return und vor den postSave-Hooks. Wirft ein apply() → TX rollback → Event weg, Entity-Tabelle unangetastet, alle Projections unangetastet.

Event-Shape: previous ist Pflicht

Für cross-aggregate-Projections (wie units_per_property) braucht apply() für update/delete den vorherigen Zustand. Wenn sich propertyId ändert, muss der Counter der alten Property dekrementiert werden — ohne previous wäre das unlösbar.

Deshalb tragen *.updated Events payload: { changes, previous }, *.deleted Events payload: { previous }, *.restored Events payload: { previous }. Payloads werden größer, Rebuild aus Events allein bleibt möglich — das ist der Deal.

Source-Validation zur Boot-Zeit

r.projection({ source: "unti" }) (Typo) wäre ohne Guard ein silent no-op. createRegistry() lehnt das ab — Fail-Fast beim Boot.

Kein Opt-In pro Aufruf

Projections werden automatisch vom Dispatcher getriggert, sobald ein write-handler ein SaveContext/DeleteContext mit event: StoredEvent zurückgibt. Der Executor füllt das immer aus. Manuelle Handler müssen nichts durchreichen, keine registry-Option vergessen.

Tabellen-Setup

setupTestStack und createApp pushen automatisch alle registrierten Projection-Tabellen via pushTables(). Tests müssen die Tabellen nicht separat anlegen. Zwei Projections auf derselben Drizzle-Tabelle werden per Referenz dedupliziert.

Rebuild

Projections sind per Definition wegwerfbar: DB drop → yarn kumiko project rebuild <name> → Zustand ist wieder da. Events sind die Wahrheit.

CLI

Terminal window
yarn kumiko project list # alle registrierten Projections + Status
yarn kumiko project status <name> # Detail-Status (lastRebuildAt, Errors)
yarn kumiko project rebuild <name> # TRUNCATE + Events replay in einer TX

Die CLI lädt Features aus ./kumiko.config.ts im current working directory:

kumiko.config.ts
import { masterdataFeature } from "./src/features/masterdata";
export default { features: [masterdataFeature] };

Ohne diese Datei bekommt der User eine hilfreiche Fehlermeldung mit dem erwarteten Pfad.

Mechanik

  1. State-Row für Projection upsert mit status: "rebuilding"
  2. TRUNCATE der Projection-Tabelle
  3. Events chronologisch lesen (gefiltert auf source-Entities + subscribed apply-Keys)
  4. Pro Event: passende apply()-Function aufrufen, mit der aktiven TX
  5. State-Row auf status: "idle", lastProcessedEventId, lastRebuildAt updaten

Alles in einer einzigen TX. Wirft apply() → Postgres rollbackt → alte Projection bleibt intakt. State-Row wird in separatem Write auf status: "failed" mit Error-Message gesetzt.

Grenzen / Ops-Regeln

  • Während Rebuild blockieren Writes auf die gleiche Projection-Tabelle. Rebuild ist ein Ops-Operation — lauf ihn auf quiet Entities oder in einem geplanten Write-Pause-Fenster.
  • Scale-Grenze: Single-TX TRUNCATE + Replay läuft, solange dein Maintenance-Window die Replay-Laufzeit aufnimmt. Messen, bevor du’s ernst nimmst — die effektive Grenze hängt an Event-Payload-Größe, apply()-Kosten und DB-Last. Wenn Replay deine akzeptable Downtime überschreitet, ist ein Shadow-Swap-Variante fällig (zwei Tabellen + atomic rename) — nicht in v1.
  • Pro Projection rebuildet, nicht pro Tenant: globaler Rebuild, keine tenant-wise Einschränkung. Bei 1000+ Tenants im System ist das bewusste Limitierung.
  • Apply muss idempotent sein — wie in der Live-Pipeline. onConflictDoUpdate statt blindes INSERT.

State-Tabelle

kumiko_projections (vom Framework gepflegt):

SpalteTypZweck
nametext (PK)Fully-qualified Projection-Name
lastProcessedEventIdbigintCursor — ID des letzten replayed Events
statustextidle / rebuilding / failed
lastRebuildAttimestampWanduhr-Zeit des letzten erfolgreichen Rebuilds
lastErrortextLetzte Fehlermeldung (wenn status = failed)
updatedAttimestampAuto-updated bei jeder Status-Änderung

Observability

  • kumiko_projection_rebuild_duration_seconds (Histogram, Labels: projection, success)
  • kumiko_projection_rebuild_events_total (Counter, Label: projection)

Emittiert nur beim Rebuild — nicht kontinuierlich. Eine Lag-Metrik gibt’s nicht, weil Apply synchron in der Schreib-TX läuft (Lag ist by-definition 0). Kommt erst mit async-Apply.

Multi-Stream-Projections

Seit Sprint E: r.multiStreamProjection(def) für cross-aggregate, async Read-Models. Marten-aligned. Eine MSP aggregiert Events aus vielen Streams in einer View, gruppiert über einen Identity-Key, den das apply() aus dem Payload extrahiert.

r.multiStreamProjection({
name: "customer-revenue",
table: customerRevenueTable, // optional — ohne `table` reine Side-Effect-MSP
apply: {
"invoice:event:paid": async (event, tx) => {
// Identity (Customer-UUID) kommt aus dem Event-Payload, nicht aus aggregate_id.
const p = event.payload as { customer: string; amountCents: number };
await tx.insert(customerRevenueTable).values(...).onConflictDoUpdate(...);
},
},
});

Unterschiede zu r.projection():

r.projectionr.multiStreamProjection
Lifecycleinline (Write-TX)async (event-dispatcher-cursor)
Sourcesingle entitycross-aggregate, event-type-driven
Deliveryexactly-once (TX-atomar)at-least-once, strictly-ordered per MSP
Latenz0 (same TX)TCP-roundtrip (LISTEN/NOTIFY)
Einsatzconsistent read-after-writeSaga, cross-feature-view, audit, webhooks

Table-less MSPs sind der Ersatz für den klassischen Pub/Sub-Subscriber: Mail senden, Webhook posten, externes System synchronisieren. Die apply bekommt weiter ihre TX, kann aber auch nur Side-Effects produzieren.

Naming: teilt den Namensraum mit r.projection — beide kebab-case-qualifiziert als <feature>:projection:<name>. Doppel-Registration desselben Namens (egal ob single- oder multi-stream) wird beim Boot abgelehnt.

Cursor: jede MSP bekommt eine eigene Row in kumiko_event_consumers. Handler-Fehler setzen den Consumer bei maxAttempts auf dead — andere MSPs laufen ungehindert weiter. Ops-Tools (restartConsumer, skipPoisonEvent) gelten 1:1.

Was (noch) nicht da ist

  • Async Apply für Single-Stream Projections: mode: "async" für hochvolumige Analytics-Projektionen. MSP ist der Work-around; echte async-single-stream ist B.3+.
  • Shadow-Swap Rebuild: für Replay-Zeiten jenseits des akzeptablen Maintenance-Windows. Dokumentierte Scale-Grenze.
  • Per-Tenant Rebuild: aktuell global-only.
  • MSP-Rebuild: aktuell nur live-apply — eine MSP nach Schema-Änderung zurückrollen + reseeden ist manuell (TRUNCATE + Cursor zurücksetzen).
  • r.reducerProjection() Zucker: Funktionaler Reducer-Stil über der imperative API. Wenn echter Use-Case auftaucht.

Contracts

  1. apply() läuft in der TX. Werfen = Rollback.
  2. event.tenantId ist die Wahrheit für Tenant-Scoping. apply() MUSS sie in jede geschriebene Row übernehmen, sonst leakt cross-tenant.
  3. Event-Schema ist stabil. unit.updated.payload.previous ist nicht optional, auch nicht zur Migration — falls sich das ändert, ist’s ein Upcaster (Phase 4, r.eventMigration()).
  4. Projection-Namen sind kebab-case. Wird zur Registrierungs-Zeit validiert.
  5. source muss existierende Entity referenzieren. Boot-Zeit-Check.
  6. apply-Keys müssen bekannten Event-Typen entsprechen (<source>.created|updated|deleted|restored). Boot-Zeit-Check verhindert Typos als Silent-No-Ops.

Sensitive Fields — GDPR / Compliance

Events sind ein append-only, immutable Log. Ohne Schutz landet jedes Feld-Update bei jeder Änderung permanent in der Historie — das kollidiert mit Right-to-be-Forgotten, Secrets-Rotation, und Datenschutz-Audits.

Feld-Flag sensitive: true:

const userEntity = createEntity({
fields: {
email: createTextField({ required: true }),
passwordHash: createTextField({ sensitive: true }), // nie im Event-Log
apiToken: createTextField({ sensitive: true }),
iban: createTextField({ sensitive: true }),
},
});

Wirkung: Sensitive Felder werden aus allen Event-Payloads gestrippt — create.data, update.changes, update.previous, delete.previous, restore.previous. Die Entity-Tabelle bekommt sie weiterhin vollständig; nur das immutable Event-Log ist bereinigt.

Konsequenzen die der Feature-Autor akzeptiert:

  • Custom-Projections können sensitive Felder nicht lesen — ein user.password taucht nie in einem apply()-Event auf.
  • Replay / Rebuild aus Events rekonstruiert sensitive Felder nicht. Projektionen über diese Felder sind nicht möglich.
  • Historie enthält keine sensitive-Änderungen. Wenn der Audit-Pfad “wer hat das Passwort wann geändert” braucht, reicht der Event-Zeitpunkt (event.createdAt + event.createdBy), aber nicht der alte/neue Wert.

Faustregel: Ein Feld ist sensitive, wenn seine Historie löschbar sein muss oder sein Wert nie in Log-Files auftauchen darf. Passwort-Hashes, API-Tokens, PII (unverschlüsselt), Bank-Daten, Steuer-IDs.