Aggregation Pipelines on Sharded Collections
Aggregation Pipelines on Sharded Collections
The Symptom
The telemetry platform’s hourly aggregation pipeline takes 45 seconds on the 16-shard cluster. The same pipeline on a single-shard development environment takes 3 seconds. The expectation was that sharding would make the aggregation faster (16x the compute), not 15x slower.
The Cause
MongoDB splits aggregation pipelines on sharded collections into two parts:
- Shard part: Stages that can run independently on each shard. Includes
$match,$project,$addFields, and the partial phase of$group. - Merger part: Stages that require data from multiple shards. Includes the final phase of
$group,$sort(merge sort),$limit,$lookup, and$out.
The merger runs on a randomly chosen shard (or on mongos for simple merges). All data that passes through the shard part must be sent over the network to the merger shard.
The telemetry pipeline:
// SLOW: Aggregation that moves all data to merger shard
db.readings.aggregate([
{ $match: { ts: { $gte: ISODate("2024-06-15T12:00:00Z"),
$lt: ISODate("2024-06-15T13:00:00Z") } } },
{ $group: {
_id: "$sensorId",
avgTemp: { $avg: "$temperature" },
maxTemp: { $max: "$temperature" },
count: { $sum: 1 }
}},
{ $sort: { avgTemp: -1 } },
{ $limit: 100 }
])
The $match stage runs on all 16 shards (no shard key, so it scatters). Each shard computes a partial $group (accumulator state for the sensorIds it owns). The partial group results from all 16 shards are sent to the merger shard, which computes the final $group, sorts, and limits.
The problem: the $match on ts does not include the shard key (sensorId), so every shard scans its data. And the $group by sensorId means the merger must combine partial results from all shards for each sensorId. If sensorId values are distributed across multiple shards (hashed key), every shard contributes partial results for every sensorId, and the merger receives 16 partial group results per sensorId.
// Check pipeline splitting
db.readings.explain().aggregate([...])
// "mergeType": "anyShard"
// "shardsPart": ["$match", "$group (partial)"]
// "mergerPart": ["$group (final)", "$sort", "$limit"]
The Benchmark
| Pipeline design | Shard part output | Network to merger | Total time |
|---|---|---|---|
$match(ts) then $group(sensorId) | 160,000 partial groups | 48 MB | 45s |
$match(ts, sensorId) then $group | 1 partial group | 0.3 KB | 0.8s |
| Pre-aggregated per-shard summary | N/A | 0 | 0.02s |
The Fix
Strategy 1: Add shard key to $match for targeted aggregation.
If the aggregation is for a specific building’s sensors, include the shard key:
// FAST: Targeted aggregation with shard key in $match
public Document aggregateBuildingSensors(String buildingId, Instant hour) {
List<String> sensorIds = sensorRegistry.getSensorsByBuilding(buildingId);
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.and(
Filters.in("sensorId", sensorIds), // shard key -> targeted
Filters.gte("ts", Date.from(hour)),
Filters.lt("ts", Date.from(hour.plus(Duration.ofHours(1))))
)),
Aggregates.group("$sensorId",
Accumulators.avg("avgTemp", "$temperature"),
Accumulators.max("maxTemp", "$temperature"),
Accumulators.sum("count", 1)
),
Aggregates.sort(Sorts.descending("avgTemp")),
Aggregates.limit(100)
);
return collection.aggregate(pipeline).first();
}
Strategy 2: Pre-aggregate and store summaries.
For the global dashboard (all sensors, all shards), pre-aggregate hourly summaries during ingestion:
// FAST: Pre-aggregate during bucket upsert (from CH7)
public void upsertBucketWithSummary(TelemetryReading reading) {
// Update 5-minute bucket (targeted by sensorId shard key)
Bson bucketFilter = Filters.and(
Filters.eq("sensorId", reading.getSensorId()),
Filters.eq("bucketStart", bucketStart(reading.getTimestamp()))
);
Bson update = Updates.combine(
Updates.push("readings", reading.toBsonValue()),
Updates.inc("count", 1),
Updates.min("minTemp", reading.getTemperature()),
Updates.max("maxTemp", reading.getTemperature()),
// Running average: update sum and count, compute avg on read
Updates.inc("tempSum", reading.getTemperature())
);
bucketsCollection.updateOne(bucketFilter, update, new UpdateOptions().upsert(true));
}
Then the hourly aggregation queries pre-computed summaries instead of raw readings:
// FAST: Aggregate from pre-computed bucket summaries
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.and(
Filters.gte("bucketStart", Date.from(hourStart)),
Filters.lt("bucketStart", Date.from(hourEnd))
)),
Aggregates.group("$sensorId",
Accumulators.avg("avgTemp",
new Document("$divide", Arrays.asList("$tempSum", "$count"))),
Accumulators.max("maxTemp", "$maxTemp"),
Accumulators.sum("totalCount", "$count")
),
Aggregates.sort(Sorts.descending("avgTemp")),
Aggregates.limit(100)
);
// Bucket collection has 12 buckets/hour/sensor vs 720 readings/hour/sensor
// 60x fewer documents to aggregate
Strategy 3: Use $merge to write aggregation results for reuse.
// FAST: Scheduled hourly aggregation that writes results
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.and(
Filters.gte("ts", Date.from(hourStart)),
Filters.lt("ts", Date.from(hourEnd))
)),
Aggregates.group("$sensorId",
Accumulators.avg("avgTemp", "$temperature"),
Accumulators.max("maxTemp", "$temperature"),
Accumulators.sum("count", 1)
),
Aggregates.merge("hourly_summaries",
new MergeOptions()
.uniqueIdentifier(Arrays.asList("_id"))
.whenMatched(MergeOptions.WhenMatched.REPLACE)
.whenNotMatched(MergeOptions.WhenNotMatched.INSERT))
);
collection.aggregate(pipeline).toCollection();
// Dashboard queries hourly_summaries (small, unsharded) instead of readings
The Proof
After implementing pre-aggregated bucket summaries and targeted aggregations:
| Query | Before | After | Improvement |
|---|---|---|---|
| Global hourly summary | 45s (scatter-gather) | 0.8s (bucket aggregation) | 56x |
| Per-building hourly | 45s (scatter-gather) | 0.12s (targeted, 3 shards) | 375x |
| Dashboard top anomalies | 2.3s (scatter-gather) | 0.012s (anomaly collection) | 192x |
The Trade-off
Pre-aggregation shifts computation from query time to write time. Each bucket upsert now includes $min, $max, and $inc for the summary fields, adding approximately 15% overhead to the upsert operation. For the telemetry platform ingesting 50,000 readings per second, this adds 7,500 additional atomic operations per second (distributed across shards).
The $merge strategy runs the heavy aggregation once per hour and stores results. The dashboard reads from the summary collection. The trade-off: the summary is up to 1 hour stale. For a monitoring dashboard, 1-hour staleness is acceptable. For a real-time alerting system, it is not. The telemetry platform uses the pre-aggregated buckets for near-real-time queries (5-minute granularity) and the $merge summaries for historical dashboards (hourly granularity).
The fundamental principle: on sharded clusters, avoid aggregations that must touch all shards and merge large result sets. Pre-compute, denormalize, or restructure queries to target specific shards. The shard key is not just a data distribution mechanism; it is a query routing mechanism. Design the shard key around the queries, not around the data.