Skip to main content
aws in the trenches advanced cloud engineering for senior developers

Dead Letter Queues, Poison Pills, and Exactly-Once Semantics

6 min read Chapter 8 of 21

Dead Letter Queues, Poison Pills, and Exactly-Once Semantics

In any event-driven system, messages will fail. The question isn’t whether — it’s whether failed messages disappear silently, block all processing, or get routed to a place where humans can investigate and reprocess them. Dead Letter Queues (DLQs) are the answer, but configuring them correctly is where most teams fail.

Dead Letter Queue Architecture

A DLQ is just a regular SQS queue configured as the destination for messages that exceed their maxReceiveCount. The critical design decisions:

import boto3
import json

sqs = boto3.client('sqs')

# Step 1: Create the DLQ first
dlq_response = sqs.create_queue(
    QueueName='order-processing-dlq',
    Attributes={
        'MessageRetentionPeriod': '1209600',  # 14 days (maximum)
        'VisibilityTimeout': '300'
    }
)
dlq_arn = sqs.get_queue_attributes(
    QueueUrl=dlq_response['QueueUrl'],
    AttributeNames=['QueueArn']
)['Attributes']['QueueArn']

# Step 2: Create the main queue with redrive policy
main_queue = sqs.create_queue(
    QueueName='order-processing',
    Attributes={
        'MessageRetentionPeriod': '345600',   # 4 days
        'VisibilityTimeout': '300',            # 5 min processing window
        'ReceiveMessageWaitTimeSeconds': '20', # Long polling
        'RedrivePolicy': json.dumps({
            'deadLetterTargetArn': dlq_arn,
            'maxReceiveCount': 3  # After 3 failed attempts → DLQ
        })
    }
)

# Step 3: Enable redrive allow policy on DLQ (who can send to it)
sqs.set_queue_attributes(
    QueueUrl=dlq_response['QueueUrl'],
    Attributes={
        'RedriveAllowPolicy': json.dumps({
            'redrivePermission': 'byQueue',
            'sourceQueueArns': [
                'arn:aws:sqs:us-east-1:123456789012:order-processing'
            ]
        })
    }
)
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.*;
import java.util.Map;

public class DlqSetup {

    private final SqsClient sqs = SqsClient.create();

    public void createQueueWithDlq() {
        // Create DLQ
        CreateQueueResponse dlqResponse = sqs.createQueue(CreateQueueRequest.builder()
            .queueName("order-processing-dlq")
            .attributes(Map.of(
                QueueAttributeName.MESSAGE_RETENTION_PERIOD, "1209600",
                QueueAttributeName.VISIBILITY_TIMEOUT, "300"
            ))
            .build());

        // Get DLQ ARN
        String dlqArn = sqs.getQueueAttributes(GetQueueAttributesRequest.builder()
            .queueUrl(dlqResponse.queueUrl())
            .attributeNames(QueueAttributeName.QUEUE_ARN)
            .build())
            .attributes().get(QueueAttributeName.QUEUE_ARN);

        // Create main queue with redrive to DLQ
        sqs.createQueue(CreateQueueRequest.builder()
            .queueName("order-processing")
            .attributes(Map.of(
                QueueAttributeName.MESSAGE_RETENTION_PERIOD, "345600",
                QueueAttributeName.VISIBILITY_TIMEOUT, "300",
                QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS, "20",
                QueueAttributeName.REDRIVE_POLICY,
                    """{"deadLetterTargetArn":"%s","maxReceiveCount":3}""".formatted(dlqArn)
            ))
            .build());
    }
}

Automated DLQ Reprocessing

Messages in the DLQ need a path back to the main queue — either automated (after a fix is deployed) or manual (after investigation):

def reprocess_dlq(dlq_url: str, main_queue_url: str, max_messages: int = 100):
    """
    Move messages from DLQ back to main queue for reprocessing.
    Use after deploying a fix for the root cause.
    """
    processed = 0

    while processed < max_messages:
        response = sqs.receive_message(
            QueueUrl=dlq_url,
            MaxNumberOfMessages=10,
            WaitTimeSeconds=5,
            MessageAttributeNames=['All'],
            AttributeNames=['All']
        )

        messages = response.get('Messages', [])
        if not messages:
            break

        for message in messages:
            # Optionally inspect before reprocessing
            body = json.loads(message['Body'])
            receive_count = int(message['Attributes'].get('ApproximateReceiveCount', 0))

            if receive_count > 10:
                # This message has been reprocessed too many times
                # Archive it for manual investigation
                archive_permanently_failed(message)
                sqs.delete_message(QueueUrl=dlq_url, ReceiptHandle=message['ReceiptHandle'])
                continue

            # Send back to main queue
            send_params = {
                'QueueUrl': main_queue_url,
                'MessageBody': message['Body'],
                'MessageAttributes': {
                    k: v for k, v in message.get('MessageAttributes', {}).items()
                }
            }

            # Add metadata about the reprocessing
            send_params['MessageAttributes']['ReprocessedFrom'] = {
                'DataType': 'String',
                'StringValue': 'DLQ'
            }
            send_params['MessageAttributes']['OriginalMessageId'] = {
                'DataType': 'String',
                'StringValue': message['MessageId']
            }

            sqs.send_message(**send_params)
            sqs.delete_message(QueueUrl=dlq_url, ReceiptHandle=message['ReceiptHandle'])
            processed += 1

    return processed

# AWS also provides native DLQ redrive (since 2021):
# sqs.start_message_move_task(SourceArn=dlq_arn, DestinationArn=main_queue_arn)
# This moves messages in bulk without Lambda involvement.

Exactly-Once Semantics: The Application Pattern

AWS services provide at-least-once delivery. Exactly-once must be implemented at the application level using one of these patterns:

Pattern 1: Idempotency Key in DynamoDB

import hashlib
import time

def process_payment_idempotent(event: dict):
    """
    Use a DynamoDB table as an idempotency store.
    If the same message is processed twice, the second attempt is a no-op.
    """
    dynamodb = boto3.resource('dynamodb')
    idempotency_table = dynamodb.Table('idempotency-store')

    # Generate idempotency key from message content
    idempotency_key = hashlib.sha256(
        json.dumps(event, sort_keys=True).encode()
    ).hexdigest()

    # Try to claim this key (conditional write)
    try:
        idempotency_table.put_item(
            Item={
                'idempotency_key': idempotency_key,
                'status': 'IN_PROGRESS',
                'started_at': int(time.time()),
                'ttl': int(time.time()) + 86400,  # Auto-cleanup after 24h
                'input_hash': idempotency_key
            },
            ConditionExpression='attribute_not_exists(idempotency_key)'
        )
    except dynamodb.meta.client.exceptions.ConditionalCheckFailedException:
        # Already processed or in progress
        existing = idempotency_table.get_item(
            Key={'idempotency_key': idempotency_key}
        ).get('Item', {})

        if existing.get('status') == 'COMPLETED':
            # Return cached result
            return existing.get('result')
        elif existing.get('status') == 'IN_PROGRESS':
            # Check if stale (processor crashed)
            if time.time() - existing.get('started_at', 0) > 300:
                # Stale — reclaim and retry
                pass
            else:
                raise ConcurrentProcessingError("Another processor is handling this")

    # Process the payment
    try:
        result = charge_payment(event)

        # Mark as completed with cached result
        idempotency_table.update_item(
            Key={'idempotency_key': idempotency_key},
            UpdateExpression='SET #s = :status, #r = :result, completed_at = :now',
            ExpressionAttributeNames={'#s': 'status', '#r': 'result'},
            ExpressionAttributeValues={
                ':status': 'COMPLETED',
                ':result': result,
                ':now': int(time.time())
            }
        )
        return result

    except Exception as e:
        # Mark as failed — allow retry
        idempotency_table.update_item(
            Key={'idempotency_key': idempotency_key},
            UpdateExpression='SET #s = :status, error_msg = :err',
            ExpressionAttributeNames={'#s': 'status'},
            ExpressionAttributeValues={
                ':status': 'FAILED',
                ':err': str(e)
            }
        )
        raise
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.*;
import java.security.MessageDigest;
import java.time.Instant;
import java.util.*;

public class IdempotentProcessor {

    private final DynamoDbClient dynamo = DynamoDbClient.create();
    private static final String IDEMPOTENCY_TABLE = "idempotency-store";

    public Optional<String> processIdempotent(String eventJson,
                                               ProcessingFunction processor) throws Exception {
        String idempotencyKey = sha256(eventJson);
        long now = Instant.now().getEpochSecond();

        // Attempt to claim processing slot
        try {
            dynamo.putItem(PutItemRequest.builder()
                .tableName(IDEMPOTENCY_TABLE)
                .item(Map.of(
                    "idempotency_key", attr(idempotencyKey),
                    "status", attr("IN_PROGRESS"),
                    "started_at", numAttr(String.valueOf(now)),
                    "ttl", numAttr(String.valueOf(now + 86400))
                ))
                .conditionExpression("attribute_not_exists(idempotency_key)")
                .build());
        } catch (ConditionalCheckFailedException e) {
            // Already exists — check status
            GetItemResponse existing = dynamo.getItem(GetItemRequest.builder()
                .tableName(IDEMPOTENCY_TABLE)
                .key(Map.of("idempotency_key", attr(idempotencyKey)))
                .build());

            String status = existing.item().get("status").s();
            if ("COMPLETED".equals(status)) {
                return Optional.of(existing.item().get("result").s());
            }
            throw new ConcurrentProcessingException("Already in progress");
        }

        // Process
        try {
            String result = processor.process(eventJson);

            dynamo.updateItem(UpdateItemRequest.builder()
                .tableName(IDEMPOTENCY_TABLE)
                .key(Map.of("idempotency_key", attr(idempotencyKey)))
                .updateExpression("SET #s = :status, #r = :result")
                .expressionAttributeNames(Map.of("#s", "status", "#r", "result"))
                .expressionAttributeValues(Map.of(
                    ":status", attr("COMPLETED"),
                    ":result", attr(result)
                ))
                .build());

            return Optional.of(result);
        } catch (Exception e) {
            dynamo.updateItem(UpdateItemRequest.builder()
                .tableName(IDEMPOTENCY_TABLE)
                .key(Map.of("idempotency_key", attr(idempotencyKey)))
                .updateExpression("SET #s = :status")
                .expressionAttributeNames(Map.of("#s", "status"))
                .expressionAttributeValues(Map.of(":status", attr("FAILED")))
                .build());
            throw e;
        }
    }

    private String sha256(String input) throws Exception {
        MessageDigest digest = MessageDigest.getInstance("SHA-256");
        byte[] hash = digest.digest(input.getBytes());
        return HexFormat.of().formatHex(hash);
    }

    private AttributeValue attr(String s) {
        return AttributeValue.builder().s(s).build();
    }
    private AttributeValue numAttr(String n) {
        return AttributeValue.builder().n(n).build();
    }
}

Monitoring Event-Driven Systems

The invisible workflow problem requires specific observability:

import boto3

cloudwatch = boto3.client('cloudwatch')

# Critical alarms for event-driven health
alarms = [
    {
        'AlarmName': 'DLQ-Messages-Present',
        'MetricName': 'ApproximateNumberOfMessagesVisible',
        'Namespace': 'AWS/SQS',
        'Dimensions': [{'Name': 'QueueName', 'Value': 'order-processing-dlq'}],
        'Threshold': 0,
        'ComparisonOperator': 'GreaterThanThreshold',
        'EvaluationPeriods': 1,
        'Period': 60,
        'Statistic': 'Sum',
        'TreatMissingData': 'notBreaching'
    },
    {
        'AlarmName': 'Queue-Age-Too-High',
        'MetricName': 'ApproximateAgeOfOldestMessage',
        'Namespace': 'AWS/SQS',
        'Dimensions': [{'Name': 'QueueName', 'Value': 'order-processing'}],
        'Threshold': 300,  # 5 minutes — messages shouldn't wait this long
        'ComparisonOperator': 'GreaterThanThreshold',
        'EvaluationPeriods': 3,
        'Period': 60,
        'Statistic': 'Maximum',
        'TreatMissingData': 'notBreaching'
    }
]

for alarm in alarms:
    cloudwatch.put_metric_alarm(**alarm, ActionsEnabled=True,
        AlarmActions=['arn:aws:sns:us-east-1:123456789012:ops-alerts'])

The golden rule: If ApproximateAgeOfOldestMessage is growing, your consumers are falling behind. If ApproximateNumberOfMessagesNotVisible is growing, your consumers are working but taking too long (or crashing). If NumberOfMessagesReceived equals NumberOfMessagesSent but NumberOfMessagesDeleted is lower, you have processing failures.