Querying Bucketed Data: Unwinding and Aggregation
Querying Bucketed Data: Unwinding and Aggregation
The Symptom
The dashboard queries sensor data bucketed in 5-minute intervals. A query for “average temperature for sensor-00042 between 10:00 and 14:00” takes 320ms. The same query against the per-event collection takes 180ms. The bucket pattern was supposed to improve query performance through fewer documents. Instead, it made it worse.
The Cause
The query uses $unwind to flatten the measurements array before filtering and averaging:
// SLOW: Unwinds all measurements, then filters
db.buckets.aggregate([
{ $match: { sensorId: "sensor-00042", bucketStart: { $gte: startTime, $lt: endTime } } },
{ $unwind: "$measurements" },
{ $match: { "measurements.ts": { $gte: startTime, $lt: endTime } } },
{ $group: { _id: null, avgTemp: { $avg: "$measurements.t" } } }
])
For a 4-hour range with 5-minute buckets, this matches 48 bucket documents. Each contains 60 measurements. The $unwind stage produces 2,880 intermediate documents in the aggregation pipeline. The second $match filters to the exact time range, but the damage is done: $unwind already allocated memory for all 2,880 expanded documents.
The Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Warmup(iterations = 2, time = 10)
@Measurement(iterations = 3, time = 15)
@Fork(1)
@State(Scope.Benchmark)
public class BucketQueryBenchmark {
private MongoCollection<Document> collection;
private Date startTime;
private Date endTime;
@Setup
public void setup() {
MongoClient client = MongoClients.create("mongodb://localhost:27017");
collection = client.getDatabase("telemetry").getCollection("buckets_5min");
startTime = Date.from(Instant.now().minus(4, ChronoUnit.HOURS));
endTime = Date.from(Instant.now());
}
@Benchmark
public Document unwindThenFilter() {
return collection.aggregate(List.of(
Aggregates.match(Filters.and(
Filters.eq("sensorId", "sensor-00042"),
Filters.gte("bucketStart", startTime),
Filters.lt("bucketStart", endTime)
)),
Aggregates.unwind("$measurements"),
Aggregates.match(Filters.and(
Filters.gte("measurements.ts", startTime),
Filters.lt("measurements.ts", endTime)
)),
Aggregates.group(null,
Accumulators.avg("avgTemp", "$measurements.t")
)
)).first();
}
@Benchmark
public Document filterThenReduce() {
return collection.aggregate(List.of(
Aggregates.match(Filters.and(
Filters.eq("sensorId", "sensor-00042"),
Filters.gte("bucketStart", startTime),
Filters.lt("bucketStart", endTime)
)),
Aggregates.project(Projections.fields(
Projections.computed("filtered",
new Document("$filter", new Document()
.append("input", "$measurements")
.append("as", "m")
.append("cond", new Document("$and", List.of(
new Document("$gte", List.of("$$m.ts", startTime)),
new Document("$lt", List.of("$$m.ts", endTime))
)))
)
)
)),
Aggregates.unwind("$filtered"),
Aggregates.group(null,
Accumulators.avg("avgTemp", "$filtered.t")
)
)).first();
}
@Benchmark
public Document useSummaryStats() {
return collection.aggregate(List.of(
Aggregates.match(Filters.and(
Filters.eq("sensorId", "sensor-00042"),
Filters.gte("bucketStart", startTime),
Filters.lt("bucketStart", endTime)
)),
Aggregates.group(null,
Accumulators.avg("avgTemp", "$summary.temperature.avg"),
Accumulators.min("minTemp", "$summary.temperature.min"),
Accumulators.max("maxTemp", "$summary.temperature.max")
)
)).first();
}
}
Results:
Benchmark Mode Cnt Score Error Units
BucketQueryBenchmark.unwindThenFilter avgt 3 320.000 ± 25.000 ms/op
BucketQueryBenchmark.filterThenReduce avgt 3 185.000 ± 15.000 ms/op
BucketQueryBenchmark.useSummaryStats avgt 3 8.000 ± 1.200 ms/op
The summary-based query is 40x faster than the unwind approach. It reads 48 documents, extracts one field from each, and computes the average. No $unwind. No intermediate document expansion.
The Fix
Two approaches, depending on query requirements.
Approach 1: Use pre-computed summaries for aggregate queries.
The bucket document already stores summary.temperature.min, .max, and .avg (maintained by $min, $max operators during writes). For queries that need aggregate statistics over a time range, use these directly:
// FAST: Query pre-computed summaries
public TemperatureStats getTemperatureStats(String sensorId, Instant start, Instant end) {
Document result = collection.aggregate(List.of(
Aggregates.match(Filters.and(
Filters.eq("sensorId", sensorId),
Filters.gte("bucketStart", Date.from(start)),
Filters.lt("bucketStart", Date.from(end))
)),
Aggregates.group(null,
Accumulators.avg("avgTemp", "$summary.temperature.avg"),
Accumulators.min("minTemp", "$summary.temperature.min"),
Accumulators.max("maxTemp", "$summary.temperature.max"),
Accumulators.sum("totalReadings", "$count")
)
)).first();
return new TemperatureStats(
result.getDouble("avgTemp"),
result.getDouble("minTemp"),
result.getDouble("maxTemp"),
result.getInteger("totalReadings")
);
}
Approach 2: Use $filter instead of $unwind when accessing individual measurements.
When you need specific readings (for a chart, for instance), filter inside the array before unwinding:
// FAST: Filter then unwind reduces intermediate documents
public List<TemperatureReading> getReadingsForChart(
String sensorId, Instant start, Instant end
) {
return collection.aggregate(List.of(
Aggregates.match(Filters.and(
Filters.eq("sensorId", sensorId),
Filters.gte("bucketStart", Date.from(start)),
Filters.lt("bucketStart", Date.from(end))
)),
Aggregates.project(Projections.computed("readings",
new Document("$filter", new Document()
.append("input", "$measurements")
.append("as", "m")
.append("cond", new Document("$and", List.of(
new Document("$gte", List.of("$$m.ts", Date.from(start))),
new Document("$lt", List.of("$$m.ts", Date.from(end)))
)))
)
)),
Aggregates.unwind("$readings"),
Aggregates.replaceRoot("$readings")
), TemperatureReading.class).into(new ArrayList<>());
}
The Proof
| Query type | Unwind approach | Summary/Filter approach |
|---|---|---|
| Avg temp, 4-hour range | 320ms | 8ms (summary) |
| Individual readings, 1-hour range | 95ms | 45ms ($filter) |
| Min/max temp, 24-hour range | 1,800ms | 22ms (summary) |
The Trade-off
Pre-computed summaries are approximate when buckets are partially filled. The running average computed during writes is the mean of all measurements inserted so far, which is correct. But the $min and $max maintained by $min/$max update operators are also correct. The limitation is that summaries are per-bucket: the overall average across multiple buckets is the average of averages, which is only correct if all buckets have the same count. For weighted accuracy, include the count and compute a weighted average at query time:
$$\text{weightedAvg} = \frac{\sum_{i=1}^{n} \text{avg}_i \times \text{count}i}{\sum{i=1}^{n} \text{count}_i}$$
Use $sum to collect both the weighted numerator and the total count in the aggregation pipeline.