A Coding Implementation to Build a Unified Apache Beam Pipeline Demonstrating Batch and Stream Processing with Event-Time Windowing Using DirectRunner
These articles are AI-generated summaries. Please check the original sources for full details.
Apache Beam Unified Pipeline for Batch and Stream Processing
Apache Beam allows developers to build a single pipeline that can execute in both batch and stream processing modes, demonstrated here with event-time windowing; this implementation uses the DirectRunner for local testing and development. The tutorial provides a fully functional example, showcasing consistent handling of on-time and late events.
This is valuable because real-world data pipelines often need to handle both historical (batch) and real-time (stream) data with consistent logic, but traditionally require separate codebases and infrastructure, increasing complexity and maintenance overhead. Failure to reconcile batch and stream processing can lead to inconsistent results or necessitate costly duplication of effort.
Key Insights
- DirectRunner limitations: The DirectRunner is primarily for local testing and doesn’t fully replicate the behavior of distributed runners like Dataflow or Flink.
- Event-time vs. Processing-time: Beam’s event-time model uses timestamps embedded within the data, providing accurate results even with out-of-order data, unlike processing-time which relies on the ingestion time.
- Temporal: Temporal, a workflow orchestration platform, is used by companies like Stripe and Coinbase for managing complex stateful applications.
Working Example
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode
from apache_beam.testing.test_stream import TestStream
import json
from datetime import datetime, timezone
MODE = "stream"
WINDOW_SIZE_SECS = 60
ALLOWED_LATENESS_SECS = 120
def make_event(user_id, event_type, amount, event_time_epoch_s):
return {"user_id": user_id, "event_type": event_type, "amount": float(amount), "event_time": int(event_time_epoch_s)}
class WindowedUserAgg(beam.PTransform):
def expand(self, pcoll):
stamped = pcoll | beam.Map(lambda e: beam.window.TimestampedValue(e, e["event_time"]))
windowed = stamped | beam.WindowInto(
FixedWindows(WINDOW_SIZE_SECS),
allowed_lateness=ALLOWED_LATENESS_SECS,
trigger=AfterWatermark(
early=AfterProcessingTime(10),
late=AfterProcessingTime(10),
),
accumulation_mode=AccumulationMode.ACCUMULATING,
)
keyed = windowed | beam.Map(lambda e: (e["user_id"], e["amount"]))
counts = keyed | beam.combiners.Count.PerKey()
sums = keyed | beam.CombinePerKey(sum)
return (
{"count": counts, "sum_amount": sums}
| beam.CoGroupByKey()
| beam.Map(lambda kv: {
"user_id": kv[0],
"count": kv[1]["count"][0] if kv[1]["count"] else 0,
"sum_amount": kv[1]["sum_amount"][0] if kv[1]["sum_amount"] else 0.0,
})
)
def run_batch():
with beam.Pipeline(options=PipelineOptions([])) as p:
(
p
| beam.Create(BATCH_EVENTS)
| WindowedUserAgg()
| beam.Map(json.dumps)
| beam.Map(print)
)
def run_stream():
opts = PipelineOptions([])
opts.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=opts) as p:
(
p
| build_test_stream()
| WindowedUserAgg()
| beam.Map(json.dumps)
| beam.Map(print)
)
run_stream() if MODE == "stream" else run_batch()
Practical Applications
- E-commerce Analytics: Analyzing user purchase behavior in real-time while also reprocessing historical data for long-term trends.
- Pitfall: Incorrectly configuring windowing or triggers can lead to data loss or inaccurate aggregation, especially with late-arriving data.
References:
Continue reading
Next article
OmniAI: A Single App for 25+ AI Platforms
Related Content
Building an End-to-End Data Engineering and Machine Learning Pipeline with PySpark in Google Colab
A step-by-step guide to using PySpark in Google Colab for data transformations, SQL analytics, feature engineering, and machine learning model training.
How Can We Build Scalable and Reproducible Machine Learning Experiment Pipelines Using Meta Research Hydra?
This article explains how to use Meta's Hydra framework to create scalable and reproducible ML experiments through structured configurations, overrides, and multirun simulations.
Building Scalable ML Data Pipelines for Image and Structured Data with Daft
Learn how to build an end-to-end ML pipeline using Daft, a Python-native data engine that handles MNIST image reshaping, feature engineering via batch UDFs, and Parquet persistence for high-performance processing.