Skip to main content
aws in the trenches advanced cloud engineering for senior developers

DynamoDB Streams, Change Data Capture, and Materialized Views

6 min read Chapter 6 of 21

DynamoDB Streams, Change Data Capture, and Materialized Views

DynamoDB Streams captures a time-ordered sequence of item-level modifications to a table. Every PutItem, UpdateItem, and DeleteItem produces a stream record within milliseconds. This is your hook for building event-driven architectures on DynamoDB — materialized views, cross-region replication, audit logs, and real-time analytics.

Stream Mechanics

A DynamoDB Stream is partitioned into shards (similar to Kinesis). Each shard corresponds roughly to a table partition, and stream records within a shard are strictly ordered by modification time. Across shards, there’s no ordering guarantee.

Four stream view types control what data appears in records:

View TypeContentsUse Case
KEYS_ONLYOnly the key attributesTriggering downstream lookups
NEW_IMAGEFull item after modificationBuilding read replicas
OLD_IMAGEFull item before modificationAudit trails, undo operations
NEW_AND_OLD_IMAGESBoth before and afterComputing deltas, validation
import boto3
import json
from decimal import Decimal

# Lambda handler for DynamoDB Stream events
def stream_handler(event, context):
    """
    Process DynamoDB Stream records.
    This function is invoked by Lambda's event source mapping.
    """
    for record in event['Records']:
        event_name = record['eventName']  # INSERT, MODIFY, REMOVE

        if event_name == 'INSERT':
            new_image = record['dynamodb']['NewImage']
            handle_new_item(deserialize(new_image))

        elif event_name == 'MODIFY':
            old_image = record['dynamodb'].get('OldImage', {})
            new_image = record['dynamodb']['NewImage']
            handle_modification(deserialize(old_image), deserialize(new_image))

        elif event_name == 'REMOVE':
            old_image = record['dynamodb']['OldImage']
            handle_deletion(deserialize(old_image))

    return {'statusCode': 200, 'batchItemFailures': []}

def deserialize(dynamodb_item: dict) -> dict:
    """Convert DynamoDB JSON format to regular Python dict."""
    from boto3.dynamodb.types import TypeDeserializer
    deserializer = TypeDeserializer()
    return {k: deserializer.deserialize(v) for k, v in dynamodb_item.items()}

def handle_modification(old_item: dict, new_item: dict):
    """Example: Detect status transitions for order state machine."""
    if old_item.get('entity_type') != 'Order':
        return

    old_status = old_item.get('status')
    new_status = new_item.get('status')

    if old_status != new_status:
        print(f"Order {new_item['order_id']} transitioned: {old_status}{new_status}")
        # Trigger downstream: send notification, update analytics, etc.
        publish_status_change(new_item['order_id'], old_status, new_status)
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import java.util.*;

public class StreamProcessor implements RequestHandler<DynamodbEvent, Map<String, Object>> {

    @Override
    public Map<String, Object> handleRequest(DynamodbEvent event, Context context) {
        List<Map<String, Object>> batchItemFailures = new ArrayList<>();

        for (DynamodbEvent.DynamodbEventRecord record : event.getRecords()) {
            try {
                processRecord(record);
            } catch (Exception e) {
                // Report individual item failure for partial batch retry
                batchItemFailures.add(Map.of(
                    "itemIdentifier", record.getEventID()
                ));
                context.getLogger().log("Failed: " + record.getEventID() + " - " + e.getMessage());
            }
        }

        return Map.of("batchItemFailures", batchItemFailures);
    }

    private void processRecord(DynamodbEvent.DynamodbEventRecord record) {
        String eventName = record.getEventName();
        DynamodbStreamRecord streamRecord = record.getDynamodb();

        switch (eventName) {
            case "INSERT" -> handleInsert(streamRecord.getNewImage());
            case "MODIFY" -> handleModify(streamRecord.getOldImage(), streamRecord.getNewImage());
            case "REMOVE" -> handleRemove(streamRecord.getOldImage());
        }
    }

    private void handleModify(
            Map<String, com.amazonaws.services.lambda.runtime.events.models.dynamodb.AttributeValue> oldImage,
            Map<String, com.amazonaws.services.lambda.runtime.events.models.dynamodb.AttributeValue> newImage) {

        String entityType = newImage.get("entity_type").getS();
        if (!"Order".equals(entityType)) return;

        String oldStatus = oldImage.get("status").getS();
        String newStatus = newImage.get("status").getS();

        if (!oldStatus.equals(newStatus)) {
            String orderId = newImage.get("order_id").getS();
            System.out.printf("Order %s: %s → %s%n", orderId, oldStatus, newStatus);
            publishStatusChange(orderId, oldStatus, newStatus);
        }
    }
}

Building Materialized Views from Streams

A materialized view in DynamoDB is a secondary representation of your data, maintained asynchronously through stream processing. Common examples: leaderboards, counts, denormalized aggregates.

# Materialized view: Real-time order count and total per customer
# Maintained by a Lambda processing the DynamoDB stream

def update_customer_aggregate(event, context):
    """
    Maintain a running aggregate: total orders and spend per customer.
    This is eventually consistent (stream latency: typically <100ms).
    """
    dynamodb = boto3.resource('dynamodb')
    aggregates_table = dynamodb.Table('customer-aggregates')

    for record in event['Records']:
        if record['eventName'] != 'INSERT':
            continue

        new_image = deserialize(record['dynamodb']['NewImage'])

        if new_image.get('entity_type') != 'Order':
            continue

        customer_id = new_image['pk'].replace('USER#', '')
        order_total = Decimal(str(new_image.get('total', 0)))

        # Atomic increment — handles concurrent Lambda invocations safely
        aggregates_table.update_item(
            Key={
                'pk': f'CUSTOMER#{customer_id}',
                'sk': 'AGGREGATE'
            },
            UpdateExpression='''
                SET order_count = if_not_exists(order_count, :zero) + :one,
                    total_spend = if_not_exists(total_spend, :zero) + :amount,
                    last_order_date = :now
            ''',
            ExpressionAttributeValues={
                ':zero': Decimal('0'),
                ':one': Decimal('1'),
                ':amount': order_total,
                ':now': new_image.get('sk', '').split('#')[1] if '#' in new_image.get('sk', '') else ''
            }
        )

# The aggregate is now queryable in O(1) time:
# GET /customers/123/stats → Single GetItem on customer-aggregates table
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.*;
import java.util.Map;

public class MaterializedViewUpdater {

    private final DynamoDbClient dynamo = DynamoDbClient.create();
    private static final String AGGREGATES_TABLE = "customer-aggregates";

    public void updateAggregate(String customerId, String orderTotal, String orderDate) {
        dynamo.updateItem(UpdateItemRequest.builder()
            .tableName(AGGREGATES_TABLE)
            .key(Map.of(
                "pk", AttributeValue.builder().s("CUSTOMER#" + customerId).build(),
                "sk", AttributeValue.builder().s("AGGREGATE").build()
            ))
            .updateExpression("""
                SET order_count = if_not_exists(order_count, :zero) + :one,
                    total_spend = if_not_exists(total_spend, :zero) + :amount,
                    last_order_date = :now
            """)
            .expressionAttributeValues(Map.of(
                ":zero", AttributeValue.builder().n("0").build(),
                ":one", AttributeValue.builder().n("1").build(),
                ":amount", AttributeValue.builder().n(orderTotal).build(),
                ":now", AttributeValue.builder().s(orderDate).build()
            ))
            .build());
    }
}

Handling Duplicates and Ordering

DynamoDB Streams provides at-least-once delivery. Your Lambda may receive the same record more than once (during retries, shard splits, or Lambda scaling). Design for idempotency:

import hashlib
import time

def idempotent_stream_handler(event, context):
    """
    Use a deduplication table to ensure exactly-once processing semantics.
    """
    dynamodb = boto3.resource('dynamodb')
    dedup_table = dynamodb.Table('stream-deduplication')

    batch_item_failures = []

    for record in event['Records']:
        event_id = record['eventID']  # Unique per stream record

        # Check if already processed (with TTL for cleanup)
        try:
            dedup_table.put_item(
                Item={
                    'event_id': event_id,
                    'processed_at': int(time.time()),
                    'ttl': int(time.time()) + 86400  # Expire after 24h
                },
                ConditionExpression='attribute_not_exists(event_id)'
            )
        except dedup_table.meta.client.exceptions.ConditionalCheckFailedException:
            # Already processed — skip
            continue

        try:
            process_record(record)
        except Exception as e:
            # Remove dedup entry so retry will reprocess
            dedup_table.delete_item(Key={'event_id': event_id})
            batch_item_failures.append({'itemIdentifier': event_id})

    return {'batchItemFailures': batch_item_failures}

Event Source Mapping Configuration

The Lambda event source mapping controls batching, error handling, and parallelism:

import boto3

lambda_client = boto3.client('lambda')

# Production-grade event source mapping configuration
lambda_client.create_event_source_mapping(
    EventSourceArn='arn:aws:dynamodb:us-east-1:123456789012:table/app-table/stream/2024-01-15T00:00:00.000',
    FunctionName='stream-processor',
    StartingPosition='LATEST',  # or TRIM_HORIZON to process existing records
    BatchSize=100,  # Up to 10,000 records per batch
    MaximumBatchingWindowInSeconds=5,  # Wait up to 5s to fill batch
    ParallelizationFactor=10,  # Process up to 10 batches per shard concurrently
    MaximumRetryAttempts=3,  # Retry failed batches up to 3 times
    MaximumRecordAgeInSeconds=3600,  # Skip records older than 1 hour
    BisectBatchOnFunctionError=True,  # Split batch in half on error to isolate poison pill
    DestinationConfig={
        'OnFailure': {
            'Destination': 'arn:aws:sqs:us-east-1:123456789012:stream-dlq'
        }
    },
    FunctionResponseTypes=['ReportBatchItemFailures']  # Enable partial batch failure reporting
)

Critical settings explained:

  • BisectBatchOnFunctionError: When a batch fails, DynamoDB splits it and retries each half. This narrows down to the “poison pill” record without blocking the entire shard.
  • ReportBatchItemFailures: Your Lambda can return specific failed record IDs instead of failing the entire batch. Only failed records get retried.
  • ParallelizationFactor: Each shard can have up to 10 concurrent Lambda invocations processing different portions. Increases throughput but requires your processing to be safe for out-of-order execution within a shard.