Skip to main content
event sourcing and cqrs in practice

Debugging Event-Sourced Systems: Replaying, Inspecting, and Reasoning About State

12 min read Chapter 14 of 17

Debugging Event-Sourced Systems

Time-travel debugging

Time-travel debugging replays an event stream and captures the aggregate state after each event. Given a timestamp from a support request, the stateAt method reconstructs the exact state at that moment. The stream inspector, timeline builder, and drift detector are the three tools that make event sourcing’s debugging advantage concrete.

A customer calls support. Their order shows as “shipped” on the website, but the warehouse never dispatched it. In a CRUD system, you look at the orders table. The status column says “shipped.” You do not know how it got there. Someone updated it. Or a background job. Or a race condition. The audit trail, if it exists, is a separate log table that may or may not be consistent with the actual data.

In an event-sourced system, the answer is in the event stream. Every state transition is recorded. The exact sequence of events that produced the current state is available. You can replay the stream, stop at any point, and inspect the aggregate state. You can find the exact event that set the status to “shipped” and determine who or what produced that event.

This is the debugging advantage that event sourcing proponents talk about. But it only works if you build the tools to exploit it.

Time-Travel Debugging

The Problem

An aggregate is in an unexpected state. You need to know how it got there. Specifically, you need to see the aggregate’s state after each event was applied, not just the final state.

The Mechanism

Replay the event stream and capture the aggregate state after each event. This produces a timeline of state transitions.

// FROM SCRATCH
public class AggregateTimeline<S> {

    private final List<StateSnapshot<S>> snapshots;

    public AggregateTimeline(List<StateSnapshot<S>> snapshots) {
        this.snapshots = List.copyOf(snapshots);
    }

    public S stateAtVersion(long version) {
        return snapshots.stream()
            .filter(s -> s.version() == version)
            .findFirst()
            .map(StateSnapshot::state)
            .orElseThrow(() -> new IllegalArgumentException(
                "No state at version " + version));
    }

    public S stateAtTime(Instant time) {
        return snapshots.stream()
            .filter(s -> !s.occurredAt().isAfter(time))
            .reduce((a, b) -> b) // Last one before or at the time
            .map(StateSnapshot::state)
            .orElseThrow(() -> new IllegalArgumentException(
                "No state at time " + time));
    }

    public List<StateTransition<S>> transitions() {
        List<StateTransition<S>> result = new ArrayList<>();
        for (int i = 1; i < snapshots.size(); i++) {
            result.add(new StateTransition<>(
                snapshots.get(i - 1).state(),
                snapshots.get(i).state(),
                snapshots.get(i).event(),
                snapshots.get(i).version()
            ));
        }
        return result;
    }

    public record StateSnapshot<S>(S state, Object event, long version, Instant occurredAt) {}
    public record StateTransition<S>(S before, S after, Object event, long version) {}
}

public class TimelineBuilder {

    private final EventStore eventStore;
    private final UpcastingEventTypeRegistry registry;

    public TimelineBuilder(EventStore eventStore, UpcastingEventTypeRegistry registry) {
        this.eventStore = eventStore;
        this.registry = registry;
    }

    public AggregateTimeline<OrderState> buildOrderTimeline(String orderId) {
        String streamId = "order-" + orderId;
        List<StoredEvent> events = eventStore.readStream(streamId, 0);

        OrderState state = OrderState.initial();
        List<AggregateTimeline.StateSnapshot<OrderState>> snapshots = new ArrayList<>();

        // Initial state
        snapshots.add(new AggregateTimeline.StateSnapshot<>(
            state, null, 0, Instant.EPOCH));

        for (StoredEvent stored : events) {
            OrderEvent event = (OrderEvent) registry.deserialize(
                stored.eventType(), stored.payload());

            state = state.apply(event);

            snapshots.add(new AggregateTimeline.StateSnapshot<>(
                state, event, stored.sequenceNumber(), stored.occurredAt()));
        }

        return new AggregateTimeline<>(snapshots);
    }
}

What the Implementation Reveals

The stateAtTime method is the key debugging tool. Given a timestamp from a customer complaint, you can see exactly what the aggregate state was at that moment. This eliminates speculation about race conditions or timing issues. If the customer says “I checked my order at 3:15 PM and it showed shipped,” you can reconstruct the state at 3:15 PM and verify their claim.

The transitions method reveals every state change. If the order jumped from “pending” to “shipped” without passing through “confirmed” and “payment authorized,” the transition list shows the exact event that caused the invalid transition. This pinpoints bugs in command handlers or event handlers that allow invalid state transitions.

Event Stream Inspection

The Mechanism

SQL queries against the event store are the primary inspection tool. These queries answer specific debugging questions.

-- FROM SCRATCH
-- What happened to order X?
SELECT
    global_position,
    event_type,
    sequence_number,
    occurred_at,
    payload
FROM event_store
WHERE stream_id = 'order-order-123'
ORDER BY sequence_number;

-- Who else was affected by the same batch?
-- (Find all orders processed in the same time window as a problematic order)
SELECT DISTINCT stream_id
FROM event_store
WHERE occurred_at BETWEEN
    (SELECT occurred_at - INTERVAL '5 minutes' FROM event_store
     WHERE stream_id = 'order-order-123' AND event_type = 'FulfilmentDispatched')
    AND
    (SELECT occurred_at + INTERVAL '5 minutes' FROM event_store
     WHERE stream_id = 'order-order-123' AND event_type = 'FulfilmentDispatched')
AND event_type = 'FulfilmentDispatched';

-- Find events with specific payload content
SELECT stream_id, event_type, occurred_at, payload
FROM event_store
WHERE payload @> '{"customerId": "customer-456"}'::jsonb
ORDER BY occurred_at DESC
LIMIT 50;

-- Event frequency analysis (detect unusual patterns)
SELECT
    event_type,
    DATE_TRUNC('hour', occurred_at) AS hour,
    COUNT(*) AS count
FROM event_store
WHERE occurred_at > NOW() - INTERVAL '24 hours'
GROUP BY event_type, DATE_TRUNC('hour', occurred_at)
ORDER BY hour DESC, count DESC;

The Stream Inspector Tool

// FROM SCRATCH
public class StreamInspector {

    private final DataSource dataSource;
    private final ObjectMapper mapper;

    public StreamInspector(DataSource dataSource) {
        this.dataSource = dataSource;
        this.mapper = new ObjectMapper();
        mapper.enable(SerializationFeature.INDENT_OUTPUT);
    }

    public String inspectStream(String streamId) {
        StringBuilder report = new StringBuilder();
        report.append("Stream: ").append(streamId).append("\n");
        report.append("=".repeat(60)).append("\n\n");

        String sql = """
            SELECT global_position, event_type, sequence_number,
                   occurred_at, payload, metadata
            FROM event_store
            WHERE stream_id = ?
            ORDER BY sequence_number
            """;

        try (Connection conn = dataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement(sql)) {
            stmt.setString(1, streamId);
            try (ResultSet rs = stmt.executeQuery()) {
                int count = 0;
                while (rs.next()) {
                    count++;
                    report.append(String.format("[%d] %s (global: %d)\n",
                        rs.getLong("sequence_number"),
                        rs.getString("event_type"),
                        rs.getLong("global_position")));
                    report.append("    Time: ").append(rs.getTimestamp("occurred_at")).append("\n");

                    String payload = rs.getString("payload");
                    try {
                        String pretty = mapper.writerWithDefaultPrettyPrinter()
                            .writeValueAsString(mapper.readTree(payload));
                        report.append("    Payload: ").append(pretty).append("\n");
                    } catch (JsonProcessingException e) {
                        report.append("    Payload: ").append(payload).append("\n");
                    }
                    report.append("\n");
                }
                report.insert(report.indexOf("\n\n") + 2,
                    "Total events: " + count + "\n\n");
            }
        } catch (SQLException e) {
            report.append("ERROR: ").append(e.getMessage());
        }

        return report.toString();
    }
}

Projection Drift Detection

The Problem

A projection shows data that does not match the event stream. The order summary says the total is $100, but replaying the events produces a total of $120. The projection has drifted from the source of truth.

Drift happens for several reasons: a bug in the projection handler that was fixed but the projection was not rebuilt, an event was processed out of order, the projection missed events due to a crash, or the projection logic was changed without rebuilding.

The Mechanism

Compare the projection state to a freshly computed state from the event stream.

// FROM SCRATCH
public class ProjectionDriftDetector {

    private final EventStore eventStore;
    private final UpcastingEventTypeRegistry registry;
    private final DataSource dataSource;

    public ProjectionDriftDetector(EventStore eventStore,
                                    UpcastingEventTypeRegistry registry,
                                    DataSource dataSource) {
        this.eventStore = eventStore;
        this.registry = registry;
        this.dataSource = dataSource;
    }

    public DriftReport detectDrift(String orderId) {
        // 1. Get the projection state
        OrderSummary projected = loadProjectedSummary(orderId);
        if (projected == null) {
            return new DriftReport(orderId, true, "Order not found in projection");
        }

        // 2. Rebuild state from events
        String streamId = "order-" + orderId;
        List<StoredEvent> events = eventStore.readStream(streamId, 0);
        OrderState rebuilt = OrderState.initial();
        for (StoredEvent stored : events) {
            OrderEvent event = (OrderEvent) registry.deserialize(
                stored.eventType(), stored.payload());
            rebuilt = rebuilt.apply(event);
        }

        // 3. Compare
        List<String> differences = new ArrayList<>();
        if (!Objects.equals(projected.status(), rebuilt.status().name())) {
            differences.add(String.format("status: projected=%s, events=%s",
                projected.status(), rebuilt.status()));
        }
        if (projected.total().compareTo(rebuilt.total()) != 0) {
            differences.add(String.format("total: projected=%s, events=%s",
                projected.total(), rebuilt.total()));
        }
        if (projected.itemCount() != rebuilt.lineItems().size()) {
            differences.add(String.format("itemCount: projected=%d, events=%d",
                projected.itemCount(), rebuilt.lineItems().size()));
        }

        if (differences.isEmpty()) {
            return new DriftReport(orderId, false, "No drift detected");
        }
        return new DriftReport(orderId, true,
            "Drift detected: " + String.join("; ", differences));
    }

    private OrderSummary loadProjectedSummary(String orderId) {
        try (Connection conn = dataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement(
                 "SELECT order_id, status, total, item_count FROM order_summary WHERE order_id = ?")) {
            stmt.setString(1, orderId);
            try (ResultSet rs = stmt.executeQuery()) {
                if (rs.next()) {
                    return new OrderSummary(
                        rs.getString("order_id"),
                        rs.getString("status"),
                        rs.getBigDecimal("total"),
                        rs.getInt("item_count")
                    );
                }
            }
        } catch (SQLException e) {
            throw new DriftDetectionException("Failed to load projection for " + orderId, e);
        }
        return null;
    }

    public record DriftReport(String orderId, boolean hasDrift, String details) {}
    private record OrderSummary(String orderId, String status, BigDecimal total, int itemCount) {}
}

Batch Drift Detection

For systematic drift checking, scan a sample of orders and report drift rates.

// FROM SCRATCH
public List<DriftReport> batchDriftCheck(int sampleSize) {
    List<String> orderIds = new ArrayList<>();
    try (Connection conn = dataSource.getConnection();
         PreparedStatement stmt = conn.prepareStatement(
             "SELECT order_id FROM order_summary ORDER BY RANDOM() LIMIT ?")) {
        stmt.setInt(1, sampleSize);
        try (ResultSet rs = stmt.executeQuery()) {
            while (rs.next()) {
                orderIds.add(rs.getString("order_id"));
            }
        }
    } catch (SQLException e) {
        throw new DriftDetectionException("Failed to sample orders", e);
    }

    return orderIds.stream()
        .map(this::detectDrift)
        .filter(DriftReport::hasDrift)
        .toList();
}

Structured Logging for Event Processing

The Mechanism

Every event carries metadata that enables tracing through the system. The correlationId groups all events that originated from the same user action. The causationId links each event to the event or command that caused it.

// FROM SCRATCH
public record EventMetadata(
    String correlationId,
    String causationId,
    String userId,
    String source,
    Instant processedAt
) {
    public static EventMetadata fromCommand(String commandId, String userId) {
        return new EventMetadata(commandId, commandId, userId, "command", null);
    }

    public EventMetadata withCausation(String newCausationId) {
        return new EventMetadata(correlationId, newCausationId, userId, source, processedAt);
    }

    public EventMetadata withProcessedAt(Instant time) {
        return new EventMetadata(correlationId, causationId, userId, source, time);
    }
}
// FROM SCRATCH
// In the event store, metadata is stored as JSONB alongside the event payload
public void appendWithMetadata(String streamId, long expectedVersion,
                                String eventType, String payload,
                                EventMetadata metadata) {
    String sql = """
        INSERT INTO event_store (stream_id, sequence_number, event_type, payload, metadata, occurred_at)
        VALUES (?, ?, ?, ?::jsonb, ?::jsonb, NOW())
        """;

    try (Connection conn = dataSource.getConnection();
         PreparedStatement stmt = conn.prepareStatement(sql)) {
        stmt.setString(1, streamId);
        stmt.setLong(2, expectedVersion);
        stmt.setString(3, eventType);
        stmt.setString(4, payload);
        stmt.setString(5, mapper.writeValueAsString(metadata));
        stmt.executeUpdate();
    } catch (SQLException e) {
        throw new EventStoreException("Failed to append event", e);
    }
}

Tracing a Correlation Chain

-- FROM SCRATCH
-- Find all events triggered by a single user action
SELECT
    stream_id,
    event_type,
    sequence_number,
    occurred_at,
    metadata->>'causationId' AS caused_by,
    metadata->>'userId' AS user_id
FROM event_store
WHERE metadata->>'correlationId' = 'cmd-abc-123'
ORDER BY occurred_at;

This query shows the full chain: the original command produced an OrderPlaced event, which triggered the saga, which dispatched AuthorizePayment, which produced PaymentAuthorized, and so on. The correlation ID ties the entire chain together.

Common Failure Patterns

Duplicate Events

Symptom: An aggregate has the same event twice, causing double state transitions. The order total is doubled.

Diagnosis: Query the event store for duplicate sequence numbers in a stream. Check the outbox relay for duplicate delivery. Check Kafka consumer offset management.

-- Find streams with duplicate sequence numbers
SELECT stream_id, sequence_number, COUNT(*) AS count
FROM event_store
GROUP BY stream_id, sequence_number
HAVING COUNT(*) > 1;

Missing Events

Symptom: A projection is missing data for specific orders. The aggregate loads correctly (events are in the store) but the projection never processed them.

Diagnosis: Compare the projection’s position with the global event store position. Check for gaps.

-- Compare projection position with event store
SELECT
    (SELECT last_processed_position FROM projection_positions WHERE projection_name = 'OrderSummary') AS projection_position,
    (SELECT MAX(global_position) FROM event_store) AS store_position,
    (SELECT MAX(global_position) FROM event_store) -
    (SELECT last_processed_position FROM projection_positions WHERE projection_name = 'OrderSummary') AS gap;

Out-of-Order Events

Symptom: An event that logically depends on a prior event was processed first. The projection applied PaymentAuthorized before OrderPlaced, resulting in an incomplete order summary.

Diagnosis: This should not happen within a single stream (streams are ordered). It happens across streams when projections process events from multiple streams. The fix is ensuring projection handlers tolerate missing predecessors by using upserts and null-safe operations.

The Production Path

// PRODUCTION
@RestController
@RequestMapping("/debug")
public class DebugController {

    private final StreamInspector inspector;
    private final TimelineBuilder timelineBuilder;
    private final ProjectionDriftDetector driftDetector;

    // Constructor injection

    @GetMapping("/stream/{streamId}")
    public ResponseEntity<String> inspectStream(@PathVariable String streamId) {
        return ResponseEntity.ok(inspector.inspectStream(streamId));
    }

    @GetMapping("/timeline/{orderId}")
    public ResponseEntity<List<Map<String, Object>>> getTimeline(@PathVariable String orderId) {
        var timeline = timelineBuilder.buildOrderTimeline(orderId);
        var result = timeline.transitions().stream()
            .map(t -> Map.<String, Object>of(
                "version", t.version(),
                "event", t.event().getClass().getSimpleName(),
                "stateBefore", t.before().status().name(),
                "stateAfter", t.after().status().name()
            ))
            .toList();
        return ResponseEntity.ok(result);
    }

    @GetMapping("/drift/{orderId}")
    public ResponseEntity<ProjectionDriftDetector.DriftReport> checkDrift(
            @PathVariable String orderId) {
        return ResponseEntity.ok(driftDetector.detectDrift(orderId));
    }
}

These endpoints should be secured and available only to operations staff. They expose internal state for debugging purposes.

The Test

// FROM SCRATCH
class TimelineBuilderTest {

    @Test
    void reconstructsStateAtAnyVersion() {
        var timeline = new AggregateTimeline<>(List.of(
            new AggregateTimeline.StateSnapshot<>(
                new OrderState("o1", OrderStatus.PENDING, List.of(), BigDecimal.ZERO),
                null, 0, Instant.parse("2026-01-01T00:00:00Z")),
            new AggregateTimeline.StateSnapshot<>(
                new OrderState("o1", OrderStatus.CONFIRMED, List.of(), BigDecimal.ZERO),
                "OrderConfirmed", 1, Instant.parse("2026-01-01T01:00:00Z")),
            new AggregateTimeline.StateSnapshot<>(
                new OrderState("o1", OrderStatus.SHIPPED, List.of(), BigDecimal.ZERO),
                "FulfilmentDispatched", 2, Instant.parse("2026-01-01T02:00:00Z"))
        ));

        assertEquals(OrderStatus.PENDING, timeline.stateAtVersion(0).status());
        assertEquals(OrderStatus.CONFIRMED, timeline.stateAtVersion(1).status());
        assertEquals(OrderStatus.SHIPPED, timeline.stateAtVersion(2).status());
    }

    @Test
    void reconstructsStateAtAnyTime() {
        var timeline = new AggregateTimeline<>(List.of(
            new AggregateTimeline.StateSnapshot<>(
                new OrderState("o1", OrderStatus.PENDING, List.of(), BigDecimal.ZERO),
                null, 0, Instant.parse("2026-01-01T00:00:00Z")),
            new AggregateTimeline.StateSnapshot<>(
                new OrderState("o1", OrderStatus.CONFIRMED, List.of(), BigDecimal.ZERO),
                "OrderConfirmed", 1, Instant.parse("2026-01-01T01:00:00Z")),
            new AggregateTimeline.StateSnapshot<>(
                new OrderState("o1", OrderStatus.SHIPPED, List.of(), BigDecimal.ZERO),
                "FulfilmentDispatched", 2, Instant.parse("2026-01-01T02:00:00Z"))
        ));

        // At 12:30, the order was still PENDING
        assertEquals(OrderStatus.PENDING,
            timeline.stateAtTime(Instant.parse("2026-01-01T00:30:00Z")).status());

        // At 01:30, the order was CONFIRMED
        assertEquals(OrderStatus.CONFIRMED,
            timeline.stateAtTime(Instant.parse("2026-01-01T01:30:00Z")).status());
    }

    @Test
    void producesTransitionList() {
        var timeline = new AggregateTimeline<>(List.of(
            new AggregateTimeline.StateSnapshot<>(
                new OrderState("o1", OrderStatus.PENDING, List.of(), BigDecimal.ZERO),
                null, 0, Instant.parse("2026-01-01T00:00:00Z")),
            new AggregateTimeline.StateSnapshot<>(
                new OrderState("o1", OrderStatus.CONFIRMED, List.of(), BigDecimal.ZERO),
                "OrderConfirmed", 1, Instant.parse("2026-01-01T01:00:00Z"))
        ));

        var transitions = timeline.transitions();
        assertEquals(1, transitions.size());
        assertEquals(OrderStatus.PENDING, transitions.get(0).before().status());
        assertEquals(OrderStatus.CONFIRMED, transitions.get(0).after().status());
    }
}

class DriftDetectorTest {

    @Test
    void detectsNoDriftWhenStatesMatch() {
        var report = new ProjectionDriftDetector.DriftReport("order-1", false, "No drift detected");
        assertFalse(report.hasDrift());
    }

    @Test
    void detectsDriftWhenStatusDiffers() {
        var report = new ProjectionDriftDetector.DriftReport(
            "order-1", true, "Drift detected: status: projected=SHIPPED, events=CONFIRMED");
        assertTrue(report.hasDrift());
        assertTrue(report.details().contains("status"));
    }
}

The debugging tools in this chapter are not optional. Build them before you need them. When a production incident happens at 2 AM, you want the stream inspector and drift detector ready. You do not want to write SQL queries from memory while the support team waits.

The next chapter covers observability: the metrics and dashboards that prevent incidents from reaching the debugging stage.