The ML Model Rollout Problem
Shipping a new model version is not like shipping a bug fix. A bug fix is either correct or incorrect. A new model version might improve average-case performance while degrading performance for a specific user segment. It might improve click-through rate while decreasing session length. The interactions are complex.
Production ML systems use a layered approach to model rollouts:
- Shadow mode: Run the new model, don't use its predictions
- Canary release: Route a small percentage of traffic to the new model
- Interleaving (ranking systems): Mix results from both models in a single response
- Full rollout: Cut over all traffic
Pattern 1: Shadow Mode (Dark Launch)
The new model receives all requests and generates predictions, but the old model's predictions are served to users. Shadow predictions are logged for offline comparison.
Architecture
Request → Feature Extraction → Old Model → Response to user
↓
New Model → Predictions logged (NOT served)
When to use
- Before any live traffic experiment
- To validate that the new model can handle production traffic patterns
- To check for prediction distribution drift and anomalies
Implementation
import asyncio
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class ShadowPrediction:
request_id: str
primary_prediction: float
shadow_prediction: float
features: dict
class ShadowModeServer:
def __init__(self, primary_model, shadow_model, logger_client):
self.primary = primary_model
self.shadow = shadow_model
self.logger = logger_client
async def predict(self, request_id: str, features: dict) -> float:
# Run both models concurrently
primary_task = asyncio.create_task(
asyncio.to_thread(self.primary.predict, features)
)
shadow_task = asyncio.create_task(
asyncio.to_thread(self.shadow.predict, features)
)
# Return primary prediction immediately when ready
primary_pred = await primary_task
# Log shadow prediction asynchronously (fire-and-forget)
async def log_shadow():
try:
shadow_pred = await shadow_task
self.logger.log(ShadowPrediction(
request_id=request_id,
primary_prediction=primary_pred,
shadow_prediction=shadow_pred,
features=features,
))
except Exception as e:
logger.error(f"Shadow model failed: {e}")
# Shadow failures MUST NOT affect primary path
asyncio.create_task(log_shadow())
return primary_pred # user gets this
Shadow mode analysis
After 24–48 hours, analyze shadow predictions:
import pandas as pd
from scipy import stats
shadow_logs = pd.read_parquet("s3://shadow-logs/date=2025-04-29/")
# Distribution comparison
ks_stat, p_value = stats.ks_2samp(
shadow_logs["primary_prediction"],
shadow_logs["shadow_prediction"],
)
print(f"KS statistic: {ks_stat:.4f}, p-value: {p_value:.4f}")
# p < 0.05 → distributions differ significantly → investigate before proceeding
# Agreement rate (useful for classification)
agreement = (
(shadow_logs["primary_prediction"] > 0.5) ==
(shadow_logs["shadow_prediction"] > 0.5)
).mean()
print(f"Decision agreement rate: {agreement:.2%}")
# Segment analysis — find where models disagree
shadow_logs["disagreement"] = abs(
shadow_logs["primary_prediction"] - shadow_logs["shadow_prediction"]
)
print(shadow_logs.groupby("country")["disagreement"].mean().sort_values(ascending=False).head(10))
Pattern 2: Canary Release
Route a controlled percentage of traffic (typically 1% → 5% → 20% → 50% → 100%) to the new model while monitoring metrics.
Traffic splitting implementation
import hashlib
def get_model_variant(user_id: str, canary_fraction: float = 0.05) -> str:
"""
Deterministic assignment: same user always gets same variant.
Uses modulo hashing so assignments are stable across restarts.
"""
# Add experiment name to hash for independent experiments
hash_input = f"model-rollout-v2:{user_id}".encode()
bucket = int(hashlib.md5(hash_input).hexdigest(), 16) % 10000
threshold = int(canary_fraction * 10000)
return "v2" if bucket < threshold else "v1"
class CanaryModelServer:
def __init__(self, model_v1, model_v2, canary_fraction: float = 0.05):
self.models = {"v1": model_v1, "v2": model_v2}
self.canary_fraction = canary_fraction
def predict(self, user_id: str, features: dict) -> dict:
variant = get_model_variant(user_id, self.canary_fraction)
prediction = self.models[variant].predict(features)
return {
"prediction": prediction,
"model_variant": variant, # log this for analysis
}
Canary monitoring dashboard
Track these metrics by variant in real-time:
from dataclasses import dataclass, field
from collections import defaultdict
import numpy as np
@dataclass
class VariantMetrics:
predictions: list = field(default_factory=list)
latencies_ms: list = field(default_factory=list)
errors: int = 0
requests: int = 0
@property
def error_rate(self) -> float:
return self.errors / max(self.requests, 1)
@property
def p99_latency(self) -> float:
return np.percentile(self.latencies_ms, 99) if self.latencies_ms else 0
@property
def mean_prediction(self) -> float:
return np.mean(self.predictions) if self.predictions else 0
metrics = defaultdict(VariantMetrics)
def record_prediction(variant: str, prediction: float, latency_ms: float):
m = metrics[variant]
m.predictions.append(prediction)
m.latencies_ms.append(latency_ms)
m.requests += 1
def check_canary_health() -> dict:
v1, v2 = metrics["v1"], metrics["v2"]
# Auto-rollback triggers
alerts = []
if v2.error_rate > v1.error_rate * 2 and v2.error_rate > 0.01:
alerts.append("ERROR_RATE_REGRESSION")
if v2.p99_latency > v1.p99_latency * 1.5:
alerts.append("LATENCY_REGRESSION")
# Prediction distribution shift
if len(v1.predictions) > 1000 and len(v2.predictions) > 1000:
from scipy import stats
_, p = stats.ks_2samp(v1.predictions[-1000:], v2.predictions[-1000:])
if p < 0.001:
alerts.append("PREDICTION_DISTRIBUTION_SHIFT")
return {"alerts": alerts, "v1": v1, "v2": v2}
Statistical significance check before ramp-up
from scipy import stats
def is_experiment_significant(
v1_metric: list[float],
v2_metric: list[float],
min_samples: int = 1000,
alpha: float = 0.05,
) -> dict:
if len(v1_metric) < min_samples or len(v2_metric) < min_samples:
return {"significant": False, "reason": "insufficient_samples"}
t_stat, p_value = stats.ttest_ind(v1_metric, v2_metric)
lift = (np.mean(v2_metric) - np.mean(v1_metric)) / np.mean(v1_metric)
return {
"significant": p_value < alpha,
"p_value": p_value,
"lift": lift,
"direction": "positive" if lift > 0 else "negative",
"recommendation": "rollout" if (p_value < alpha and lift > 0) else "rollback" if (p_value < alpha and lift < 0) else "continue_experiment",
}
Pattern 3: Interleaving (for Ranking Systems)
Interleaving is the gold standard for comparing ranking models because it eliminates position bias and requires 10–100× fewer samples than A/B testing to reach significance.
How it works
Instead of showing user A results from model 1 and user B results from model 2, you show every user a blended list that mixes results from both models. Attribution is based on which model contributed the clicked/purchased item.
def team_draft_interleave(list_a: list, list_b: list, size: int) -> tuple[list, dict[int, str]]:
"""
Team Draft Interleaving: the dominant method in industry.
Returns (interleaved_list, item_to_team_mapping).
"""
result = []
attribution = {} # item_id → "A" or "B"
a_set = set(list_a)
b_set = set(list_b)
a_idx, b_idx = 0, 0
team_a_picks, team_b_picks = 0, 0
while len(result) < size and (a_idx < len(list_a) or b_idx < len(list_b)):
# Team with fewer picks gets to pick next (coin flip on tie)
import random
if team_a_picks < team_b_picks or (team_a_picks == team_b_picks and random.random() < 0.5):
# Team A picks
while a_idx < len(list_a) and list_a[a_idx] in set(result):
a_idx += 1
if a_idx < len(list_a):
item = list_a[a_idx]
result.append(item)
attribution[item] = "A"
team_a_picks += 1
a_idx += 1
else:
# Team B picks
while b_idx < len(list_b) and list_b[b_idx] in set(result):
b_idx += 1
if b_idx < len(list_b):
item = list_b[b_idx]
result.append(item)
attribution[item] = "B"
team_b_picks += 1
b_idx += 1
return result, attribution
# Score after collecting user interactions
def score_interleaving_experiment(interaction_log: list[dict]) -> dict:
wins_a, wins_b = 0, 0
for session in interaction_log:
attribution = session["attribution"]
clicks = session["clicked_items"]
a_clicks = sum(1 for item in clicks if attribution.get(item) == "A")
b_clicks = sum(1 for item in clicks if attribution.get(item) == "B")
if a_clicks > b_clicks:
wins_a += 1
elif b_clicks > a_clicks:
wins_b += 1
total = wins_a + wins_b
return {
"wins_A": wins_a,
"wins_B": wins_b,
"win_rate_B": wins_b / total if total > 0 else 0,
"total_decisive_sessions": total,
}
Rollout Decision Checklist
Before moving from one stage to the next:
Shadow → Canary (1%)
- Shadow ran for ≥ 24 hours of production traffic
- Prediction distribution matches expected range
- No crashes or timeout spikes in shadow logs
- Decision agreement rate > 85% (unless expecting major changes)
Canary 1% → 5%
- Error rate and p99 latency parity with v1
- No prediction distribution anomalies
- At least 1,000 samples per variant
Canary 5% → 20% → 50% → 100%
- Primary metric shows positive or neutral lift (statistically significant or neutral)
- No segment regressions in top 3 user segments
- On-call engineer aware and monitoring
Auto-rollback triggers (should be automated)
- Error rate > 2× baseline
- p99 latency > 1.5× baseline
- Prediction mean deviation > 20% from baseline
Design the full experimentation and deployment pipeline with our Production ML Anti-Patterns guide.