DynamoDB Streams, Change Data Capture, and Materialized Views
DynamoDB Streams, Change Data Capture, and Materialized Views
DynamoDB Streams captures a time-ordered sequence of item-level modifications to a table. Every PutItem, UpdateItem, and DeleteItem produces a stream record within milliseconds. This is your hook for building event-driven architectures on DynamoDB — materialized views, cross-region replication, audit logs, and real-time analytics.
Stream Mechanics
A DynamoDB Stream is partitioned into shards (similar to Kinesis). Each shard corresponds roughly to a table partition, and stream records within a shard are strictly ordered by modification time. Across shards, there’s no ordering guarantee.
Four stream view types control what data appears in records:
| View Type | Contents | Use Case |
|---|---|---|
KEYS_ONLY | Only the key attributes | Triggering downstream lookups |
NEW_IMAGE | Full item after modification | Building read replicas |
OLD_IMAGE | Full item before modification | Audit trails, undo operations |
NEW_AND_OLD_IMAGES | Both before and after | Computing deltas, validation |
import boto3
import json
from decimal import Decimal
# Lambda handler for DynamoDB Stream events
def stream_handler(event, context):
"""
Process DynamoDB Stream records.
This function is invoked by Lambda's event source mapping.
"""
for record in event['Records']:
event_name = record['eventName'] # INSERT, MODIFY, REMOVE
if event_name == 'INSERT':
new_image = record['dynamodb']['NewImage']
handle_new_item(deserialize(new_image))
elif event_name == 'MODIFY':
old_image = record['dynamodb'].get('OldImage', {})
new_image = record['dynamodb']['NewImage']
handle_modification(deserialize(old_image), deserialize(new_image))
elif event_name == 'REMOVE':
old_image = record['dynamodb']['OldImage']
handle_deletion(deserialize(old_image))
return {'statusCode': 200, 'batchItemFailures': []}
def deserialize(dynamodb_item: dict) -> dict:
"""Convert DynamoDB JSON format to regular Python dict."""
from boto3.dynamodb.types import TypeDeserializer
deserializer = TypeDeserializer()
return {k: deserializer.deserialize(v) for k, v in dynamodb_item.items()}
def handle_modification(old_item: dict, new_item: dict):
"""Example: Detect status transitions for order state machine."""
if old_item.get('entity_type') != 'Order':
return
old_status = old_item.get('status')
new_status = new_item.get('status')
if old_status != new_status:
print(f"Order {new_item['order_id']} transitioned: {old_status} → {new_status}")
# Trigger downstream: send notification, update analytics, etc.
publish_status_change(new_item['order_id'], old_status, new_status)
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import java.util.*;
public class StreamProcessor implements RequestHandler<DynamodbEvent, Map<String, Object>> {
@Override
public Map<String, Object> handleRequest(DynamodbEvent event, Context context) {
List<Map<String, Object>> batchItemFailures = new ArrayList<>();
for (DynamodbEvent.DynamodbEventRecord record : event.getRecords()) {
try {
processRecord(record);
} catch (Exception e) {
// Report individual item failure for partial batch retry
batchItemFailures.add(Map.of(
"itemIdentifier", record.getEventID()
));
context.getLogger().log("Failed: " + record.getEventID() + " - " + e.getMessage());
}
}
return Map.of("batchItemFailures", batchItemFailures);
}
private void processRecord(DynamodbEvent.DynamodbEventRecord record) {
String eventName = record.getEventName();
DynamodbStreamRecord streamRecord = record.getDynamodb();
switch (eventName) {
case "INSERT" -> handleInsert(streamRecord.getNewImage());
case "MODIFY" -> handleModify(streamRecord.getOldImage(), streamRecord.getNewImage());
case "REMOVE" -> handleRemove(streamRecord.getOldImage());
}
}
private void handleModify(
Map<String, com.amazonaws.services.lambda.runtime.events.models.dynamodb.AttributeValue> oldImage,
Map<String, com.amazonaws.services.lambda.runtime.events.models.dynamodb.AttributeValue> newImage) {
String entityType = newImage.get("entity_type").getS();
if (!"Order".equals(entityType)) return;
String oldStatus = oldImage.get("status").getS();
String newStatus = newImage.get("status").getS();
if (!oldStatus.equals(newStatus)) {
String orderId = newImage.get("order_id").getS();
System.out.printf("Order %s: %s → %s%n", orderId, oldStatus, newStatus);
publishStatusChange(orderId, oldStatus, newStatus);
}
}
}
Building Materialized Views from Streams
A materialized view in DynamoDB is a secondary representation of your data, maintained asynchronously through stream processing. Common examples: leaderboards, counts, denormalized aggregates.
# Materialized view: Real-time order count and total per customer
# Maintained by a Lambda processing the DynamoDB stream
def update_customer_aggregate(event, context):
"""
Maintain a running aggregate: total orders and spend per customer.
This is eventually consistent (stream latency: typically <100ms).
"""
dynamodb = boto3.resource('dynamodb')
aggregates_table = dynamodb.Table('customer-aggregates')
for record in event['Records']:
if record['eventName'] != 'INSERT':
continue
new_image = deserialize(record['dynamodb']['NewImage'])
if new_image.get('entity_type') != 'Order':
continue
customer_id = new_image['pk'].replace('USER#', '')
order_total = Decimal(str(new_image.get('total', 0)))
# Atomic increment — handles concurrent Lambda invocations safely
aggregates_table.update_item(
Key={
'pk': f'CUSTOMER#{customer_id}',
'sk': 'AGGREGATE'
},
UpdateExpression='''
SET order_count = if_not_exists(order_count, :zero) + :one,
total_spend = if_not_exists(total_spend, :zero) + :amount,
last_order_date = :now
''',
ExpressionAttributeValues={
':zero': Decimal('0'),
':one': Decimal('1'),
':amount': order_total,
':now': new_image.get('sk', '').split('#')[1] if '#' in new_image.get('sk', '') else ''
}
)
# The aggregate is now queryable in O(1) time:
# GET /customers/123/stats → Single GetItem on customer-aggregates table
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.*;
import java.util.Map;
public class MaterializedViewUpdater {
private final DynamoDbClient dynamo = DynamoDbClient.create();
private static final String AGGREGATES_TABLE = "customer-aggregates";
public void updateAggregate(String customerId, String orderTotal, String orderDate) {
dynamo.updateItem(UpdateItemRequest.builder()
.tableName(AGGREGATES_TABLE)
.key(Map.of(
"pk", AttributeValue.builder().s("CUSTOMER#" + customerId).build(),
"sk", AttributeValue.builder().s("AGGREGATE").build()
))
.updateExpression("""
SET order_count = if_not_exists(order_count, :zero) + :one,
total_spend = if_not_exists(total_spend, :zero) + :amount,
last_order_date = :now
""")
.expressionAttributeValues(Map.of(
":zero", AttributeValue.builder().n("0").build(),
":one", AttributeValue.builder().n("1").build(),
":amount", AttributeValue.builder().n(orderTotal).build(),
":now", AttributeValue.builder().s(orderDate).build()
))
.build());
}
}
Handling Duplicates and Ordering
DynamoDB Streams provides at-least-once delivery. Your Lambda may receive the same record more than once (during retries, shard splits, or Lambda scaling). Design for idempotency:
import hashlib
import time
def idempotent_stream_handler(event, context):
"""
Use a deduplication table to ensure exactly-once processing semantics.
"""
dynamodb = boto3.resource('dynamodb')
dedup_table = dynamodb.Table('stream-deduplication')
batch_item_failures = []
for record in event['Records']:
event_id = record['eventID'] # Unique per stream record
# Check if already processed (with TTL for cleanup)
try:
dedup_table.put_item(
Item={
'event_id': event_id,
'processed_at': int(time.time()),
'ttl': int(time.time()) + 86400 # Expire after 24h
},
ConditionExpression='attribute_not_exists(event_id)'
)
except dedup_table.meta.client.exceptions.ConditionalCheckFailedException:
# Already processed — skip
continue
try:
process_record(record)
except Exception as e:
# Remove dedup entry so retry will reprocess
dedup_table.delete_item(Key={'event_id': event_id})
batch_item_failures.append({'itemIdentifier': event_id})
return {'batchItemFailures': batch_item_failures}
Event Source Mapping Configuration
The Lambda event source mapping controls batching, error handling, and parallelism:
import boto3
lambda_client = boto3.client('lambda')
# Production-grade event source mapping configuration
lambda_client.create_event_source_mapping(
EventSourceArn='arn:aws:dynamodb:us-east-1:123456789012:table/app-table/stream/2024-01-15T00:00:00.000',
FunctionName='stream-processor',
StartingPosition='LATEST', # or TRIM_HORIZON to process existing records
BatchSize=100, # Up to 10,000 records per batch
MaximumBatchingWindowInSeconds=5, # Wait up to 5s to fill batch
ParallelizationFactor=10, # Process up to 10 batches per shard concurrently
MaximumRetryAttempts=3, # Retry failed batches up to 3 times
MaximumRecordAgeInSeconds=3600, # Skip records older than 1 hour
BisectBatchOnFunctionError=True, # Split batch in half on error to isolate poison pill
DestinationConfig={
'OnFailure': {
'Destination': 'arn:aws:sqs:us-east-1:123456789012:stream-dlq'
}
},
FunctionResponseTypes=['ReportBatchItemFailures'] # Enable partial batch failure reporting
)
Critical settings explained:
BisectBatchOnFunctionError: When a batch fails, DynamoDB splits it and retries each half. This narrows down to the “poison pill” record without blocking the entire shard.ReportBatchItemFailures: Your Lambda can return specific failed record IDs instead of failing the entire batch. Only failed records get retried.ParallelizationFactor: Each shard can have up to 10 concurrent Lambda invocations processing different portions. Increases throughput but requires your processing to be safe for out-of-order execution within a shard.