Loading
Implement an event store with projections, snapshots, replay, and CQRS read models in Node.js.
Event sourcing stores every state change as an immutable event rather than overwriting current state. Instead of a users table with the latest data, you have an events table that records every action: UserCreated, EmailChanged, AccountDeactivated. Current state is derived by replaying events from the beginning.
This pattern powers systems at LinkedIn, Netflix, and most financial platforms where auditability and temporal queries are essential. In this tutorial, you will build a complete event sourcing system with an append-only event store, projections that compute read models, snapshots for performance, and full replay capability.
We use Node.js with SQLite (via the built-in node:sqlite module in Node 22+, or better-sqlite3 for older versions). Everything runs cross-platform with no external services.
Events are the foundation. Each event is immutable, timestamped, and belongs to an aggregate.
The event store is an append-only log. Events can never be modified or deleted.
Aggregates encapsulate domain logic. They accept commands and produce events.
Build a BankAccount aggregate that handles deposits, withdrawals, and tracks balance.
Projections transform events into read-optimized views. They subscribe to events and maintain denormalized state.
Replaying thousands of events on every load is slow. Snapshots save aggregate state at a point in time.
The repository combines the event store and snapshot store to load and save aggregates.
Replay lets you rebuild all projections from scratch. This is essential for adding new projections to existing data.
CQRS separates write operations (commands) from read operations (queries).
Wire everything together and exercise the full event sourcing pipeline.
Run with npx tsx src/main.ts. You will see the accounts created via commands, queried via projections, and verified by replaying the full event history. The event log in data/events.json provides a complete audit trail of every state change.
From here, you could add event versioning for schema evolution, async projection handlers, or a real database backend like PostgreSQL with its built-in JSON support.
// src/events.ts
export interface DomainEvent {
id: string;
aggregateId: string;
aggregateType: string;
eventType: string;
payload: Record<string, unknown>;
version: number;
timestamp: string;
metadata: Record<string, unknown>;
}
export function createEvent(
aggregateId: string,
aggregateType: string,
eventType: string,
payload: Record<string, unknown>,
version: number,
metadata: Record<string, unknown> = {}
): DomainEvent {
return {
id: crypto.randomUUID(),
aggregateId,
aggregateType,
eventType,
payload,
version,
timestamp: new Date().toISOString(),
metadata,
};
}// src/event-store.ts
import { readFileSync } from "node:fs";
import { writeFileSync, mkdirSync } from "node:fs";
import { join } from "node:path";
import { DomainEvent } from "./events.js";
export class EventStore {
private events: DomainEvent[] = [];
private storagePath: string;
constructor(dataDir: string = "./data") {
mkdirSync(dataDir, { recursive: true });
this.storagePath = join(dataDir, "events.json");
try {
const data = readFileSync(this.storagePath, "utf-8");
this.events = JSON.parse(data);
} catch {
this.events = [];
}
}
append(event: DomainEvent): void {
const existing = this.getEventsForAggregate(event.aggregateId);
const expectedVersion = existing.length;
if (event.version !== expectedVersion) {
throw new Error(
`Concurrency conflict: expected version ${expectedVersion} but got ${event.version}`
);
}
this.events.push(Object.freeze(event));
this.persist();
}
appendBatch(events: DomainEvent[]): void {
for (const event of events) this.append(event);
}
getEventsForAggregate(aggregateId: string): DomainEvent[] {
return this.events.filter((e) => e.aggregateId === aggregateId);
}
getEventsByType(eventType: string): DomainEvent[] {
return this.events.filter((e) => e.eventType === eventType);
}
getAllEvents(): DomainEvent[] {
return [...this.events];
}
getEventsAfter(timestamp: string): DomainEvent[] {
return this.events.filter((e) => e.timestamp > timestamp);
}
private persist(): void {
writeFileSync(this.storagePath, JSON.stringify(this.events, null, 2));
}
}// src/aggregate.ts
import { DomainEvent, createEvent } from "./events.js";
export abstract class Aggregate {
protected version = 0;
private uncommittedEvents: DomainEvent[] = [];
constructor(
public readonly id: string,
public readonly type: string
) {}
protected addEvent(eventType: string, payload: Record<string, unknown>): void {
const event = createEvent(this.id, this.type, eventType, payload, this.version);
this.apply(event);
this.uncommittedEvents.push(event);
this.version++;
}
loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.apply(event);
this.version++;
}
}
getUncommittedEvents(): DomainEvent[] {
return [...this.uncommittedEvents];
}
clearUncommittedEvents(): void {
this.uncommittedEvents = [];
}
protected abstract apply(event: DomainEvent): void;
}// src/bank-account.ts
import { Aggregate } from "./aggregate.js";
import { DomainEvent } from "./events.js";
export class BankAccount extends Aggregate {
private balance = 0;
private ownerName = "";
private isActive = true;
constructor(id: string) {
super(id, "BankAccount");
}
static create(id: string, ownerName: string): BankAccount {
const account = new BankAccount(id);
account.addEvent("AccountOpened", { ownerName, initialBalance: 0 });
return account;
}
deposit(amount: number): void {
if (amount <= 0) throw new Error("Deposit amount must be positive");
if (!this.isActive) throw new Error("Account is closed");
this.addEvent("MoneyDeposited", { amount });
}
withdraw(amount: number): void {
if (amount <= 0) throw new Error("Withdrawal amount must be positive");
if (!this.isActive) throw new Error("Account is closed");
if (amount > this.balance) throw new Error("Insufficient funds");
this.addEvent("MoneyWithdrawn", { amount });
}
close(): void {
if (!this.isActive) throw new Error("Account already closed");
this.addEvent("AccountClosed", { finalBalance: this.balance });
}
getBalance(): number {
return this.balance;
}
getOwnerName(): string {
return this.ownerName;
}
protected apply(event: DomainEvent): void {
switch (event.eventType) {
case "AccountOpened":
this.ownerName = event.payload.ownerName as string;
this.balance = 0;
this.isActive = true;
break;
case "MoneyDeposited":
this.balance += event.payload.amount as number;
break;
case "MoneyWithdrawn":
this.balance -= event.payload.amount as number;
break;
case "AccountClosed":
this.isActive = false;
break;
}
}
}// src/projections.ts
import { DomainEvent } from "./events.js";
export interface AccountSummary {
id: string;
ownerName: string;
balance: number;
isActive: boolean;
transactionCount: number;
lastActivity: string;
}
export class AccountSummaryProjection {
private summaries: Map<string, AccountSummary> = new Map();
handle(event: DomainEvent): void {
switch (event.eventType) {
case "AccountOpened": {
this.summaries.set(event.aggregateId, {
id: event.aggregateId,
ownerName: event.payload.ownerName as string,
balance: 0,
isActive: true,
transactionCount: 0,
lastActivity: event.timestamp,
});
break;
}
case "MoneyDeposited": {
const s = this.summaries.get(event.aggregateId);
if (s) {
s.balance += event.payload.amount as number;
s.transactionCount++;
s.lastActivity = event.timestamp;
}
break;
}
case "MoneyWithdrawn": {
const s = this.summaries.get(event.aggregateId);
if (s) {
s.balance -= event.payload.amount as number;
s.transactionCount++;
s.lastActivity = event.timestamp;
}
break;
}
case "AccountClosed": {
const s = this.summaries.get(event.aggregateId);
if (s) s.isActive = false;
break;
}
}
}
getAll(): AccountSummary[] {
return Array.from(this.summaries.values());
}
getById(id: string): AccountSummary | undefined {
return this.summaries.get(id);
}
getTotalBalance(): number {
return this.getAll().reduce((sum, a) => sum + a.balance, 0);
}
}// src/snapshots.ts
import { readFileSync, writeFileSync, mkdirSync } from "node:fs";
import { join } from "node:path";
export interface Snapshot {
aggregateId: string;
version: number;
state: Record<string, unknown>;
timestamp: string;
}
export class SnapshotStore {
private snapshots: Map<string, Snapshot> = new Map();
private storagePath: string;
constructor(dataDir: string = "./data") {
mkdirSync(dataDir, { recursive: true });
this.storagePath = join(dataDir, "snapshots.json");
try {
const data = JSON.parse(readFileSync(this.storagePath, "utf-8"));
for (const s of data) this.snapshots.set(s.aggregateId, s);
} catch {
// No snapshots yet
}
}
save(aggregateId: string, version: number, state: Record<string, unknown>): void {
const snapshot: Snapshot = {
aggregateId,
version,
state,
timestamp: new Date().toISOString(),
};
this.snapshots.set(aggregateId, snapshot);
this.persist();
}
get(aggregateId: string): Snapshot | undefined {
return this.snapshots.get(aggregateId);
}
private persist(): void {
writeFileSync(this.storagePath, JSON.stringify(Array.from(this.snapshots.values()), null, 2));
}
}// src/repository.ts
import { EventStore } from "./event-store.js";
import { SnapshotStore } from "./snapshots.js";
import { BankAccount } from "./bank-account.js";
const SNAPSHOT_INTERVAL = 10;
export class BankAccountRepository {
constructor(
private eventStore: EventStore,
private snapshotStore: SnapshotStore
) {}
save(account: BankAccount): void {
const events = account.getUncommittedEvents();
this.eventStore.appendBatch(events);
account.clearUncommittedEvents();
if (account["version"] % SNAPSHOT_INTERVAL === 0) {
this.snapshotStore.save(account.id, account["version"], {
balance: account.getBalance(),
ownerName: account.getOwnerName(),
});
}
}
load(id: string): BankAccount {
const account = new BankAccount(id);
const snapshot = this.snapshotStore.get(id);
let events = this.eventStore.getEventsForAggregate(id);
if (snapshot) {
events = events.filter((e) => e.version >= snapshot.version);
}
account.loadFromHistory(events);
return account;
}
}// src/replay.ts
import { EventStore } from "./event-store.js";
import { AccountSummaryProjection } from "./projections.js";
export function replayAllEvents(eventStore: EventStore): AccountSummaryProjection {
const projection = new AccountSummaryProjection();
const events = eventStore.getAllEvents();
console.log(`Replaying ${events.length} events...`);
const start = performance.now();
for (const event of events) {
projection.handle(event);
}
const elapsed = (performance.now() - start).toFixed(2);
console.log(`Replay complete in ${elapsed}ms`);
console.log(`Accounts: ${projection.getAll().length}`);
console.log(`Total balance: $${projection.getTotalBalance()}`);
return projection;
}// src/command-bus.ts
import { BankAccountRepository } from "./repository.js";
import { BankAccount } from "./bank-account.js";
import { AccountSummaryProjection } from "./projections.js";
interface OpenAccountCommand {
type: "OpenAccount";
id: string;
ownerName: string;
}
interface DepositCommand {
type: "Deposit";
accountId: string;
amount: number;
}
interface WithdrawCommand {
type: "Withdraw";
accountId: string;
amount: number;
}
interface CloseAccountCommand {
type: "CloseAccount";
accountId: string;
}
type Command = OpenAccountCommand | DepositCommand | WithdrawCommand | CloseAccountCommand;
export class CommandBus {
constructor(
private repository: BankAccountRepository,
private projection: AccountSummaryProjection
) {}
execute(command: Command): void {
switch (command.type) {
case "OpenAccount": {
const account = BankAccount.create(command.id, command.ownerName);
this.repository.save(account);
for (const e of account.getUncommittedEvents()) this.projection.handle(e);
break;
}
case "Deposit": {
const account = this.repository.load(command.accountId);
account.deposit(command.amount);
const events = account.getUncommittedEvents();
this.repository.save(account);
for (const e of events) this.projection.handle(e);
break;
}
case "Withdraw": {
const account = this.repository.load(command.accountId);
account.withdraw(command.amount);
const events = account.getUncommittedEvents();
this.repository.save(account);
for (const e of events) this.projection.handle(e);
break;
}
case "CloseAccount": {
const account = this.repository.load(command.accountId);
account.close();
const events = account.getUncommittedEvents();
this.repository.save(account);
for (const e of events) this.projection.handle(e);
break;
}
}
}
}// src/main.ts
import { EventStore } from "./event-store.js";
import { SnapshotStore } from "./snapshots.js";
import { BankAccountRepository } from "./repository.js";
import { AccountSummaryProjection } from "./projections.js";
import { CommandBus } from "./command-bus.js";
import { replayAllEvents } from "./replay.js";
const eventStore = new EventStore("./data");
const snapshotStore = new SnapshotStore("./data");
const repository = new BankAccountRepository(eventStore, snapshotStore);
const projection = new AccountSummaryProjection();
const bus = new CommandBus(repository, projection);
// Execute commands
bus.execute({ type: "OpenAccount", id: "acc-001", ownerName: "Alice" });
bus.execute({ type: "Deposit", accountId: "acc-001", amount: 1000 });
bus.execute({ type: "Withdraw", accountId: "acc-001", amount: 250 });
bus.execute({ type: "OpenAccount", id: "acc-002", ownerName: "Bob" });
bus.execute({ type: "Deposit", accountId: "acc-002", amount: 500 });
// Query the read model
console.log("\n--- Account Summaries ---");
for (const summary of projection.getAll()) {
console.log(`${summary.ownerName}: $${summary.balance} (${summary.transactionCount} txns)`);
}
// Replay from scratch to verify consistency
console.log("\n--- Replaying All Events ---");
const replayed = replayAllEvents(eventStore);
for (const summary of replayed.getAll()) {
console.log(`${summary.ownerName}: $${summary.balance}`);
}