Projection Rebuilding: Replaying History Without Taking the System Down
Projection Rebuilding
Blue-green projection rebuilding uses two physical tables behind a database view. While the active (blue) table serves reads, the inactive (green) table is cleared, replayed from position zero, caught up to the latest events, and then swapped in via a CREATE OR REPLACE VIEW statement. The swap is atomic in PostgreSQL, so reads are never interrupted.
A projection bug is discovered. The order_summary table has been calculating item_count as the number of unique products instead of the total quantity across all products. An order with 3 units of product A and 2 units of product B shows item_count = 2 instead of item_count = 5. The projection logic is fixed, but the existing data is wrong. The projection must be rebuilt from the beginning of the event stream.
This scenario is inevitable. Projection logic changes as business requirements evolve. Bugs are discovered after months of production operation. New projections are added that need historical data. The ability to rebuild a projection from scratch is one of the core benefits of event sourcing. The events are immutable. The projection is disposable.
The challenge is rebuilding without downtime. The production system continues to receive orders while the projection replays millions of historical events. Queries must continue to return results (even if slightly stale) during the rebuild. The swap from the old projection to the new one must be atomic.
The Naive Approach
The simplest rebuild strategy: truncate the projection table, reset the position, and let the projection engine replay all events.
-- NAIVE
TRUNCATE TABLE order_summary;
DELETE FROM projection_positions WHERE projection_name = 'order-summary';
This works. The projection engine detects position 0, reads from the beginning of the event store, and rebuilds the table. The problem is that the order_summary table is empty during the rebuild. Queries return no results. The customer-facing API shows zero orders. The admin panel is blank.
For a small event store (under 100,000 events), the rebuild completes in seconds and the downtime is negligible. For a production event store with millions of events, the rebuild takes minutes to hours. The downtime is unacceptable.
Blue-Green Projection Tables
The Problem
The production system must continue serving queries from the existing projection while the new projection is being rebuilt. The swap from old to new must be instantaneous and atomic. If the swap takes seconds, some queries hit the old table and some hit the new table, producing inconsistent results.
The Mechanism
The blue-green strategy uses two physical tables and a view. Queries read from the view. The view points to whichever table is currently active. During a rebuild, the new projection is written to the inactive table. When the rebuild catches up to the current event store position, the view is switched to the new table. The switch is a DDL operation that completes in milliseconds.
-- FROM SCRATCH
-- Two physical tables
CREATE TABLE order_summary_blue (
order_id VARCHAR(255) PRIMARY KEY,
customer_id VARCHAR(255) NOT NULL,
status VARCHAR(50) NOT NULL,
total DECIMAL(10, 2) NOT NULL,
item_count INT NOT NULL,
shipping_city VARCHAR(255),
placed_at TIMESTAMPTZ,
last_updated_at TIMESTAMPTZ NOT NULL
);
CREATE TABLE order_summary_green (
order_id VARCHAR(255) PRIMARY KEY,
customer_id VARCHAR(255) NOT NULL,
status VARCHAR(50) NOT NULL,
total DECIMAL(10, 2) NOT NULL,
item_count INT NOT NULL,
shipping_city VARCHAR(255),
placed_at TIMESTAMPTZ,
last_updated_at TIMESTAMPTZ NOT NULL
);
-- The view that queries read from
CREATE VIEW order_summary AS SELECT * FROM order_summary_blue;
-- Track which table is active
CREATE TABLE projection_config (
projection_name VARCHAR(255) PRIMARY KEY,
active_table VARCHAR(255) NOT NULL DEFAULT 'order_summary_blue',
rebuild_status VARCHAR(50) DEFAULT 'idle'
);
INSERT INTO projection_config (projection_name, active_table) VALUES ('order-summary', 'order_summary_blue');
The From-Scratch Implementation
// FROM SCRATCH
public class BlueGreenProjectionRebuilder {
private final EventStore eventStore;
private final DataSource dataSource;
private final EventTypeRegistry registry;
private final int batchSize;
public BlueGreenProjectionRebuilder(EventStore eventStore, DataSource dataSource,
EventTypeRegistry registry, int batchSize) {
this.eventStore = eventStore;
this.dataSource = dataSource;
this.registry = registry;
this.batchSize = batchSize;
}
public void rebuild(String projectionName) {
String activeTable = getActiveTable(projectionName);
String inactiveTable = activeTable.equals("order_summary_blue")
? "order_summary_green"
: "order_summary_blue";
// Phase 1: Clear inactive table
truncateTable(inactiveTable);
updateRebuildStatus(projectionName, "rebuilding");
// Phase 2: Replay all events into inactive table
long position = 0;
while (true) {
List<StoredEvent> events = eventStore.readAllFromPosition(position, batchSize);
if (events.isEmpty()) break;
for (StoredEvent event : events) {
applyToTable(event, inactiveTable);
}
position = events.get(events.size() - 1).globalPosition();
}
// Phase 3: Catch up on events that arrived during replay
long latestPosition = getLatestEventPosition();
while (position < latestPosition) {
List<StoredEvent> events = eventStore.readAllFromPosition(position, batchSize);
if (events.isEmpty()) break;
for (StoredEvent event : events) {
applyToTable(event, inactiveTable);
}
position = events.get(events.size() - 1).globalPosition();
latestPosition = getLatestEventPosition();
}
// Phase 4: Swap the view
swapView(inactiveTable);
updateActiveTable(projectionName, inactiveTable);
updateProjectionPosition(projectionName, position);
updateRebuildStatus(projectionName, "idle");
}
private void swapView(String newTable) {
try (Connection conn = dataSource.getConnection();
Statement stmt = conn.createStatement()) {
stmt.execute("CREATE OR REPLACE VIEW order_summary AS SELECT * FROM " + newTable);
} catch (SQLException e) {
throw new ProjectionException("Failed to swap view", e);
}
}
private void truncateTable(String table) {
try (Connection conn = dataSource.getConnection();
Statement stmt = conn.createStatement()) {
stmt.execute("TRUNCATE TABLE " + table);
} catch (SQLException e) {
throw new ProjectionException("Failed to truncate " + table, e);
}
}
private void applyToTable(StoredEvent stored, String tableName) {
OrderEvent event = registry.deserialize(stored.eventType(), stored.payload());
try (Connection conn = dataSource.getConnection()) {
switch (event) {
case OrderPlaced e -> {
String sql = String.format("""
INSERT INTO %s (order_id, customer_id, status, total, item_count, shipping_city, placed_at, last_updated_at)
VALUES (?, ?, 'PLACED', ?, ?, ?, ?, ?)
ON CONFLICT (order_id) DO UPDATE SET
status = 'PLACED', total = EXCLUDED.total, item_count = EXCLUDED.item_count,
shipping_city = EXCLUDED.shipping_city, last_updated_at = EXCLUDED.last_updated_at
""", tableName);
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, e.orderId());
stmt.setString(2, e.customerId());
stmt.setBigDecimal(3, e.total());
int totalQuantity = e.lineItems().stream()
.mapToInt(LineItem::quantity).sum();
stmt.setInt(4, totalQuantity);
stmt.setString(5, e.shippingAddress().city());
stmt.setTimestamp(6, Timestamp.from(e.occurredAt()));
stmt.setTimestamp(7, Timestamp.from(e.occurredAt()));
stmt.executeUpdate();
}
}
case OrderConfirmed e -> updateStatus(tableName, e.orderId(), "CONFIRMED", e.occurredAt());
case PaymentAuthorized e -> updateStatus(tableName, e.orderId(), "PAYMENT_AUTHORIZED", e.occurredAt());
case OrderFulfilled e -> updateStatus(tableName, e.orderId(), "FULFILLED", e.occurredAt());
case OrderCancelled e -> updateStatus(tableName, e.orderId(), "CANCELLED", e.occurredAt());
case ShippingAddressChanged e -> {
String sql = String.format(
"UPDATE %s SET shipping_city = ?, last_updated_at = ? WHERE order_id = ?",
tableName
);
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, e.newAddress().city());
stmt.setTimestamp(2, Timestamp.from(e.occurredAt()));
stmt.setString(3, e.orderId());
stmt.executeUpdate();
}
}
case RefundRequested e -> {} // Handle as needed
}
} catch (SQLException e) {
throw new ProjectionException("Failed to apply event to " + tableName, e);
}
}
private void updateStatus(String table, String orderId, String status, Instant occurredAt) {
String sql = String.format(
"UPDATE %s SET status = ?, last_updated_at = ? WHERE order_id = ?", table
);
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, status);
stmt.setTimestamp(2, Timestamp.from(occurredAt));
stmt.setString(3, orderId);
stmt.executeUpdate();
} catch (SQLException e) {
throw new ProjectionException("Failed to update status in " + table, e);
}
}
private String getActiveTable(String projectionName) {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"SELECT active_table FROM projection_config WHERE projection_name = ?")) {
stmt.setString(1, projectionName);
try (ResultSet rs = stmt.executeQuery()) {
if (rs.next()) return rs.getString("active_table");
return "order_summary_blue";
}
} catch (SQLException e) {
throw new ProjectionException("Failed to read active table", e);
}
}
private void updateActiveTable(String projectionName, String table) {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"UPDATE projection_config SET active_table = ? WHERE projection_name = ?")) {
stmt.setString(1, table);
stmt.setString(2, projectionName);
stmt.executeUpdate();
} catch (SQLException e) {
throw new ProjectionException("Failed to update active table", e);
}
}
private void updateRebuildStatus(String projectionName, String status) {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"UPDATE projection_config SET rebuild_status = ? WHERE projection_name = ?")) {
stmt.setString(1, status);
stmt.setString(2, projectionName);
stmt.executeUpdate();
} catch (SQLException e) {
throw new ProjectionException("Failed to update rebuild status", e);
}
}
private void updateProjectionPosition(String projectionName, long position) {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"""
INSERT INTO projection_positions (projection_name, last_position)
VALUES (?, ?)
ON CONFLICT (projection_name) DO UPDATE SET last_position = ?, updated_at = NOW()
""")) {
stmt.setString(1, projectionName);
stmt.setLong(2, position);
stmt.setLong(3, position);
stmt.executeUpdate();
} catch (SQLException e) {
throw new ProjectionException("Failed to update position", e);
}
}
private long getLatestEventPosition() {
try (Connection conn = dataSource.getConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT COALESCE(MAX(global_position), 0) FROM event_store")) {
rs.next();
return rs.getLong(1);
} catch (SQLException e) {
throw new ProjectionException("Failed to read latest position", e);
}
}
}
What the Implementation Reveals
Phase 3 (catch-up) is critical. During the time it takes to replay all historical events, new events arrive. The rebuild must process these new events before swapping. If the rebuild does not catch up, the new table is missing recent events and the swap introduces stale data.
The catch-up loop runs until the rebuild’s position matches the latest event position. In a high-throughput system, this is a moving target: new events arrive while the catch-up processes old events. The loop converges because the catch-up processes events faster than they arrive (the replay is bulk processing, the production write rate is lower). If the write rate exceeds the replay rate, the rebuild never catches up. This is rare but possible during extreme traffic. The solution is to run the rebuild during low-traffic periods or to increase the replay batch size.
The CREATE OR REPLACE VIEW statement is atomic in PostgreSQL. All queries after the statement read from the new table. Queries in flight at the moment of the swap read from whichever table the query planner selected at the start of the transaction. There is no inconsistency window.
The string interpolation for table names is safe because the table names are internal constants (order_summary_blue, order_summary_green), not user input. In a more general implementation, the table names would be validated against an allowed list.
The Production Path
// PRODUCTION
@Service
public class ProjectionRebuildService {
private final SpringJdbcEventStore eventStore;
private final JdbcTemplate jdbc;
private final EventTypeRegistry registry;
@Value("${projection.rebuild.batch-size:500}")
private int batchSize;
public ProjectionRebuildService(SpringJdbcEventStore eventStore,
JdbcTemplate jdbc,
EventTypeRegistry registry) {
this.eventStore = eventStore;
this.jdbc = jdbc;
this.registry = registry;
}
@Async
public CompletableFuture<Void> triggerRebuild(String projectionName) {
String activeTable = jdbc.queryForObject(
"SELECT active_table FROM projection_config WHERE projection_name = ?",
String.class, projectionName
);
String targetTable = activeTable.equals("order_summary_blue")
? "order_summary_green" : "order_summary_blue";
jdbc.execute("TRUNCATE TABLE " + targetTable);
jdbc.update("UPDATE projection_config SET rebuild_status = 'rebuilding' WHERE projection_name = ?",
projectionName);
long position = replayAllEvents(targetTable);
position = catchUpToLatest(targetTable, position);
jdbc.execute("CREATE OR REPLACE VIEW order_summary AS SELECT * FROM " + targetTable);
jdbc.update("UPDATE projection_config SET active_table = ?, rebuild_status = 'idle' WHERE projection_name = ?",
targetTable, projectionName);
jdbc.update(
"INSERT INTO projection_positions (projection_name, last_position) VALUES (?, ?) " +
"ON CONFLICT (projection_name) DO UPDATE SET last_position = ?, updated_at = NOW()",
projectionName, position, position
);
return CompletableFuture.completedFuture(null);
}
private long replayAllEvents(String targetTable) {
long position = 0;
while (true) {
List<StoredEvent> events = eventStore.readAllFromPosition(position, batchSize);
if (events.isEmpty()) break;
for (StoredEvent event : events) {
applyEvent(event, targetTable);
}
position = events.get(events.size() - 1).globalPosition();
}
return position;
}
private long catchUpToLatest(String targetTable, long position) {
while (true) {
List<StoredEvent> events = eventStore.readAllFromPosition(position, batchSize);
if (events.isEmpty()) break;
for (StoredEvent event : events) {
applyEvent(event, targetTable);
}
position = events.get(events.size() - 1).globalPosition();
}
return position;
}
private void applyEvent(StoredEvent stored, String table) {
// Projection logic here, writing to the specified table
}
}
The @Async annotation runs the rebuild on a separate thread. The API endpoint that triggers the rebuild returns immediately. The rebuild status can be polled via a status endpoint that reads from the projection_config table.
The Test
// FROM SCRATCH
@Testcontainers
class BlueGreenRebuildTest {
@Container
static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:16")
.withDatabaseName("rebuild_test")
.withInitScript("blue_green_schema.sql");
private EventStore eventStore;
private BlueGreenProjectionRebuilder rebuilder;
private DataSource dataSource;
@BeforeEach
void setUp() {
var ds = new org.postgresql.ds.PGSimpleDataSource();
ds.setUrl(postgres.getJdbcUrl());
ds.setUser(postgres.getUsername());
ds.setPassword(postgres.getPassword());
this.dataSource = ds;
var mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
eventStore = new EventStore(ds, mapper);
var registry = new EventTypeRegistry(mapper);
rebuilder = new BlueGreenProjectionRebuilder(eventStore, ds, registry, 50);
}
@Test
void rebuildProducesCorrectDataInNewTable() throws Exception {
// Write events to event store
for (int i = 0; i < 20; i++) {
eventStore.append("order-rb-" + i, -1, List.of(
new OrderPlaced(
"rb-" + i, "customer-1",
List.of(new LineItem("p1", "W", 3, BigDecimal.TEN)),
new BigDecimal("30.00"),
new Address("1 St", "City", "ST", "00000", "US"),
Instant.now()
)
));
}
// Rebuild
rebuilder.rebuild("order-summary");
// Verify: query through view returns all orders
try (Connection conn = dataSource.getConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM order_summary")) {
rs.next();
assertEquals(20, rs.getInt(1));
}
// Verify: item_count uses total quantity (3), not unique product count (1)
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"SELECT item_count FROM order_summary WHERE order_id = 'rb-0'")) {
try (ResultSet rs = stmt.executeQuery()) {
rs.next();
assertEquals(3, rs.getInt("item_count"));
}
}
}
@Test
void rebuildSwapsActiveTable() throws Exception {
eventStore.append("order-swap-1", -1, List.of(
new OrderPlaced("swap-1", "c1", List.of(), BigDecimal.ZERO,
new Address("1", "C", "S", "0", "US"), Instant.now())
));
String activeBefore = getActiveTable("order-summary");
rebuilder.rebuild("order-summary");
String activeAfter = getActiveTable("order-summary");
assertNotEquals(activeBefore, activeAfter);
}
private String getActiveTable(String projectionName) throws SQLException {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"SELECT active_table FROM projection_config WHERE projection_name = ?")) {
stmt.setString(1, projectionName);
try (ResultSet rs = stmt.executeQuery()) {
rs.next();
return rs.getString("active_table");
}
}
}
}
When to Rebuild
Projection rebuilds are necessary in three scenarios:
Projection bug fix. The projection logic produced incorrect read model data. Fix the logic, rebuild the projection. The events have not changed. The new logic produces correct results from the same events.
New projection. A new read model is needed for a new feature. The projection consumes the existing event stream from the beginning and builds the read model from historical events. No backfill migration needed.
Schema change in the read model. A new column is added to the projection table, or an existing column’s computation changes. The projection is rebuilt to populate the new column from historical events.
Projection rebuilds are one of the strongest arguments for event sourcing. In a CRUD system, adding a new column that depends on historical data requires a backfill migration that queries the application’s business logic, not just the database. In an event-sourced system, the projection replays the events and the business logic is embedded in the projection code. The rebuild is self-contained.
This chapter completed Part III. The read side is fully established: projections, multiple targets, and rebuilding. Part IV connects the system to the outside world: Kafka, sagas, and schema evolution.