Exactly-Once Semantics and Kafka Transactions
Exactly-Once Semantics and Kafka Transactions
At-least-once delivery means a fraud check might run twice for the same payment. For a read-only scoring operation, this is harmless: the same input produces the same score. But when the fraud check result triggers a downstream action (blocking a card, notifying compliance), duplicate processing has real consequences. Two card-block requests for the same transaction, or two compliance reports, create operational confusion.
Idempotent Producers
// PRODUCTION - Idempotent producer configuration
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, PaymentEvent> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka:9092");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Idempotence ensures that retried producer sends
// (due to network errors) do not create duplicate messages.
// Kafka assigns a sequence number to each message from this
// producer. If a duplicate sequence number arrives at the broker,
// the broker discards it.
config.put(ProducerConfig.ACKS_CONFIG, "all");
// Required for idempotence. All replicas must acknowledge.
config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
// Safe with idempotence: retries do not create duplicates.
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
5);
// Up to 5 with idempotence (Kafka 2.0+). Ordering is preserved
// by the sequence number mechanism.
return new DefaultKafkaProducerFactory<>(config);
}
}
Idempotent producers prevent duplicate messages at the producer level. The broker tracks the producer’s sequence numbers and deduplicates. This handles the case where the producer sends a message, does not receive an acknowledgment (network timeout), and retries the send. Without idempotence, this retry creates a duplicate message in the topic.
Consume-Transform-Produce Transactions
The fraud check consumer reads from payment-events, calls the fraud service, and publishes the result to fraud-results. This is a consume-transform-produce pattern. Without transactions, a crash between publishing the result and committing the consumer offset causes the message to be reprocessed, and a second fraud result is published.
// PRODUCTION - Transactional consume-transform-produce
@Configuration
public class KafkaTransactionalConfig {
@Bean
public KafkaTransactionManager<String, PaymentEvent> kafkaTransactionManager(
ProducerFactory<String, PaymentEvent> pf) {
return new KafkaTransactionManager<>(pf);
}
}
@Component
public class TransactionalFraudConsumer {
private final KafkaTemplate<String, FraudResult> resultTemplate;
private final FraudDetectionClient fraudClient;
@KafkaListener(topics = "payment-events",
groupId = "fraud-transactional")
@Transactional("kafkaTransactionManager")
public void onPaymentEvent(
ConsumerRecord<String, PaymentEvent> record) {
PaymentEvent event = record.value();
FraudScore score = fraudClient.score(event);
FraudResult result = new FraudResult(
event.paymentId(), score, Instant.now());
// This send and the consumer offset commit happen atomically.
// Either both succeed or neither does.
resultTemplate.send("fraud-results",
event.paymentId(), result);
}
}
The @Transactional annotation wraps the entire method in a Kafka transaction. The transaction includes: (1) the consumer offset commit for the input record, (2) the producer send for the output record. If the consumer crashes after calling fraudClient.score() but before the transaction commits, neither the offset nor the result is committed. On restart, the consumer reprocesses the message and publishes the result again, but this time the transaction commits successfully. No duplicate results in the output topic.
Consumer-Side Deduplication
Kafka transactions provide exactly-once between Kafka topics. But the fraud detection call is a side effect outside Kafka’s transactional boundary. If the consumer crashes after calling fraud detection but before the transaction commits, the fraud detection call has already been made. On reprocessing, fraud detection is called again.
For idempotent operations (fraud scoring), this is acceptable. For non-idempotent operations (card blocking), consumer-side deduplication is needed:
// PRODUCTION - Deduplication with processed event tracking
@Component
public class DeduplicatingFraudConsumer {
private final ProcessedEventStore processedEvents;
private final FraudDetectionClient fraudClient;
private final KafkaTemplate<String, FraudResult> resultTemplate;
@KafkaListener(topics = "payment-events",
groupId = "fraud-dedup")
@Transactional("kafkaTransactionManager")
public void onPaymentEvent(
ConsumerRecord<String, PaymentEvent> record) {
String eventId = record.key(); // paymentId
if (processedEvents.alreadyProcessed(eventId)) {
// Skip: already processed in a previous attempt
return;
}
PaymentEvent event = record.value();
FraudScore score = fraudClient.score(event);
FraudResult result = new FraudResult(
event.paymentId(), score, Instant.now());
resultTemplate.send("fraud-results",
event.paymentId(), result);
// Mark as processed WITHIN the transaction.
// If the transaction rolls back, this mark is also rolled back
// (if using a transactional database).
processedEvents.markProcessed(eventId);
}
}
The ProcessedEventStore can be a Redis set with TTL, a database table, or a Bloom filter (for approximate deduplication with lower storage cost). The TTL should exceed the maximum retry delay to ensure deduplication covers the entire retry window.
The tradeoff: deduplication adds a lookup per message (Redis round-trip or database query). For the fraud check pipeline processing 200 events per second, a Redis lookup adding 1ms per event is negligible. For a high-throughput pipeline processing 100,000 events per second, the deduplication store becomes a bottleneck and a Bloom filter or local cache is more appropriate.