Dead Letter Queues, Poison Pills, and Exactly-Once Semantics
Dead Letter Queues, Poison Pills, and Exactly-Once Semantics
In any event-driven system, messages will fail. The question isn’t whether — it’s whether failed messages disappear silently, block all processing, or get routed to a place where humans can investigate and reprocess them. Dead Letter Queues (DLQs) are the answer, but configuring them correctly is where most teams fail.
Dead Letter Queue Architecture
A DLQ is just a regular SQS queue configured as the destination for messages that exceed their maxReceiveCount. The critical design decisions:
import boto3
import json
sqs = boto3.client('sqs')
# Step 1: Create the DLQ first
dlq_response = sqs.create_queue(
QueueName='order-processing-dlq',
Attributes={
'MessageRetentionPeriod': '1209600', # 14 days (maximum)
'VisibilityTimeout': '300'
}
)
dlq_arn = sqs.get_queue_attributes(
QueueUrl=dlq_response['QueueUrl'],
AttributeNames=['QueueArn']
)['Attributes']['QueueArn']
# Step 2: Create the main queue with redrive policy
main_queue = sqs.create_queue(
QueueName='order-processing',
Attributes={
'MessageRetentionPeriod': '345600', # 4 days
'VisibilityTimeout': '300', # 5 min processing window
'ReceiveMessageWaitTimeSeconds': '20', # Long polling
'RedrivePolicy': json.dumps({
'deadLetterTargetArn': dlq_arn,
'maxReceiveCount': 3 # After 3 failed attempts → DLQ
})
}
)
# Step 3: Enable redrive allow policy on DLQ (who can send to it)
sqs.set_queue_attributes(
QueueUrl=dlq_response['QueueUrl'],
Attributes={
'RedriveAllowPolicy': json.dumps({
'redrivePermission': 'byQueue',
'sourceQueueArns': [
'arn:aws:sqs:us-east-1:123456789012:order-processing'
]
})
}
)
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.*;
import java.util.Map;
public class DlqSetup {
private final SqsClient sqs = SqsClient.create();
public void createQueueWithDlq() {
// Create DLQ
CreateQueueResponse dlqResponse = sqs.createQueue(CreateQueueRequest.builder()
.queueName("order-processing-dlq")
.attributes(Map.of(
QueueAttributeName.MESSAGE_RETENTION_PERIOD, "1209600",
QueueAttributeName.VISIBILITY_TIMEOUT, "300"
))
.build());
// Get DLQ ARN
String dlqArn = sqs.getQueueAttributes(GetQueueAttributesRequest.builder()
.queueUrl(dlqResponse.queueUrl())
.attributeNames(QueueAttributeName.QUEUE_ARN)
.build())
.attributes().get(QueueAttributeName.QUEUE_ARN);
// Create main queue with redrive to DLQ
sqs.createQueue(CreateQueueRequest.builder()
.queueName("order-processing")
.attributes(Map.of(
QueueAttributeName.MESSAGE_RETENTION_PERIOD, "345600",
QueueAttributeName.VISIBILITY_TIMEOUT, "300",
QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS, "20",
QueueAttributeName.REDRIVE_POLICY,
"""{"deadLetterTargetArn":"%s","maxReceiveCount":3}""".formatted(dlqArn)
))
.build());
}
}
Automated DLQ Reprocessing
Messages in the DLQ need a path back to the main queue — either automated (after a fix is deployed) or manual (after investigation):
def reprocess_dlq(dlq_url: str, main_queue_url: str, max_messages: int = 100):
"""
Move messages from DLQ back to main queue for reprocessing.
Use after deploying a fix for the root cause.
"""
processed = 0
while processed < max_messages:
response = sqs.receive_message(
QueueUrl=dlq_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=5,
MessageAttributeNames=['All'],
AttributeNames=['All']
)
messages = response.get('Messages', [])
if not messages:
break
for message in messages:
# Optionally inspect before reprocessing
body = json.loads(message['Body'])
receive_count = int(message['Attributes'].get('ApproximateReceiveCount', 0))
if receive_count > 10:
# This message has been reprocessed too many times
# Archive it for manual investigation
archive_permanently_failed(message)
sqs.delete_message(QueueUrl=dlq_url, ReceiptHandle=message['ReceiptHandle'])
continue
# Send back to main queue
send_params = {
'QueueUrl': main_queue_url,
'MessageBody': message['Body'],
'MessageAttributes': {
k: v for k, v in message.get('MessageAttributes', {}).items()
}
}
# Add metadata about the reprocessing
send_params['MessageAttributes']['ReprocessedFrom'] = {
'DataType': 'String',
'StringValue': 'DLQ'
}
send_params['MessageAttributes']['OriginalMessageId'] = {
'DataType': 'String',
'StringValue': message['MessageId']
}
sqs.send_message(**send_params)
sqs.delete_message(QueueUrl=dlq_url, ReceiptHandle=message['ReceiptHandle'])
processed += 1
return processed
# AWS also provides native DLQ redrive (since 2021):
# sqs.start_message_move_task(SourceArn=dlq_arn, DestinationArn=main_queue_arn)
# This moves messages in bulk without Lambda involvement.
Exactly-Once Semantics: The Application Pattern
AWS services provide at-least-once delivery. Exactly-once must be implemented at the application level using one of these patterns:
Pattern 1: Idempotency Key in DynamoDB
import hashlib
import time
def process_payment_idempotent(event: dict):
"""
Use a DynamoDB table as an idempotency store.
If the same message is processed twice, the second attempt is a no-op.
"""
dynamodb = boto3.resource('dynamodb')
idempotency_table = dynamodb.Table('idempotency-store')
# Generate idempotency key from message content
idempotency_key = hashlib.sha256(
json.dumps(event, sort_keys=True).encode()
).hexdigest()
# Try to claim this key (conditional write)
try:
idempotency_table.put_item(
Item={
'idempotency_key': idempotency_key,
'status': 'IN_PROGRESS',
'started_at': int(time.time()),
'ttl': int(time.time()) + 86400, # Auto-cleanup after 24h
'input_hash': idempotency_key
},
ConditionExpression='attribute_not_exists(idempotency_key)'
)
except dynamodb.meta.client.exceptions.ConditionalCheckFailedException:
# Already processed or in progress
existing = idempotency_table.get_item(
Key={'idempotency_key': idempotency_key}
).get('Item', {})
if existing.get('status') == 'COMPLETED':
# Return cached result
return existing.get('result')
elif existing.get('status') == 'IN_PROGRESS':
# Check if stale (processor crashed)
if time.time() - existing.get('started_at', 0) > 300:
# Stale — reclaim and retry
pass
else:
raise ConcurrentProcessingError("Another processor is handling this")
# Process the payment
try:
result = charge_payment(event)
# Mark as completed with cached result
idempotency_table.update_item(
Key={'idempotency_key': idempotency_key},
UpdateExpression='SET #s = :status, #r = :result, completed_at = :now',
ExpressionAttributeNames={'#s': 'status', '#r': 'result'},
ExpressionAttributeValues={
':status': 'COMPLETED',
':result': result,
':now': int(time.time())
}
)
return result
except Exception as e:
# Mark as failed — allow retry
idempotency_table.update_item(
Key={'idempotency_key': idempotency_key},
UpdateExpression='SET #s = :status, error_msg = :err',
ExpressionAttributeNames={'#s': 'status'},
ExpressionAttributeValues={
':status': 'FAILED',
':err': str(e)
}
)
raise
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.*;
import java.security.MessageDigest;
import java.time.Instant;
import java.util.*;
public class IdempotentProcessor {
private final DynamoDbClient dynamo = DynamoDbClient.create();
private static final String IDEMPOTENCY_TABLE = "idempotency-store";
public Optional<String> processIdempotent(String eventJson,
ProcessingFunction processor) throws Exception {
String idempotencyKey = sha256(eventJson);
long now = Instant.now().getEpochSecond();
// Attempt to claim processing slot
try {
dynamo.putItem(PutItemRequest.builder()
.tableName(IDEMPOTENCY_TABLE)
.item(Map.of(
"idempotency_key", attr(idempotencyKey),
"status", attr("IN_PROGRESS"),
"started_at", numAttr(String.valueOf(now)),
"ttl", numAttr(String.valueOf(now + 86400))
))
.conditionExpression("attribute_not_exists(idempotency_key)")
.build());
} catch (ConditionalCheckFailedException e) {
// Already exists — check status
GetItemResponse existing = dynamo.getItem(GetItemRequest.builder()
.tableName(IDEMPOTENCY_TABLE)
.key(Map.of("idempotency_key", attr(idempotencyKey)))
.build());
String status = existing.item().get("status").s();
if ("COMPLETED".equals(status)) {
return Optional.of(existing.item().get("result").s());
}
throw new ConcurrentProcessingException("Already in progress");
}
// Process
try {
String result = processor.process(eventJson);
dynamo.updateItem(UpdateItemRequest.builder()
.tableName(IDEMPOTENCY_TABLE)
.key(Map.of("idempotency_key", attr(idempotencyKey)))
.updateExpression("SET #s = :status, #r = :result")
.expressionAttributeNames(Map.of("#s", "status", "#r", "result"))
.expressionAttributeValues(Map.of(
":status", attr("COMPLETED"),
":result", attr(result)
))
.build());
return Optional.of(result);
} catch (Exception e) {
dynamo.updateItem(UpdateItemRequest.builder()
.tableName(IDEMPOTENCY_TABLE)
.key(Map.of("idempotency_key", attr(idempotencyKey)))
.updateExpression("SET #s = :status")
.expressionAttributeNames(Map.of("#s", "status"))
.expressionAttributeValues(Map.of(":status", attr("FAILED")))
.build());
throw e;
}
}
private String sha256(String input) throws Exception {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(input.getBytes());
return HexFormat.of().formatHex(hash);
}
private AttributeValue attr(String s) {
return AttributeValue.builder().s(s).build();
}
private AttributeValue numAttr(String n) {
return AttributeValue.builder().n(n).build();
}
}
Monitoring Event-Driven Systems
The invisible workflow problem requires specific observability:
import boto3
cloudwatch = boto3.client('cloudwatch')
# Critical alarms for event-driven health
alarms = [
{
'AlarmName': 'DLQ-Messages-Present',
'MetricName': 'ApproximateNumberOfMessagesVisible',
'Namespace': 'AWS/SQS',
'Dimensions': [{'Name': 'QueueName', 'Value': 'order-processing-dlq'}],
'Threshold': 0,
'ComparisonOperator': 'GreaterThanThreshold',
'EvaluationPeriods': 1,
'Period': 60,
'Statistic': 'Sum',
'TreatMissingData': 'notBreaching'
},
{
'AlarmName': 'Queue-Age-Too-High',
'MetricName': 'ApproximateAgeOfOldestMessage',
'Namespace': 'AWS/SQS',
'Dimensions': [{'Name': 'QueueName', 'Value': 'order-processing'}],
'Threshold': 300, # 5 minutes — messages shouldn't wait this long
'ComparisonOperator': 'GreaterThanThreshold',
'EvaluationPeriods': 3,
'Period': 60,
'Statistic': 'Maximum',
'TreatMissingData': 'notBreaching'
}
]
for alarm in alarms:
cloudwatch.put_metric_alarm(**alarm, ActionsEnabled=True,
AlarmActions=['arn:aws:sns:us-east-1:123456789012:ops-alerts'])
The golden rule: If ApproximateAgeOfOldestMessage is growing, your consumers are falling behind. If ApproximateNumberOfMessagesNotVisible is growing, your consumers are working but taking too long (or crashing). If NumberOfMessagesReceived equals NumberOfMessagesSent but NumberOfMessagesDeleted is lower, you have processing failures.