Sharding Strategies and When the Database Is Not the Problem
Sharding Strategies and When the Database Is Not the Problem
The Symptom
The trip history endpoint takes 2.1 seconds at p99. The team has spent two weeks designing a sharding strategy. Three engineers built a prototype that splits trips across four PostgreSQL instances by city. The prototype works but introduced cross-shard query complexity for the analytics dashboard, doubled the deployment pipeline duration, and requires coordinated schema migrations across all shards. Then someone runs EXPLAIN ANALYZE on the slow query.
The Cause
The trip history query does a sequential scan on 50 million rows. There is no index on (rider_id, completed_at). PostgreSQL reads every row in the table, filters out 49,999,480 of them, sorts the remaining 520, and returns 20. The query planner has no faster option because no index exists that matches the WHERE + ORDER BY pattern.
The team diagnosed “database at scale” when the problem was “database without an index.” Both produce the same symptom: slow queries. Only one requires new infrastructure.
The Baseline
Before any changes, the trip history query on a single PostgreSQL instance with 50M rows:
-- BOTTLENECK: No index on (rider_id, completed_at)
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT trip_id, rider_id, driver_id, pickup_location, dropoff_location,
fare, distance, status, completed_at
FROM trips
WHERE rider_id = 'rider-4821'
ORDER BY completed_at DESC
LIMIT 20;
Limit (cost=1284567.12..1284567.17 rows=20 width=248)
(actual time=2143.821..2143.845 rows=20 loops=1)
-> Sort (cost=1284567.12..1285867.12 rows=520000 width=248)
(actual time=2143.819..2143.831 rows=20 loops=1)
Sort Key: completed_at DESC
Sort Method: top-N heapsort Memory: 31kB
-> Seq Scan on trips (cost=0.00..1271234.00 rows=520000 width=248)
(actual time=0.028..2089.412 rows=518 loops=1)
Filter: (rider_id = 'rider-4821'::text)
Rows Removed by Filter: 49999482
Buffers: shared hit=48123 read=634521
Planning Time: 0.152 ms
Execution Time: 2143.892 ms
Key numbers: 634,521 disk reads. 49,999,482 rows removed by filter. 2,143ms execution. This is PostgreSQL doing exactly what it was told: read the entire table because no index provides a shortcut.
# load-tests/index_comparison_locustfile.py
from locust import HttpUser, task, between, LoadTestShape
class TripHistoryUser(HttpUser):
wait_time = between(0.5, 1.0)
@task
def get_trip_history(self):
rider_id = f"rider-{self.environment.runner.user_count % 50000}"
self.client.get(
f"/api/trips/history?riderId={rider_id}&page=0&size=20",
name="/api/trips/history"
)
class IndexTestShape(LoadTestShape):
stages = [
{"duration": 60, "users": 100, "spawn_rate": 20},
{"duration": 120, "users": 200, "spawn_rate": 20},
{"duration": 180, "users": 400, "spawn_rate": 20},
{"duration": 240, "users": 600, "spawn_rate": 20},
{"duration": 300, "users": 800, "spawn_rate": 20},
]
def tick(self):
run_time = self.get_run_time()
for stage in self.stages:
if run_time < stage["duration"]:
return (stage["users"], stage["spawn_rate"])
return None
Without the index, 50M rows:
| Users | RPS | p50 (ms) | p99 (ms) | PG CPU | Disk Read (MB/s) |
|---|---|---|---|---|---|
| 100 | 45 | 1,800 | 2,400 | 72% | 890 |
| 200 | 78 | 2,100 | 3,800 | 88% | 1,240 |
| 400 | 82 | 4,200 | 8,500 | 95% | 1,310 |
| 600 | 80 | timeout | timeout | 97% | 1,320 |
| 800 | 74 | timeout | timeout | 98% | 1,280 |
Max throughput: 82 RPS. Each request forces a full table scan. PostgreSQL spends 1.3 GB/s reading rows it will throw away.
The Fix
Part 1: The Index
-- SCALED: Composite index matching the exact query pattern
-- CONCURRENTLY avoids locking the table during creation
CREATE INDEX CONCURRENTLY idx_trips_rider_completed
ON trips (rider_id, completed_at DESC);
Why (rider_id, completed_at DESC) and not just (rider_id)?
A single-column index on rider_id finds the 518 matching rows without scanning the table. But PostgreSQL still needs to sort those 518 rows by completed_at DESC to satisfy the ORDER BY. With the composite index, the rows are already stored in the correct order. PostgreSQL reads the first 20 entries from the index and stops. No sort. No reading beyond 20 rows.
After the index:
-- SCALED: Index scan replaces sequential scan
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT trip_id, rider_id, driver_id, pickup_location, dropoff_location,
fare, distance, status, completed_at
FROM trips
WHERE rider_id = 'rider-4821'
ORDER BY completed_at DESC
LIMIT 20;
Limit (cost=0.56..42.18 rows=20 width=248)
(actual time=0.031..3.847 rows=20 loops=1)
-> Index Scan using idx_trips_rider_completed on trips
(cost=0.56..1082.34 rows=520 width=248)
(actual time=0.029..3.831 rows=20 loops=1)
Index Cond: (rider_id = 'rider-4821'::text)
Buffers: shared hit=23
Planning Time: 0.124 ms
Execution Time: 3.871 ms
23 buffer hits. Zero disk reads. 3.87ms. The index creation on 50M rows took 47 seconds with CONCURRENTLY. No downtime. No application changes.
Part 2: Sharding Strategies (For When You Actually Need Them)
Sharding splits a single table across multiple database instances. Each shard holds a subset of the data. The application decides which shard to query based on a routing key.
Two strategies for the ride-hailing platform:
Range-Based Sharding by City
Shard 1: New York trips → pg-shard-nyc
Shard 2: San Francisco trips → pg-shard-sfo
Shard 3: Chicago trips → pg-shard-chi
Shard 4: Everything else → pg-shard-default
The routing is intuitive. Trips for New York go to the NYC shard. Queries scoped to a city hit a single shard.
The problem: hot shards. New York generates 40% of all trips. The NYC shard handles 40% of all writes while the Chicago shard handles 8%. The load distribution is determined by business geography, not by engineering.
Hash-Based Sharding by User ID
// SCALED: Even distribution across shards via hash
@Component
public class HashShardResolver implements ShardResolver {
private final List<DataSource> shards;
@Override
public DataSource resolve(String userId) {
int shardIndex = Math.abs(userId.hashCode() % shards.size());
return shards.get(shardIndex);
}
@Override
public List<DataSource> resolveAll() {
return Collections.unmodifiableList(shards);
}
}
Hash-based sharding distributes evenly. Each shard gets ~25% of users regardless of which city they ride in. No hot shards.
The problem: cross-shard queries. The analytics dashboard needs “total trips in New York last hour.” With range-based sharding, that query hits one shard. With hash-based sharding, that query hits all four shards, aggregates results, and is slower than querying a single unsharded database.
// BOTTLENECK: Cross-shard query fans out to all shards
@Service
public class AnalyticsService {
private final ShardResolver shardResolver;
@Transactional(readOnly = true)
public TripAnalytics getCityAnalytics(String city, Duration window) {
Instant since = Instant.now().minus(window);
// Must query ALL shards because users in any shard may have
// trips in the target city
return shardResolver.resolveAll().parallelStream()
.map(shard -> queryShardForCity(shard, city, since))
.reduce(TripAnalytics.EMPTY, TripAnalytics::merge);
}
private TripAnalytics queryShardForCity(DataSource shard,
String city,
Instant since) {
// Each shard query is fast, but N shards = N queries
var jdbc = new JdbcTemplate(shard);
return jdbc.queryForObject(
"""
SELECT COUNT(*) as total_trips,
AVG(fare) as avg_fare,
SUM(fare) as total_revenue
FROM trips
WHERE city = ? AND completed_at > ?
""",
new Object[]{city, Timestamp.from(since)},
(rs, rowNum) -> new TripAnalytics(
rs.getLong("total_trips"),
rs.getBigDecimal("avg_fare"),
rs.getBigDecimal("total_revenue")
)
);
}
}
The ShardResolver Interface
// SCALED: Clean abstraction for shard routing
public interface ShardResolver {
/**
* Resolve the DataSource for a specific user's data.
*/
DataSource resolve(String userId);
/**
* Return all shards for fan-out queries.
*/
List<DataSource> resolveAll();
}
// SCALED: Configuration for 4 shards
@Configuration
public class ShardConfig {
@Bean
public ShardResolver shardResolver(
@Value("${sharding.count:4}") int shardCount) {
List<DataSource> shards = IntStream.range(0, shardCount)
.mapToObj(i -> createShardDataSource(i))
.collect(Collectors.toList());
return new HashShardResolver(shards);
}
private DataSource createShardDataSource(int index) {
var config = new HikariConfig();
config.setJdbcUrl(String.format(
"jdbc:postgresql://pg-shard-%d:5432/ridehail", index));
config.setUsername("app");
config.setPassword(System.getenv("DB_PASSWORD"));
config.setMaximumPoolSize(15);
config.setConnectionTimeout(5000);
config.setReadOnly(false);
return new HikariDataSource(config);
}
}
Part 3: The Operational Cost of Sharding
Schema migrations across shards are the hidden tax. Adding a column to the trips table means running the migration on every shard. If shard 3 fails midway, you have an inconsistent schema across your cluster.
# BOTTLENECK: Schema migration across 4 shards
# Each shard must succeed. Failure on any shard = partial rollout.
for shard in pg-shard-0 pg-shard-1 pg-shard-2 pg-shard-3; do
echo "Migrating $shard..."
flyway -url=jdbc:postgresql://$shard:5432/ridehail \
-user=app \
-password=$DB_PASSWORD \
migrate
if [ $? -ne 0 ]; then
echo "FAILED on $shard. Previous shards already migrated."
echo "Manual intervention required."
exit 1
fi
done
With a single database, Flyway runs once. With 4 shards, it runs 4 times. With 16 shards (where this goes as you grow), it runs 16 times. Each migration must be backward-compatible because the application will talk to shards with both old and new schemas during the rollout window.
Part 4: The Decision Rule
Shard only after this checklist is exhausted:
| Step | Action | Cost | Typical Improvement |
|---|---|---|---|
| 1 | Add missing indexes (EXPLAIN ANALYZE) | Minutes | 100-500x for slow queries |
| 2 | Tune connection pool (CH4) | Hours | 2-10x throughput |
| 3 | Add caching layer (CH6) | Days | 5-50x for repeated reads |
| 4 | Add read replicas (CH8-S1) | Days | 2-4x read throughput |
| 5 | Shard | Weeks-Months | 2-Nx write throughput |
Each step is an order of magnitude more expensive than the previous one. Sharding a database with missing indexes is like buying a second car because the first one has a flat tire.
The Proof
The composite index on the unsharded, single-primary database. Same Locust test, same 50M rows:
| Users | RPS | p50 (ms) | p99 (ms) | PG CPU | Disk Read (MB/s) |
|---|---|---|---|---|---|
| 100 | 190 | 5 | 18 | 8% | 12 |
| 200 | 380 | 6 | 22 | 14% | 18 |
| 400 | 740 | 7 | 28 | 26% | 24 |
| 600 | 1,080 | 9 | 35 | 37% | 31 |
| 800 | 1,420 | 11 | 42 | 48% | 38 |
| Metric | Before Index | After Index | Factor |
|---|---|---|---|
| Max RPS | 82 | 1,420 | 17.3x |
| p50 at 200 users | 2,100ms | 6ms | 350x |
| p99 at 400 users | 8,500ms | 28ms | 303x |
| PG CPU at 400 users | 95% | 26% | 3.7x reduction |
| Disk read at 400 users | 1,310 MB/s | 24 MB/s | 54x reduction |
The “scaling problem” was reading 634,521 pages per query instead of 23. One CREATE INDEX CONCURRENTLY statement produced a 350x p50 improvement. The database was never the problem. The missing index was the problem.
At 1,420 RPS on a single instance with the index, the platform handles more traffic than the sharded prototype handled across four instances. Four fewer PostgreSQL instances to operate. Four fewer connection pools to tune. Four fewer sets of replicas to monitor. Zero cross-shard query complexity.
Sharding is not wrong. Sharding before indexing is wrong. Sharding before connection pool tuning is wrong. Sharding before caching is wrong. Sharding before read replicas is wrong. Shard when a single PostgreSQL primary cannot handle write throughput after every cheaper option is exhausted. For the ride-hailing platform at 50M trips, that day has not arrived.