Skip to main content
event sourcing and cqrs in practice

Projections: Rebuilding State from Events and the Exactly-Once Problem

12 min read Chapter 7 of 17

Projections

Projection pipeline

The projection engine polls the event store for new events, processes them through projection handlers, and writes the results to read model databases. Each projection tracks its position in the global event stream. The lag between the projection’s position and the event store’s latest position measures how far behind the read side is from the write side.

A projection is an event consumer that builds a read model. It reads events from the event store in global position order, deserializes each event, and updates a denormalized table, cache, or search index. The read model serves queries. The projection keeps the read model synchronized with the event store.

This sounds straightforward until you consider three operational realities. The projection process can crash and restart. Events can be processed more than once during recovery. The read model can fall behind the event store during traffic spikes. Each of these realities requires a specific engineering solution.

The Projection Loop

The Problem

The event store is append-only. Events arrive continuously as commands are processed. The read model must be kept up to date. A batch job that runs every hour is not acceptable for a customer-facing order summary. A synchronous update within the command transaction couples the write and read paths. The projection loop is the asynchronous middle ground: a continuously running process that polls the event store for new events and applies them to the read model.

The Mechanism

The projection loop has four steps:

  1. Read the last processed global position from a tracking table.
  2. Query the event store for events after that position, in batches.
  3. For each event, update the read model.
  4. Update the tracking table with the last processed position.

Steps 3 and 4 must be atomic. If the read model is updated but the position is not, the event will be processed again on restart. If the position is updated but the read model is not, the event is skipped forever. Both outcomes are incorrect.

-- FROM SCRATCH
CREATE TABLE projection_positions (
    projection_name VARCHAR(255) PRIMARY KEY,
    last_position   BIGINT NOT NULL DEFAULT 0,
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

The From-Scratch Implementation

// FROM SCRATCH
public class ProjectionEngine {

    private final EventStore eventStore;
    private final DataSource dataSource;
    private final EventTypeRegistry registry;
    private final Map<String, EventProjection> projections;
    private final int batchSize;
    private final Duration pollInterval;
    private volatile boolean running;

    public ProjectionEngine(
            EventStore eventStore,
            DataSource dataSource,
            EventTypeRegistry registry,
            int batchSize,
            Duration pollInterval) {
        this.eventStore = eventStore;
        this.dataSource = dataSource;
        this.registry = registry;
        this.projections = new LinkedHashMap<>();
        this.batchSize = batchSize;
        this.pollInterval = pollInterval;
    }

    public void register(String name, EventProjection projection) {
        projections.put(name, projection);
    }

    public void start() {
        running = true;
        for (var entry : projections.entrySet()) {
            Thread.ofVirtual().name("projection-" + entry.getKey()).start(() ->
                runProjectionLoop(entry.getKey(), entry.getValue())
            );
        }
    }

    public void stop() {
        running = false;
    }

    private void runProjectionLoop(String name, EventProjection projection) {
        while (running) {
            try {
                long lastPosition = readPosition(name);
                List<StoredEvent> events = eventStore.readAllFromPosition(lastPosition, batchSize);

                if (events.isEmpty()) {
                    Thread.sleep(pollInterval.toMillis());
                    continue;
                }

                for (StoredEvent stored : events) {
                    processEvent(name, projection, stored);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                // Log error and continue after a delay
                try { Thread.sleep(pollInterval.toMillis()); }
                catch (InterruptedException ie) { Thread.currentThread().interrupt(); break; }
            }
        }
    }

    private void processEvent(String projectionName, EventProjection projection, StoredEvent stored) {
        try (Connection conn = dataSource.getConnection()) {
            conn.setAutoCommit(false);
            try {
                projection.process(stored, conn);
                updatePosition(projectionName, stored.globalPosition(), conn);
                conn.commit();
            } catch (Exception e) {
                conn.rollback();
                throw new ProjectionException(
                    "Failed to process event at position " + stored.globalPosition() +
                    " for projection " + projectionName, e
                );
            }
        } catch (SQLException e) {
            throw new ProjectionException("Connection failed", e);
        }
    }

    private long readPosition(String projectionName) {
        String sql = "SELECT last_position FROM projection_positions WHERE projection_name = ?";
        try (Connection conn = dataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement(sql)) {
            stmt.setString(1, projectionName);
            try (ResultSet rs = stmt.executeQuery()) {
                if (rs.next()) return rs.getLong("last_position");
                return 0;
            }
        } catch (SQLException e) {
            throw new ProjectionException("Failed to read position for " + projectionName, e);
        }
    }

    private void updatePosition(String projectionName, long position, Connection conn)
            throws SQLException {
        String sql = """
            INSERT INTO projection_positions (projection_name, last_position, updated_at)
            VALUES (?, ?, NOW())
            ON CONFLICT (projection_name) DO UPDATE SET
                last_position = EXCLUDED.last_position,
                updated_at = NOW()
            """;
        try (PreparedStatement stmt = conn.prepareStatement(sql)) {
            stmt.setString(1, projectionName);
            stmt.setLong(2, position);
            stmt.executeUpdate();
        }
    }
}

public interface EventProjection {
    void process(StoredEvent event, Connection conn) throws Exception;
}

What the Implementation Reveals

The processEvent method wraps both the projection update and the position update in a single transaction. This is the key to correctness. If the projection update succeeds but the position update fails (connection drop, disk full), the transaction rolls back and the event is reprocessed on the next loop iteration. If the position update succeeds but the projection update fails (constraint violation, data error), the transaction rolls back and the event is reprocessed. In both cases, the read model and the position are consistent.

This transactional guarantee requires that the projection’s read model and the position tracking table are in the same database. If the read model is in Redis or Elasticsearch, this transactional approach does not work. Chapter 8 covers the alternatives for non-relational projection targets.

The polling loop uses Thread.sleep during idle periods. This is deliberate. Event polling from PostgreSQL is cheap (an indexed query returning zero rows completes in microseconds), and the polling interval can be tuned (100ms to 1s depending on latency requirements). The alternative, PostgreSQL LISTEN/NOTIFY, eliminates polling but adds complexity. For most systems, polling is sufficient and simpler.

Virtual threads (Java 21’s Thread.ofVirtual()) are used for the projection loops. Each projection runs in its own virtual thread. Virtual threads are lightweight enough that running dozens of projections concurrently has negligible overhead.

The Exactly-Once Problem

Events must be processed exactly once by each projection. But distributed systems do not provide exactly-once delivery as a primitive. What they provide is at-least-once delivery (every event is delivered at least once, possibly more) combined with idempotent processing (processing an event more than once produces the same result as processing it once).

The position tracking in the previous section provides at-least-once delivery: if a crash occurs before the position is updated, the event is redelivered. Idempotent processing ensures that this redelivery does not corrupt the read model.

Idempotent Projection Updates

// FROM SCRATCH
public class IdempotentOrderSummaryProjection implements EventProjection {

    private final EventTypeRegistry registry;

    public IdempotentOrderSummaryProjection(EventTypeRegistry registry) {
        this.registry = registry;
    }

    @Override
    public void process(StoredEvent stored, Connection conn) throws Exception {
        OrderEvent event = registry.deserialize(stored.eventType(), stored.payload());

        switch (event) {
            case OrderPlaced e -> upsertOrder(e, conn);
            case OrderConfirmed e -> updateOrderStatus(e.orderId(), "CONFIRMED", e.occurredAt(), conn);
            case PaymentAuthorized e -> updateOrderStatus(e.orderId(), "PAYMENT_AUTHORIZED", e.occurredAt(), conn);
            case OrderFulfilled e -> updateOrderStatus(e.orderId(), "FULFILLED", e.occurredAt(), conn);
            case OrderCancelled e -> updateOrderStatus(e.orderId(), "CANCELLED", e.occurredAt(), conn);
            case ShippingAddressChanged e -> updateShippingCity(e, conn);
            case RefundRequested e -> updateRefundTotal(e, conn);
        }
    }

    private void upsertOrder(OrderPlaced event, Connection conn) throws SQLException {
        String sql = """
            INSERT INTO order_summary (order_id, customer_id, status, total, item_count,
                                       shipping_city, placed_at, last_updated_at)
            VALUES (?, ?, 'PLACED', ?, ?, ?, ?, ?)
            ON CONFLICT (order_id) DO UPDATE SET
                status = 'PLACED',
                total = EXCLUDED.total,
                item_count = EXCLUDED.item_count,
                shipping_city = EXCLUDED.shipping_city,
                last_updated_at = EXCLUDED.last_updated_at
            """;
        try (PreparedStatement stmt = conn.prepareStatement(sql)) {
            stmt.setString(1, event.orderId());
            stmt.setString(2, event.customerId());
            stmt.setBigDecimal(3, event.total());
            stmt.setInt(4, event.lineItems().size());
            stmt.setString(5, event.shippingAddress().city());
            stmt.setTimestamp(6, Timestamp.from(event.occurredAt()));
            stmt.setTimestamp(7, Timestamp.from(event.occurredAt()));
            stmt.executeUpdate();
        }
    }

    private void updateOrderStatus(String orderId, String status,
                                   Instant occurredAt, Connection conn) throws SQLException {
        String sql = "UPDATE order_summary SET status = ?, last_updated_at = ? WHERE order_id = ?";
        try (PreparedStatement stmt = conn.prepareStatement(sql)) {
            stmt.setString(1, status);
            stmt.setTimestamp(2, Timestamp.from(occurredAt));
            stmt.setString(3, orderId);
            stmt.executeUpdate();
        }
    }

    private void updateShippingCity(ShippingAddressChanged event, Connection conn) throws SQLException {
        String sql = "UPDATE order_summary SET shipping_city = ?, last_updated_at = ? WHERE order_id = ?";
        try (PreparedStatement stmt = conn.prepareStatement(sql)) {
            stmt.setString(1, event.newAddress().city());
            stmt.setTimestamp(2, Timestamp.from(event.occurredAt()));
            stmt.setString(3, event.orderId());
            stmt.executeUpdate();
        }
    }

    private void updateRefundTotal(RefundRequested event, Connection conn) throws SQLException {
        String sql = """
            UPDATE order_summary
            SET refunded_amount = COALESCE(refunded_amount, 0) + ?,
                last_updated_at = ?
            WHERE order_id = ?
            """;
        try (PreparedStatement stmt = conn.prepareStatement(sql)) {
            stmt.setBigDecimal(1, event.amount());
            stmt.setTimestamp(2, Timestamp.from(event.occurredAt()));
            stmt.setString(3, event.orderId());
            stmt.executeUpdate();
        }
    }
}

The upsertOrder method uses ON CONFLICT DO UPDATE. If the event is processed twice, the second processing overwrites the read model with the same values. The result is identical regardless of how many times the event is processed.

The updateRefundTotal method is not idempotent in the naive implementation. Processing a RefundRequested event twice adds the refund amount twice. The position tracking prevents this in normal operation, but during a crash recovery where the position was not updated, the event could be processed again.

// NAIVE - Not idempotent for additive operations
String sql = "UPDATE order_summary SET refunded_amount = refunded_amount + ? WHERE order_id = ?";

// PRODUCTION - Idempotent via processed event tracking
String checkSql = "SELECT 1 FROM processed_events WHERE projection_name = ? AND global_position = ?";
String insertSql = "INSERT INTO processed_events (projection_name, global_position) VALUES (?, ?)";

For projections where idempotency is not natural (counters, running totals, aggregations), a processed event tracking table provides the safety net. Before processing an event, check if it has already been processed. After processing, record the event position. Both operations are in the same transaction as the read model update.

-- PRODUCTION
CREATE TABLE processed_events (
    projection_name VARCHAR(255) NOT NULL,
    global_position BIGINT NOT NULL,
    processed_at    TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    PRIMARY KEY (projection_name, global_position)
);

This table grows linearly with the number of processed events. For high-throughput systems, the table can be partitioned by time and old partitions dropped periodically, since the position tracking already prevents reprocessing events older than the current position.

The Production Path

// PRODUCTION
@Component
public class SpringProjectionEngine {

    private final SpringJdbcEventStore eventStore;
    private final JdbcTemplate jdbc;
    private final List<NamedProjection> projections;

    @Value("${projection.batch-size:100}")
    private int batchSize;

    @Value("${projection.poll-interval-ms:200}")
    private long pollIntervalMs;

    public SpringProjectionEngine(SpringJdbcEventStore eventStore,
                                  JdbcTemplate jdbc,
                                  List<NamedProjection> projections) {
        this.eventStore = eventStore;
        this.jdbc = jdbc;
        this.projections = projections;
    }

    @Scheduled(fixedDelayString = "${projection.poll-interval-ms:200}")
    public void poll() {
        for (NamedProjection projection : projections) {
            processProjection(projection);
        }
    }

    @Transactional
    public void processProjection(NamedProjection projection) {
        long lastPosition = jdbc.queryForObject(
            "SELECT COALESCE(last_position, 0) FROM projection_positions WHERE projection_name = ?",
            Long.class,
            projection.name()
        );

        List<StoredEvent> events = eventStore.readAllFromPosition(lastPosition, batchSize);
        if (events.isEmpty()) return;

        for (StoredEvent event : events) {
            projection.process(event);
        }

        long newPosition = events.get(events.size() - 1).globalPosition();
        jdbc.update(
            """
            INSERT INTO projection_positions (projection_name, last_position, updated_at)
            VALUES (?, ?, NOW())
            ON CONFLICT (projection_name) DO UPDATE SET last_position = ?, updated_at = NOW()
            """,
            projection.name(), newPosition, newPosition
        );
    }
}

The Spring @Scheduled annotation replaces the manual polling loop. The @Transactional annotation wraps the entire batch in a single transaction. If any event processing fails, the entire batch rolls back and is retried on the next poll.

The Test

// FROM SCRATCH
@Testcontainers
class ProjectionEngineTest {

    @Container
    static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:16")
        .withDatabaseName("projection_test")
        .withInitScript("projection_schema.sql");

    private EventStore eventStore;
    private ProjectionEngine engine;
    private DataSource dataSource;

    @BeforeEach
    void setUp() {
        var ds = new org.postgresql.ds.PGSimpleDataSource();
        ds.setUrl(postgres.getJdbcUrl());
        ds.setUser(postgres.getUsername());
        ds.setPassword(postgres.getPassword());
        this.dataSource = ds;

        var mapper = new ObjectMapper();
        mapper.registerModule(new JavaTimeModule());
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);

        eventStore = new EventStore(ds, mapper);
        var registry = new EventTypeRegistry(mapper);

        engine = new ProjectionEngine(eventStore, ds, registry, 50, Duration.ofMillis(100));
        engine.register("order-summary", new IdempotentOrderSummaryProjection(registry));
    }

    @Test
    void projectionCatchesUpWithEventStore() throws Exception {
        // Write events
        for (int i = 0; i < 10; i++) {
            eventStore.append("order-" + i, -1, List.of(
                new OrderPlaced(
                    String.valueOf(i), "customer-1",
                    List.of(new LineItem("p1", "Widget", 1, BigDecimal.TEN)),
                    BigDecimal.TEN,
                    new Address("1", "City", "ST", "00000", "US"),
                    Instant.now()
                )
            ));
        }

        // Start projection engine
        engine.start();

        // Wait for catch-up
        Thread.sleep(2000);

        // Verify read model
        try (Connection conn = dataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement(
                 "SELECT COUNT(*) FROM order_summary WHERE customer_id = 'customer-1'"
             )) {
            try (ResultSet rs = stmt.executeQuery()) {
                rs.next();
                assertEquals(10, rs.getInt(1));
            }
        }

        engine.stop();
    }

    @Test
    void projectionResumesFromLastPosition() throws Exception {
        // Write 5 events
        for (int i = 0; i < 5; i++) {
            eventStore.append("order-a" + i, -1, List.of(
                new OrderPlaced(
                    "a" + i, "customer-2", List.of(), BigDecimal.ZERO,
                    new Address("1", "C", "S", "0", "US"), Instant.now()
                )
            ));
        }

        // Process first batch
        engine.start();
        Thread.sleep(1000);
        engine.stop();

        // Write 5 more events
        for (int i = 5; i < 10; i++) {
            eventStore.append("order-a" + i, -1, List.of(
                new OrderPlaced(
                    "a" + i, "customer-2", List.of(), BigDecimal.ZERO,
                    new Address("1", "C", "S", "0", "US"), Instant.now()
                )
            ));
        }

        // Restart and catch up
        engine.start();
        Thread.sleep(1000);
        engine.stop();

        try (Connection conn = dataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement(
                 "SELECT COUNT(*) FROM order_summary WHERE customer_id = 'customer-2'"
             )) {
            try (ResultSet rs = stmt.executeQuery()) {
                rs.next();
                assertEquals(10, rs.getInt(1));
            }
        }
    }
}

Projection Lag

Projection lag is the difference between the latest event in the event store and the last event processed by a projection. In normal operation, this lag is milliseconds. During traffic spikes, complex event processing, or projection bugs, the lag grows.

Measuring projection lag is straightforward:

-- Lag in events
SELECT
    p.projection_name,
    (SELECT MAX(global_position) FROM event_store) - p.last_position AS events_behind,
    p.updated_at AS last_updated
FROM projection_positions p;

Alert thresholds depend on the projection’s purpose. A customer-facing order summary with 1000 events of lag means a customer might not see their latest order for seconds. An analytics projection with 1000 events of lag is normal and acceptable. Set thresholds per projection based on the consumer’s tolerance for staleness.

When a projection falls behind significantly (thousands of events), the catch-up process becomes a concern. The projection processes events as fast as it can, but each event requires a database write. A projection catching up on 100,000 events at 1ms per event takes 100 seconds. During this time, the read model shows increasingly stale data.

Chapter 9 covers the more extreme case: rebuilding a projection from the beginning of the event stream.

This chapter established the projection engine, idempotent processing, and position tracking. The next chapter extends projections to multiple storage targets.