Skip to main content
ship it and sleep

Locust Scenarios for E-Commerce Flows

4 min read Chapter 50 of 66

Locust Scenarios for E-Commerce Flows

The Failure

The team’s Locust test hit the catalog API with 100 concurrent users. It passed the performance budget. In production, the service crashed under similar load because real users do not just browse the catalog. They browse → add to cart → checkout → pay. The checkout endpoint was 10x slower than the catalog endpoint, and the load test never exercised it proportionally.

Realistic load testing requires weighted task distribution that matches production traffic patterns.

The Mechanism

Traffic Distribution from Production

Analyze production access logs to determine real traffic ratios:

EndpointProduction ShareLocust Weight
Browse catalog50%@task(10)
View product25%@task(5)
Add to cart15%@task(3)
Checkout8%@task(1.5)
Payment2%@task(0.5)

Sequential vs Independent Tasks

Independent tasks: each task runs in isolation. User browses, then browses again. Sequential flows: user browses → adds to cart → checks out. State carries between steps.

E-commerce requires sequential flows. A checkout without items in the cart returns 400.

The Implementation

Sequential Flow with TaskSet

# tests/performance/flows.py
# HARDENED: Sequential e-commerce flow
from locust import HttpUser, SequentialTaskSet, task, between, tag
import random


class CheckoutFlow(SequentialTaskSet):
    """Models a complete purchase: browse → cart → checkout."""

    @task
    def browse_catalog(self):
        with self.client.get(
            "/api/catalog/products?page=1&limit=20",
            name="/api/catalog/products",
            catch_response=True
        ) as response:
            if response.status_code == 200:
                products = response.json().get("items", [])
                if products:
                    self.product_id = random.choice(products)["id"]
                    response.success()
                else:
                    response.failure("Empty catalog")
            else:
                response.failure(f"Status {response.status_code}")

    @task
    def view_product(self):
        if not hasattr(self, "product_id"):
            return
        self.client.get(
            f"/api/catalog/products/{self.product_id}",
            name="/api/catalog/products/[id]"
        )

    @task
    def add_to_cart(self):
        if not hasattr(self, "product_id"):
            return
        self.client.post("/api/cart/items", json={
            "productId": self.product_id,
            "quantity": 1
        })

    @task
    def checkout(self):
        with self.client.post(
            "/api/checkout",
            json={"paymentMethod": "test-card"},
            name="/api/checkout",
            catch_response=True
        ) as response:
            if response.status_code in (200, 201):
                response.success()
            elif response.status_code == 400:
                response.failure("Cart empty - flow broken")
            else:
                response.failure(f"Checkout failed: {response.status_code}")

        # Flow complete, restart
        self.interrupt()


class BrowseOnly(SequentialTaskSet):
    """Models a user who only browses."""

    @task
    def browse(self):
        self.client.get("/api/catalog/products?page=1&limit=20")

    @task
    def view_random(self):
        self.client.get(f"/api/catalog/products/SKU-{random.randint(1, 100)}")
        self.interrupt()


class EcommerceUser(HttpUser):
    wait_time = between(1, 5)
    tasks = {
        BrowseOnly: 7,       # 70% just browse
        CheckoutFlow: 3,     # 30% complete a purchase
    }

Custom Metrics

# tests/performance/custom_metrics.py
# HARDENED: Track business metrics alongside technical ones
from locust import events
import time


@events.request.add_listener
def on_request(request_type, name, response_time, response_length,
               response, exception, context, **kwargs):
    if name == "/api/checkout" and response and response.status_code == 201:
        # Track order completion as a custom metric
        events.request.fire(
            request_type="BUSINESS",
            name="order_completed",
            response_time=response_time,
            response_length=0,
            response=response,
            exception=None,
            context=context,
        )

Test Data Seeding

# tests/performance/conftest.py
# HARDENED: Seed test data before load test
import requests
import json


def seed_test_data(host):
    """Create test products and users before load test."""
    # Create test products
    for i in range(100):
        requests.post(f"{host}/api/admin/products", json={
            "id": f"SKU-{i+1}",
            "name": f"Test Product {i+1}",
            "price": round(random.uniform(10, 500), 2),
            "inventory": 10000,
        }, headers={"Authorization": "Bearer admin-token"})

    # Create test users
    for i in range(50):
        requests.post(f"{host}/api/admin/users", json={
            "email": f"loadtest-{i}@example.com",
            "password": "loadtest-password",
        }, headers={"Authorization": "Bearer admin-token"})

The Gate

Sequential flows validate the full user journey. If any step in the flow fails (cart empty at checkout, payment timeout), the flow is marked as failed. The error rate includes flow-level failures, not just HTTP errors.

The Recovery

Sequential flows have high failure rates: Usually caused by state leaking between flows. Ensure each flow starts with a clean cart. Add a cart-clear step at the beginning of CheckoutFlow.

Test data is exhausted: If products run out of inventory during the test, add-to-cart starts failing. Seed with high inventory counts (10000+) or use an infinite-inventory test mode.

Locust master/worker desync: When running distributed Locust, workers may not have the same test data seeds. Use a shared seed script that runs once before the load test starts.