Aggregation Pipelines at Scale
Aggregation Pipelines at Scale
MongoDB’s aggregation framework processes documents through a sequence of stages. Each stage transforms the document stream: filtering, grouping, reshaping, joining. The order of stages determines how much data flows through the pipeline and how much memory is consumed at each step.
The optimizer reorders some stages automatically. It pushes $match stages before $project and $addFields when possible. But it does not reorder across $unwind, $lookup, or $group boundaries. The developer is responsible for placing expensive stages after stages that reduce the document count.
The Execution Model
An aggregation pipeline is not a set of queries. It is a single execution plan with streaming and blocking stages:
Streaming stages process documents one at a time and pass them to the next stage immediately: $match, $project, $addFields, $unset, $replaceRoot, $limit.
Blocking stages must consume all input before producing output: $group, $sort, $bucket, $facet. These stages accumulate results in memory (or on disk if allowDiskUse is enabled).
The memory limit for blocking stages is 100 MB. If a $sort or $group exceeds 100 MB without allowDiskUse, the aggregation fails with error code 16819. With allowDiskUse, the stage spills to disk, which is slow.
// Aggregation with explain
AggregateIterable<Document> pipeline = collection.aggregate(List.of(
Aggregates.match(Filters.gte("ts", Date.from(dayStart))),
Aggregates.group("$sensorId",
Accumulators.avg("avgTemp", "$temperature"),
Accumulators.min("minTemp", "$temperature"),
Accumulators.max("maxTemp", "$temperature"),
Accumulators.sum("count", 1)
),
Aggregates.sort(Sorts.descending("avgTemp")),
Aggregates.limit(100)
));
// Get explain output
Document explainResult = collection.aggregate(pipeline.getPipeline())
.explain(ExplainVerbosity.EXECUTION_STATS);
The explain output shows each stage’s execution time, documents processed, and memory usage.