design pattern 2025-03-09 12 min read

Building ML Pipelines Like Software Pipelines

Design reproducible ML pipelines using software engineering principles. Learn how to structure data ingestion, feature computation, training, and evaluation as composable, testable stages.

MLOps pipelines data engineering reproducibility software engineering

The Pipeline Analogy

In software, you build pipelines everywhere: CI/CD pipelines, data ETL pipelines, streaming pipelines. An ML pipeline is the same concept applied to model development and deployment.

The goal is identical: take an input (raw data, code, events), apply a series of deterministic transformations, and produce a reliable output (trained model, deployed artifact, processed stream).

Why ML Pipelines Are Hard

Ad-hoc ML development looks like:

Notebook 1: Data cleaning
Notebook 2: Exploration
Notebook 3: Feature engineering (slightly different cleaning)
Notebook 4: Model training (different features than Notebook 3?)
Script: "final_model_v3_REAL_THIS_TIME.py"

This creates reproducibility failures, version drift, and deployment nightmares. The model in production often can't be traced to a specific data version or code commit.

A pipeline forces you to make the implicit explicit.

The Five Stages of an ML Pipeline

Raw Data
    │
    â–¼
[1. Data Ingestion]      — Fetch, validate, version data
    │
    â–¼
[2. Data Preprocessing]  — Clean, filter, split
    │
    â–¼
[3. Feature Engineering] — Transform, encode, compute features
    │
    â–¼
[4. Training]            — Train model, tune hyperparameters
    │
    â–¼
[5. Evaluation]          — Metrics, validation, comparison
    │
    â–¼
Trained Model Artifact

Each stage should be: idempotent, testable in isolation, and parameterized (not hardcoded).

Implementation with Scikit-learn Pipelines

For tabular ML, scikit-learn's Pipeline class is the right abstraction:

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier

# Define feature groups
numerical_features = ["age", "income", "tenure_days"]
categorical_features = ["country", "plan_type", "acquisition_channel"]

# Preprocessing pipelines per feature type
numerical_pipeline = Pipeline([
    ("impute", SimpleImputer(strategy="median")),
    ("scale", StandardScaler()),
])

categorical_pipeline = Pipeline([
    ("impute", SimpleImputer(strategy="constant", fill_value="unknown")),
    ("encode", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
])

# Combine with ColumnTransformer
preprocessor = ColumnTransformer([
    ("numerical", numerical_pipeline, numerical_features),
    ("categorical", categorical_pipeline, categorical_features),
])

# Full pipeline: preprocessing + model
full_pipeline = Pipeline([
    ("preprocess", preprocessor),
    ("model", GradientBoostingClassifier(n_estimators=200, max_depth=4)),
])

# Train
full_pipeline.fit(X_train, y_train)

# Predict — preprocessing applied automatically
predictions = full_pipeline.predict(X_test)

The critical insight: fit only on training data, transform both train and test. The pipeline enforces this automatically. Fitting the pipeline on all data is a data leakage bug.

Saving the Pipeline

import joblib

# Save — this saves both preprocessing params AND model
joblib.dump(full_pipeline, "models/churn_v1.joblib")

# Load and predict
pipeline = joblib.load("models/churn_v1.joblib")
pipeline.predict(new_data)  # Same preprocessing automatically applied

This is the key advantage: the saved artifact includes the preprocessing logic. No more "what scaler did we use in training?"

DVC for Pipeline Versioning

For ML projects, DVC (Data Version Control) extends Git to handle data and pipeline stages:

# dvc.yaml
stages:
  ingest:
    cmd: python src/ingest.py
    deps:
      - src/ingest.py
    params:
      - params.yaml:
        - data.source_url
    outs:
      - data/raw/dataset.parquet

  preprocess:
    cmd: python src/preprocess.py
    deps:
      - src/preprocess.py
      - data/raw/dataset.parquet
    outs:
      - data/processed/train.parquet
      - data/processed/val.parquet
      - data/processed/test.parquet

  train:
    cmd: python src/train.py
    deps:
      - src/train.py
      - data/processed/train.parquet
    params:
      - params.yaml:
        - model.learning_rate
        - model.n_estimators
    outs:
      - models/model.joblib
    metrics:
      - metrics/train_metrics.json

  evaluate:
    cmd: python src/evaluate.py
    deps:
      - src/evaluate.py
      - models/model.joblib
      - data/processed/test.parquet
    metrics:
      - metrics/test_metrics.json: {cache: false}
# Run the pipeline (only reruns changed stages)
dvc repro

# Compare experiments
dvc metrics diff main feature/new-features

# Reproduce a specific commit's pipeline
git checkout abc123
dvc checkout  # Restore data artifacts
dvc repro

Parameterizing with Hydra

Hardcoded hyperparameters are a code smell in ML. Use a config system:

# params.yaml
model:
  learning_rate: 0.05
  n_estimators: 200
  max_depth: 4
  subsample: 0.8

data:
  test_size: 0.2
  random_seed: 42

features:
  numerical: ["age", "income", "tenure_days"]
  categorical: ["country", "plan_type"]
import yaml

def train(config_path: str = "params.yaml"):
    with open(config_path) as f:
        config = yaml.safe_load(f)

    model = GradientBoostingClassifier(
        learning_rate=config["model"]["learning_rate"],
        n_estimators=config["model"]["n_estimators"],
        max_depth=config["model"]["max_depth"],
    )
    # ...

Every parameter that affects the model output should be in config, not code. This enables reproducibility and systematic experimentation.

Testing Your Pipeline

ML pipelines have distinct test categories:

# tests/test_preprocessing.py
import pytest
import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline

class TestNumericalPipeline:
    def test_handles_missing_values(self, numerical_pipeline, sample_data):
        # Inject NaN — should not crash
        sample_data.loc[0, "income"] = np.nan
        result = numerical_pipeline.fit_transform(sample_data)
        assert not np.isnan(result).any()

    def test_output_range(self, numerical_pipeline, sample_data):
        result = numerical_pipeline.fit_transform(sample_data)
        # StandardScaler: ~mean 0, std 1
        assert abs(result.mean()) < 0.1
        assert abs(result.std() - 1.0) < 0.1

    def test_fit_transform_vs_transform(self, numerical_pipeline, train_data, test_data):
        """Ensure transform uses training stats, not test stats."""
        numerical_pipeline.fit(train_data)

        result1 = numerical_pipeline.transform(test_data)
        result2 = numerical_pipeline.transform(test_data)  # Idempotent

        np.testing.assert_array_equal(result1, result2)

class TestFullPipeline:
    def test_prediction_shape(self, full_pipeline, X_test):
        predictions = full_pipeline.predict(X_test)
        assert predictions.shape == (len(X_test),)

    def test_no_nan_in_predictions(self, full_pipeline, X_test):
        predictions = full_pipeline.predict(X_test)
        assert not np.isnan(predictions).any()

Production Deployment Pattern

# src/serve.py
from fastapi import FastAPI
import joblib
import pandas as pd
from pydantic import BaseModel

app = FastAPI()
pipeline = joblib.load("models/churn_v1.joblib")

class PredictionRequest(BaseModel):
    age: float
    income: float
    tenure_days: float
    country: str
    plan_type: str
    acquisition_channel: str

@app.post("/predict")
def predict(request: PredictionRequest):
    df = pd.DataFrame([request.dict()])
    probability = pipeline.predict_proba(df)[0, 1]
    return {"churn_probability": float(probability)}

The pipeline artifact handles all preprocessing, so the serving code is clean. The same preprocessing used in training is automatically applied at inference.


Next: learn how to track experiments and compare models with experiment tracking for ML engineers.

Want to Go Deeper?

This article is part of our comprehensive curriculum on building ML systems at scale. Explore our full courses for hands-on learning.