Skip to main content
surviving the spike

Decoupling with Kafka: What to Make Async and What to Keep Synchronous

9 min read Chapter 26 of 66

Decoupling with Kafka: What to Make Async and What to Keep Synchronous

The Symptom

After the initial Kafka migration from CH9, the team got excited. Every new feature started with “let’s put it on Kafka.” Ride acceptance went async. Fare quotes went async. A rider requests a fare quote, the request publishes to a topic, a consumer calculates the fare, publishes the result to another topic, and a WebSocket pushes it to the rider. Round-trip: 1,200ms. The synchronous version took 40ms.

The fare quote endpoint hit 2% timeout rate during evening surge. Riders saw a loading spinner, tapped again, generated duplicate requests. The team had over-corrected. Everything was async. The pendulum swung too far.

The Cause

The decision rule was missing. The team treated Kafka as a universal hammer. Async is not free. Every async boundary adds latency (producer serialization, broker persistence, consumer deserialization, consumer processing). For operations where the caller waits for the result, that added latency is pure waste.

The rule: if the rider is staring at a spinner waiting for this response, keep it synchronous. If the response is a side effect that no user waits for, make it async.

OperationUser Waiting?Sync or Async
Fare quoteYes, rider sees priceSync
Ride acceptanceYes, driver confirmsSync
Trip completion (fare)Yes, rider sees totalSync
Trip analytics aggregationNoAsync
Receipt emailNoAsync
Surge recalculationNo (affects next rider)Async
Driver rating updateNo (background)Async
Fraud detection scoringNo (post-trip)Async
Payment captureDepends (see below)Sync write, async retry

Payment capture is the edge case. The initial charge is synchronous because the rider needs confirmation. But payment retries (declined card, network timeout) go to Kafka for retry with exponential backoff.

The Baseline

The fare quote endpoint, after the team made it async via Kafka:

# load-tests/fare_quote_locustfile.py
from locust import HttpUser, task, between, LoadTestShape
import random

class FareQuoteUser(HttpUser):
    wait_time = between(0.1, 0.5)

    @task
    def request_fare_quote(self):
        self.client.post(
            "/api/fare/quote",
            json={
                "pickupLat": 40.7128 + random.uniform(-0.05, 0.05),
                "pickupLng": -74.0060 + random.uniform(-0.05, 0.05),
                "dropoffLat": 40.7580 + random.uniform(-0.05, 0.05),
                "dropoffLng": -73.9855 + random.uniform(-0.05, 0.05),
                "rideType": random.choice(["standard", "premium", "pool"])
            },
            name="/api/fare/quote"
        )


class FareQuoteShape(LoadTestShape):
    stages = [
        {"duration": 60,  "users": 200,  "spawn_rate": 20},
        {"duration": 120, "users": 500,  "spawn_rate": 30},
        {"duration": 180, "users": 1000, "spawn_rate": 50},
        {"duration": 240, "users": 2000, "spawn_rate": 50},
        {"duration": 300, "users": 3000, "spawn_rate": 50},
    ]

    def tick(self):
        run_time = self.get_run_time()
        for stage in self.stages:
            if run_time < stage["duration"]:
                return (stage["users"], stage["spawn_rate"])
        return None

Fare quote via Kafka round-trip (publish request, consume, compute, publish result, push via WebSocket):

UsersRPSp50 (ms)p99 (ms)Timeout Rate
2003804208900.1%
5008204801,0500.8%
10001,4005601,2002.1%
20002,1007801,8005.4%
30002,3001,1003,20011.2%

A fare quote should take 40ms. This takes 480ms at p50. The Kafka round-trip adds 440ms of overhead for zero benefit.

The Fix

Step 1: Revert Fare Quote to Synchronous

// SCALED: Fare quote is synchronous - the rider is waiting
@PostMapping("/api/fare/quote")
public ResponseEntity<FareQuote> getFareQuote(@RequestBody FareQuoteRequest request) {
    FareQuote quote = fareService.calculateFare(
        request.getPickup(),
        request.getDropoff(),
        request.getRideType()
    );
    return ResponseEntity.ok(quote);  // 40ms, no Kafka involved
}

Step 2: Build the RideCompletedEvent Producer

Trip completion is the core async use case. The fare is synchronous. Everything after is a Kafka event.

// SCALED: Event carries all data consumers need - no callbacks to the producer
public record RideCompletedEvent(
    String tripId,
    String riderId,
    String driverId,
    String pickupZoneId,
    String dropoffZoneId,
    BigDecimal fare,
    BigDecimal distance,
    Duration tripDuration,
    Instant completedAt,
    String rideType
) {}

The event is self-contained. Consumers do not call back to the trip service for additional data. Every field a consumer needs is in the event. This prevents the “event-driven spaghetti” pattern where consumers make synchronous calls to producers, creating circular dependencies.

// SCALED: Idempotent producer with exactly-once delivery guarantee
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = Map.of(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092",
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class,
            ProducerConfig.ACKS_CONFIG, "all",
            ProducerConfig.RETRIES_CONFIG, 3,
            ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true,
            ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5,
            ProducerConfig.LINGER_MS_CONFIG, 5,
            ProducerConfig.BATCH_SIZE_CONFIG, 16384,
            ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"
        );
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate(
            ProducerFactory<String, Object> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}

enable.idempotence=true assigns a producer ID and sequence number to each message. If the broker receives a duplicate (from a retry), it deduplicates silently. Combined with acks=all, the message is persisted on all in-sync replicas before the producer gets an acknowledgment. The send() call itself takes under 1ms because it buffers to a local batch (linger.ms=5).

// SCALED: Trip completion service - sync fare, async everything else
@Service
public class TripCompletionService {

    private final TripRepository tripRepository;
    private final FareCalculator fareCalculator;
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private static final Logger log = LoggerFactory.getLogger(TripCompletionService.class);

    @Transactional
    public TripReceipt complete(TripCompletionRequest request) {
        Trip trip = tripRepository.findById(request.getTripId())
            .orElseThrow(() -> new TripNotFoundException(request.getTripId()));

        BigDecimal finalFare = fareCalculator.finalize(trip, request);
        trip.complete(finalFare, request.getDistance(), request.getDuration());
        Trip saved = tripRepository.save(trip);

        // SCALED: Publish event after DB commit - async from here
        RideCompletedEvent event = new RideCompletedEvent(
            saved.getId(), saved.getRiderId(), saved.getDriverId(),
            saved.getPickupZoneId(), saved.getDropoffZoneId(),
            saved.getFare(), saved.getDistance(), saved.getDuration(),
            saved.getCompletedAt(), saved.getRideType()
        );

        kafkaTemplate.send("ride-completed", saved.getId(), event)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("Failed to publish RideCompletedEvent for trip {}",
                        saved.getId(), ex);
                    // Event will be retried by the outbox poller (CH14)
                }
            });

        return saved.toReceipt();
    }
}

Step 3: Kafka Consumers with Spring @KafkaListener

// SCALED: Analytics consumer - separate consumer group, independent scaling
@Component
public class TripAnalyticsConsumer {

    private final AnalyticsAggregator aggregator;
    private final MeterRegistry meterRegistry;

    @KafkaListener(
        topics = "ride-completed",
        groupId = "trip-analytics",
        containerFactory = "analyticsListenerFactory"
    )
    public void onRideCompleted(
            @Payload RideCompletedEvent event,
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset) {

        meterRegistry.counter("kafka.consumer.events",
            "topic", "ride-completed", "group", "trip-analytics").increment();

        aggregator.recordTrip(
            event.pickupZoneId(),
            event.fare(),
            event.distance(),
            event.tripDuration()
        );
    }
}
// SCALED: Notification consumer - lower concurrency, email is not latency-sensitive
@Component
public class ReceiptNotificationConsumer {

    private final NotificationService notificationService;

    @KafkaListener(
        topics = "ride-completed",
        groupId = "trip-notifications",
        containerFactory = "notificationListenerFactory"
    )
    public void onRideCompleted(@Payload RideCompletedEvent event) {
        notificationService.sendTripReceipt(
            event.riderId(),
            event.tripId(),
            event.fare(),
            event.completedAt()
        );
    }
}
// SCALED: Consumer factory with error handling and concurrency
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, RideCompletedEvent>
            analyticsListenerFactory(ConsumerFactory<String, RideCompletedEvent> cf) {

        var factory = new ConcurrentKafkaListenerContainerFactory<String, RideCompletedEvent>();
        factory.setConsumerFactory(cf);
        factory.setConcurrency(3);
        factory.setCommonErrorHandler(new DefaultErrorHandler(
            new FixedBackOff(1000L, 3)  // 1s between retries, max 3 attempts
        ));
        factory.getContainerProperties().setAckMode(
            ContainerProperties.AckMode.RECORD
        );
        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, RideCompletedEvent>
            notificationListenerFactory(ConsumerFactory<String, RideCompletedEvent> cf) {

        var factory = new ConcurrentKafkaListenerContainerFactory<String, RideCompletedEvent>();
        factory.setConsumerFactory(cf);
        factory.setConcurrency(2);
        factory.setCommonErrorHandler(new DefaultErrorHandler(
            new FixedBackOff(2000L, 5)  // Notifications can retry more aggressively
        ));
        return factory;
    }
}

Step 4: Kubernetes Deployment for Consumers

Consumers run as a separate Deployment from the API pods. Scale them independently. The API needs low latency. Consumers need throughput.

# k8s/trip-analytics-consumer.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: trip-analytics-consumer
  labels:
    app: trip-analytics-consumer
spec:
  replicas: 3
  selector:
    matchLabels:
      app: trip-analytics-consumer
  template:
    metadata:
      labels:
        app: trip-analytics-consumer
    spec:
      containers:
        - name: consumer
          image: ridehail/trip-analytics-consumer:latest
          resources:
            requests:
              cpu: "500m"
              memory: "512Mi"
            limits:
              cpu: "1000m"
              memory: "1Gi"
          env:
            - name: SPRING_PROFILES_ACTIVE
              value: "consumer"
            - name: KAFKA_BOOTSTRAP_SERVERS
              value: "kafka-1:9092,kafka-2:9092,kafka-3:9092"
          livenessProbe:
            httpGet:
              path: /actuator/health/liveness
              port: 8081
            initialDelaySeconds: 30
            periodSeconds: 10
          readinessProbe:
            httpGet:
              path: /actuator/health/readiness
              port: 8081
            initialDelaySeconds: 15
            periodSeconds: 5

The ride-completed topic has 12 partitions. With 3 replicas at concurrency 3, each replica runs 3 consumer threads, totaling 9 active consumers. Each consumer gets 1-2 partitions. Adding a 4th replica distributes the load further: 12 consumers for 12 partitions, one per partition. This is the maximum parallelism for this topic.

The Proof

Same Locust test from the baseline, now with synchronous fare quote restored and trip completion publishing to Kafka:

Fare Quote (Reverted to Synchronous)

UsersRPSp50 (ms)p99 (ms)Timeout Rate
20048028420%
5001,18030480%
10002,30032550%
20004,40035620%
30006,20038780%

p99 dropped from 1,200ms to 55ms at 1,000 users. Timeout rate went from 2.1% to 0%. Removing the Kafka round-trip removed 95% of the latency.

Trip Completion (Kafka-Decoupled)

UsersRPSp50 (ms)p99 (ms)Error RateTomcat Threads Active
5001,40012280%22
10002,80014320%38
20005,40015450%62
30007,80016580%84
40009,60018850%108
MetricSync (All Inline)Kafka-DecoupledImprovement
Trip completion p99 at 2000450ms45ms10x
Trip completion p99 at 40001,200ms85ms14x
Fare quote p99 at 10001,200ms (async)55ms (sync)22x
Max RPS before errors3,2009,600+3x
Error rate at 4000 users12.4%0%Eliminated

The fare quote got faster by removing Kafka. The trip completion got faster by adding Kafka. The difference: one operation has a user waiting for the response. The other does not. That is the entire decision framework.