Skip to main content

On This Page

How to Build a High-Performance Distributed Task Routing System Using Kombu

3 min read
Share

These articles are AI-generated summaries. Please check the original sources for full details.

How to Build a High-Performance Distributed Task Routing System Using Kombu

This tutorial demonstrates building a functional event-driven workflow with Kombu, treating messaging as a core architectural capability, and showcases a real distributed system. The example utilizes topic exchanges and concurrent workers to achieve high performance, demonstrating the power of production microservices.

Why This Matters

Traditional synchronous request/response models often struggle with scalability and resilience. Distributed task queues, like those built with Kombu, enable asynchronous processing, decoupling services and improving fault tolerance. Without robust message routing, complex systems can experience significant delays or failures, costing organizations time and resources – a single outage can impact thousands of users and lead to lost revenue.

Key Insights

  • Kombu installation: pip install kombu (2025)
  • Topic Exchanges: Allow for flexible message routing based on wildcard patterns, enabling selective consumption by multiple queues.
  • ConsumerMixin: Simplifies the creation of background workers in Kombu, facilitating concurrent message processing, as used by companies like Stripe and Coinbase with similar technologies.

Working Example

import threading
import time
import logging
import uuid
import datetime
import sys
from kombu import Connection, Exchange, Queue, Producer, Consumer
from kombu.mixins import ConsumerMixin

logging.basicConfig(
level=logging.INFO,
format='%(message)s',
handlers=[logging.StreamHandler(sys.stdout)],
force=True
)
logger = logging.getLogger(__name__)
BROKER_URL = "memory://localhost/"

media_exchange = Exchange('media_exchange', type='topic', durable=True)
task_queues = [
Queue('video_queue', media_exchange, routing_key='video.#'),
Queue('audit_queue', media_exchange, routing_key='#'),
]

class Worker(ConsumerMixin):
    def __init__(self, connection, queues):
        self.connection = connection
        self.queues = queues
        self.should_stop = False

    def get_consumers(self, Consumer, channel):
        return [
            Consumer(queues=self.queues,
                     callbacks=[self.on_message],
                     accept=['json'],
                     prefetch_count=1)
        ]

    def on_message(self, body, message):
        routing_key = message.delivery_info['routing_key']
        payload_id = body.get('id', 'unknown')
        logger.info(f"\n⚡ RECEIVED MSG via key: [{routing_key}]")
        logger.info(f" Payload ID: {payload_id}")
        try:
            if 'video' in routing_key:
                self.process_video(body)
            elif 'audit' in routing_key:
                logger.info(" 🔍 [Audit] Logging event...")
                message.ack()
                logger.info(f" ✅ ACKNOWLEDGED")
        except Exception as e:
            logger.error(f" ❌ ERROR: {e}")

    def process_video(self, body):
        logger.info(" ⚙️ [Processor] Transcoding video (Simulating work...)")
        time.sleep(0.5)

def publish_messages(connection):
    producer = Producer(connection)
    tasks = [
        ('video.upload', {'file': 'movie.mp4'}),
        ('user.login', {'user': 'admin'}),
    ]
    logger.info("\n🚀 PRODUCER: Starting to publish messages...")
    for r_key, data in tasks:
        data['id'] = str(uuid.uuid4())[:8]
        logger.info(f"📤 SENDING: {r_key} -> {data}")
        producer.publish(
            data,
            exchange=media_exchange,
            routing_key=r_key,
            serializer='json'
        )
        time.sleep(1.5)
    logger.info("🏁 PRODUCER: Done.")

def run_example():
    with Connection(BROKER_URL) as conn:
        worker = Worker(conn, task_queues)
        worker_thread = threading.Thread(target=worker.run)
        worker_thread.daemon = True
        worker_thread.start()
        logger.info("✅ SYSTEM: Worker thread started.")
        time.sleep(1)
        try:
            publish_messages(conn)
            time.sleep(2)
        except KeyboardInterrupt:
            pass
        finally:
            worker.should_stop = True
            logger.info("\n👋 SYSTEM: Execution complete.")

if __name__ == "__main__":
    run_example()

Practical Applications

  • Media Processing: A video platform uses Kombu to distribute video transcoding tasks to multiple worker nodes.
  • Pitfall: Failing to acknowledge messages after processing can lead to message redelivery and potential duplicate processing, causing data inconsistencies.

References:

Continue reading

Next article

Higgsfield Cinema Studio: AI Filmmaking with Real Camera Controls

Related Content