Idempotency and Exactly-Once Payment Processing
Idempotency and Exactly-Once Payment Processing
In payment systems, retries are inevitable. Networks fail, load balancers timeout, clients crash and restart. Without idempotency, a retry can charge a customer twice. With incorrect idempotency, a retry can return stale results while a new charge processes in the background.
The difference between “works most of the time” and “never double-charges” is the hardest engineering problem in payment gateway design.
Idempotency Key Design
An idempotency key is a client-provided identifier that the server uses to deduplicate requests. If the server sees the same key twice, it returns the cached result from the first request instead of processing again:
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Optional
import hashlib
import json
import uuid
import time
@dataclass
class IdempotencyRecord:
"""
Stored for each idempotency key.
Lifecycle:
1. Key received → record created with status "processing"
2. Request processed → record updated with response
3. Same key received again → cached response returned
4. Key expires after TTL (typically 24-48 hours)
The record MUST be created BEFORE processing begins.
Otherwise, a concurrent retry could start processing
before the first request completes → double charge.
"""
idempotency_key: str
merchant_id: str
request_fingerprint: str # Hash of the request body
status: str # "processing", "completed", "failed"
# Cached response (populated when processing completes)
response_code: int = 0
response_body: str = ""
# Timestamps
created_at: datetime = field(default_factory=datetime.utcnow)
completed_at: Optional[datetime] = None
expires_at: datetime = field(
default_factory=lambda: datetime.utcnow() + timedelta(hours=48)
)
# Lock management
locked_by: str = "" # Instance ID holding the processing lock
lock_expires_at: Optional[datetime] = None
class IdempotencyStore:
"""
Idempotency key storage with atomic check-and-create.
The critical invariant: for a given idempotency key, at most
ONE request is processing at any time. This requires atomic
operations — the check ("does this key exist?") and create
("insert a new record") must be a single atomic operation.
Implementation options:
1. Database with unique constraint + INSERT ... ON CONFLICT
2. Redis with SET NX (set if not exists) + TTL
3. DynamoDB with conditional put (attribute_not_exists)
All three provide the atomicity guarantee needed.
"""
def __init__(self):
self._records: dict[str, IdempotencyRecord] = {}
self._instance_id = str(uuid.uuid4())[:8]
def check_and_create(
self, key: str, merchant_id: str, request_body: dict
) -> tuple[str, Optional[IdempotencyRecord]]:
"""
Atomically check if a key exists and create it if not.
Returns:
- ("new", None): key didn't exist, record created, proceed with processing
- ("duplicate", record): key exists, return cached response
- ("conflict", None): key exists but request body differs → error
- ("processing", record): key exists but still processing → wait or error
In production, this is a single atomic database operation:
INSERT INTO idempotency_keys (key, merchant_id, fingerprint, status, ...)
VALUES ($1, $2, $3, 'processing', ...)
ON CONFLICT (key, merchant_id) DO NOTHING
RETURNING *;
If the INSERT succeeds → "new"
If it conflicts → SELECT the existing record and check status
"""
fingerprint = self._compute_fingerprint(request_body)
compound_key = f"{merchant_id}:{key}"
existing = self._records.get(compound_key)
if existing is None:
# New key — create record and proceed
record = IdempotencyRecord(
idempotency_key=key,
merchant_id=merchant_id,
request_fingerprint=fingerprint,
status="processing",
locked_by=self._instance_id,
lock_expires_at=datetime.utcnow() + timedelta(seconds=30),
)
self._records[compound_key] = record
return ("new", None)
# Key exists — check what to do
if existing.request_fingerprint != fingerprint:
# Same key but different request body → client error
return ("conflict", None)
if existing.status == "completed":
# Previous request completed → return cached response
return ("duplicate", existing)
if existing.status == "processing":
# Check if the processing lock has expired
if (existing.lock_expires_at and
datetime.utcnow() > existing.lock_expires_at):
# Lock expired — previous processor likely crashed
# Take over processing
existing.locked_by = self._instance_id
existing.lock_expires_at = (
datetime.utcnow() + timedelta(seconds=30)
)
return ("new", None) # Re-process
# Still processing — tell client to wait
return ("processing", existing)
return ("new", None)
def complete(
self, key: str, merchant_id: str,
response_code: int, response_body: str
):
"""Mark the idempotency record as completed with the response."""
compound_key = f"{merchant_id}:{key}"
record = self._records.get(compound_key)
if record and record.locked_by == self._instance_id:
record.status = "completed"
record.response_code = response_code
record.response_body = response_body
record.completed_at = datetime.utcnow()
record.locked_by = ""
def fail(self, key: str, merchant_id: str):
"""
Mark the idempotency record as failed.
A failed record allows the client to retry with the same
key — the next attempt will re-process instead of returning
the cached failure.
"""
compound_key = f"{merchant_id}:{key}"
record = self._records.get(compound_key)
if record and record.locked_by == self._instance_id:
record.status = "failed"
record.locked_by = ""
@staticmethod
def _compute_fingerprint(request_body: dict) -> str:
"""
Compute a deterministic hash of the request body.
The fingerprint detects when a client reuses an idempotency
key with different request parameters — which is a client
error, not a retry.
Uses canonical JSON serialization (sorted keys, no whitespace)
for deterministic hashing.
"""
canonical = json.dumps(request_body, sort_keys=True, separators=(',', ':'))
return hashlib.sha256(canonical.encode()).hexdigest()
Request Processing with Idempotency
class IdempotentPaymentHandler:
"""
Wraps payment processing with idempotency guarantees.
"""
def __init__(
self, idempotency_store: IdempotencyStore,
payment_engine: 'PaymentProcessingEngine',
):
self._idempotency = idempotency_store
self._engine = payment_engine
def handle_payment_request(
self, idempotency_key: str, merchant_id: str,
request: dict
) -> tuple[int, dict]:
"""
Process a payment request with idempotency.
Returns (status_code, response_body).
"""
# Step 1: Check idempotency
result, record = self._idempotency.check_and_create(
idempotency_key, merchant_id, request
)
if result == "duplicate":
# Return cached response
return (
record.response_code,
json.loads(record.response_body)
)
if result == "conflict":
return (422, {
"error": "idempotency_key_conflict",
"message": (
"This idempotency key was already used with "
"different request parameters"
),
})
if result == "processing":
return (409, {
"error": "request_in_progress",
"message": (
"A request with this idempotency key is currently "
"being processed. Retry after 5 seconds."
),
"retry_after": 5,
})
# Step 2: Process the payment
try:
payment = self._create_and_process_payment(
merchant_id, request
)
response = {
"payment_id": payment.payment_id,
"status": payment.status.value,
"amount": str(payment.amount),
"currency": payment.currency,
}
status_code = 200
# Step 3: Cache the response
self._idempotency.complete(
idempotency_key, merchant_id,
status_code, json.dumps(response)
)
return (status_code, response)
except Exception as e:
# Mark idempotency record as failed (allows retry)
self._idempotency.fail(idempotency_key, merchant_id)
raise
def _create_and_process_payment(
self, merchant_id: str, request: dict
) -> 'Payment':
"""Create a payment and process it."""
payment = Payment(
payment_id=str(uuid.uuid4()),
merchant_id=merchant_id,
idempotency_key=request.get("idempotency_key", ""),
amount=Decimal(request["amount"]),
currency=request["currency"],
payment_method_type=request.get("payment_method", {}).get("type", ""),
payment_method_token=request.get("payment_method", {}).get("token", ""),
)
return self._engine.authorize(
payment,
card_brand=request.get("card_brand", "visa"),
card_country=request.get("card_country", "US"),
)
The Outbox Pattern
The hardest consistency problem in payment systems: you need to update the payment state in your database AND publish an event to a message broker, atomically. If either fails independently, the system is inconsistent.
class OutboxPublisher:
"""
The Outbox Pattern: atomic state updates + event publishing.
Problem:
1. Save payment to database
2. Publish event to Kafka/webhook
If step 1 succeeds but step 2 fails → lost event
If step 2 succeeds but step 1 fails → phantom event
Solution: write the event to an "outbox" table in the SAME
database transaction as the payment update. A separate
publisher process reads the outbox and publishes events.
Transaction:
BEGIN;
UPDATE payments SET status = 'captured' WHERE id = $1;
INSERT INTO outbox (event_type, payload) VALUES ('payment.captured', $2);
COMMIT;
The outbox reader:
1. SELECT * FROM outbox WHERE published = false LIMIT 100;
2. Publish each event to Kafka/webhook
3. UPDATE outbox SET published = true WHERE id IN ($ids);
If the reader crashes after publishing but before marking as
published → events are re-published on restart. Consumers
must be idempotent (they should be anyway).
"""
def __init__(self):
self._outbox: list[dict] = []
self._published: set[str] = set()
def write_to_outbox(
self, event_type: str, aggregate_id: str, payload: dict
) -> str:
"""
Write an event to the outbox.
This MUST be called within the same database transaction
as the state change it represents. In SQLAlchemy:
with session.begin():
session.execute(update_payment_sql)
outbox.write_to_outbox("payment.captured", payment_id, payload)
# Both committed or both rolled back
"""
event_id = str(uuid.uuid4())
self._outbox.append({
"event_id": event_id,
"event_type": event_type,
"aggregate_id": aggregate_id,
"payload": payload,
"created_at": datetime.utcnow().isoformat(),
"published": False,
})
return event_id
def poll_and_publish(
self, publisher: 'EventPublisher', batch_size: int = 100
) -> int:
"""
Poll the outbox for unpublished events and publish them.
Called by a background worker on a schedule (e.g., every 100ms).
Returns the number of events published.
"""
# Get unpublished events
unpublished = [
e for e in self._outbox
if not e["published"] and e["event_id"] not in self._published
][:batch_size]
published_count = 0
for event in unpublished:
try:
publisher.publish(
topic=f"payments.{event['event_type']}",
key=event["aggregate_id"],
value=event["payload"],
)
event["published"] = True
self._published.add(event["event_id"])
published_count += 1
except Exception:
# Will retry on next poll
break
return published_count
class EventPublisher:
"""Abstract event publisher interface."""
def publish(self, topic: str, key: str, value: dict):
"""Publish an event to the message broker."""
pass
Distributed Locking for Payment Operations
Some payment operations (capture, refund) must be serialized per payment — two concurrent capture requests for the same authorization must not both succeed if the remaining amount only supports one:
class DistributedLock:
"""
Distributed lock for serializing operations on a payment.
Implementation options:
1. Database advisory locks (PostgreSQL: pg_advisory_lock)
2. Redis: SET key value NX EX 30 (atomic set-if-not-exists with TTL)
3. ZooKeeper / etcd: ephemeral nodes with session tracking
Critical requirement: the lock MUST have a TTL. Without a TTL,
a crashed process holds the lock forever, blocking all operations
on that payment.
The TTL introduces a risk: if processing takes longer than the
TTL, the lock expires and another process can acquire it. Use
fencing tokens (monotonically increasing sequence numbers) to
detect and reject stale operations.
"""
def __init__(self):
self._locks: dict[str, dict] = {}
def acquire(
self, resource: str, holder: str, ttl_seconds: int = 30
) -> tuple[bool, int]:
"""
Acquire a lock on a resource.
Returns (acquired, fencing_token).
The fencing token is a monotonically increasing number.
Operations must include the fencing token, and the storage
layer must reject writes with a token lower than the
current maximum.
"""
existing = self._locks.get(resource)
if existing:
# Check if lock has expired
if time.time() < existing["expires_at"]:
return (False, 0) # Lock held by another process
# Lock expired — safe to acquire
fencing_token = int(time.time() * 1000) # Simplified
self._locks[resource] = {
"holder": holder,
"fencing_token": fencing_token,
"expires_at": time.time() + ttl_seconds,
}
return (True, fencing_token)
def release(self, resource: str, holder: str) -> bool:
"""Release a lock. Only the holder can release."""
existing = self._locks.get(resource)
if not existing or existing["holder"] != holder:
return False
del self._locks[resource]
return True
def extend(
self, resource: str, holder: str, ttl_seconds: int = 30
) -> bool:
"""
Extend the lock TTL.
Used by long-running operations to prevent lock expiration.
A background thread extends the lock every TTL/3 seconds.
"""
existing = self._locks.get(resource)
if not existing or existing["holder"] != holder:
return False
existing["expires_at"] = time.time() + ttl_seconds
return True
Summary: The Idempotency Contract
| Scenario | Client Behavior | Server Behavior | Outcome |
|---|---|---|---|
| First request | Send with idempotency key | Process, cache result | Success |
| Retry (previous succeeded) | Same key, same body | Return cached result | Same response, no duplicate |
| Retry (previous failed) | Same key, same body | Re-process | New attempt |
| Retry (previous in-progress) | Same key, same body | Return 409 | Client waits and retries |
| Same key, different body | Same key, different body | Return 422 | Client error |
| No idempotency key | Omit header | Process without caching | No dedup protection |
The idempotency key is a contract between client and server: “I promise this key uniquely identifies this payment intent. If you see it again, it’s a retry, not a new payment.” When both sides honor this contract, double-charges become impossible regardless of network conditions, crashes, or retry storms.