MLOps: Deploying Machine Learning Models to Production
Master MLOps best practices for deploying, monitoring, and maintaining ML models in production. From model training to automated retraining pipelines and drift detection.
MLOps: Deploying Machine Learning Models to Production
87% of ML models never make it to production. MLOps bridges the gap between data science experiments and reliable production systems, bringing DevOps practices to machine learning workflows.
MLOps Lifecycle
Model Development
Reproducible training pipelines:
# training_pipeline.py
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import pandas as pd
def train_model(data_path, experiment_name):
# Set experiment
mlflow.set_experiment(experiment_name)
with mlflow.start_run():
# Load and split data
df = pd.read_csv(data_path)
X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Log parameters
params = {
'n_estimators': 100,
'max_depth': 10,
'min_samples_split': 5
}
mlflow.log_params(params)
# Train model
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Evaluate
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
# Log metrics
mlflow.log_metrics({
'train_accuracy': train_score,
'test_accuracy': test_score
})
# Log model
mlflow.sklearn.log_model(
model,
"model",
registered_model_name="customer_churn_predictor"
)
return model
Experiment Tracking
Centralized logging of experiments:
MLflow:
- Track parameters, metrics, and artifacts
- Compare experiment runs
- Model registry for version control
- Deployment to various targets
Weights & Biases:
- Advanced visualization and dashboards
- Hyperparameter optimization
- Model versioning
- Team collaboration features
Model Registry
Version control for ML models:
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Transition model to staging
client.transition_model_version_stage(
name="customer_churn_predictor",
version=3,
stage="Staging"
)
# Add description
client.update_model_version(
name="customer_churn_predictor",
version=3,
description="Random Forest with feature engineering v2, 85% test accuracy"
)
# Promote to production
client.transition_model_version_stage(
name="customer_churn_predictor",
version=3,
stage="Production"
)
Deployment Patterns
Batch Prediction
Process large datasets offline:
import mlflow.pyfunc
# Load production model
model = mlflow.pyfunc.load_model(
model_uri="models:/customer_churn_predictor/Production"
)
# Batch prediction
predictions = model.predict(df_batch)
# Write results
df_batch['churn_prediction'] = predictions
df_batch['prediction_timestamp'] = datetime.now()
df_batch.to_sql('churn_predictions', con=engine, if_exists='append')
Use Cases:
- Daily customer scoring
- Fraud detection batch analysis
- Recommendation system updates
Real-Time Inference
Low-latency predictions via API:
from fastapi import FastAPI
from pydantic import BaseModel
import mlflow.pyfunc
app = FastAPI()
# Load model at startup
model = mlflow.pyfunc.load_model(
model_uri="models:/customer_churn_predictor/Production"
)
class PredictionInput(BaseModel):
feature1: float
feature2: int
feature3: str
@app.post("/predict")
async def predict(input_data: PredictionInput):
# Prepare features
features = prepare_features(input_data)
# Predict
prediction = model.predict(features)
# Log prediction for monitoring
log_prediction(input_data, prediction)
return {
"prediction": float(prediction[0]),
"model_version": get_model_version(),
"timestamp": datetime.now().isoformat()
}
Deployment Options:
- Kubernetes: Scalable, orchestrated deployments
- AWS SageMaker: Managed inference endpoints
- Serverless: Lambda functions for sporadic requests
- Edge: TensorFlow Lite for mobile/IoT devices
Streaming Predictions
Process events in real-time:
from kafka import KafkaConsumer, KafkaProducer
import json
consumer = KafkaConsumer(
'events',
bootstrap_servers=['kafka:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
producer = KafkaProducer(
bootstrap_servers=['kafka:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
for message in consumer:
event = message.value
# Feature engineering
features = extract_features(event)
# Predict
prediction = model.predict([features])[0]
# Enrich event
result = {
**event,
'prediction': float(prediction),
'model_version': MODEL_VERSION
}
# Publish result
producer.send('predictions', value=result)
Model Monitoring
Performance Tracking
Detect model degradation:
Classification Metrics:
from prometheus_client import Gauge, Counter
# Define metrics
accuracy_gauge = Gauge('model_accuracy', 'Current model accuracy')
precision_gauge = Gauge('model_precision', 'Current precision')
recall_gauge = Gauge('model_recall', 'Current recall')
prediction_counter = Counter('predictions_total', 'Total predictions made')
def update_metrics(y_true, y_pred):
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='weighted')
recall = recall_score(y_true, y_pred, average='weighted')
accuracy_gauge.set(accuracy)
precision_gauge.set(precision)
recall_gauge.set(recall)
Alerts:
# prometheus/alerts.yml
groups:
- name: model_performance
rules:
- alert: ModelAccuracyDegradation
expr: model_accuracy < 0.75
for: 1h
annotations:
summary: "Model accuracy below threshold"
description: "Model accuracy has dropped to {{ $value }}"
Data Drift Detection
Identify when input distributions change:
from evidently import ColumnMapping
from evidently.dashboard import Dashboard
from evidently.dashboard.tabs import DataDriftTab
# Compare reference and current data
dashboard = Dashboard(tabs=[DataDriftTab()])
dashboard.calculate(reference_data, current_data, column_mapping=ColumnMapping())
dashboard.save("drift_report.html")
# Programmatic drift detection
from evidently.model_profile import Profile
from evidently.model_profile.sections import DataDriftProfileSection
profile = Profile(sections=[DataDriftProfileSection()])
profile.calculate(reference_data, current_data)
drift_report = profile.json()
if drift_report['data_drift']['data_drift_detected']:
trigger_retraining_pipeline()
Drift Types:
- Covariate Drift: Input feature distributions change
- Concept Drift: Relationship between features and target changes
- Label Drift: Target variable distribution shifts
Feature Distribution Monitoring
Track feature statistics over time:
import great_expectations as ge
def monitor_feature_distribution(df):
expectations = {
'age': {
'expect_column_mean_to_be_between': {'min_value': 30, 'max_value': 50},
'expect_column_stdev_to_be_between': {'min_value': 10, 'max_value': 20}
},
'income': {
'expect_column_min_to_be_between': {'min_value': 0, 'max_value': 10000},
'expect_column_quantile_values_to_be_between': {
'quantile_ranges': {
'quantiles': [0.25, 0.5, 0.75],
'value_ranges': [[20000, 30000], [40000, 50000], [60000, 80000]]
}
}
}
}
# Validate expectations
ge_df = ge.from_pandas(df)
results = ge_df.validate(expectations)
if not results['success']:
alert_ops_team(results)
Continuous Training
Automated Retraining Pipeline
Trigger retraining based on schedule or metrics:
# airflow DAG
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
def check_retraining_needed():
# Check model performance
current_accuracy = get_current_model_accuracy()
# Check data drift
drift_detected = check_data_drift()
if current_accuracy < 0.80 or drift_detected:
return 'retrain_model'
return 'skip_retraining'
def retrain_and_evaluate():
# Fetch latest data
data = fetch_training_data()
# Train new model
new_model = train_model(data)
# Evaluate on holdout set
test_accuracy = evaluate_model(new_model)
if test_accuracy > current_production_accuracy:
deploy_new_model(new_model)
else:
alert_data_science_team()
with DAG('model_retraining', schedule_interval='@daily') as dag:
check = BranchPythonOperator(
task_id='check_retraining_needed',
python_callable=check_retraining_needed
)
retrain = PythonOperator(
task_id='retrain_model',
python_callable=retrain_and_evaluate
)
skip = PythonOperator(
task_id='skip_retraining',
python_callable=lambda: print("Retraining not needed")
)
check >> [retrain, skip]
Feature Store
Centralize feature engineering:
Feast Example:
from feast import FeatureStore
# Initialize feature store
store = FeatureStore(repo_path=".")
# Define feature service
from feast import FeatureService
feature_service = FeatureService(
name="churn_prediction_features",
features=[
feature_view_1[['feature_1', 'feature_2']],
feature_view_2[['feature_3']]
]
)
# Online inference
entity_rows = [{"customer_id": 123}]
features = store.get_online_features(
features=feature_service,
entity_rows=entity_rows
).to_dict()
# Batch training
training_df = store.get_historical_features(
entity_df=entity_df,
features=feature_service
).to_df()
Benefits:
- Consistent features across training and serving
- Reusable feature definitions
- Low-latency feature retrieval
- Time-travel for historical features
Infrastructure as Code
Terraform for ML Infrastructure
# sagemaker_endpoint.tf
resource "aws_sagemaker_model" "model" {
name = "customer-churn-model-${var.version}"
execution_role_arn = aws_iam_role.sagemaker.arn
primary_container {
image = "${var.account_id}.dkr.ecr.${var.region}.amazonaws.com/ml-models:latest"
model_data_url = "s3://${var.model_bucket}/models/churn/${var.version}/model.tar.gz"
}
}
resource "aws_sagemaker_endpoint_configuration" "config" {
name = "churn-model-config-${var.version}"
production_variants {
variant_name = "AllTraffic"
model_name = aws_sagemaker_model.model.name
initial_instance_count = var.instance_count
instance_type = "ml.t2.medium"
}
}
resource "aws_sagemaker_endpoint" "endpoint" {
name = "churn-prediction-endpoint"
endpoint_config_name = aws_sagemaker_endpoint_configuration.config.name
}
Best Practices
Model Development:
- Version control all code (Git)
- Reproducible environments (Docker, conda)
- Automated testing for model code
- Document assumptions and trade-offs
Deployment:
- Gradual rollouts (canary deployments)
- A/B testing for model comparison
- Automated rollback on degradation
- Blue-green deployment for zero downtime
Monitoring:
- Track prediction latency (p50, p95, p99)
- Monitor resource utilization
- Alert on anomalies
- Dashboard for stakeholder visibility
Governance:
- Model cards for documentation
- Bias and fairness testing
- Regulatory compliance (GDPR, etc.)
- Audit logs for predictions
Getting Started
Week 1: Experiment Tracking
- Set up MLflow or W&B
- Log first experiments
- Compare runs
Week 2: Model Registry
- Version models
- Tag production candidates
- Document models
Week 3: Deployment
- Deploy first model
- Create inference API
- Basic monitoring
Week 4: Automation
- Automate retraining
- Set up drift detection
- Create alerting
MLOps transforms ML from research to reliable business systems. Partner with experts to build production-grade ML infrastructure from day one.
Ready to Transform Your Business?
Let's discuss how our AI and technology solutions can drive revenue growth for your organization.