Skip to main content
unbound mongodb at scale

Eliminating Scatter-Gather from the Telemetry Platform

4 min read Chapter 53 of 72

Eliminating Scatter-Gather from the Telemetry Platform

The Symptom

The telemetry dashboard’s “temperature anomaly” page takes 2.3 seconds to load on the production cluster (16 shards). The page queries all readings above a threshold across all sensors in the last hour. The same query takes 180ms on a single-shard development environment.

The Cause

The dashboard query does not include the shard key:

// SLOW: Scatter-gather across all 16 shards
public List<TelemetryReading> findAnomalies(double threshold, Instant since) {
    return collection.find(
        Filters.and(
            Filters.gt("temperature", threshold),
            Filters.gte("ts", Date.from(since))
        ))
        .sort(Sorts.descending("temperature"))
        .limit(50)
        .into(new ArrayList<>());
}

The filter contains temperature and ts but not sensorId (the shard key prefix). Mongos sends this query to all 16 shards. Each shard executes a full collection scan (no index on {temperature: 1} on every shard), returns up to 50 documents, and mongos merges and re-sorts 800 candidate documents (50 per shard * 16 shards).

// Confirm scatter-gather
db.readings.explain().find({
  temperature: { $gt: 100 },
  ts: { $gte: ISODate("2024-06-15T12:00:00Z") }
}).sort({ temperature: -1 }).limit(50)

// "winningPlan": { "stage": "SHARD_MERGE_SORT" }
// "shards": [ ... all 16 shards ... ]

The Benchmark

Query approachShards queriedLatency (16 shards)Docs examined (total)
Scatter-gather (no shard key)162,300ms1,280,000
Targeted per-sensor loop1 each, 200 calls4,500ms40,000
Secondary anomaly collection112ms50

Looping through known sensors and making targeted queries per sensor is worse: the round-trip overhead per query (200 sensors * 22ms per query = 4,500ms) dominates.

The Fix

Create a denormalized anomaly collection that stores only anomalous readings, sharded by ts (hashed) for even distribution:

// FAST: Write anomalies to a dedicated collection during ingestion
public void ingestReading(TelemetryReading reading) {
    // Write to main collection (targeted by sensorId shard key)
    readingsCollection.insertOne(reading.toDocument());

    // If anomalous, also write to anomaly collection
    if (reading.getTemperature() > ANOMALY_THRESHOLD) {
        Document anomaly = new Document()
            .append("sensorId", reading.getSensorId())
            .append("ts", Date.from(reading.getTimestamp()))
            .append("temperature", reading.getTemperature())
            .append("location", reading.getLocation());

        anomalyCollection.insertOne(anomaly);
    }
}

The anomaly collection is small (less than 0.1% of total readings are anomalies) and can be sharded differently or kept unsharded if it fits on a single shard.

For the dashboard query:

// FAST: Query the anomaly collection (small, indexed, single shard or targeted)
public List<TelemetryReading> findAnomalies(double threshold, Instant since) {
    return anomalyCollection.find(
        Filters.and(
            Filters.gt("temperature", threshold),
            Filters.gte("ts", Date.from(since))
        ))
        .sort(Sorts.descending("temperature"))
        .limit(50)
        .into(new ArrayList<>());
}

For queries that must access the main readings collection across sensors, add the shard key to the filter by querying per-building (each building has a known set of sensorIds):

// FAST: Targeted queries by building (known sensorId set)
public List<TelemetryReading> findAnomaliesByBuilding(
        String buildingId, double threshold, Instant since) {
    List<String> sensorIds = sensorRegistry.getSensorsByBuilding(buildingId);

    // $in with shard key prefix -> targeted to specific shards
    return readingsCollection.find(
        Filters.and(
            Filters.in("sensorId", sensorIds),
            Filters.gt("temperature", threshold),
            Filters.gte("ts", Date.from(since))
        ))
        .sort(Sorts.descending("temperature"))
        .limit(50)
        .into(new ArrayList<>());
}

When the $in list contains shard key values, mongos targets only the shards that own those values. If the 20 sensors in Building HQ hash to 3 shards, this query targets 3 shards instead of 16.

The Proof

Query approachShards queriedLatencyDocs examined
Scatter-gather (original)162,300ms1,280,000
Anomaly collection112ms50
Per-building targeted345ms2,400

The Trade-off

The anomaly collection adds write amplification: every anomalous reading is written twice (once to the main collection, once to the anomaly collection). For the telemetry platform, where less than 0.1% of readings are anomalies, the extra write volume is negligible. If 10% of readings were anomalous, the duplication cost would be significant.

The per-building approach requires maintaining a sensor registry that maps sensors to buildings. If this mapping changes (a sensor is moved to another building), queries using the old mapping may miss data or target the wrong shards. The sensor registry must be kept current. For the telemetry platform with stable sensor deployments, this is manageable.

Both approaches trade write complexity for read performance. This is the fundamental pattern in sharded MongoDB: design queries around the shard key, and if a query cannot include the shard key, materialize the answer in a collection that can be queried efficiently.