Skip to main content
surviving the spike

Backpressure, Consumer Lag, and Dead Letter Queues

11 min read Chapter 27 of 66

Backpressure, Consumer Lag, and Dead Letter Queues

The Symptom

The driver location pipeline processes location updates from every active driver in real time. During Friday evening surge: 22,000 active drivers, each emitting GPS coordinates every 2 seconds. That is 11,000 events per second at baseline. During a city-wide concert event with surge pricing across 40 zones: 50,000 events per second. The three consumer instances that handled baseline traffic fell behind. Consumer lag hit 2.4 million messages. The driver map in the rider app showed positions that were 4 minutes stale. Riders requested pickups from drivers who had already moved 6 blocks.

The on-call engineer scaled consumers from 3 to 6. Lag kept growing. Scaled to 9. Lag stabilized but did not decrease. The partition count was 12, and 9 consumers with concurrency 1 meant 9 active threads. Three partitions had 2x the traffic (airport, downtown, stadium zones). Those three partitions drove all the lag.

The Cause

Three problems compounded:

Problem 1: Consumer lag. The consumers process each location update by writing to Redis (for the real-time driver map) and to TimescaleDB (for historical analytics). The Redis write takes 2ms. The TimescaleDB write takes 18ms. At 50,000 events/second across 12 partitions, each partition receives ~4,167 events/second. Each event takes 20ms to process. One consumer thread processes 50 events/second. A single consumer cannot keep up with a single partition.

Problem 2: Uneven partition distribution. The partition key is driverId. Drivers cluster geographically. Airport drivers, downtown drivers, and stadium drivers each dominate a few partitions. Three partitions handle 35% of total traffic. The fastest 9 partitions finish their batches while the slowest 3 partitions accumulate lag.

Problem 3: No backpressure signal. The producer publishes at line rate. When consumers fall behind, the producer has no feedback mechanism. Events accumulate in Kafka until the retention period expires (7 days). The system “works” in the sense that nothing crashes. But the data is stale, which is worse than crashing because nobody gets paged.

The Baseline

# load-tests/driver_location_locustfile.py
from locust import HttpUser, task, between, LoadTestShape
import random
import time

class DriverLocationUser(HttpUser):
    wait_time = between(1.5, 2.5)  # Simulates GPS interval

    def on_start(self):
        self.driver_id = f"driver-{random.randint(1, 50000)}"
        self.lat = 40.7128 + random.uniform(-0.1, 0.1)
        self.lng = -74.0060 + random.uniform(-0.1, 0.1)

    @task
    def send_location(self):
        # Simulate GPS drift
        self.lat += random.uniform(-0.001, 0.001)
        self.lng += random.uniform(-0.001, 0.001)
        self.client.post(
            "/api/drivers/location",
            json={
                "driverId": self.driver_id,
                "latitude": round(self.lat, 6),
                "longitude": round(self.lng, 6),
                "heading": random.randint(0, 360),
                "speed": round(random.uniform(0, 65), 1),
                "timestamp": int(time.time() * 1000)
            },
            name="/api/drivers/location"
        )


class DriverLocationShape(LoadTestShape):
    stages = [
        {"duration": 60,  "users": 5000,   "spawn_rate": 500},
        {"duration": 120, "users": 15000,  "spawn_rate": 1000},
        {"duration": 180, "users": 30000,  "spawn_rate": 1000},
        {"duration": 240, "users": 50000,  "spawn_rate": 2000},
        {"duration": 360, "users": 50000,  "spawn_rate": 0},
    ]

    def tick(self):
        run_time = self.get_run_time()
        for stage in self.stages:
            if run_time < stage["duration"]:
                return (stage["users"], stage["spawn_rate"])
        return None

3 consumer instances, 12 partitions, each consumer with concurrency=1 (3 active consumer threads):

Time (s)Events/sConsumer LagLag Growth Ratep99 Staleness
605,00000<1s
12015,000180,0003,000/s12s
18030,0001,080,00015,000/s72s
24050,0002,400,00022,000/s240s
36050,0005,040,00022,000/s420s

At 50,000 events/second, the 3 consumers process ~150 events/second total. Lag grows at 22,000/second. After 6 minutes at peak rate, the driver map is 7 minutes behind reality.

The Fix

Step 1: Measure Consumer Lag with Prometheus

Before fixing anything, instrument the lag. If you cannot measure it, you cannot alert on it, and you will not know when the fix works.

// SCALED: Consumer lag gauge exposed to Prometheus
@Component
public class ConsumerLagMonitor {

    private final AdminClient adminClient;
    private final MeterRegistry meterRegistry;

    @Scheduled(fixedRate = 5000)
    public void recordConsumerLag() {
        Map<TopicPartition, OffsetAndMetadata> committed =
            adminClient.listConsumerGroupOffsets("driver-location-tracker")
                .partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);

        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
            adminClient.listOffsets(
                committed.keySet().stream().collect(Collectors.toMap(
                    tp -> tp,
                    tp -> OffsetSpec.latest()
                ))
            ).all().get(10, TimeUnit.SECONDS);

        committed.forEach((tp, offsetMeta) -> {
            long lag = endOffsets.get(tp).offset() - offsetMeta.offset();
            Gauge.builder("kafka.consumer.lag")
                .tag("topic", tp.topic())
                .tag("partition", String.valueOf(tp.partition()))
                .tag("group", "driver-location-tracker")
                .register(meterRegistry)
                .set(lag);
        });
    }
}
# prometheus/alerts.yml
- alert: KafkaConsumerLagCritical
  expr: sum(kafka_consumer_lag{group="driver-location-tracker"}) > 100000
  for: 60s
  labels:
    severity: critical
  annotations:
    summary: "Driver location consumer lag is {{ $value }} messages"
    description: "Driver map staleness will exceed 30s. Scale consumers."

Step 2: Increase Partition Count

12 partitions cap parallelism at 12 consumer threads. At 50,000 events/second with 20ms processing per event, you need at least 1,000 consumer threads. That is impractical. The fix is twofold: increase partitions and reduce per-event processing time.

First, separate the processing. Redis write (2ms) and TimescaleDB write (18ms) do not need to happen in the same consumer. Split them into two consumer groups.

// SCALED: Redis consumer - fast path for real-time driver map
@Component
public class DriverLocationRedisConsumer {

    private final RedisTemplate<String, DriverLocation> redisTemplate;

    @KafkaListener(
        topics = "driver-location-updates",
        groupId = "driver-location-redis",
        concurrency = "12"
    )
    public void onLocationUpdate(@Payload DriverLocationEvent event) {
        // SCALED: 2ms per event - Redis SETEX with 30s TTL
        redisTemplate.opsForValue().set(
            "driver:loc:" + event.driverId(),
            new DriverLocation(event.latitude(), event.longitude(),
                event.heading(), event.speed()),
            Duration.ofSeconds(30)
        );
    }
}
// SCALED: TimescaleDB consumer - slow path for analytics, can lag
@Component
public class DriverLocationAnalyticsConsumer {

    private final JdbcTemplate jdbcTemplate;

    @KafkaListener(
        topics = "driver-location-updates",
        groupId = "driver-location-analytics",
        concurrency = "6",
        properties = {
            "max.poll.records=500",
            "fetch.min.bytes=50000"
        }
    )
    public void onLocationUpdate(
            @Payload List<DriverLocationEvent> events) {
        // SCALED: Batch insert - 500 rows in one statement
        jdbcTemplate.batchUpdate(
            "INSERT INTO driver_locations (driver_id, lat, lng, heading, speed, ts) VALUES (?, ?, ?, ?, ?, ?)",
            events.stream().map(e -> new Object[]{
                e.driverId(), e.latitude(), e.longitude(),
                e.heading(), e.speed(), Timestamp.from(e.timestamp())
            }).toList()
        );
    }
}

The Redis consumer processes each event in 2ms. With 12 partitions and 12 consumer threads, that is 6,000 events/second. Still short of 50,000. Scale partitions.

# Scale from 12 to 48 partitions
kafka-topics.sh --bootstrap-server kafka-1:9092 \
  --alter --topic driver-location-updates \
  --partitions 48

48 partitions with 12 consumer threads: each thread handles 4 partitions. At 2ms per event, each thread processes 500 events/second. 12 threads process 6,000/second. Still short. Scale the consumer deployment.

Step 3: Scale Consumer Instances

# k8s/driver-location-redis-consumer.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: driver-location-redis-consumer
spec:
  replicas: 4
  selector:
    matchLabels:
      app: driver-location-redis-consumer
  template:
    metadata:
      labels:
        app: driver-location-redis-consumer
    spec:
      containers:
        - name: consumer
          image: ridehail/driver-location-consumer:latest
          env:
            - name: SPRING_KAFKA_LISTENER_CONCURRENCY
              value: "12"
          resources:
            requests:
              cpu: "1000m"
              memory: "768Mi"
            limits:
              cpu: "2000m"
              memory: "1536Mi"

4 replicas, each with concurrency 12: 48 consumer threads. 48 threads for 48 partitions. Each thread handles one partition. At ~1,042 events/partition/second and 2ms per event, each thread processes 500 events/second with headroom. But 48 threads at 500/second is only 24,000/second. Half of peak.

The remaining gap is closed by batch processing. Instead of one-at-a-time, poll 200 records per batch:

// SCALED: Batch processing with pipelined Redis writes
@KafkaListener(
    topics = "driver-location-updates",
    groupId = "driver-location-redis",
    concurrency = "12",
    batch = "true",
    properties = {
        "max.poll.records=200",
        "fetch.min.bytes=10000"
    }
)
public void onLocationBatch(@Payload List<DriverLocationEvent> events) {
    // SCALED: Pipeline 200 Redis writes in one round-trip
    redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
        for (DriverLocationEvent event : events) {
            byte[] key = ("driver:loc:" + event.driverId()).getBytes();
            byte[] value = serialize(event);
            connection.stringCommands().setEx(key, 30, value);
        }
        return null;
    });
}

200 pipelined Redis writes complete in ~4ms (one network round-trip instead of 200). One consumer thread processes 200 events in 4ms. That is 50,000 events/second per thread. With 48 threads, capacity is 2.4 million events/second. The bottleneck moves from consumers to the Kafka broker network.

Step 4: Dead Letter Topics for Poisoned Events

A malformed driver location event with latitude: null caused a NullPointerException. The consumer retried 3 times, failed 3 times, and blocked the partition. All subsequent events on that partition stalled. 4% of drivers disappeared from the map.

// SCALED: DLT routing for events that fail after retries
@Configuration
public class DriverLocationConsumerConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, DriverLocationEvent>
            locationListenerFactory(
                ConsumerFactory<String, DriverLocationEvent> cf,
                KafkaTemplate<String, Object> kafkaTemplate) {

        var factory = new ConcurrentKafkaListenerContainerFactory<String, DriverLocationEvent>();
        factory.setConsumerFactory(cf);
        factory.setBatchListener(true);

        // SCALED: After 3 retries, send to DLT and move on
        var recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
            (record, ex) -> new TopicPartition(
                record.topic() + ".DLT",
                record.partition()
            )
        );

        var errorHandler = new DefaultErrorHandler(recoverer,
            new FixedBackOff(500L, 3));

        // SCALED: Skip deserialization errors immediately - no retry
        errorHandler.addNotRetryableExceptions(
            DeserializationException.class,
            ClassCastException.class
        );

        factory.setCommonErrorHandler(errorHandler);
        return factory;
    }
}

The DeadLetterPublishingRecoverer publishes failed records to driver-location-updates.DLT. The original event, the exception stack trace, and the original topic/partition/offset are preserved in headers. An operations team reviews DLT events daily.

For poison pills (events that fail on every attempt regardless of retry), the addNotRetryableExceptions call skips retries entirely. A DeserializationException means the bytes cannot become a DriverLocationEvent. Retrying will not fix corrupted bytes. Send it to the DLT immediately and continue processing the next record.

// SCALED: DLT consumer for monitoring and alerting
@Component
public class DriverLocationDltConsumer {

    private final MeterRegistry meterRegistry;
    private static final Logger log = LoggerFactory.getLogger(DriverLocationDltConsumer.class);

    @KafkaListener(
        topics = "driver-location-updates.DLT",
        groupId = "driver-location-dlt-monitor"
    )
    public void onDeadLetter(
            ConsumerRecord<String, byte[]> record,
            @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String errorMessage,
            @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
            @Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset) {

        meterRegistry.counter("kafka.dlt.events",
            "topic", originalTopic).increment();

        log.error("DLT event from {} offset {}: {}",
            originalTopic, originalOffset, errorMessage);
    }
}
# prometheus/alerts.yml
- alert: KafkaDLTEventsHigh
  expr: rate(kafka_dlt_events_total{topic="driver-location-updates"}[5m]) > 10
  for: 30s
  labels:
    severity: warning
  annotations:
    summary: "Driver location DLT receiving {{ $value }} events/s"
    description: "Possible poison pill or schema mismatch in producer."

Step 5: Backpressure via Consumer Pause/Resume

When the Redis consumer detects that processing is falling behind (lag exceeds a threshold), it can pause consumption on specific partitions. This gives overloaded partitions time to drain while other partitions continue processing.

// SCALED: Pause overloaded partitions based on lag
@Component
public class BackpressureManager implements ConsumerAwareRebalanceListener {

    private final AtomicBoolean paused = new AtomicBoolean(false);
    private static final long LAG_PAUSE_THRESHOLD = 50_000;
    private static final long LAG_RESUME_THRESHOLD = 10_000;

    @Scheduled(fixedRate = 5000)
    public void evaluateBackpressure(
            @Autowired KafkaListenerEndpointRegistry registry) {

        var container = registry.getListenerContainer("driver-location-redis");
        if (container == null) return;

        long totalLag = getTotalLag();

        if (totalLag > LAG_PAUSE_THRESHOLD && !paused.get()) {
            container.pause();
            paused.set(true);
            log.warn("Paused driver-location-redis consumer. Lag: {}", totalLag);
        } else if (totalLag < LAG_RESUME_THRESHOLD && paused.get()) {
            container.resume();
            paused.set(false);
            log.info("Resumed driver-location-redis consumer. Lag: {}", totalLag);
        }
    }
}

Pausing is a last resort. The primary defense is scaling consumers and partitions. But during cascading failures (Redis latency spike, network partition), pausing prevents the consumer from accumulating a processing backlog that takes minutes to drain after the failure resolves.

The Proof

Same Locust test, now with 48 partitions, 4 consumer replicas (12 threads each = 48 threads), batch processing with pipelined Redis writes, and DLT for poison pills:

3 Consumers (Original)

Time (s)Events/sConsumer LagLag Growth Ratep99 Staleness
605,00000<1s
12015,000180,0003,000/s12s
18030,0001,080,00015,000/s72s
24050,0002,400,00022,000/s240s

12 Consumers (48 Partitions, Batch + Pipeline)

Time (s)Events/sConsumer LagLag Growth Ratep99 Staleness
605,00000<1s
12015,00000<1s
18030,00000<1s
24050,000850~0<2s
36050,0001,200~0<2s
Metric3 Consumers (12 parts)12 Consumers (48 parts)Improvement
Max events/s at zero lag~5,00050,000+10x
Consumer lag at 50k events/s2,400,0001,2002,000x reduction
p99 staleness at 50k events/s240s<2s120x
DLT events captured0 (blocked partition)47 (routed to DLT)No stuck partitions
Recovery time after spike45 minutes<10s270x

The driver map now shows positions that are at most 2 seconds old during the worst surge. The 47 DLT events in the test were intentionally malformed records. Without the DLT, those 47 events would have blocked 47 partitions. With the DLT, they were routed, logged, and the consumer moved on in under 1ms per poison pill.

The cost: 4 consumer pods at 1 CPU / 768Mi each. Total: 4 vCPUs and 3Gi memory for the driver location Redis pipeline. The alternative was 240 seconds of driver map staleness, which translates to riders requesting pickups from phantom drivers, generating 3x support tickets during surge events.