design pattern 2025-04-03 12 min read

Data Preprocessing Patterns Every ML Engineer Should Know

A practical reference for handling missing data, outliers, categorical encoding, and data splits. Includes common mistakes and production-ready patterns for preprocessing tabular ML data.

data preprocessing feature engineering scikit-learn tabular data missing values

The 80% Problem

The popular claim that "data scientists spend 80% of their time on data preparation" is often cited. Whether or not it's precisely true, the implication is real: data quality determines model quality, and most real-world data requires substantial preparation.

This guide covers the most important preprocessing patterns with production-ready implementations.

Handling Missing Values

Understanding Missingness Types

Before imputing, understand why data is missing:

  • MCAR (Missing Completely at Random): Missingness is random — no relationship to observed or unobserved data. Safe to impute.
  • MAR (Missing at Random): Missingness depends on observed data. Imputation based on observed features is valid.
  • MNAR (Missing Not at Random): Missingness relates to the missing value itself. Example: people with high income skip income field. Imputation can introduce bias.
import pandas as pd
import numpy as np

def analyze_missingness(df: pd.DataFrame) -> pd.DataFrame:
    missing_stats = pd.DataFrame({
        "count": df.isnull().sum(),
        "percent": (df.isnull().sum() / len(df) * 100).round(2),
    })
    return missing_stats[missing_stats["count"] > 0].sort_values("percent", ascending=False)

print(analyze_missingness(df))
#                     count  percent
# income               1250    12.50
# last_login_days        89     0.89
# acquisition_channel    12     0.12

Imputation Strategies

from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.experimental import enable_iterative_imputer  # noqa
from sklearn.impute import IterativeImputer

# Simple imputation (fast, works for most cases)
numerical_imputer = SimpleImputer(strategy="median")   # Robust to outliers
categorical_imputer = SimpleImputer(strategy="most_frequent")

# KNN imputation (better for features with complex relationships)
knn_imputer = KNNImputer(n_neighbors=5)

# Iterative imputation (best quality, slowest)
# Each feature imputed as a function of all others
iterative_imputer = IterativeImputer(max_iter=10, random_state=42)

The Missing Indicator Pattern

For MNAR data, add a binary indicator for missingness:

from sklearn.impute import MissingIndicator
from sklearn.pipeline import FeatureUnion

# Income is often MNAR — missing correlates with value
# Create both imputed value AND indicator
income_pipeline = FeatureUnion([
    ("imputed", SimpleImputer(strategy="median")),
    ("indicator", MissingIndicator()),
])

# Now the model sees both: imputed_income and income_was_missing

Outlier Handling

Detection

def detect_outliers(series: pd.Series, method: str = "iqr") -> pd.Series:
    if method == "iqr":
        Q1, Q3 = series.quantile([0.25, 0.75])
        IQR = Q3 - Q1
        lower, upper = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR
        return (series < lower) | (series > upper)

    elif method == "zscore":
        z_scores = (series - series.mean()) / series.std()
        return z_scores.abs() > 3

    elif method == "isolation_forest":
        from sklearn.ensemble import IsolationForest
        iso = IsolationForest(contamination=0.05)
        return iso.fit_predict(series.values.reshape(-1, 1)) == -1

Treatment

def winsorize(series: pd.Series, lower_q: float = 0.01, upper_q: float = 0.99) -> pd.Series:
    """Cap outliers at percentile bounds instead of removing them."""
    lower = series.quantile(lower_q)
    upper = series.quantile(upper_q)
    return series.clip(lower=lower, upper=upper)

# Apply to training data
train_bounds = {}
for col in numerical_cols:
    train_bounds[col] = {
        "lower": X_train[col].quantile(0.01),
        "upper": X_train[col].quantile(0.99),
    }
    X_train[col] = X_train[col].clip(**train_bounds[col])

# Apply SAME bounds to test data (computed from training data)
for col in numerical_cols:
    X_test[col] = X_test[col].clip(**train_bounds[col])

The key pattern: compute bounds from training data, apply to test. Never recompute on test data.

Categorical Encoding at Scale

Low Cardinality: One-Hot Encoding

from sklearn.preprocessing import OneHotEncoder

encoder = OneHotEncoder(
    handle_unknown="ignore",   # Unknown categories at test time → all zeros
    sparse_output=False,
    drop="first",              # Avoid multicollinearity for linear models
)

# Fit on training data only
encoder.fit(X_train[cat_cols])

High Cardinality: Target Encoding with Cross-fitting

from sklearn.model_selection import KFold
import numpy as np

def target_encode_cv(X_train, y_train, col, n_splits=5, smoothing=10):
    """
    Target encoding with cross-fitting to prevent leakage.
    Each sample's encoding uses only data from other folds.
    """
    global_mean = y_train.mean()
    encodings = np.zeros(len(X_train))
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)

    for train_idx, val_idx in kf.split(X_train):
        # Compute mean target per category on the train fold
        fold_stats = (
            pd.DataFrame({"cat": X_train.iloc[train_idx][col], "target": y_train.iloc[train_idx]})
            .groupby("cat")["target"]
            .agg(["mean", "count"])
        )

        # Smoothing: blend category mean with global mean based on sample count
        fold_stats["smoothed"] = (
            (fold_stats["mean"] * fold_stats["count"] + global_mean * smoothing)
            / (fold_stats["count"] + smoothing)
        )

        # Apply to validation fold
        encodings[val_idx] = X_train.iloc[val_idx][col].map(
            fold_stats["smoothed"]
        ).fillna(global_mean)

    return encodings

# For test data: use encoding trained on all training data
def fit_target_encoder(X_train, y_train, col, smoothing=10):
    global_mean = y_train.mean()
    stats = (
        pd.DataFrame({"cat": X_train[col], "target": y_train})
        .groupby("cat")["target"]
        .agg(["mean", "count"])
    )
    stats["smoothed"] = (
        (stats["mean"] * stats["count"] + global_mean * smoothing)
        / (stats["count"] + smoothing)
    )
    return stats["smoothed"].to_dict(), global_mean

Data Splitting Patterns

Standard Split (Tabular, IID Data)

from sklearn.model_selection import train_test_split

# 60/20/20 split — test set touched only once for final evaluation
X_temp, X_test, y_temp, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
X_train, X_val, y_train, y_val = train_test_split(
    X_temp, y_temp, test_size=0.25, random_state=42, stratify=y_temp
)
# 0.25 * 0.8 = 0.2 → 60/20/20

Time Series Split

# Temporal split: no shuffling, gap between train and test
def temporal_split(df, date_col, val_months=2, test_months=2, gap_days=7):
    max_date = df[date_col].max()

    test_start = max_date - pd.DateOffset(months=test_months)
    val_start = test_start - pd.DateOffset(months=val_months) - pd.Timedelta(days=gap_days)
    train_end = val_start - pd.Timedelta(days=gap_days)

    train = df[df[date_col] <= train_end]
    val = df[(df[date_col] >= val_start) & (df[date_col] < test_start)]
    test = df[df[date_col] >= test_start]

    return train, val, test

Group-Based Split

For data where rows aren't independent (multiple rows per user, per document):

from sklearn.model_selection import GroupShuffleSplit

# Ensure all rows for a given user_id are in the same split
gss = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
train_idx, test_idx = next(gss.split(X, y, groups=df["user_id"]))

X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]

# Verify no user_id overlap
assert len(
    set(df.iloc[train_idx]["user_id"]) & set(df.iloc[test_idx]["user_id"])
) == 0

The Pipeline Pattern (Putting It Together)

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer

# Numerical: impute → winsorize → scale
numerical_pipeline = Pipeline([
    ("impute", SimpleImputer(strategy="median")),
    ("scale", StandardScaler()),
])

# Categorical: impute → encode
categorical_pipeline = Pipeline([
    ("impute", SimpleImputer(strategy="constant", fill_value="MISSING")),
    ("encode", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
])

preprocessor = ColumnTransformer([
    ("num", numerical_pipeline, numerical_cols),
    ("cat", categorical_pipeline, categorical_cols),
])

# Full pipeline
pipeline = Pipeline([
    ("preprocess", preprocessor),
    ("model", XGBClassifier(n_estimators=200, learning_rate=0.05)),
])

pipeline.fit(X_train, y_train)

# Save — preprocessing params included
import joblib
joblib.dump(pipeline, "model_pipeline.joblib")

Next: learn how features flow from preprocessing through feature stores to model serving in our feature store guide.

Want to Go Deeper?

This article is part of our comprehensive curriculum on building ML systems at scale. Explore our full courses for hands-on learning.