Lambda Performance Tuning and Cost Optimization
Lambda Performance Tuning and Cost Optimization
Lambda billing is per-millisecond of execution time multiplied by memory allocated. This creates a non-obvious optimization space: increasing memory might reduce duration enough to lower total cost. Decreasing memory saves per-ms cost but extends duration. The optimal point varies per function and must be measured empirically.
Power Tuning: Finding the Optimal Configuration
AWS Lambda Power Tuning is an open-source Step Functions state machine that invokes your function at different memory levels and reports the cost/performance tradeoff:
import boto3
import json
import time
from concurrent.futures import ThreadPoolExecutor
from statistics import median, mean
def power_tune(function_name: str, payload: dict, memory_levels: list = None,
invocations_per_level: int = 50):
"""
Manual power tuning: invoke at each memory level and measure.
For production use, deploy the official Power Tuning tool.
"""
if memory_levels is None:
memory_levels = [128, 256, 512, 1024, 1536, 2048, 3008]
lambda_client = boto3.client('lambda')
results = {}
for memory in memory_levels:
# Update function memory
lambda_client.update_function_configuration(
FunctionName=function_name,
MemorySize=memory
)
# Wait for update to propagate
waiter = lambda_client.get_waiter('function_updated_v2')
waiter.wait(FunctionName=function_name)
# Run cold start invocation (discard) to warm the new config
lambda_client.invoke(
FunctionName=function_name,
Payload=json.dumps(payload)
)
# Measure warm invocations
durations = []
for _ in range(invocations_per_level):
response = lambda_client.invoke(
FunctionName=function_name,
Payload=json.dumps(payload),
LogType='Tail'
)
# Extract billed duration from log
import base64
log = base64.b64decode(response['LogResult']).decode()
for line in log.split('\n'):
if 'Billed Duration' in line:
billed = int(line.split('Billed Duration: ')[1].split(' ms')[0])
durations.append(billed)
# Calculate cost per invocation at this memory level
# Price: $0.0000166667 per GB-second (x86)
cost_per_ms = (memory / 1024) * 0.0000000167
avg_duration = mean(durations)
p99_duration = sorted(durations)[int(len(durations) * 0.99)]
cost = cost_per_ms * avg_duration
results[memory] = {
'avg_duration_ms': round(avg_duration, 1),
'p99_duration_ms': p99_duration,
'cost_per_invocation': round(cost, 8),
'invocations_per_dollar': round(1 / cost) if cost > 0 else 0
}
print(f"{memory:>5} MB | avg: {avg_duration:>7.1f}ms | "
f"p99: {p99_duration:>5}ms | cost: ${cost:.8f}")
# Find optimal (lowest cost)
optimal = min(results.items(), key=lambda x: x[1]['cost_per_invocation'])
print(f"\nOptimal: {optimal[0]} MB (${optimal[1]['cost_per_invocation']:.8f}/invocation)")
return results
Connection Pooling in Serverless
Traditional connection pooling doesn’t work in Lambda because each execution environment is isolated. You need connection management strategies:
import os
import psycopg2
from psycopg2 import pool
import boto3
# Problem: Each Lambda instance opens its own connection
# 1000 concurrent Lambda instances = 1000 database connections
# Most databases cap at 100-500 connections
# Solution 1: RDS Proxy (managed connection pooling)
# RDS Proxy sits between Lambda and your database, multiplexing connections
# Configure in Lambda: DATABASE_HOST = rds-proxy-endpoint (not direct RDS endpoint)
# Solution 2: Connection reuse within execution environment
_connection = None
def get_connection():
"""Reuse connection across invocations in the same execution environment."""
global _connection
if _connection is None or _connection.closed:
_connection = psycopg2.connect(
host=os.environ['DB_HOST'],
dbname=os.environ['DB_NAME'],
user=os.environ['DB_USER'],
password=get_db_password(),
connect_timeout=5,
# Critical for Lambda: set statement timeout to avoid long-running queries
# blocking the function past its timeout
options='-c statement_timeout=25000' # 25 seconds
)
_connection.autocommit = True
return _connection
def handler(event, context):
conn = get_connection()
remaining_ms = context.get_remaining_time_in_millis()
# Ensure we don't start a query if we don't have time to finish it
if remaining_ms < 5000:
return {'statusCode': 503, 'body': 'Insufficient time remaining'}
with conn.cursor() as cur:
cur.execute("SELECT * FROM orders WHERE customer_id = %s", (event['customer_id'],))
results = cur.fetchall()
return {'statusCode': 200, 'body': json.dumps(results, default=str)}
import software.amazon.awssdk.services.rdsdata.RdsDataClient;
import software.amazon.awssdk.services.rdsdata.model.*;
import java.util.*;
// Solution 3: RDS Data API (HTTP-based, no connection management needed)
// Tradeoff: Higher per-query latency (~30ms overhead) but zero connection issues
public class RdsDataApiHandler {
// Data API client — no TCP connection to manage
private static final RdsDataClient rdsData = RdsDataClient.create();
private static final String CLUSTER_ARN = System.getenv("CLUSTER_ARN");
private static final String SECRET_ARN = System.getenv("SECRET_ARN");
private static final String DATABASE = System.getenv("DATABASE_NAME");
public List<Map<String, String>> getOrders(String customerId) {
ExecuteStatementResponse response = rdsData.executeStatement(
ExecuteStatementRequest.builder()
.resourceArn(CLUSTER_ARN)
.secretArn(SECRET_ARN)
.database(DATABASE)
.sql("SELECT order_id, status, total FROM orders WHERE customer_id = :id")
.parameters(SqlParameter.builder()
.name("id")
.value(Field.builder().stringValue(customerId).build())
.build())
.build());
// Parse columnar results
List<Map<String, String>> results = new ArrayList<>();
for (List<Field> row : response.records()) {
results.add(Map.of(
"order_id", row.get(0).stringValue(),
"status", row.get(1).stringValue(),
"total", row.get(2).stringValue()
));
}
return results;
}
}
ARM64 (Graviton2) Migration
Lambda supports ARM64 (Graviton2) at 20% lower cost with generally better performance for compute workloads. Migration checklist:
# Check if your dependencies have ARM64 wheels
# Most pure Python packages work without changes
# Compiled packages (numpy, pandas, psycopg2) need ARM64 binaries
import subprocess
import json
def check_arm64_compatibility(requirements_file: str):
"""Check which packages have ARM64 wheels available on PyPI."""
import requests
with open(requirements_file) as f:
packages = [line.strip().split('==')[0] for line in f if line.strip() and not line.startswith('#')]
for package in packages:
response = requests.get(f"https://pypi.org/pypi/{package}/json")
if response.status_code != 200:
print(f" ❓ {package}: not found on PyPI")
continue
data = response.json()
urls = data.get('urls', [])
has_arm = any('aarch64' in url.get('filename', '') or
'none-any' in url.get('filename', '')
for url in urls)
status = "✅" if has_arm else "❌"
print(f" {status} {package}: {'ARM64 available' if has_arm else 'x86_64 only'}")
# Deploying for ARM64:
lambda_client = boto3.client('lambda')
lambda_client.update_function_configuration(
FunctionName='my-function',
Architectures=['arm64'] # Change from default ['x86_64']
)
# Layers MUST also be compatible with arm64!
# Rebuild layers with: docker run --platform linux/arm64 ...
Batch Processing: Maximizing Throughput
For event-source mappings (SQS, Kinesis, DynamoDB Streams), batch configuration directly impacts throughput and cost:
# Lambda processes messages in batches — process the ENTIRE batch efficiently
def batch_handler(event, context):
"""
Processing 100 SQS messages per invocation instead of 1.
Cost: 1 invocation instead of 100.
"""
# Batch DynamoDB writes (25 items max per batch)
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('processed-events')
failed_message_ids = []
items_to_write = []
for record in event['Records']:
try:
body = json.loads(record['body'])
processed = transform_message(body)
items_to_write.append({
'PutRequest': {
'Item': processed
}
})
except Exception as e:
failed_message_ids.append(record['messageId'])
# Batch write (max 25 items per call)
for i in range(0, len(items_to_write), 25):
batch = items_to_write[i:i+25]
response = dynamodb.meta.client.batch_write_item(
RequestItems={'processed-events': batch}
)
# Handle unprocessed items (throttled)
unprocessed = response.get('UnprocessedItems', {})
while unprocessed:
time.sleep(0.1) # Brief backoff
response = dynamodb.meta.client.batch_write_item(
RequestItems=unprocessed
)
unprocessed = response.get('UnprocessedItems', {})
# Report partial failures — only failed messages get retried
return {
'batchItemFailures': [
{'itemIdentifier': msg_id} for msg_id in failed_message_ids
]
}
# Event source mapping config for maximum throughput:
# BatchSize: 100 (SQS max: 10,000 for standard, 10 for FIFO)
# MaximumBatchingWindowInSeconds: 5 (wait to fill batch)
# FunctionResponseTypes: ['ReportBatchItemFailures']
Cost optimization summary:
- Power-tune every function (5 minutes of effort, often 30-50% cost reduction)
- Use ARM64 for all new functions (20% cheaper, equal or better performance)
- Maximize batch sizes for event-source-driven functions
- Use Provisioned Concurrency only for latency-critical synchronous paths
- Set memory to 128 MB for I/O-bound functions that just call other services