Stage Ordering and the $match-First Principle
Stage Ordering and the $match-First Principle
The Symptom
The daily analytics report computes average readings per sensor for the last 24 hours, enriched with sensor metadata via $lookup. The pipeline takes 12 seconds and uses 2.8 GB of temporary disk space (allowDiskUse is enabled). The collection has 200 million documents, but only 2.5 million are from the last 24 hours.
The Cause
The pipeline processes stages in the order they were written:
// SLOW: $lookup before $match on time range
db.readings.aggregate([
{ $lookup: {
from: "sensors",
localField: "sensorId",
foreignField: "_id",
as: "sensor"
}},
{ $unwind: "$sensor" },
{ $match: { ts: { $gte: yesterday } } }, // Filters AFTER $lookup
{ $group: {
_id: { sensorId: "$sensorId", building: "$sensor.building" },
avgTemp: { $avg: "$temperature" },
count: { $sum: 1 }
}},
{ $sort: { avgTemp: -1 } }
], { allowDiskUse: true })
The $lookup stage runs against all 200 million documents before the $match filters to the last 24 hours. Each of those 200 million documents triggers a lookup against the sensors collection. The optimizer cannot reorder $match before $lookup automatically because the $unwind stage between them changes the document structure.
The Benchmark
@BenchmarkMode(Mode.SingleShotTime)
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(1)
@State(Scope.Benchmark)
public class PipelineOrderingBenchmark {
private MongoCollection<Document> collection;
private Date yesterday;
@Setup
public void setup() {
MongoClient client = MongoClients.create("mongodb://localhost:27017");
collection = client.getDatabase("telemetry").getCollection("readings");
yesterday = Date.from(Instant.now().minus(1, ChronoUnit.DAYS));
}
@Benchmark
public List<Document> lookupFirst() {
return collection.aggregate(List.of(
Aggregates.lookup("sensors", "sensorId", "_id", "sensor"),
Aggregates.unwind("$sensor"),
Aggregates.match(Filters.gte("ts", yesterday)),
Aggregates.group(
new Document("sensorId", "$sensorId").append("building", "$sensor.building"),
Accumulators.avg("avgTemp", "$temperature"),
Accumulators.sum("count", 1)
),
Aggregates.sort(Sorts.descending("avgTemp"))
)).allowDiskUse(true).into(new ArrayList<>());
}
@Benchmark
public List<Document> matchFirst() {
return collection.aggregate(List.of(
Aggregates.match(Filters.gte("ts", yesterday)),
Aggregates.lookup("sensors", "sensorId", "_id", "sensor"),
Aggregates.unwind("$sensor"),
Aggregates.group(
new Document("sensorId", "$sensorId").append("building", "$sensor.building"),
Accumulators.avg("avgTemp", "$temperature"),
Accumulators.sum("count", 1)
),
Aggregates.sort(Sorts.descending("avgTemp"))
)).into(new ArrayList<>());
}
@Benchmark
public List<Document> matchFirstWithProjection() {
return collection.aggregate(List.of(
Aggregates.match(Filters.gte("ts", yesterday)),
Aggregates.project(Projections.include("sensorId", "temperature", "ts")),
Aggregates.lookup("sensors", "sensorId", "_id", "sensor"),
Aggregates.unwind("$sensor"),
Aggregates.group(
new Document("sensorId", "$sensorId").append("building", "$sensor.building"),
Accumulators.avg("avgTemp", "$temperature"),
Accumulators.sum("count", 1)
),
Aggregates.sort(Sorts.descending("avgTemp"))
)).into(new ArrayList<>());
}
}
Results:
Benchmark Mode Cnt Score Error Units
PipelineOrderingBenchmark.lookupFirst ss 1 12.000 s/op
PipelineOrderingBenchmark.matchFirst ss 1 1.800 s/op
PipelineOrderingBenchmark.matchFirstWithProjection ss 1 1.200 s/op
Moving $match before $lookup reduces execution from 12 seconds to 1.8 seconds (6.7x improvement). Adding $project before $lookup to strip unnecessary fields reduces it further to 1.2 seconds. The $project reduces the document size flowing through the pipeline, which reduces memory usage in subsequent stages.
The Fix
The pipeline ordering checklist:
- Place
$matchstages as early as possible. Every document filtered out avoids all subsequent stages. - Place
$project/$unsetbefore$lookup,$group, and$sortto reduce document size. - Place
$limitimmediately after$sortto enable the top-k optimization (MongoDB keeps only the top k documents in memory during sort). - Combine multiple
$matchstages into one when they are adjacent (the optimizer does this, but explicit is clearer).
// FAST: Optimized pipeline ordering
List<Document> results = collection.aggregate(List.of(
// 1. Filter first: 200M -> 2.5M documents
Aggregates.match(Filters.gte("ts", Date.from(yesterday))),
// 2. Project only needed fields: reduces per-doc size from 340 to 80 bytes
Aggregates.project(Projections.fields(
Projections.include("sensorId", "temperature"),
Projections.excludeId()
)),
// 3. Group: 2.5M -> 10K documents
Aggregates.group("$sensorId",
Accumulators.avg("avgTemp", "$temperature"),
Accumulators.sum("count", 1)
),
// 4. Lookup on grouped results: 10K lookups, not 2.5M
Aggregates.lookup("sensors", "_id", "_id", "sensor"),
Aggregates.unwind("$sensor"),
// 5. Project final shape
Aggregates.project(Projections.fields(
Projections.computed("sensorId", "$_id"),
Projections.include("avgTemp", "count"),
Projections.computed("building", "$sensor.building")
)),
// 6. Sort and limit
Aggregates.sort(Sorts.descending("avgTemp")),
Aggregates.limit(100)
)).into(new ArrayList<>());
The $lookup now runs on 10,000 grouped results instead of 2.5 million individual readings.
The Proof
| Metric | Unoptimized | Optimized |
|---|---|---|
| Execution time | 12s | 0.4s |
| Documents processed by $lookup | 200,000,000 | 10,000 |
| Peak memory usage | 2.8 GB (disk spill) | 45 MB |
| allowDiskUse required | Yes | No |
The Trade-off
Moving $group before $lookup changes the semantics when the lookup field is used in the group key. In the original pipeline, grouping is by {sensorId, building}. In the optimized pipeline, grouping is only by sensorId, and building is added after grouping. This works because sensorId has a one-to-one relationship with building. If the relationship were many-to-many (a sensor could be in multiple buildings), the optimized pipeline would produce different results. Verify that the semantic equivalence holds before reordering across $lookup boundaries.