Skip to main content
unbound mongodb at scale

Aggregation Pipelines on Sharded Collections

5 min read Chapter 54 of 72

Aggregation Pipelines on Sharded Collections

The Symptom

The telemetry platform’s hourly aggregation pipeline takes 45 seconds on the 16-shard cluster. The same pipeline on a single-shard development environment takes 3 seconds. The expectation was that sharding would make the aggregation faster (16x the compute), not 15x slower.

The Cause

MongoDB splits aggregation pipelines on sharded collections into two parts:

  1. Shard part: Stages that can run independently on each shard. Includes $match, $project, $addFields, and the partial phase of $group.
  2. Merger part: Stages that require data from multiple shards. Includes the final phase of $group, $sort (merge sort), $limit, $lookup, and $out.

The merger runs on a randomly chosen shard (or on mongos for simple merges). All data that passes through the shard part must be sent over the network to the merger shard.

The telemetry pipeline:

// SLOW: Aggregation that moves all data to merger shard
db.readings.aggregate([
  { $match: { ts: { $gte: ISODate("2024-06-15T12:00:00Z"),
                     $lt: ISODate("2024-06-15T13:00:00Z") } } },
  { $group: {
    _id: "$sensorId",
    avgTemp: { $avg: "$temperature" },
    maxTemp: { $max: "$temperature" },
    count: { $sum: 1 }
  }},
  { $sort: { avgTemp: -1 } },
  { $limit: 100 }
])

The $match stage runs on all 16 shards (no shard key, so it scatters). Each shard computes a partial $group (accumulator state for the sensorIds it owns). The partial group results from all 16 shards are sent to the merger shard, which computes the final $group, sorts, and limits.

The problem: the $match on ts does not include the shard key (sensorId), so every shard scans its data. And the $group by sensorId means the merger must combine partial results from all shards for each sensorId. If sensorId values are distributed across multiple shards (hashed key), every shard contributes partial results for every sensorId, and the merger receives 16 partial group results per sensorId.

// Check pipeline splitting
db.readings.explain().aggregate([...])
// "mergeType": "anyShard"
// "shardsPart": ["$match", "$group (partial)"]
// "mergerPart": ["$group (final)", "$sort", "$limit"]

The Benchmark

Pipeline designShard part outputNetwork to mergerTotal time
$match(ts) then $group(sensorId)160,000 partial groups48 MB45s
$match(ts, sensorId) then $group1 partial group0.3 KB0.8s
Pre-aggregated per-shard summaryN/A00.02s

The Fix

Strategy 1: Add shard key to $match for targeted aggregation.

If the aggregation is for a specific building’s sensors, include the shard key:

// FAST: Targeted aggregation with shard key in $match
public Document aggregateBuildingSensors(String buildingId, Instant hour) {
    List<String> sensorIds = sensorRegistry.getSensorsByBuilding(buildingId);

    List<Bson> pipeline = Arrays.asList(
        Aggregates.match(Filters.and(
            Filters.in("sensorId", sensorIds),  // shard key -> targeted
            Filters.gte("ts", Date.from(hour)),
            Filters.lt("ts", Date.from(hour.plus(Duration.ofHours(1))))
        )),
        Aggregates.group("$sensorId",
            Accumulators.avg("avgTemp", "$temperature"),
            Accumulators.max("maxTemp", "$temperature"),
            Accumulators.sum("count", 1)
        ),
        Aggregates.sort(Sorts.descending("avgTemp")),
        Aggregates.limit(100)
    );

    return collection.aggregate(pipeline).first();
}

Strategy 2: Pre-aggregate and store summaries.

For the global dashboard (all sensors, all shards), pre-aggregate hourly summaries during ingestion:

// FAST: Pre-aggregate during bucket upsert (from CH7)
public void upsertBucketWithSummary(TelemetryReading reading) {
    // Update 5-minute bucket (targeted by sensorId shard key)
    Bson bucketFilter = Filters.and(
        Filters.eq("sensorId", reading.getSensorId()),
        Filters.eq("bucketStart", bucketStart(reading.getTimestamp()))
    );

    Bson update = Updates.combine(
        Updates.push("readings", reading.toBsonValue()),
        Updates.inc("count", 1),
        Updates.min("minTemp", reading.getTemperature()),
        Updates.max("maxTemp", reading.getTemperature()),
        // Running average: update sum and count, compute avg on read
        Updates.inc("tempSum", reading.getTemperature())
    );

    bucketsCollection.updateOne(bucketFilter, update, new UpdateOptions().upsert(true));
}

Then the hourly aggregation queries pre-computed summaries instead of raw readings:

// FAST: Aggregate from pre-computed bucket summaries
List<Bson> pipeline = Arrays.asList(
    Aggregates.match(Filters.and(
        Filters.gte("bucketStart", Date.from(hourStart)),
        Filters.lt("bucketStart", Date.from(hourEnd))
    )),
    Aggregates.group("$sensorId",
        Accumulators.avg("avgTemp",
            new Document("$divide", Arrays.asList("$tempSum", "$count"))),
        Accumulators.max("maxTemp", "$maxTemp"),
        Accumulators.sum("totalCount", "$count")
    ),
    Aggregates.sort(Sorts.descending("avgTemp")),
    Aggregates.limit(100)
);

// Bucket collection has 12 buckets/hour/sensor vs 720 readings/hour/sensor
// 60x fewer documents to aggregate

Strategy 3: Use $merge to write aggregation results for reuse.

// FAST: Scheduled hourly aggregation that writes results
List<Bson> pipeline = Arrays.asList(
    Aggregates.match(Filters.and(
        Filters.gte("ts", Date.from(hourStart)),
        Filters.lt("ts", Date.from(hourEnd))
    )),
    Aggregates.group("$sensorId",
        Accumulators.avg("avgTemp", "$temperature"),
        Accumulators.max("maxTemp", "$temperature"),
        Accumulators.sum("count", 1)
    ),
    Aggregates.merge("hourly_summaries",
        new MergeOptions()
            .uniqueIdentifier(Arrays.asList("_id"))
            .whenMatched(MergeOptions.WhenMatched.REPLACE)
            .whenNotMatched(MergeOptions.WhenNotMatched.INSERT))
);

collection.aggregate(pipeline).toCollection();
// Dashboard queries hourly_summaries (small, unsharded) instead of readings

The Proof

After implementing pre-aggregated bucket summaries and targeted aggregations:

QueryBeforeAfterImprovement
Global hourly summary45s (scatter-gather)0.8s (bucket aggregation)56x
Per-building hourly45s (scatter-gather)0.12s (targeted, 3 shards)375x
Dashboard top anomalies2.3s (scatter-gather)0.012s (anomaly collection)192x

The Trade-off

Pre-aggregation shifts computation from query time to write time. Each bucket upsert now includes $min, $max, and $inc for the summary fields, adding approximately 15% overhead to the upsert operation. For the telemetry platform ingesting 50,000 readings per second, this adds 7,500 additional atomic operations per second (distributed across shards).

The $merge strategy runs the heavy aggregation once per hour and stores results. The dashboard reads from the summary collection. The trade-off: the summary is up to 1 hour stale. For a monitoring dashboard, 1-hour staleness is acceptable. For a real-time alerting system, it is not. The telemetry platform uses the pre-aggregated buckets for near-real-time queries (5-minute granularity) and the $merge summaries for historical dashboards (hourly granularity).

The fundamental principle: on sharded clusters, avoid aggregations that must touch all shards and merge large result sets. Pre-compute, denormalize, or restructure queries to target specific shards. The shard key is not just a data distribution mechanism; it is a query routing mechanism. Design the shard key around the queries, not around the data.