Skip to main content

On This Page

Building a Real-Time DDoS Detection Engine from Scratch with Python and Iptables

3 min read
Share

These articles are AI-generated summaries. Please check the original sources for full details.

How I Built a Real-Time DDoS Detection Engine from Scratch (No Fail2Ban, No Libraries)

Engineer Hezekiah Umoh developed a custom security daemon to protect web infrastructure from high-volume application-layer attacks. The system monitors live Nginx JSON logs and can identify and block malicious traffic patterns within a 10-second window using automated firewall updates.

Why This Matters

Ideal security models often rely on static rate limits that fail to account for the dynamic nature of web traffic, where ‘normal’ volume at noon can be ten times higher than at 3 AM. This implementation addresses technical reality by using a rolling statistical baseline and per-hour data slots, ensuring that detection thresholds adapt to historical traffic signatures rather than relying on arbitrary hardcoded values.

Key Insights

  • Sliding window request tracking is implemented using Python’s collections.deque to maintain a precise 60-second history without the inaccuracies of per-minute counter resets.
  • The engine uses Z-score statistical analysis to flag anomalies that exceed 3.0 standard deviations from the mean traffic baseline.
  • Automated mitigation is performed via ‘iptables -I INPUT 1’, inserting drop rules at the top of the firewall chain to stop malicious packets at the kernel level immediately.
  • A tiered backoff schedule (10m, 30m, 2h, permanent) distinguishes between misconfigured bots and persistent malicious actors.
  • The system tightens detection thresholds automatically when an IP generates a high volume of 4xx or 5xx errors, effectively identifying brute-force and scanning attempts.

Working Examples

A generator function that mimics ‘tail -f’ to process new Nginx log entries in real-time.

def tail_log(log_path): with open(log_path, 'r') as fh: fh.seek(0, 2); while True: line = fh.readline(); if line: parsed = parse_line(line); if parsed: yield parsed; else: time.sleep(0.05)

Sliding window implementation using deques to track global and per-IP request rates.

from collections import deque, defaultdict; WINDOW = 60; global_window = deque(); ip_windows = defaultdict(deque); def add_request(ip, status): now = time.time(); global_window.append(now); ip_windows[ip].append(now); cutoff = now - WINDOW; while global_window and global_window[0] < cutoff: global_window.popleft(); for dq in ip_windows.values(): while dq and dq[0] < cutoff: dq.popleft()

Anomaly detection logic combining Z-score analysis and raw rate multipliers.

def detect_ip(ip_rate, mean, std, ip_error_rate=0, baseline_error=0): z = (ip_rate - mean) / std; if z > 3.0: return True, f'z-score={z:.2f}>3.0'; if ip_rate > mean * 5.0: return True, f'{ip_rate:.1f}req/s > 5x baseline'; return False, None

Practical Applications

  • Cloud Storage Protection: Identifying IP addresses sending 500+ requests per second to prevent service degradation for legitimate users. Pitfall: Relying on cron jobs for log analysis can result in a minute-long detection lag, allowing the server to crash before action is taken.
  • Containerized Firewall Management: Deploying the detector in Docker with host-network privileges to manage the underlying Linux firewall directly. Pitfall: Failing to use a rolling baseline leads to false positives during legitimate traffic spikes, such as marketing campaigns or peak business hours.

References:

Continue reading

Next article

Mapping HTTP Status Codes to Options API Tiers: A FlashAlpha Developer Guide

Related Content