Aggregates and Command Handling: Enforcing Invariants Against an Event Stream
Aggregates and Command Handling
An aggregate processes commands through a four-step cycle: load state from events, validate business invariants, produce new events, and apply those events to update state. The aggregate is the transactional consistency boundary. Optimistic concurrency via version checking prevents conflicting writes to the same stream.
An aggregate is a consistency boundary. It is the unit of state that must be internally consistent at all times. An order must not be fulfilled without payment. Inventory must not go negative. A refund must not exceed the original payment amount. These invariants are enforced by the aggregate, and the aggregate enforces them by controlling all state transitions through command validation against the current event-derived state.
In a CRUD system, invariants are enforced by application-layer checks and database constraints. The service layer reads the current row, validates the business rule, and updates the row. If two requests arrive simultaneously, both might read the same row state, both might pass validation, and both might update the row. The database constraint (if one exists) catches the conflict. If no constraint exists, the data is corrupted silently.
In an event-sourced system, invariants are enforced by the aggregate, and concurrency is controlled by the event store. The aggregate loads its state from the event stream, validates the command, produces events, and appends them to the stream. If two commands arrive simultaneously for the same aggregate, the optimistic concurrency check on the event store ensures that only one succeeds. The other retries with the updated state.
The aggregate boundary is the most consequential design decision in an event-sourced system. A boundary that is too large creates contention: many commands compete for the same event stream. A boundary that is too small scatters invariants across multiple aggregates, requiring distributed coordination to enforce rules that should be local.
The Order Aggregate
The Problem
An e-commerce order has a complex lifecycle. It transitions through multiple states: placed, confirmed, payment authorized, fulfilled, cancelled, refunded. Some transitions are valid only from certain states. An order cannot be fulfilled without payment authorization. An order cannot be cancelled after fulfilment. A refund cannot exceed the paid amount. A partially fulfilled order can be partially refunded.
In a CRUD system, these rules are scattered across service methods. The cancelOrder method checks the status column. The refundOrder method checks the payment amount. The fulfillOrder method checks inventory status. Each method reads the row, validates, and updates. The invariants are implicit in the procedural logic, not explicit in the domain model.
The Mechanism
An aggregate in an event-sourced system has four responsibilities:
-
Load state from events. The aggregate replays its event stream, applying each event to build the current state. After replay, the aggregate knows its current status, totals, line items, and everything it needs for command validation.
-
Validate commands. The aggregate checks business rules against its current state. If the command violates an invariant, it throws an exception. If the command is valid, it proceeds.
-
Produce events. Valid commands produce one or more events that describe the state change. These events are the facts that will be appended to the stream.
-
Enforce the consistency boundary. All invariants that must hold are checked within the aggregate. No external system is consulted during command handling. If the aggregate needs information from another aggregate to validate a command, the aggregate boundary is wrong.
The From-Scratch Implementation
// FROM SCRATCH
public class OrderAggregate {
private String orderId;
private String customerId;
private OrderStatus status;
private BigDecimal total;
private BigDecimal paidAmount;
private BigDecimal refundedAmount;
private List<LineItem> lineItems;
private Address shippingAddress;
private long version; // current sequence number
public OrderAggregate() {
this.status = OrderStatus.NEW;
this.total = BigDecimal.ZERO;
this.paidAmount = BigDecimal.ZERO;
this.refundedAmount = BigDecimal.ZERO;
this.lineItems = List.of();
this.version = -1; // no events yet
}
// --- Command handlers ---
public List<OrderEvent> place(PlaceOrder command) {
if (status != OrderStatus.NEW) {
throw new IllegalStateException("Order already exists: " + orderId);
}
BigDecimal calculatedTotal = command.lineItems().stream()
.map(li -> li.unitPrice().multiply(BigDecimal.valueOf(li.quantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
if (calculatedTotal.compareTo(BigDecimal.ZERO) <= 0) {
throw new InvalidOrderException("Order total must be positive");
}
var event = new OrderPlaced(
command.orderId(), command.customerId(),
command.lineItems(), calculatedTotal,
command.shippingAddress(), Instant.now()
);
apply(event);
return List.of(event);
}
public List<OrderEvent> confirm(ConfirmOrder command) {
if (status != OrderStatus.PLACED) {
throw new IllegalStateException(
"Cannot confirm order in status " + status
);
}
var event = new OrderConfirmed(command.orderId(), Instant.now());
apply(event);
return List.of(event);
}
public List<OrderEvent> authorizePayment(AuthorizePayment command) {
if (status != OrderStatus.CONFIRMED) {
throw new IllegalStateException(
"Cannot authorize payment for order in status " + status
);
}
if (command.amount().compareTo(total) != 0) {
throw new PaymentAmountMismatchException(
"Payment amount " + command.amount() + " does not match order total " + total
);
}
var event = new PaymentAuthorized(
command.orderId(), command.paymentId(),
command.amount(), command.authorizationCode(),
Instant.now()
);
apply(event);
return List.of(event);
}
public List<OrderEvent> fulfill(FulfillOrder command) {
if (status != OrderStatus.PAYMENT_AUTHORIZED) {
throw new IllegalStateException(
"Cannot fulfill order in status " + status
);
}
var event = new OrderFulfilled(
command.orderId(), command.trackingNumber(),
command.carrier(), Instant.now()
);
apply(event);
return List.of(event);
}
public List<OrderEvent> cancel(CancelOrder command) {
if (status == OrderStatus.CANCELLED) {
throw new OrderAlreadyCancelledException(orderId);
}
if (status == OrderStatus.FULFILLED || status == OrderStatus.REFUNDED) {
throw new OrderCannotBeCancelledException(
orderId, "Order in status " + status + " cannot be cancelled"
);
}
var event = new OrderCancelled(
command.orderId(), command.reason(),
command.cancelledBy(), Instant.now()
);
apply(event);
return List.of(event);
}
public List<OrderEvent> requestRefund(RequestRefund command) {
if (status != OrderStatus.FULFILLED) {
throw new IllegalStateException(
"Cannot refund order in status " + status
);
}
BigDecimal maxRefundable = paidAmount.subtract(refundedAmount);
if (command.amount().compareTo(maxRefundable) > 0) {
throw new RefundExceedsPaymentException(
"Requested refund " + command.amount() +
" exceeds maximum refundable " + maxRefundable
);
}
var event = new RefundRequested(
command.orderId(), command.refundId(),
command.amount(), command.reason(),
Instant.now()
);
apply(event);
return List.of(event);
}
// --- Event application ---
public void apply(OrderEvent event) {
switch (event) {
case OrderPlaced e -> {
this.orderId = e.orderId();
this.customerId = e.customerId();
this.status = OrderStatus.PLACED;
this.total = e.total();
this.lineItems = e.lineItems();
this.shippingAddress = e.shippingAddress();
}
case OrderConfirmed e -> {
this.status = OrderStatus.CONFIRMED;
}
case PaymentAuthorized e -> {
this.status = OrderStatus.PAYMENT_AUTHORIZED;
this.paidAmount = e.amount();
}
case OrderFulfilled e -> {
this.status = OrderStatus.FULFILLED;
}
case OrderCancelled e -> {
this.status = OrderStatus.CANCELLED;
}
case ShippingAddressChanged e -> {
this.shippingAddress = e.newAddress();
}
case RefundRequested e -> {
this.refundedAmount = this.refundedAmount.add(e.amount());
if (this.refundedAmount.compareTo(this.paidAmount) >= 0) {
this.status = OrderStatus.REFUNDED;
}
}
}
this.version++;
}
public long version() { return version; }
public String orderId() { return orderId; }
public OrderStatus status() { return status; }
}
Additional events and commands for the expanded lifecycle:
// FROM SCRATCH
public record AuthorizePayment(
String orderId,
String paymentId,
BigDecimal amount,
String authorizationCode
) implements OrderCommand {}
public record FulfillOrder(
String orderId,
String trackingNumber,
String carrier
) implements OrderCommand {}
public record RequestRefund(
String orderId,
String refundId,
BigDecimal amount,
String reason
) implements OrderCommand {}
public record OrderFulfilled(
String orderId,
String trackingNumber,
String carrier,
Instant occurredAt
) implements OrderEvent {}
public record RefundRequested(
String orderId,
String refundId,
BigDecimal amount,
String reason,
Instant occurredAt
) implements OrderEvent {}
What the Implementation Reveals
The version field increments with each applied event. When the aggregate is loaded from the event store, the version reflects the number of events replayed. When new events are produced by a command handler, the version increments for each. The version after command handling becomes the expected sequence number for the next append. This is the connection between the aggregate’s in-memory state and the event store’s concurrency control.
The apply method is called both during replay (loading existing state) and during command handling (applying new events). This dual-use is deliberate. The same code path that loads historical state also processes new state changes. If the apply method has a bug, it manifests during loading (which is tested frequently) rather than hiding in production until a specific sequence of events triggers it.
The refund validation demonstrates why aggregates must be self-contained. The requestRefund handler checks whether the refund amount exceeds the maximum refundable amount, which depends on the paid amount and previously refunded amounts. This information is tracked by the aggregate’s internal state, derived from PaymentAuthorized and prior RefundRequested events. If refund validation required querying the payment service or a separate refund table, the invariant would not be enforceable under concurrency: two simultaneous refund requests might both pass validation and produce a total refund exceeding the paid amount.
The Aggregate Repository
The aggregate repository handles loading and saving aggregates. It bridges the aggregate’s in-memory state and the event store’s persistent state.
// FROM SCRATCH
public class OrderRepository {
private final JdbcEventStore eventStore;
private final EventTypeRegistry registry;
public OrderRepository(JdbcEventStore eventStore, EventTypeRegistry registry) {
this.eventStore = eventStore;
this.registry = registry;
}
public OrderAggregate load(String orderId) {
String streamId = "order-" + orderId;
List<StoredEvent> storedEvents = eventStore.readStream(streamId);
OrderAggregate aggregate = new OrderAggregate();
for (StoredEvent stored : storedEvents) {
OrderEvent event = registry.deserialize(stored.eventType(), stored.payload());
aggregate.apply(event);
}
return aggregate;
}
public void save(OrderAggregate aggregate, List<OrderEvent> newEvents) {
String streamId = "order-" + aggregate.orderId();
long expectedSequence = aggregate.version() - newEvents.size() + 1;
eventStore.append(streamId, expectedSequence, newEvents);
}
}
The save method calculates the expected sequence number. After loading, the aggregate’s version reflects the number of existing events. After handling a command, the version has been incremented by the number of new events. The expected sequence for the append is the version before the new events were applied.
The Production Path
// PRODUCTION
@Service
public class OrderApplicationService {
private final SpringEventStore eventStore;
private final EventTypeRegistry registry;
public OrderApplicationService(SpringEventStore eventStore, EventTypeRegistry registry) {
this.eventStore = eventStore;
this.registry = registry;
}
@Transactional
public OrderEvent placeOrder(PlaceOrder command) {
OrderAggregate aggregate = new OrderAggregate();
List<OrderEvent> events = aggregate.place(command);
eventStore.append("order-" + command.orderId(), 0, events);
return events.get(0);
}
@Transactional
public OrderEvent cancelOrder(CancelOrder command) {
OrderAggregate aggregate = loadAggregate(command.orderId());
long versionBeforeCommand = aggregate.version();
List<OrderEvent> events = aggregate.cancel(command);
eventStore.append("order-" + command.orderId(), versionBeforeCommand + 1, events);
return events.get(0);
}
@Transactional
public OrderEvent requestRefund(RequestRefund command) {
OrderAggregate aggregate = loadAggregate(command.orderId());
long versionBeforeCommand = aggregate.version();
List<OrderEvent> events = aggregate.requestRefund(command);
eventStore.append("order-" + command.orderId(), versionBeforeCommand + 1, events);
return events.get(0);
}
private OrderAggregate loadAggregate(String orderId) {
List<StoredEvent> storedEvents = eventStore.readStream("order-" + orderId);
if (storedEvents.isEmpty()) {
throw new OrderNotFoundException(orderId);
}
OrderAggregate aggregate = new OrderAggregate();
for (StoredEvent stored : storedEvents) {
OrderEvent event = registry.deserialize(stored.eventType(), stored.payload());
aggregate.apply(event);
}
return aggregate;
}
}
In Axon Framework, the aggregate is annotated with @Aggregate, command handlers with @CommandHandler, and event application methods with @EventSourcingHandler. Axon handles loading, event application, and persistence automatically. The aggregate’s version-based concurrency control is built into Axon’s AggregateLifecycle. The developer writes only the business logic. This is a significant reduction in boilerplate, earned by understanding what the boilerplate does.
The Test
// FROM SCRATCH
@Testcontainers
class OrderAggregateTest {
@Container
static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:16")
.withDatabaseName("aggregate_test")
.withInitScript("event_store_schema.sql");
private OrderRepository repository;
private JdbcEventStore eventStore;
@BeforeEach
void setUp() {
var dataSource = new org.postgresql.ds.PGSimpleDataSource();
dataSource.setUrl(postgres.getJdbcUrl());
dataSource.setUser(postgres.getUsername());
dataSource.setPassword(postgres.getPassword());
var mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
eventStore = new JdbcEventStore(dataSource, mapper);
var registry = new EventTypeRegistry(mapper);
repository = new OrderRepository(eventStore, registry);
}
@Test
void fullOrderLifecycle() {
// Place
OrderAggregate order = new OrderAggregate();
List<OrderEvent> placeEvents = order.place(new PlaceOrder(
"order-1", "customer-1",
List.of(new LineItem("prod-1", "Widget", 2, new BigDecimal("19.99"))),
new Address("123 Main St", "Springfield", "IL", "62701", "US")
));
eventStore.append("order-order-1", 0, placeEvents);
// Confirm
order = repository.load("order-1");
assertEquals(OrderStatus.PLACED, order.status());
List<OrderEvent> confirmEvents = order.confirm(new ConfirmOrder("order-1"));
repository.save(order, confirmEvents);
// Authorize payment
order = repository.load("order-1");
assertEquals(OrderStatus.CONFIRMED, order.status());
List<OrderEvent> paymentEvents = order.authorizePayment(new AuthorizePayment(
"order-1", "pay-1", new BigDecimal("39.98"), "AUTH-12345"
));
repository.save(order, paymentEvents);
// Fulfill
order = repository.load("order-1");
assertEquals(OrderStatus.PAYMENT_AUTHORIZED, order.status());
List<OrderEvent> fulfillEvents = order.fulfill(new FulfillOrder(
"order-1", "TRACK-001", "FedEx"
));
repository.save(order, fulfillEvents);
// Verify final state
order = repository.load("order-1");
assertEquals(OrderStatus.FULFILLED, order.status());
assertEquals(4, order.version()); // 4 events, version 3 (0-indexed)
}
@Test
void cannotFulfillWithoutPayment() {
OrderAggregate order = new OrderAggregate();
order.place(new PlaceOrder(
"order-2", "customer-1", List.of(new LineItem("p", "W", 1, BigDecimal.TEN)),
new Address("1 St", "City", "ST", "00000", "US")
));
assertThrows(IllegalStateException.class, () ->
order.fulfill(new FulfillOrder("order-2", "TRACK", "UPS"))
);
}
@Test
void cannotRefundMoreThanPaid() {
OrderAggregate order = new OrderAggregate();
order.place(new PlaceOrder(
"order-3", "customer-1",
List.of(new LineItem("p", "W", 1, new BigDecimal("50.00"))),
new Address("1 St", "City", "ST", "00000", "US")
));
order.confirm(new ConfirmOrder("order-3"));
order.authorizePayment(new AuthorizePayment(
"order-3", "pay-1", new BigDecimal("50.00"), "AUTH-1"
));
order.fulfill(new FulfillOrder("order-3", "T", "UPS"));
// First refund of 30 succeeds
order.requestRefund(new RequestRefund(
"order-3", "refund-1", new BigDecimal("30.00"), "Damaged item"
));
// Second refund of 30 exceeds remaining 20
assertThrows(RefundExceedsPaymentException.class, () ->
order.requestRefund(new RequestRefund(
"order-3", "refund-2", new BigDecimal("30.00"), "Wrong item"
))
);
}
@Test
void cannotCancelFulfilledOrder() {
OrderAggregate order = new OrderAggregate();
order.place(new PlaceOrder(
"order-4", "customer-1",
List.of(new LineItem("p", "W", 1, BigDecimal.TEN)),
new Address("1 St", "City", "ST", "00000", "US")
));
order.confirm(new ConfirmOrder("order-4"));
order.authorizePayment(new AuthorizePayment(
"order-4", "pay-1", BigDecimal.TEN, "AUTH-1"
));
order.fulfill(new FulfillOrder("order-4", "T", "UPS"));
assertThrows(OrderCannotBeCancelledException.class, () ->
order.cancel(new CancelOrder("order-4", "Changed mind", "customer-1"))
);
}
}
Aggregate Boundaries and Contention
The aggregate boundary determines the contention profile of the system. Every command targeting the same aggregate instance competes for the same event stream. Two simultaneous commands for the same order will result in one succeeding and one retrying.
For the order management platform, the Order aggregate boundary includes the order status, line items, shipping address, payment state, and refund state. This is a reasonable boundary because all these elements must be consistent with each other (a refund cannot exceed payment, fulfilment requires payment authorization).
Consider the alternative: a broader aggregate that includes the customer’s entire order history. Now every order placement, cancellation, and refund for the same customer competes for the same event stream. A customer with high order frequency experiences contention on every operation. The invariant “total refunds across all orders must not exceed total payments” might seem to justify this broader boundary, but that invariant is better enforced by a separate process (a saga or a policy) than by expanding the aggregate boundary.
The rule: an aggregate should be as small as possible while still enforcing all invariants that must hold under concurrent operations. If an invariant can tolerate eventual consistency (checked and enforced asynchronously), it does not belong in the aggregate.
This chapter established the aggregate pattern, the command handling lifecycle, and the relationship between aggregate boundaries and concurrency. The next chapter builds the event store itself with full optimistic concurrency control.