Dead Letter Queues and Poison Message Handling
Dead Letter Queues and Poison Message Handling
The Symptom
A document with a 15MB body field arrives in the CDC topic. The OpenSearch bulk index rejects it because the HTTP request exceeds the http.max_content_length setting. The consumer retries. And retries. The consumer is stuck on this single message, unable to process the 10,000 events behind it. The search index falls behind the database while the consumer retries a message that can never succeed.
The Internals
A poison message is a CDC event that can never be successfully processed, regardless of how many times it is retried. Common causes:
- Document exceeds OpenSearch’s maximum field length or HTTP content length
- Document contains invalid UTF-8 sequences that break the JSON serializer
- Document references a tenant whose index does not exist and cannot be auto-created
- Mapping conflict: the event contains a field value incompatible with the existing mapping (e.g., string value for an integer field)
A dead letter queue (DLQ) isolates poison messages from the main pipeline. Failed messages are moved to a separate topic after a configurable number of retries. The main pipeline continues processing the remaining events.
The Implementation
DLQ-Aware Consumer
@Component
public class ResilientDocumentConsumer {
private final OpenSearchClient client;
private final KafkaTemplate<String, String> kafkaTemplate;
private final DocumentTransformer transformer;
private static final int MAX_RETRIES = 3;
private static final String DLQ_TOPIC = "cdc.public.documents.dlq";
@KafkaListener(
topics = "cdc.public.documents",
groupId = "opensearch-indexer",
containerFactory = "batchListenerFactory"
)
public void consume(List<ConsumerRecord<String, String>> records,
Acknowledgment ack) {
List<ConsumerRecord<String, String>> retryable = new ArrayList<>();
// First pass: attempt to index all records
BulkRequest.Builder bulk = new BulkRequest.Builder()
.refresh(Refresh.False);
for (var record : records) {
try {
CdcEvent event = parseEvent(record.value());
addToBulk(bulk, event);
} catch (Exception e) {
// Parse failure → immediate DLQ (no retry will fix bad JSON)
sendToDlq(record, e.getMessage());
}
}
try {
BulkResponse response = client.bulk(bulk.build());
if (response.errors()) {
for (int i = 0; i < response.items().size(); i++) {
var item = response.items().get(i);
if (item.error() != null) {
var record = records.get(i);
String errorType = item.error().type();
if (isRetryable(errorType)) {
retryable.add(record);
} else {
sendToDlq(record, item.error().reason());
}
}
}
}
} catch (IOException e) {
// Cluster-level failure → do not acknowledge, redeliver all
throw new RuntimeException("Bulk index failed", e);
}
// Retry retryable failures with exponential backoff
for (var record : retryable) {
retryWithBackoff(record);
}
ack.acknowledge();
}
private boolean isRetryable(String errorType) {
return errorType.contains("rejected_execution") ||
errorType.contains("unavailable_shards") ||
errorType.contains("timeout");
}
private void sendToDlq(ConsumerRecord<String, String> record,
String reason) {
var headers = new RecordHeaders(record.headers());
headers.add("dlq.reason", reason.getBytes(StandardCharsets.UTF_8));
headers.add("dlq.timestamp",
Instant.now().toString().getBytes(StandardCharsets.UTF_8));
headers.add("dlq.original.topic",
record.topic().getBytes(StandardCharsets.UTF_8));
kafkaTemplate.send(DLQ_TOPIC, record.key(), record.value());
}
private void retryWithBackoff(ConsumerRecord<String, String> record) {
int retryCount = getRetryCount(record);
if (retryCount >= MAX_RETRIES) {
sendToDlq(record, "Exceeded max retries (" + MAX_RETRIES + ")");
return;
}
try {
Thread.sleep((long) Math.pow(2, retryCount) * 1000);
CdcEvent event = parseEvent(record.value());
indexSingleDocument(event);
} catch (Exception e) {
sendToDlq(record, e.getMessage());
}
}
}
DLQ Monitoring and Recovery
public class DlqMonitor {
private final KafkaConsumer<String, String> dlqConsumer;
public record DlqStats(
long totalMessages,
Map<String, Long> reasonCounts,
Instant oldestMessage,
Instant newestMessage
) {}
public DlqStats getDlqStats() {
var partitions = dlqConsumer.partitionsFor(DLQ_TOPIC);
long total = 0;
Map<String, Long> reasons = new LinkedHashMap<>();
for (var partition : partitions) {
TopicPartition tp = new TopicPartition(DLQ_TOPIC, partition.partition());
dlqConsumer.assign(List.of(tp));
dlqConsumer.seekToBeginning(List.of(tp));
ConsumerRecords<String, String> records =
dlqConsumer.poll(Duration.ofSeconds(5));
for (var record : records) {
total++;
String reason = new String(
record.headers().lastHeader("dlq.reason").value(),
StandardCharsets.UTF_8);
reasons.merge(reason, 1L, Long::sum);
}
}
return new DlqStats(total, reasons, null, null);
}
}
The Measurement
DLQ impact on pipeline throughput:
| Scenario | Without DLQ | With DLQ |
|---|---|---|
| Poison message (1 in 10,000) | Pipeline stuck indefinitely | 3s delay, continues |
| Transient failure (5% rate) | 5% events lost or stuck | Retried and indexed |
| Mapping conflict batch | Entire batch rejected | 1 event to DLQ, 999 indexed |
| DLQ backlog after 30 days | N/A | ~50 messages (manual review) |
The Decision Rule
Route permanently failing messages to a DLQ after 3 retry attempts. A message that fails 3 times with the same error is unlikely to succeed on the 4th attempt. The DLQ preserves the message for manual investigation without blocking the pipeline.
Classify errors as retryable (cluster overload, timeout, unavailable shards) or permanent (mapping conflict, oversized document, malformed data). Retryable errors get exponential backoff. Permanent errors go directly to the DLQ.
Monitor DLQ depth daily. A growing DLQ indicates a systemic problem (mapping mismatch, schema evolution error) rather than isolated poison messages. DLQ depth should be stable or decreasing after manual remediation.