Building an End-to-End Data Engineering and Machine Learning Pipeline with PySpark in Google Colab
These articles are AI-generated summaries. Please check the original sources for full details.
Building an End-to-End Data Engineering and Machine Learning Pipeline with PySpark in Google Colab
This tutorial demonstrates how to leverage Apache Spark and PySpark in a Google Colab environment to build a complete data engineering and machine learning pipeline. The workflow includes data preprocessing, SQL queries, window functions, model training, and Parquet file handling, all within a single-node setup.
1. Setup and Initialization
-
Spark Session Configuration:
- A local Spark session is initialized with
master("local[*]")to utilize all available cores. - The
spark.sql.shuffle.partitionsis set to 4 to optimize shuffle operations. - Spark version: 3.5.1 (as specified in the
pip installcommand).
- A local Spark session is initialized with
-
Sample Dataset:
- A structured DataFrame is created with user data, including
id,name,country,signup_date,income, andplan(subscription type). - The schema is explicitly defined using
StructTypefor data validation.
- A structured DataFrame is created with user data, including
2. Data Transformations and SQL Queries
-
Column Engineering:
- New columns like
signup_ts(timestamp),year,month, andis_india(binary indicator) are added using PySpark SQL functions. - Example:
F.to_timestamp("signup_date")converts the date string to a timestamp.
- New columns like
-
SQL Analytics:
- The DataFrame is registered as a SQL table (
users), enabling SQL queries. - Example query: Aggregates user count and average income per country.
SELECT country, COUNT(*) AS cnt, AVG(income) AS avg_income FROM users GROUP BY country ORDER BY cnt DESC
- The DataFrame is registered as a SQL table (
-
Window Functions:
- A window function ranks users by income within each country using
rank().over(Window.partitionBy("country").orderBy(F.col("income").desc())).
- A window function ranks users by income within each country using
-
User-Defined Functions (UDFs):
- A UDF
plan_priorityassigns numerical priority to subscription plans (premium= 3,standard= 2,basic= 1). - Applied via
F.udf(plan_priority, IntegerType()).
- A UDF
3. Data Joining and Aggregation
-
Enrichment with Country Metadata:
- A separate DataFrame
country_dfcontains country-level data (region, population). - Joined with the user DataFrame using
df.alias("u").join(country_df.alias("c"), on="country", how="left").
- A separate DataFrame
-
Regional Analysis:
- Aggregates user counts and average income by
regionandplantype. - Example output: Highlights regional trends in subscription plans and income.
- Aggregates user counts and average income by
4. Machine Learning with Spark MLlib
-
Feature Engineering:
- Label Encoding: Converts
planto a binary label (1for “premium”,0for others). - Indexing: Uses
StringIndexerto encode categoricalcountryvalues. - Feature Assembly: Combines
income,country_idx, andplan_priorityinto a feature vector usingVectorAssembler.
- Label Encoding: Converts
-
Model Training:
- A logistic regression model is trained on 70% of the data (
train_df), with 20 iterations. - Evaluation: Accuracy is computed on the test set using
MulticlassClassificationEvaluator(metric: “accuracy”).
- A logistic regression model is trained on 70% of the data (
-
Results:
- The model predicts subscription type (
premiumvs. others) with an accuracy metric (value not explicitly provided in the context).
- The model predicts subscription type (
5. Data Persistence and Query Optimization
-
Parquet File Handling:
- Processed data is saved to Parquet format (
/content/spark_users_parquet) and reloaded for validation. - Parquet is used for efficient storage and schema preservation.
- Processed data is saved to Parquet format (
-
Query Optimization:
- A SQL query retrieves recent signups (
signup_ts >= '2025-10-01') and demonstrates the query execution plan usingrecent.explain().
- A SQL query retrieves recent signups (
-
Session Termination:
- The Spark session is gracefully stopped with
spark.stop().
- The Spark session is gracefully stopped with
Working Example
# Complete PySpark Code Example
!pip install -q pyspark==3.5.1
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Initialize Spark Session
spark = (SparkSession.builder.appName("ColabSparkPipeline")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate())
# Sample Data
data = [
(1, "Alice", "IN", "2025-10-01", 56000.0, "premium"),
# ... (additional data rows)
]
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), True),
StructField("country", StringType(), True),
StructField("signup_date", StringType(), True),
StructField("income", FloatType(), True),
StructField("plan", StringType(), True),
])
df = spark.createDataFrame(data, schema)
# Transformations
df2 = df.withColumn("signup_ts", F.to_timestamp("signup_date")) \
.withColumn("year", F.year("signup_ts")) \
.withColumn("month", F.month("signup_ts")) \
.withColumn("is_india", (F.col("country") == "IN").cast("int"))
df2.createOrReplaceTempView("users")
# SQL Query
spark.sql("""
SELECT country, COUNT(*) AS cnt, AVG(income) AS avg_income
FROM users
GROUP BY country
ORDER BY cnt DESC
""").show()
# ML Pipeline
country_indexer = StringIndexer(inputCol="country", outputCol="country_idx", handleInvalid="keep")
assembler = VectorAssembler(inputCols=["income", "country_idx", "plan_priority"], outputCol="features")
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=20)
# ... (additional ML steps as in the context)
Recommendations
- UDF Performance: Avoid heavy computations in UDFs to prevent performance bottlenecks.
- Data Partitioning: Use
repartition()orcoalesce()for large datasets to optimize shuffle operations. - Model Evaluation: Always validate models using cross-validation and multiple metrics (e.g., precision, recall).
- Parquet Usage: Leverage Parquet for columnar storage and schema enforcement in production workflows.
- Query Optimization: Use
explain()to analyze query plans and optimize joins or aggregations.
Continue reading
Next article
Kubernetes Deployment Best Practices: Production-Ready Guide
Related Content
How Can We Build Scalable and Reproducible Machine Learning Experiment Pipelines Using Meta Research Hydra?
This article explains how to use Meta's Hydra framework to create scalable and reproducible ML experiments through structured configurations, overrides, and multirun simulations.
Multi-Agent System for Integrated Multi-Omics Data Analysis with Pathway Reasoning
A tutorial on building a multi-agent system to analyze transcriptomic, proteomic, and metabolomic data for biological insights using pathway reasoning and drug repurposing.
Advanced SHAP Workflows for Machine Learning Explainability: A Comprehensive Coding Guide
Implementing SHAP workflows to compare explainers and detect data drift, showing TreeExplainer's speed advantage for interpreting complex machine learning models.