← Week 3: Event Sourcing & CQRS

Day 17: Event Store Design

Phase 4 · Aug 7, 2026

← Week 3: Event Sourcing & CQRS

Agenda (2–3 hours)

  • Read (45 min): EventStoreDB documentation; Greg Young "Building an Event Store" blog post
  • Study (45 min): Design the schema for an event store; what indexes do you need? What is "wrong version" optimistic concurrency?
  • Practice (45 min): Implement a simple append-only event store on top of PostgreSQL; use row-level locking for optimistic concurrency
  • Challenge (30 min): How does EventStoreDB implement its own storage engine instead of using an external database? What advantages does purpose-built storage give?
← Week 3: Event Sourcing & CQRS

Event Store Schema

CREATE TABLE events (
    global_seq   BIGSERIAL PRIMARY KEY,    -- global ordering across all streams
    stream_id    TEXT NOT NULL,            -- e.g., "account-42", "order-uuid"
    stream_seq   BIGINT NOT NULL,          -- per-stream ordering (for optimistic lock)
    event_type   TEXT NOT NULL,            -- e.g., "Deposited", "OrderPlaced"
    data         JSONB NOT NULL,           -- event payload
    metadata     JSONB,                    -- causation ID, correlation ID, user
    created_at   TIMESTAMPTZ DEFAULT now(),
    UNIQUE (stream_id, stream_seq)         -- enforce per-stream ordering
);

CREATE INDEX events_stream ON events (stream_id, stream_seq);
CREATE INDEX events_global ON events (global_seq);
← Week 3: Event Sourcing & CQRS

Optimistic Concurrency Control

The "wrong version" check prevents two concurrent writes from conflicting:

async fn append_event(
    pool: &PgPool,
    stream_id: &str,
    expected_version: i64,  // expected current max stream_seq
    event: &Event,
) -> Result<(), ConcurrencyError> {
    let result = sqlx::query!(
        r#"
        INSERT INTO events (stream_id, stream_seq, event_type, data)
        VALUES ($1,
                (SELECT COALESCE(MAX(stream_seq), -1) + 1
                 FROM events WHERE stream_id = $1),
                $2, $3)
        WHERE (SELECT COALESCE(MAX(stream_seq), -1) FROM events WHERE stream_id = $1) = $4
        "#,
        stream_id, event.event_type, event.data, expected_version
    )
    .execute(pool)
    .await?;

    if result.rows_affected() == 0 {
        Err(ConcurrencyError::WrongVersion)
    } else {
        Ok(())
    }
}
← Week 3: Event Sourcing & CQRS

Event Subscriptions (Push-Based)

Projectors need to be notified when new events arrive:

PostgreSQL LISTEN/NOTIFY:

-- After each INSERT into events:
NOTIFY new_events, '{"global_seq": 42}';
// Projector subscribes
let mut listener = PgListener::connect(database_url).await?;
listener.listen("new_events").await?;
while let Some(notification) = listener.recv().await? {
    let seq: i64 = serde_json::from_str(notification.payload())?;
    project_events_since(seq).await?;
}

For large-scale event distribution, replace NOTIFY with Kafka or Kinesis.

← Week 3: Event Sourcing & CQRS

Key Takeaways

  • Event store schema: (global_seq, stream_id, stream_seq, event_type, data)
  • Optimistic concurrency: expected_version check prevents lost updates without locks
  • Subscriptions: LISTEN/NOTIFY for low-volume; Kafka/Kinesis for high-throughput projection
  • EventStoreDB: purpose-built with optimized storage for sequential event log access

Tomorrow: Outbox pattern — reliable event publishing with database transactions.