The Pipeline Analogy
In software, you build pipelines everywhere: CI/CD pipelines, data ETL pipelines, streaming pipelines. An ML pipeline is the same concept applied to model development and deployment.
The goal is identical: take an input (raw data, code, events), apply a series of deterministic transformations, and produce a reliable output (trained model, deployed artifact, processed stream).
Why ML Pipelines Are Hard
Ad-hoc ML development looks like:
Notebook 1: Data cleaning
Notebook 2: Exploration
Notebook 3: Feature engineering (slightly different cleaning)
Notebook 4: Model training (different features than Notebook 3?)
Script: "final_model_v3_REAL_THIS_TIME.py"
This creates reproducibility failures, version drift, and deployment nightmares. The model in production often can't be traced to a specific data version or code commit.
A pipeline forces you to make the implicit explicit.
The Five Stages of an ML Pipeline
Raw Data
│
â–¼
[1. Data Ingestion] — Fetch, validate, version data
│
â–¼
[2. Data Preprocessing] — Clean, filter, split
│
â–¼
[3. Feature Engineering] — Transform, encode, compute features
│
â–¼
[4. Training] — Train model, tune hyperparameters
│
â–¼
[5. Evaluation] — Metrics, validation, comparison
│
â–¼
Trained Model Artifact
Each stage should be: idempotent, testable in isolation, and parameterized (not hardcoded).
Implementation with Scikit-learn Pipelines
For tabular ML, scikit-learn's Pipeline class is the right abstraction:
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
# Define feature groups
numerical_features = ["age", "income", "tenure_days"]
categorical_features = ["country", "plan_type", "acquisition_channel"]
# Preprocessing pipelines per feature type
numerical_pipeline = Pipeline([
("impute", SimpleImputer(strategy="median")),
("scale", StandardScaler()),
])
categorical_pipeline = Pipeline([
("impute", SimpleImputer(strategy="constant", fill_value="unknown")),
("encode", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
])
# Combine with ColumnTransformer
preprocessor = ColumnTransformer([
("numerical", numerical_pipeline, numerical_features),
("categorical", categorical_pipeline, categorical_features),
])
# Full pipeline: preprocessing + model
full_pipeline = Pipeline([
("preprocess", preprocessor),
("model", GradientBoostingClassifier(n_estimators=200, max_depth=4)),
])
# Train
full_pipeline.fit(X_train, y_train)
# Predict — preprocessing applied automatically
predictions = full_pipeline.predict(X_test)
The critical insight: fit only on training data, transform both train and test. The pipeline enforces this automatically. Fitting the pipeline on all data is a data leakage bug.
Saving the Pipeline
import joblib
# Save — this saves both preprocessing params AND model
joblib.dump(full_pipeline, "models/churn_v1.joblib")
# Load and predict
pipeline = joblib.load("models/churn_v1.joblib")
pipeline.predict(new_data) # Same preprocessing automatically applied
This is the key advantage: the saved artifact includes the preprocessing logic. No more "what scaler did we use in training?"
DVC for Pipeline Versioning
For ML projects, DVC (Data Version Control) extends Git to handle data and pipeline stages:
# dvc.yaml
stages:
ingest:
cmd: python src/ingest.py
deps:
- src/ingest.py
params:
- params.yaml:
- data.source_url
outs:
- data/raw/dataset.parquet
preprocess:
cmd: python src/preprocess.py
deps:
- src/preprocess.py
- data/raw/dataset.parquet
outs:
- data/processed/train.parquet
- data/processed/val.parquet
- data/processed/test.parquet
train:
cmd: python src/train.py
deps:
- src/train.py
- data/processed/train.parquet
params:
- params.yaml:
- model.learning_rate
- model.n_estimators
outs:
- models/model.joblib
metrics:
- metrics/train_metrics.json
evaluate:
cmd: python src/evaluate.py
deps:
- src/evaluate.py
- models/model.joblib
- data/processed/test.parquet
metrics:
- metrics/test_metrics.json: {cache: false}
# Run the pipeline (only reruns changed stages)
dvc repro
# Compare experiments
dvc metrics diff main feature/new-features
# Reproduce a specific commit's pipeline
git checkout abc123
dvc checkout # Restore data artifacts
dvc repro
Parameterizing with Hydra
Hardcoded hyperparameters are a code smell in ML. Use a config system:
# params.yaml
model:
learning_rate: 0.05
n_estimators: 200
max_depth: 4
subsample: 0.8
data:
test_size: 0.2
random_seed: 42
features:
numerical: ["age", "income", "tenure_days"]
categorical: ["country", "plan_type"]
import yaml
def train(config_path: str = "params.yaml"):
with open(config_path) as f:
config = yaml.safe_load(f)
model = GradientBoostingClassifier(
learning_rate=config["model"]["learning_rate"],
n_estimators=config["model"]["n_estimators"],
max_depth=config["model"]["max_depth"],
)
# ...
Every parameter that affects the model output should be in config, not code. This enables reproducibility and systematic experimentation.
Testing Your Pipeline
ML pipelines have distinct test categories:
# tests/test_preprocessing.py
import pytest
import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline
class TestNumericalPipeline:
def test_handles_missing_values(self, numerical_pipeline, sample_data):
# Inject NaN — should not crash
sample_data.loc[0, "income"] = np.nan
result = numerical_pipeline.fit_transform(sample_data)
assert not np.isnan(result).any()
def test_output_range(self, numerical_pipeline, sample_data):
result = numerical_pipeline.fit_transform(sample_data)
# StandardScaler: ~mean 0, std 1
assert abs(result.mean()) < 0.1
assert abs(result.std() - 1.0) < 0.1
def test_fit_transform_vs_transform(self, numerical_pipeline, train_data, test_data):
"""Ensure transform uses training stats, not test stats."""
numerical_pipeline.fit(train_data)
result1 = numerical_pipeline.transform(test_data)
result2 = numerical_pipeline.transform(test_data) # Idempotent
np.testing.assert_array_equal(result1, result2)
class TestFullPipeline:
def test_prediction_shape(self, full_pipeline, X_test):
predictions = full_pipeline.predict(X_test)
assert predictions.shape == (len(X_test),)
def test_no_nan_in_predictions(self, full_pipeline, X_test):
predictions = full_pipeline.predict(X_test)
assert not np.isnan(predictions).any()
Production Deployment Pattern
# src/serve.py
from fastapi import FastAPI
import joblib
import pandas as pd
from pydantic import BaseModel
app = FastAPI()
pipeline = joblib.load("models/churn_v1.joblib")
class PredictionRequest(BaseModel):
age: float
income: float
tenure_days: float
country: str
plan_type: str
acquisition_channel: str
@app.post("/predict")
def predict(request: PredictionRequest):
df = pd.DataFrame([request.dict()])
probability = pipeline.predict_proba(df)[0, 1]
return {"churn_probability": float(probability)}
The pipeline artifact handles all preprocessing, so the serving code is clean. The same preprocessing used in training is automatically applied at inference.
Next: learn how to track experiments and compare models with experiment tracking for ML engineers.