How to Build a High-Performance Distributed Task Routing System Using Kombu
These articles are AI-generated summaries. Please check the original sources for full details.
How to Build a High-Performance Distributed Task Routing System Using Kombu
This tutorial demonstrates building a functional event-driven workflow with Kombu, treating messaging as a core architectural capability, and showcases a real distributed system. The example utilizes topic exchanges and concurrent workers to achieve high performance, demonstrating the power of production microservices.
Why This Matters
Traditional synchronous request/response models often struggle with scalability and resilience. Distributed task queues, like those built with Kombu, enable asynchronous processing, decoupling services and improving fault tolerance. Without robust message routing, complex systems can experience significant delays or failures, costing organizations time and resources – a single outage can impact thousands of users and lead to lost revenue.
Key Insights
- Kombu installation:
pip install kombu(2025) - Topic Exchanges: Allow for flexible message routing based on wildcard patterns, enabling selective consumption by multiple queues.
- ConsumerMixin: Simplifies the creation of background workers in Kombu, facilitating concurrent message processing, as used by companies like Stripe and Coinbase with similar technologies.
Working Example
import threading
import time
import logging
import uuid
import datetime
import sys
from kombu import Connection, Exchange, Queue, Producer, Consumer
from kombu.mixins import ConsumerMixin
logging.basicConfig(
level=logging.INFO,
format='%(message)s',
handlers=[logging.StreamHandler(sys.stdout)],
force=True
)
logger = logging.getLogger(__name__)
BROKER_URL = "memory://localhost/"
media_exchange = Exchange('media_exchange', type='topic', durable=True)
task_queues = [
Queue('video_queue', media_exchange, routing_key='video.#'),
Queue('audit_queue', media_exchange, routing_key='#'),
]
class Worker(ConsumerMixin):
def __init__(self, connection, queues):
self.connection = connection
self.queues = queues
self.should_stop = False
def get_consumers(self, Consumer, channel):
return [
Consumer(queues=self.queues,
callbacks=[self.on_message],
accept=['json'],
prefetch_count=1)
]
def on_message(self, body, message):
routing_key = message.delivery_info['routing_key']
payload_id = body.get('id', 'unknown')
logger.info(f"\n⚡ RECEIVED MSG via key: [{routing_key}]")
logger.info(f" Payload ID: {payload_id}")
try:
if 'video' in routing_key:
self.process_video(body)
elif 'audit' in routing_key:
logger.info(" 🔍 [Audit] Logging event...")
message.ack()
logger.info(f" ✅ ACKNOWLEDGED")
except Exception as e:
logger.error(f" ❌ ERROR: {e}")
def process_video(self, body):
logger.info(" ⚙️ [Processor] Transcoding video (Simulating work...)")
time.sleep(0.5)
def publish_messages(connection):
producer = Producer(connection)
tasks = [
('video.upload', {'file': 'movie.mp4'}),
('user.login', {'user': 'admin'}),
]
logger.info("\n🚀 PRODUCER: Starting to publish messages...")
for r_key, data in tasks:
data['id'] = str(uuid.uuid4())[:8]
logger.info(f"📤 SENDING: {r_key} -> {data}")
producer.publish(
data,
exchange=media_exchange,
routing_key=r_key,
serializer='json'
)
time.sleep(1.5)
logger.info("🏁 PRODUCER: Done.")
def run_example():
with Connection(BROKER_URL) as conn:
worker = Worker(conn, task_queues)
worker_thread = threading.Thread(target=worker.run)
worker_thread.daemon = True
worker_thread.start()
logger.info("✅ SYSTEM: Worker thread started.")
time.sleep(1)
try:
publish_messages(conn)
time.sleep(2)
except KeyboardInterrupt:
pass
finally:
worker.should_stop = True
logger.info("\n👋 SYSTEM: Execution complete.")
if __name__ == "__main__":
run_example()
Practical Applications
- Media Processing: A video platform uses Kombu to distribute video transcoding tasks to multiple worker nodes.
- Pitfall: Failing to acknowledge messages after processing can lead to message redelivery and potential duplicate processing, causing data inconsistencies.
References:
Continue reading
Next article
Higgsfield Cinema Studio: AI Filmmaking with Real Camera Controls
Related Content
How to Build a Fully Functional Custom GPT-style Conversational AI Locally Using Hugging Face Transformers
Build a local GPT-style AI with Hugging Face Transformers using Microsoft's Phi-3 model and Python code.
Building Interactive Web Apps with NiceGUI: A Technical Guide to Multi-Page Dashboards and Real-Time Systems
Learn to build a multi-page web application using NiceGUI featuring real-time dashboards, CRUD operations, and async chat functionality.
Building a Scalable AI Directory with Next.js and Tailwind CSS
Xiaomo Fan launched useaitools.me featuring 50+ AI tools across 6 categories using a modern Next.js 16 stack.