Skip to main content
digital payment systems cryptography banking protocols and blockchain internals

Idempotency and Exactly-Once Payment Processing

10 min read Chapter 20 of 21

Idempotency and Exactly-Once Payment Processing

In payment systems, retries are inevitable. Networks fail, load balancers timeout, clients crash and restart. Without idempotency, a retry can charge a customer twice. With incorrect idempotency, a retry can return stale results while a new charge processes in the background.

The difference between “works most of the time” and “never double-charges” is the hardest engineering problem in payment gateway design.

Idempotency Key Design

An idempotency key is a client-provided identifier that the server uses to deduplicate requests. If the server sees the same key twice, it returns the cached result from the first request instead of processing again:

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Optional
import hashlib
import json
import uuid
import time

@dataclass
class IdempotencyRecord:
    """
    Stored for each idempotency key.
    
    Lifecycle:
    1. Key received → record created with status "processing"
    2. Request processed → record updated with response
    3. Same key received again → cached response returned
    4. Key expires after TTL (typically 24-48 hours)
    
    The record MUST be created BEFORE processing begins.
    Otherwise, a concurrent retry could start processing
    before the first request completes → double charge.
    """
    idempotency_key: str
    merchant_id: str
    request_fingerprint: str  # Hash of the request body
    status: str              # "processing", "completed", "failed"
    
    # Cached response (populated when processing completes)
    response_code: int = 0
    response_body: str = ""
    
    # Timestamps
    created_at: datetime = field(default_factory=datetime.utcnow)
    completed_at: Optional[datetime] = None
    expires_at: datetime = field(
        default_factory=lambda: datetime.utcnow() + timedelta(hours=48)
    )
    
    # Lock management
    locked_by: str = ""       # Instance ID holding the processing lock
    lock_expires_at: Optional[datetime] = None

class IdempotencyStore:
    """
    Idempotency key storage with atomic check-and-create.
    
    The critical invariant: for a given idempotency key, at most
    ONE request is processing at any time. This requires atomic
    operations — the check ("does this key exist?") and create
    ("insert a new record") must be a single atomic operation.
    
    Implementation options:
    1. Database with unique constraint + INSERT ... ON CONFLICT
    2. Redis with SET NX (set if not exists) + TTL
    3. DynamoDB with conditional put (attribute_not_exists)
    
    All three provide the atomicity guarantee needed.
    """
    
    def __init__(self):
        self._records: dict[str, IdempotencyRecord] = {}
        self._instance_id = str(uuid.uuid4())[:8]
    
    def check_and_create(
        self, key: str, merchant_id: str, request_body: dict
    ) -> tuple[str, Optional[IdempotencyRecord]]:
        """
        Atomically check if a key exists and create it if not.
        
        Returns:
        - ("new", None): key didn't exist, record created, proceed with processing
        - ("duplicate", record): key exists, return cached response
        - ("conflict", None): key exists but request body differs → error
        - ("processing", record): key exists but still processing → wait or error
        
        In production, this is a single atomic database operation:
        
        INSERT INTO idempotency_keys (key, merchant_id, fingerprint, status, ...)
        VALUES ($1, $2, $3, 'processing', ...)
        ON CONFLICT (key, merchant_id) DO NOTHING
        RETURNING *;
        
        If the INSERT succeeds → "new"
        If it conflicts → SELECT the existing record and check status
        """
        fingerprint = self._compute_fingerprint(request_body)
        compound_key = f"{merchant_id}:{key}"
        
        existing = self._records.get(compound_key)
        
        if existing is None:
            # New key — create record and proceed
            record = IdempotencyRecord(
                idempotency_key=key,
                merchant_id=merchant_id,
                request_fingerprint=fingerprint,
                status="processing",
                locked_by=self._instance_id,
                lock_expires_at=datetime.utcnow() + timedelta(seconds=30),
            )
            self._records[compound_key] = record
            return ("new", None)
        
        # Key exists — check what to do
        if existing.request_fingerprint != fingerprint:
            # Same key but different request body → client error
            return ("conflict", None)
        
        if existing.status == "completed":
            # Previous request completed → return cached response
            return ("duplicate", existing)
        
        if existing.status == "processing":
            # Check if the processing lock has expired
            if (existing.lock_expires_at and 
                datetime.utcnow() > existing.lock_expires_at):
                # Lock expired — previous processor likely crashed
                # Take over processing
                existing.locked_by = self._instance_id
                existing.lock_expires_at = (
                    datetime.utcnow() + timedelta(seconds=30)
                )
                return ("new", None)  # Re-process
            
            # Still processing — tell client to wait
            return ("processing", existing)
        
        return ("new", None)
    
    def complete(
        self, key: str, merchant_id: str,
        response_code: int, response_body: str
    ):
        """Mark the idempotency record as completed with the response."""
        compound_key = f"{merchant_id}:{key}"
        record = self._records.get(compound_key)
        
        if record and record.locked_by == self._instance_id:
            record.status = "completed"
            record.response_code = response_code
            record.response_body = response_body
            record.completed_at = datetime.utcnow()
            record.locked_by = ""
    
    def fail(self, key: str, merchant_id: str):
        """
        Mark the idempotency record as failed.
        
        A failed record allows the client to retry with the same
        key — the next attempt will re-process instead of returning
        the cached failure.
        """
        compound_key = f"{merchant_id}:{key}"
        record = self._records.get(compound_key)
        
        if record and record.locked_by == self._instance_id:
            record.status = "failed"
            record.locked_by = ""
    
    @staticmethod
    def _compute_fingerprint(request_body: dict) -> str:
        """
        Compute a deterministic hash of the request body.
        
        The fingerprint detects when a client reuses an idempotency
        key with different request parameters — which is a client
        error, not a retry.
        
        Uses canonical JSON serialization (sorted keys, no whitespace)
        for deterministic hashing.
        """
        canonical = json.dumps(request_body, sort_keys=True, separators=(',', ':'))
        return hashlib.sha256(canonical.encode()).hexdigest()

Request Processing with Idempotency

class IdempotentPaymentHandler:
    """
    Wraps payment processing with idempotency guarantees.
    """
    
    def __init__(
        self, idempotency_store: IdempotencyStore,
        payment_engine: 'PaymentProcessingEngine',
    ):
        self._idempotency = idempotency_store
        self._engine = payment_engine
    
    def handle_payment_request(
        self, idempotency_key: str, merchant_id: str,
        request: dict
    ) -> tuple[int, dict]:
        """
        Process a payment request with idempotency.
        
        Returns (status_code, response_body).
        """
        # Step 1: Check idempotency
        result, record = self._idempotency.check_and_create(
            idempotency_key, merchant_id, request
        )
        
        if result == "duplicate":
            # Return cached response
            return (
                record.response_code,
                json.loads(record.response_body)
            )
        
        if result == "conflict":
            return (422, {
                "error": "idempotency_key_conflict",
                "message": (
                    "This idempotency key was already used with "
                    "different request parameters"
                ),
            })
        
        if result == "processing":
            return (409, {
                "error": "request_in_progress",
                "message": (
                    "A request with this idempotency key is currently "
                    "being processed. Retry after 5 seconds."
                ),
                "retry_after": 5,
            })
        
        # Step 2: Process the payment
        try:
            payment = self._create_and_process_payment(
                merchant_id, request
            )
            
            response = {
                "payment_id": payment.payment_id,
                "status": payment.status.value,
                "amount": str(payment.amount),
                "currency": payment.currency,
            }
            status_code = 200
            
            # Step 3: Cache the response
            self._idempotency.complete(
                idempotency_key, merchant_id,
                status_code, json.dumps(response)
            )
            
            return (status_code, response)
            
        except Exception as e:
            # Mark idempotency record as failed (allows retry)
            self._idempotency.fail(idempotency_key, merchant_id)
            raise
    
    def _create_and_process_payment(
        self, merchant_id: str, request: dict
    ) -> 'Payment':
        """Create a payment and process it."""
        payment = Payment(
            payment_id=str(uuid.uuid4()),
            merchant_id=merchant_id,
            idempotency_key=request.get("idempotency_key", ""),
            amount=Decimal(request["amount"]),
            currency=request["currency"],
            payment_method_type=request.get("payment_method", {}).get("type", ""),
            payment_method_token=request.get("payment_method", {}).get("token", ""),
        )
        
        return self._engine.authorize(
            payment,
            card_brand=request.get("card_brand", "visa"),
            card_country=request.get("card_country", "US"),
        )

The Outbox Pattern

The hardest consistency problem in payment systems: you need to update the payment state in your database AND publish an event to a message broker, atomically. If either fails independently, the system is inconsistent.

class OutboxPublisher:
    """
    The Outbox Pattern: atomic state updates + event publishing.
    
    Problem:
    1. Save payment to database
    2. Publish event to Kafka/webhook
    
    If step 1 succeeds but step 2 fails → lost event
    If step 2 succeeds but step 1 fails → phantom event
    
    Solution: write the event to an "outbox" table in the SAME
    database transaction as the payment update. A separate
    publisher process reads the outbox and publishes events.
    
    Transaction:
      BEGIN;
        UPDATE payments SET status = 'captured' WHERE id = $1;
        INSERT INTO outbox (event_type, payload) VALUES ('payment.captured', $2);
      COMMIT;
    
    The outbox reader:
      1. SELECT * FROM outbox WHERE published = false LIMIT 100;
      2. Publish each event to Kafka/webhook
      3. UPDATE outbox SET published = true WHERE id IN ($ids);
    
    If the reader crashes after publishing but before marking as
    published → events are re-published on restart. Consumers
    must be idempotent (they should be anyway).
    """
    
    def __init__(self):
        self._outbox: list[dict] = []
        self._published: set[str] = set()
    
    def write_to_outbox(
        self, event_type: str, aggregate_id: str, payload: dict
    ) -> str:
        """
        Write an event to the outbox.
        
        This MUST be called within the same database transaction
        as the state change it represents. In SQLAlchemy:
        
        with session.begin():
            session.execute(update_payment_sql)
            outbox.write_to_outbox("payment.captured", payment_id, payload)
            # Both committed or both rolled back
        """
        event_id = str(uuid.uuid4())
        
        self._outbox.append({
            "event_id": event_id,
            "event_type": event_type,
            "aggregate_id": aggregate_id,
            "payload": payload,
            "created_at": datetime.utcnow().isoformat(),
            "published": False,
        })
        
        return event_id
    
    def poll_and_publish(
        self, publisher: 'EventPublisher', batch_size: int = 100
    ) -> int:
        """
        Poll the outbox for unpublished events and publish them.
        
        Called by a background worker on a schedule (e.g., every 100ms).
        
        Returns the number of events published.
        """
        # Get unpublished events
        unpublished = [
            e for e in self._outbox
            if not e["published"] and e["event_id"] not in self._published
        ][:batch_size]
        
        published_count = 0
        for event in unpublished:
            try:
                publisher.publish(
                    topic=f"payments.{event['event_type']}",
                    key=event["aggregate_id"],
                    value=event["payload"],
                )
                event["published"] = True
                self._published.add(event["event_id"])
                published_count += 1
            except Exception:
                # Will retry on next poll
                break
        
        return published_count

class EventPublisher:
    """Abstract event publisher interface."""
    
    def publish(self, topic: str, key: str, value: dict):
        """Publish an event to the message broker."""
        pass

Distributed Locking for Payment Operations

Some payment operations (capture, refund) must be serialized per payment — two concurrent capture requests for the same authorization must not both succeed if the remaining amount only supports one:

class DistributedLock:
    """
    Distributed lock for serializing operations on a payment.
    
    Implementation options:
    1. Database advisory locks (PostgreSQL: pg_advisory_lock)
    2. Redis: SET key value NX EX 30 (atomic set-if-not-exists with TTL)
    3. ZooKeeper / etcd: ephemeral nodes with session tracking
    
    Critical requirement: the lock MUST have a TTL. Without a TTL,
    a crashed process holds the lock forever, blocking all operations
    on that payment.
    
    The TTL introduces a risk: if processing takes longer than the
    TTL, the lock expires and another process can acquire it. Use
    fencing tokens (monotonically increasing sequence numbers) to
    detect and reject stale operations.
    """
    
    def __init__(self):
        self._locks: dict[str, dict] = {}
    
    def acquire(
        self, resource: str, holder: str, ttl_seconds: int = 30
    ) -> tuple[bool, int]:
        """
        Acquire a lock on a resource.
        
        Returns (acquired, fencing_token).
        
        The fencing token is a monotonically increasing number.
        Operations must include the fencing token, and the storage
        layer must reject writes with a token lower than the
        current maximum.
        """
        existing = self._locks.get(resource)
        
        if existing:
            # Check if lock has expired
            if time.time() < existing["expires_at"]:
                return (False, 0)  # Lock held by another process
            # Lock expired — safe to acquire
        
        fencing_token = int(time.time() * 1000)  # Simplified
        
        self._locks[resource] = {
            "holder": holder,
            "fencing_token": fencing_token,
            "expires_at": time.time() + ttl_seconds,
        }
        
        return (True, fencing_token)
    
    def release(self, resource: str, holder: str) -> bool:
        """Release a lock. Only the holder can release."""
        existing = self._locks.get(resource)
        
        if not existing or existing["holder"] != holder:
            return False
        
        del self._locks[resource]
        return True
    
    def extend(
        self, resource: str, holder: str, ttl_seconds: int = 30
    ) -> bool:
        """
        Extend the lock TTL.
        
        Used by long-running operations to prevent lock expiration.
        A background thread extends the lock every TTL/3 seconds.
        """
        existing = self._locks.get(resource)
        
        if not existing or existing["holder"] != holder:
            return False
        
        existing["expires_at"] = time.time() + ttl_seconds
        return True

Summary: The Idempotency Contract

ScenarioClient BehaviorServer BehaviorOutcome
First requestSend with idempotency keyProcess, cache resultSuccess
Retry (previous succeeded)Same key, same bodyReturn cached resultSame response, no duplicate
Retry (previous failed)Same key, same bodyRe-processNew attempt
Retry (previous in-progress)Same key, same bodyReturn 409Client waits and retries
Same key, different bodySame key, different bodyReturn 422Client error
No idempotency keyOmit headerProcess without cachingNo dedup protection

The idempotency key is a contract between client and server: “I promise this key uniquely identifies this payment intent. If you see it again, it’s a retry, not a new payment.” When both sides honor this contract, double-charges become impossible regardless of network conditions, crashes, or retry storms.