The Event Store: Append-Only Tables, Optimistic Concurrency, and Building One in PostgreSQL
The Event Store
The event store table uses a global_position for total ordering across all streams and a (stream_id, sequence_number) unique constraint for optimistic concurrency. The table is append-only: no UPDATE or DELETE operations are permitted, enforced by a database trigger. Concurrency conflicts produce SQL state 23505, which triggers a retry cycle.
The event store is the central data structure of an event-sourced system. Every other component, aggregates, projections, sagas, depends on the guarantees the event store provides. If the event store allows out-of-order events, aggregates compute incorrect state. If the event store allows duplicate sequence numbers, projections process events twice. If the event store does not support global ordering, cross-stream consumers cannot process events deterministically.
A production event store must guarantee four properties:
- Append-only within a stream. Events are added, never updated or deleted.
- Ordered within a stream. Events in a stream have a deterministic sequence.
- Globally ordered across streams. A monotonically increasing position across all events enables cross-stream consumers to process events in insertion order.
- Optimistic concurrency. Concurrent appends to the same stream are detected and rejected, forcing the caller to reload and retry.
PostgreSQL provides the primitives to build all four. This chapter implements a production-grade event store.
The Append-Only Constraint
The Problem
A relational table allows INSERT, UPDATE, and DELETE by default. An event store must allow only INSERT. If an UPDATE modifies an event’s payload, any aggregate loaded from that stream produces a different state than it did before the modification. If a DELETE removes an event, the aggregate’s state has a gap.
In a CRUD system, these operations are expected. In an event store, they are data corruption.
The Mechanism
PostgreSQL does not have a built-in “append-only” table type. The constraint must be enforced through a combination of database rules and application discipline.
-- FROM SCRATCH
-- Prevent UPDATE and DELETE on the event store
CREATE RULE no_update_event_store AS
ON UPDATE TO event_store DO INSTEAD NOTHING;
CREATE RULE no_delete_event_store AS
ON DELETE TO event_store DO INSTEAD NOTHING;
These rules silently discard UPDATE and DELETE statements. An alternative is to use triggers that raise an exception:
-- PRODUCTION
CREATE OR REPLACE FUNCTION prevent_event_mutation()
RETURNS TRIGGER AS $$
BEGIN
RAISE EXCEPTION 'Event store is append-only. % operations are not permitted.',
TG_OP;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_no_update_event_store
BEFORE UPDATE ON event_store
FOR EACH ROW EXECUTE FUNCTION prevent_event_mutation();
CREATE TRIGGER trg_no_delete_event_store
BEFORE DELETE ON event_store
FOR EACH ROW EXECUTE FUNCTION prevent_event_mutation();
The trigger approach is preferred in production because it fails loudly. A silent discard via rules masks bugs. If application code accidentally issues an UPDATE against the event store, the trigger makes the mistake visible immediately. The rule approach swallows the error and continues, which is worse.
The From-Scratch Implementation
The complete DDL for a production event store:
-- FROM SCRATCH
CREATE TABLE event_store (
global_position BIGSERIAL PRIMARY KEY,
stream_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
occurred_at TIMESTAMPTZ NOT NULL,
stored_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT uq_stream_sequence UNIQUE (stream_id, sequence_number)
);
-- Stream reads: load all events for an aggregate
CREATE INDEX idx_event_store_stream_seq
ON event_store (stream_id, sequence_number ASC);
-- Global position reads: cross-stream consumption
CREATE INDEX idx_event_store_global_pos
ON event_store (global_position ASC);
-- Event type filtering: ad-hoc queries for specific event types
CREATE INDEX idx_event_store_event_type
ON event_store (event_type);
-- Time-range queries: operational dashboards and debugging
CREATE INDEX idx_event_store_stored_at
ON event_store (stored_at);
-- Append-only enforcement
CREATE OR REPLACE FUNCTION prevent_event_mutation()
RETURNS TRIGGER AS $$
BEGIN
RAISE EXCEPTION 'Event store is append-only. % operations are not permitted.', TG_OP;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_no_update
BEFORE UPDATE ON event_store
FOR EACH ROW EXECUTE FUNCTION prevent_event_mutation();
CREATE TRIGGER trg_no_delete
BEFORE DELETE ON event_store
FOR EACH ROW EXECUTE FUNCTION prevent_event_mutation();
What the Implementation Reveals
The global_position column uses BIGSERIAL, which is backed by a PostgreSQL sequence. Sequences are not transactional in the usual sense: once a value is allocated from a sequence, it is consumed even if the transaction rolls back. This means global positions can have gaps. Position 1, 2, 4, 5 (position 3 was allocated by a rolled-back transaction). Projection consumers must tolerate gaps in the global position. They cannot assume position N+1 follows position N.
The (stream_id, sequence_number) UNIQUE constraint serves two purposes. It prevents duplicate events within a stream (data integrity), and it provides the mechanism for optimistic concurrency control. When two concurrent transactions attempt to insert at the same sequence number, one succeeds and one receives a unique constraint violation. This is cheaper than advisory locks and does not require explicit locking.
Optimistic Concurrency Control
The Problem
Two support agents modify the same order simultaneously. Agent A loads the order (3 events, version 2). Agent B loads the order (3 events, version 2). Agent A cancels the order, appending an OrderCancelled event at sequence 3. Agent B changes the shipping address, attempting to append a ShippingAddressChanged event at sequence 3. Without concurrency control, both events are stored at sequence 3 and the event stream is corrupted.
The Mechanism
Optimistic concurrency works by including the expected sequence number in the append operation. The writer states: “I last saw this stream at sequence N, so my new events start at sequence N+1.” If the stream has been modified since the writer’s read, the UNIQUE constraint on (stream_id, sequence_number) causes the insert to fail.
// FROM SCRATCH
public class EventStore {
private final DataSource dataSource;
private final ObjectMapper mapper;
public EventStore(DataSource dataSource, ObjectMapper mapper) {
this.dataSource = dataSource;
this.mapper = mapper;
}
public void append(String streamId, long expectedVersion, List<? extends Object> events) {
String sql = """
INSERT INTO event_store (stream_id, sequence_number, event_type, payload, metadata, occurred_at)
VALUES (?, ?, ?, ?::jsonb, ?::jsonb, ?)
""";
try (Connection conn = dataSource.getConnection()) {
conn.setAutoCommit(false);
try {
long sequence = expectedVersion + 1;
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
for (Object event : events) {
stmt.setString(1, streamId);
stmt.setLong(2, sequence);
stmt.setString(3, event.getClass().getSimpleName());
stmt.setString(4, mapper.writeValueAsString(event));
stmt.setString(5, "{}");
stmt.setTimestamp(6, Timestamp.from(extractOccurredAt(event)));
stmt.addBatch();
sequence++;
}
stmt.executeBatch();
}
conn.commit();
} catch (SQLException e) {
conn.rollback();
if (isUniqueViolation(e)) {
throw new OptimisticConcurrencyException(
streamId, expectedVersion
);
}
throw new EventStoreException("Append failed for stream " + streamId, e);
} catch (JsonProcessingException e) {
conn.rollback();
throw new EventStoreException("Serialization failed", e);
}
} catch (SQLException e) {
throw new EventStoreException("Connection failed", e);
}
}
public long currentVersion(String streamId) {
String sql = "SELECT COALESCE(MAX(sequence_number), -1) FROM event_store WHERE stream_id = ?";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, streamId);
try (ResultSet rs = stmt.executeQuery()) {
rs.next();
return rs.getLong(1);
}
} catch (SQLException e) {
throw new EventStoreException("Failed to read version for " + streamId, e);
}
}
public List<StoredEvent> readStream(String streamId, long fromSequence) {
String sql = """
SELECT global_position, stream_id, sequence_number, event_type,
payload, metadata, occurred_at, stored_at
FROM event_store
WHERE stream_id = ? AND sequence_number >= ?
ORDER BY sequence_number ASC
""";
List<StoredEvent> events = new ArrayList<>();
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, streamId);
stmt.setLong(2, fromSequence);
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
events.add(mapRow(rs));
}
}
} catch (SQLException e) {
throw new EventStoreException("Failed to read stream " + streamId, e);
}
return events;
}
public List<StoredEvent> readAllFromPosition(long afterPosition, int batchSize) {
String sql = """
SELECT global_position, stream_id, sequence_number, event_type,
payload, metadata, occurred_at, stored_at
FROM event_store
WHERE global_position > ?
ORDER BY global_position ASC
LIMIT ?
""";
List<StoredEvent> events = new ArrayList<>();
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setLong(1, afterPosition);
stmt.setInt(2, batchSize);
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
events.add(mapRow(rs));
}
}
} catch (SQLException e) {
throw new EventStoreException("Failed to read from global position", e);
}
return events;
}
private StoredEvent mapRow(ResultSet rs) throws SQLException {
return new StoredEvent(
rs.getLong("global_position"),
rs.getString("stream_id"),
rs.getLong("sequence_number"),
rs.getString("event_type"),
rs.getString("payload"),
rs.getString("metadata"),
rs.getTimestamp("occurred_at").toInstant(),
rs.getTimestamp("stored_at").toInstant()
);
}
private boolean isUniqueViolation(SQLException e) {
return "23505".equals(e.getSQLState());
}
private Instant extractOccurredAt(Object event) {
try {
var method = event.getClass().getMethod("occurredAt");
return (Instant) method.invoke(event);
} catch (Exception e) {
return Instant.now();
}
}
}
public class OptimisticConcurrencyException extends RuntimeException {
private final String streamId;
private final long expectedVersion;
public OptimisticConcurrencyException(String streamId, long expectedVersion) {
super("Concurrency conflict on stream " + streamId +
" at expected version " + expectedVersion);
this.streamId = streamId;
this.expectedVersion = expectedVersion;
}
public String streamId() { return streamId; }
public long expectedVersion() { return expectedVersion; }
}
What the Implementation Reveals
The readAllFromPosition method is the API for cross-stream consumers like projections. A projection tracks the last processed global_position and polls for new events after that position. The batchSize parameter prevents loading the entire event store into memory.
The expectedVersion convention differs from the expectedSequence convention in earlier chapters. Here, version -1 means “no events exist in this stream” and the first event gets sequence number 0. This aligns with the aggregate’s version field, which starts at -1 for a new aggregate and increments with each event.
The reflection-based extractOccurredAt is a compromise for the generic append method. In production, the method signature would use a typed event interface instead.
Retry Strategy
When an optimistic concurrency exception occurs, the correct response is to reload the aggregate, re-validate the command, and retry the append.
// FROM SCRATCH
public class RetryableCommandHandler {
private final EventStore eventStore;
private final EventTypeRegistry registry;
private static final int MAX_RETRIES = 3;
public RetryableCommandHandler(EventStore eventStore, EventTypeRegistry registry) {
this.eventStore = eventStore;
this.registry = registry;
}
public List<OrderEvent> handleWithRetry(CancelOrder command) {
for (int attempt = 0; attempt < MAX_RETRIES; attempt++) {
try {
OrderAggregate aggregate = loadAggregate(command.orderId());
long versionBeforeCommand = aggregate.version();
List<OrderEvent> events = aggregate.cancel(command);
eventStore.append(
"order-" + command.orderId(),
versionBeforeCommand,
events
);
return events;
} catch (OptimisticConcurrencyException e) {
if (attempt == MAX_RETRIES - 1) {
throw e;
}
// Reload and retry
}
}
throw new IllegalStateException("Retry loop exited without result");
}
private OrderAggregate loadAggregate(String orderId) {
List<StoredEvent> events = eventStore.readStream("order-" + orderId, 0);
OrderAggregate aggregate = new OrderAggregate();
for (StoredEvent stored : events) {
OrderEvent event = registry.deserialize(stored.eventType(), stored.payload());
aggregate.apply(event);
}
return aggregate;
}
}
The retry is not blind. Each retry reloads the aggregate from the event store, getting the latest state including the events that caused the conflict. The command is re-validated against this new state. If the conflict was caused by a non-conflicting change (e.g., a shipping address change while the cancel was in flight), the retry succeeds. If the conflict was caused by a conflicting change (e.g., the order was already cancelled), the re-validation throws a business exception, which is correct.
The maximum retry count of 3 is a pragmatic choice. If an aggregate experiences more than 3 concurrent modifications during a single command handling attempt, the system has a contention problem that retries cannot solve. The aggregate boundary needs to be reconsidered.
The Production Path
// PRODUCTION
@Repository
public class SpringJdbcEventStore {
private final JdbcTemplate jdbc;
private final ObjectMapper eventMapper;
public SpringJdbcEventStore(JdbcTemplate jdbc,
@Qualifier("eventObjectMapper") ObjectMapper eventMapper) {
this.jdbc = jdbc;
this.eventMapper = eventMapper;
}
@Transactional
public void append(String streamId, long expectedVersion, List<? extends OrderEvent> events) {
long sequence = expectedVersion + 1;
for (OrderEvent event : events) {
try {
jdbc.update(
"""
INSERT INTO event_store (stream_id, sequence_number, event_type, payload, metadata, occurred_at)
VALUES (?, ?, ?, ?::jsonb, '{}'::jsonb, ?)
""",
streamId, sequence,
event.getClass().getSimpleName(),
eventMapper.writeValueAsString(event),
Timestamp.from(event.occurredAt())
);
} catch (DuplicateKeyException e) {
throw new OptimisticConcurrencyException(streamId, expectedVersion);
} catch (JsonProcessingException e) {
throw new EventStoreException("Serialization failed", e);
}
sequence++;
}
}
@Transactional(readOnly = true)
public List<StoredEvent> readStream(String streamId) {
return jdbc.query(
"""
SELECT global_position, stream_id, sequence_number, event_type,
payload, metadata, occurred_at, stored_at
FROM event_store
WHERE stream_id = ?
ORDER BY sequence_number ASC
""",
this::mapRow, streamId
);
}
@Transactional(readOnly = true)
public List<StoredEvent> readAllFromPosition(long afterPosition, int batchSize) {
return jdbc.query(
"""
SELECT global_position, stream_id, sequence_number, event_type,
payload, metadata, occurred_at, stored_at
FROM event_store
WHERE global_position > ?
ORDER BY global_position ASC
LIMIT ?
""",
this::mapRow, afterPosition, batchSize
);
}
private StoredEvent mapRow(ResultSet rs, int rowNum) throws SQLException {
return new StoredEvent(
rs.getLong("global_position"),
rs.getString("stream_id"),
rs.getLong("sequence_number"),
rs.getString("event_type"),
rs.getString("payload"),
rs.getString("metadata"),
rs.getTimestamp("occurred_at").toInstant(),
rs.getTimestamp("stored_at").toInstant()
);
}
}
The @Transactional(readOnly = true) annotation on read methods enables PostgreSQL read-only transaction optimizations and ensures the connection is routed to a read replica if the application uses connection routing.
The Test
// FROM SCRATCH
@Testcontainers
class EventStoreConcurrencyTest {
@Container
static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:16")
.withDatabaseName("concurrency_test")
.withInitScript("event_store_schema.sql");
private EventStore eventStore;
@BeforeEach
void setUp() {
var dataSource = new org.postgresql.ds.PGSimpleDataSource();
dataSource.setUrl(postgres.getJdbcUrl());
dataSource.setUser(postgres.getUsername());
dataSource.setPassword(postgres.getPassword());
var mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
eventStore = new EventStore(dataSource, mapper);
}
@Test
void concurrentAppendsToSameStreamDetected() throws Exception {
// Initial event
eventStore.append("order-1", -1, List.of(
new OrderPlaced("1", "c1", List.of(), BigDecimal.ZERO,
new Address("1", "C", "S", "0", "US"), Instant.now())
));
// Two concurrent writers, both reading version 0
ExecutorService executor = Executors.newFixedThreadPool(2);
var future1 = executor.submit(() -> {
eventStore.append("order-1", 0, List.of(
new OrderConfirmed("1", Instant.now())
));
return "success";
});
var future2 = executor.submit(() -> {
eventStore.append("order-1", 0, List.of(
new OrderCancelled("1", "reason", "user", Instant.now())
));
return "success";
});
int successes = 0;
int conflicts = 0;
try { future1.get(); successes++; }
catch (ExecutionException e) {
if (e.getCause() instanceof OptimisticConcurrencyException) conflicts++;
else throw e;
}
try { future2.get(); successes++; }
catch (ExecutionException e) {
if (e.getCause() instanceof OptimisticConcurrencyException) conflicts++;
else throw e;
}
assertEquals(1, successes, "Exactly one writer should succeed");
assertEquals(1, conflicts, "Exactly one writer should conflict");
executor.shutdown();
}
@Test
void globalPositionIsMonotonicallyIncreasing() {
for (int i = 0; i < 5; i++) {
eventStore.append("order-" + i, -1, List.of(
new OrderPlaced(String.valueOf(i), "c1", List.of(), BigDecimal.ZERO,
new Address("1", "C", "S", "0", "US"), Instant.now())
));
}
List<StoredEvent> allEvents = eventStore.readAllFromPosition(0, 100);
assertEquals(5, allEvents.size());
for (int i = 1; i < allEvents.size(); i++) {
assertTrue(
allEvents.get(i).globalPosition() > allEvents.get(i - 1).globalPosition(),
"Global positions must be strictly increasing"
);
}
}
@Test
void retryableCommandHandlerRecoverFromConflict() {
eventStore.append("order-retry", -1, List.of(
new OrderPlaced("retry", "c1", List.of(), BigDecimal.ZERO,
new Address("1", "C", "S", "0", "US"), Instant.now())
));
var mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
var registry = new EventTypeRegistry(mapper);
var handler = new RetryableCommandHandler(eventStore, registry);
// Simulate a concurrent modification before the cancel
eventStore.append("order-retry", 0, List.of(
new ShippingAddressChanged("retry",
new Address("1", "C", "S", "0", "US"),
new Address("2", "D", "T", "1", "US"),
"Moved", Instant.now())
));
// Cancel should succeed on first attempt since no real race
var result = handler.handleWithRetry(
new CancelOrder("retry", "No longer needed", "customer")
);
assertEquals(1, result.size());
assertInstanceOf(OrderCancelled.class, result.get(0));
}
}
PostgreSQL vs. EventStoreDB
PostgreSQL is a general-purpose relational database. EventStoreDB is a database designed specifically for event sourcing. The comparison is relevant because the choice affects operational complexity, performance characteristics, and feature availability.
PostgreSQL advantages for most Java applications:
- The team already operates PostgreSQL. Backup, monitoring, replication, and connection pooling are established.
- The event store shares the same database instance as the outbox table, enabling transactional outbox writes without distributed transactions.
- JSONB queries enable ad-hoc debugging without a separate query tool.
- Partitioning and archiving use standard PostgreSQL mechanisms.
EventStoreDB advantages:
- Built-in catch-up subscriptions for event consumers, with automatic position tracking and backpressure.
- Built-in projections (JavaScript-based) that run inside the database.
- Purpose-built storage engine optimized for append-only workloads with higher throughput than PostgreSQL for sustained writes above 50,000 events per second.
- System-level stream categories that simplify cross-aggregate event consumption.
The threshold for switching: If your event store handles fewer than 50,000 appends per second sustained, if your event consumers are external processes (not in-database projections), and if your event publishing uses the outbox pattern to Kafka, PostgreSQL provides everything you need. If you need catch-up subscriptions with guaranteed ordering across consumer restarts, or if your append throughput exceeds what a single PostgreSQL instance provides, EventStoreDB is the specialized tool for the job.
Most Java applications writing orders, payments, and inventory events are well below this threshold. The order management platform in this book processes thousands of orders per day, producing tens of thousands of events per day. PostgreSQL handles this without measurable impact on its other workloads.
This chapter built the event store with production-grade concurrency control. The next chapter addresses what happens when replaying thousands of events to load an aggregate becomes a performance bottleneck.