Memory Limits, allowDiskUse, and Aggregation Sizing
Memory Limits, allowDiskUse, and Aggregation Sizing
The Symptom
The weekly report aggregation computes statistics across all sensors for 7 days: 17.5 million documents. The aggregation fails with:
Command failed with error 16819: '$group exceeded memory limit of 104857600 bytes'
Adding allowDiskUse(true) makes it succeed but takes 45 seconds instead of the expected 5 seconds.
The Cause
The $group stage accumulates state for each distinct group key. With 10,000 sensors and 7 daily sub-groups (one per day), the group stage maintains 70,000 accumulators. Each accumulator stores intermediate values for $avg, $min, $max, and $sum. The intermediate state per group is approximately 200 bytes. Total: 14 MB. This is well within 100 MB.
But the $sort stage that follows must sort all 70,000 grouped documents. Each grouped document contains the accumulated fields plus the group key. The documents are approximately 400 bytes each after grouping. Total for sort: 28 MB. Still within 100 MB.
The problem is earlier in the pipeline. A $project stage computes a derived field using $dateToString, which creates a string representation of the timestamp for grouping by day. The intermediate documents between $project and $group are 17.5 million documents at approximately 120 bytes each: 2.1 GB. The $group stage processes these as a stream and does not need to hold all 2.1 GB in memory. But a subsequent $sort on the pre-grouped stream would.
Examining the actual pipeline reveals an accidental $sort between $match and $group:
// SLOW: Unnecessary $sort before $group causes memory explosion
[
{ $match: { ts: { $gte: weekStart } } }, // 17.5M docs
{ $sort: { ts: 1 } }, // SORT 17.5M docs: exceeds 100MB
{ $group: { _id: { sensor: "$sensorId", day: ... }, ... } },
{ $sort: { avgTemp: -1 } }
]
The $sort before $group is unnecessary. $group does not require sorted input. This $sort attempts to load 17.5 million documents into memory for sorting, requiring 2.1 GB.
The Benchmark
@BenchmarkMode(Mode.SingleShotTime)
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(1)
@State(Scope.Benchmark)
public class AggregationMemoryBenchmark {
private MongoCollection<Document> collection;
private Date weekStart;
@Setup
public void setup() {
MongoClient client = MongoClients.create("mongodb://localhost:27017");
collection = client.getDatabase("telemetry").getCollection("readings");
weekStart = Date.from(Instant.now().minus(7, ChronoUnit.DAYS));
}
@Benchmark
public List<Document> withUnnecessarySort() {
return collection.aggregate(List.of(
Aggregates.match(Filters.gte("ts", weekStart)),
Aggregates.sort(Sorts.ascending("ts")),
Aggregates.group(
new Document("sensorId", "$sensorId")
.append("day", new Document("$dateToString",
new Document("format", "%Y-%m-%d").append("date", "$ts"))),
Accumulators.avg("avgTemp", "$temperature"),
Accumulators.min("minTemp", "$temperature"),
Accumulators.max("maxTemp", "$temperature"),
Accumulators.sum("count", 1)
),
Aggregates.sort(Sorts.descending("avgTemp")),
Aggregates.limit(100)
)).allowDiskUse(true).into(new ArrayList<>());
}
@Benchmark
public List<Document> withoutUnnecessarySort() {
return collection.aggregate(List.of(
Aggregates.match(Filters.gte("ts", weekStart)),
Aggregates.group(
new Document("sensorId", "$sensorId")
.append("day", new Document("$dateToString",
new Document("format", "%Y-%m-%d").append("date", "$ts"))),
Accumulators.avg("avgTemp", "$temperature"),
Accumulators.min("minTemp", "$temperature"),
Accumulators.max("maxTemp", "$temperature"),
Accumulators.sum("count", 1)
),
Aggregates.sort(Sorts.descending("avgTemp")),
Aggregates.limit(100)
)).into(new ArrayList<>());
}
@Benchmark
public List<Document> preAggregatedFromBuckets() {
return collection.getDatabase().getCollection("buckets_5min").aggregate(List.of(
Aggregates.match(Filters.gte("bucketStart", weekStart)),
Aggregates.group(
new Document("sensorId", "$sensorId")
.append("day", new Document("$dateToString",
new Document("format", "%Y-%m-%d").append("date", "$bucketStart"))),
Accumulators.avg("avgTemp", "$summary.temperature.avg"),
Accumulators.min("minTemp", "$summary.temperature.min"),
Accumulators.max("maxTemp", "$summary.temperature.max"),
Accumulators.sum("count", "$count")
),
Aggregates.sort(Sorts.descending("avgTemp")),
Aggregates.limit(100)
)).into(new ArrayList<>());
}
}
Results:
Benchmark Mode Cnt Score Error Units
AggregationMemoryBenchmark.withUnnecessarySort ss 1 45.000 s/op
AggregationMemoryBenchmark.withoutUnnecessarySort ss 1 4.500 s/op
AggregationMemoryBenchmark.preAggregatedFromBuckets ss 1 0.350 s/op
Removing the unnecessary $sort reduces execution from 45 seconds to 4.5 seconds. Using pre-aggregated 5-minute buckets (from CH7) reduces it to 350ms because the $group processes 2 million bucket documents instead of 17.5 million raw readings, and each bucket already contains the summary statistics.
The Fix
// FAST: Aggregation on pre-computed bucket summaries
public List<DailySensorStats> getWeeklyReport(Instant weekStart) {
return bucketCollection.aggregate(List.of(
Aggregates.match(Filters.gte("bucketStart", Date.from(weekStart))),
// Group by sensor and day using bucket summaries
Aggregates.group(
new Document("sensorId", "$sensorId")
.append("day", new Document("$dateToString",
new Document("format", "%Y-%m-%d").append("date", "$bucketStart"))),
Accumulators.avg("avgTemp", "$summary.temperature.avg"),
Accumulators.min("minTemp", "$summary.temperature.min"),
Accumulators.max("maxTemp", "$summary.temperature.max"),
Accumulators.sum("totalReadings", "$count")
),
// Sort and paginate
Aggregates.sort(Sorts.orderBy(
Sorts.ascending("_id.sensorId"),
Sorts.ascending("_id.day")
)),
Aggregates.limit(1000)
), DailySensorStats.class).into(new ArrayList<>());
}
The Proof
| Metric | Unnecessary sort | Corrected pipeline | Bucket-based |
|---|---|---|---|
| Execution time | 45s | 4.5s | 0.35s |
| Memory usage | 2.1 GB (disk spill) | 14 MB | 2 MB |
| allowDiskUse required | Yes | No | No |
| Documents processed | 17.5M | 17.5M | 2M |
The Trade-off
Aggregating on bucket summaries produces slightly different results than aggregating on raw readings. The average of averages is only equal to the global average when all groups have equal counts. With 5-minute buckets, most buckets contain 60 readings (sensors report every 5 seconds). During periods of sensor downtime, some buckets have fewer readings. The weighted average computation described in CH7-S2 corrects for this, but adds complexity to the aggregation pipeline.
For reporting use cases where 0.1% accuracy variance is acceptable, aggregating on bucket summaries is the right choice. For financial or compliance reporting where exact values are required, aggregate on the raw readings but ensure the pipeline is ordered correctly (no unnecessary blocking stages, $match first, $project before $group).