Skip to main content
unbound mongodb at scale

Querying Bucketed Data: Unwinding and Aggregation

4 min read Chapter 21 of 72

Querying Bucketed Data: Unwinding and Aggregation

The Symptom

The dashboard queries sensor data bucketed in 5-minute intervals. A query for “average temperature for sensor-00042 between 10:00 and 14:00” takes 320ms. The same query against the per-event collection takes 180ms. The bucket pattern was supposed to improve query performance through fewer documents. Instead, it made it worse.

The Cause

The query uses $unwind to flatten the measurements array before filtering and averaging:

// SLOW: Unwinds all measurements, then filters
db.buckets.aggregate([
  { $match: { sensorId: "sensor-00042", bucketStart: { $gte: startTime, $lt: endTime } } },
  { $unwind: "$measurements" },
  { $match: { "measurements.ts": { $gte: startTime, $lt: endTime } } },
  { $group: { _id: null, avgTemp: { $avg: "$measurements.t" } } }
])

For a 4-hour range with 5-minute buckets, this matches 48 bucket documents. Each contains 60 measurements. The $unwind stage produces 2,880 intermediate documents in the aggregation pipeline. The second $match filters to the exact time range, but the damage is done: $unwind already allocated memory for all 2,880 expanded documents.

The Benchmark

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Warmup(iterations = 2, time = 10)
@Measurement(iterations = 3, time = 15)
@Fork(1)
@State(Scope.Benchmark)
public class BucketQueryBenchmark {

    private MongoCollection<Document> collection;
    private Date startTime;
    private Date endTime;

    @Setup
    public void setup() {
        MongoClient client = MongoClients.create("mongodb://localhost:27017");
        collection = client.getDatabase("telemetry").getCollection("buckets_5min");
        startTime = Date.from(Instant.now().minus(4, ChronoUnit.HOURS));
        endTime = Date.from(Instant.now());
    }

    @Benchmark
    public Document unwindThenFilter() {
        return collection.aggregate(List.of(
            Aggregates.match(Filters.and(
                Filters.eq("sensorId", "sensor-00042"),
                Filters.gte("bucketStart", startTime),
                Filters.lt("bucketStart", endTime)
            )),
            Aggregates.unwind("$measurements"),
            Aggregates.match(Filters.and(
                Filters.gte("measurements.ts", startTime),
                Filters.lt("measurements.ts", endTime)
            )),
            Aggregates.group(null,
                Accumulators.avg("avgTemp", "$measurements.t")
            )
        )).first();
    }

    @Benchmark
    public Document filterThenReduce() {
        return collection.aggregate(List.of(
            Aggregates.match(Filters.and(
                Filters.eq("sensorId", "sensor-00042"),
                Filters.gte("bucketStart", startTime),
                Filters.lt("bucketStart", endTime)
            )),
            Aggregates.project(Projections.fields(
                Projections.computed("filtered",
                    new Document("$filter", new Document()
                        .append("input", "$measurements")
                        .append("as", "m")
                        .append("cond", new Document("$and", List.of(
                            new Document("$gte", List.of("$$m.ts", startTime)),
                            new Document("$lt", List.of("$$m.ts", endTime))
                        )))
                    )
                )
            )),
            Aggregates.unwind("$filtered"),
            Aggregates.group(null,
                Accumulators.avg("avgTemp", "$filtered.t")
            )
        )).first();
    }

    @Benchmark
    public Document useSummaryStats() {
        return collection.aggregate(List.of(
            Aggregates.match(Filters.and(
                Filters.eq("sensorId", "sensor-00042"),
                Filters.gte("bucketStart", startTime),
                Filters.lt("bucketStart", endTime)
            )),
            Aggregates.group(null,
                Accumulators.avg("avgTemp", "$summary.temperature.avg"),
                Accumulators.min("minTemp", "$summary.temperature.min"),
                Accumulators.max("maxTemp", "$summary.temperature.max")
            )
        )).first();
    }
}

Results:

Benchmark                                    Mode  Cnt    Score    Error  Units
BucketQueryBenchmark.unwindThenFilter        avgt    3  320.000 ± 25.000  ms/op
BucketQueryBenchmark.filterThenReduce        avgt    3  185.000 ± 15.000  ms/op
BucketQueryBenchmark.useSummaryStats         avgt    3    8.000 ±  1.200  ms/op

The summary-based query is 40x faster than the unwind approach. It reads 48 documents, extracts one field from each, and computes the average. No $unwind. No intermediate document expansion.

The Fix

Two approaches, depending on query requirements.

Approach 1: Use pre-computed summaries for aggregate queries.

The bucket document already stores summary.temperature.min, .max, and .avg (maintained by $min, $max operators during writes). For queries that need aggregate statistics over a time range, use these directly:

// FAST: Query pre-computed summaries
public TemperatureStats getTemperatureStats(String sensorId, Instant start, Instant end) {
    Document result = collection.aggregate(List.of(
        Aggregates.match(Filters.and(
            Filters.eq("sensorId", sensorId),
            Filters.gte("bucketStart", Date.from(start)),
            Filters.lt("bucketStart", Date.from(end))
        )),
        Aggregates.group(null,
            Accumulators.avg("avgTemp", "$summary.temperature.avg"),
            Accumulators.min("minTemp", "$summary.temperature.min"),
            Accumulators.max("maxTemp", "$summary.temperature.max"),
            Accumulators.sum("totalReadings", "$count")
        )
    )).first();

    return new TemperatureStats(
        result.getDouble("avgTemp"),
        result.getDouble("minTemp"),
        result.getDouble("maxTemp"),
        result.getInteger("totalReadings")
    );
}

Approach 2: Use $filter instead of $unwind when accessing individual measurements.

When you need specific readings (for a chart, for instance), filter inside the array before unwinding:

// FAST: Filter then unwind reduces intermediate documents
public List<TemperatureReading> getReadingsForChart(
    String sensorId, Instant start, Instant end
) {
    return collection.aggregate(List.of(
        Aggregates.match(Filters.and(
            Filters.eq("sensorId", sensorId),
            Filters.gte("bucketStart", Date.from(start)),
            Filters.lt("bucketStart", Date.from(end))
        )),
        Aggregates.project(Projections.computed("readings",
            new Document("$filter", new Document()
                .append("input", "$measurements")
                .append("as", "m")
                .append("cond", new Document("$and", List.of(
                    new Document("$gte", List.of("$$m.ts", Date.from(start))),
                    new Document("$lt", List.of("$$m.ts", Date.from(end)))
                )))
            )
        )),
        Aggregates.unwind("$readings"),
        Aggregates.replaceRoot("$readings")
    ), TemperatureReading.class).into(new ArrayList<>());
}

The Proof

Query typeUnwind approachSummary/Filter approach
Avg temp, 4-hour range320ms8ms (summary)
Individual readings, 1-hour range95ms45ms ($filter)
Min/max temp, 24-hour range1,800ms22ms (summary)

The Trade-off

Pre-computed summaries are approximate when buckets are partially filled. The running average computed during writes is the mean of all measurements inserted so far, which is correct. But the $min and $max maintained by $min/$max update operators are also correct. The limitation is that summaries are per-bucket: the overall average across multiple buckets is the average of averages, which is only correct if all buckets have the same count. For weighted accuracy, include the count and compute a weighted average at query time:

$$\text{weightedAvg} = \frac{\sum_{i=1}^{n} \text{avg}_i \times \text{count}i}{\sum{i=1}^{n} \text{count}_i}$$

Use $sum to collect both the weighted numerator and the total count in the aggregation pipeline.