Balancer Tuning and Pre-Splitting Strategies
Balancer Tuning and Pre-Splitting Strategies
The Symptom
The telemetry platform sharded the bucket collection without pre-splitting. All initial data landed on the primary shard. The balancer spent 14 hours migrating chunks to achieve even distribution. During those 14 hours, the primary shard ran at 95% I/O utilization.
The Cause
When a collection is first sharded, all existing data resides on the primary shard in a single chunk. As data grows and the chunk exceeds 128 MB, MongoDB splits it. The balancer then migrates chunks to other shards. For a collection with 500 GB of existing data, this means:
- The auto-splitter creates approximately 4,000 chunks (500 GB / 128 MB).
- The balancer migrates approximately 3,000 chunks to other shards (keeping ~1,000 on the primary shard in a 4-shard cluster).
- Each migration involves reading from the primary shard, transferring over the network, writing to the target shard.
- At ~30 seconds per migration with two concurrent migrations, rebalancing takes: 3,000 / 2 * 30s = 12.5 hours.
The Benchmark
Compare three approaches for sharding a 500 GB collection:
| Approach | Time to balanced | Primary shard peak I/O | Read latency during rebalance |
|---|---|---|---|
| Shard then let balancer work | 14 hours | 95% | 4x baseline |
| Pre-split before inserting data | 0 (balanced from start) | 35% | 1x baseline |
| Pre-split and manual distribution | 0 (balanced from start) | 30% | 1x baseline |
The Fix
Pre-split before inserting data.
For the telemetry platform, the sensorId range is known (sensor-00000 through sensor-09999). Pre-split the collection into equal ranges before any data is inserted:
// Shard the empty collection
sh.shardCollection("telemetry.buckets_5min", { sensorId: 1, bucketStart: 1 })
// Pre-split into 16 chunks (4 per shard on a 4-shard cluster)
var boundaries = [];
for (var i = 0; i < 16; i++) {
var sensorNum = Math.floor((i / 16) * 10000);
var sensorId = "sensor-" + String(sensorNum).padStart(5, "0");
boundaries.push({ sensorId: sensorId, bucketStart: MinKey });
}
for (var i = 0; i < boundaries.length; i++) {
sh.splitAt("telemetry.buckets_5min", boundaries[i]);
}
// Verify chunk distribution
db.getSiblingDB("config").chunks.aggregate([
{ $match: { ns: "telemetry.buckets_5min" } },
{ $group: { _id: "$shard", count: { $sum: 1 } } }
])
// Output: each shard has 4 chunks
Tune chunk size for the workload.
The default 128 MB chunk size works well for most workloads. For the telemetry platform, where bucket documents are small (5-10 KB) and queries target specific sensors, smaller chunks (64 MB) provide better granularity for balancing at the cost of more metadata in the config server:
// Reduce chunk size to 64 MB
use config
db.settings.updateOne(
{ _id: "chunksize" },
{ $set: { value: 64 } },
{ upsert: true }
)
Disable balancer for stable collections.
If the telemetry archive collection is no longer receiving writes and is already balanced, disable the balancer for that collection to prevent unnecessary migrations during config changes:
// Disable balancer for a specific collection
sh.disableBalancing("telemetry.archive")
// Verify
db.getSiblingDB("config").collections.findOne({ _id: "telemetry.archive" })
// { "noBalance": true }
Configure secondary throttle for reduced migration impact.
// Ensure each migration waits for write concern before continuing
db.settings.updateOne(
{ _id: "balancer" },
{ $set: {
"_secondaryThrottle": true,
"_waitForDelete": false
}},
{ upsert: true }
)
With _secondaryThrottle: true, each batch of migrated documents waits for replication to at least one secondary before migrating the next batch. This slows migrations but prevents replication lag from accumulating.
The Proof
Pre-splitting results for the telemetry platform:
| Metric | No pre-split | Pre-split (16 chunks) | Pre-split (64 chunks) |
|---|---|---|---|
| Time to balanced state | 14 hours | Immediate | Immediate |
| Migrations in first 24h | 3,000 | 12 | 5 |
| I/O impact during setup | 95% peak | 35% baseline | 35% baseline |
| Config server metadata | 4,000 chunks | 16 chunks (grows naturally) | 64 chunks |
| Balancer overhead (ongoing) | High | Low | Low |
With 64 pre-split chunks (16 per shard), the initial distribution is even. As data grows, the auto-splitter creates new chunks within each pre-split range, and the balancer only needs to handle minor imbalances.
The Trade-off
Pre-splitting requires knowledge of the shard key value distribution. For the telemetry platform with predictable sensorId ranges, this is straightforward. For workloads where the key space is unknown (user-generated IDs, UUIDs), pre-splitting is less effective. In those cases, a hashed shard key provides better initial distribution without manual pre-splitting because the hash function randomizes the key space.
Smaller chunk sizes (64 MB vs 128 MB) mean more chunks in the config server’s metadata. For a 10 TB collection, 64 MB chunks produce approximately 160,000 chunks. Config server operations (reads during query routing, writes during migrations) scale with chunk count. Below 200,000 chunks, the overhead is negligible. Above 500,000, config server performance degrades.
Disabling the balancer for a collection is safe only if the collection’s data distribution will not change. If new shards are added to the cluster, manually re-enable balancing to redistribute data to the new shards.