Skip to main content
MLOps workflow diagram showing model training, deployment, and monitoring
AI & Machine Learning

MLOps: Deploying Machine Learning Models to Production

Cesar Adames

Master MLOps best practices for deploying, monitoring, and maintaining ML models in production. From model training to automated retraining pipelines and drift detection.

#mlops #machine-learning #devops #model-deployment #ai-operations

MLOps: Deploying Machine Learning Models to Production

87% of ML models never make it to production. MLOps bridges the gap between data science experiments and reliable production systems, bringing DevOps practices to machine learning workflows.

MLOps Lifecycle

Model Development

Reproducible training pipelines:

# training_pipeline.py
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import pandas as pd

def train_model(data_path, experiment_name):
    # Set experiment
    mlflow.set_experiment(experiment_name)

    with mlflow.start_run():
        # Load and split data
        df = pd.read_csv(data_path)
        X = df.drop('target', axis=1)
        y = df['target']
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )

        # Log parameters
        params = {
            'n_estimators': 100,
            'max_depth': 10,
            'min_samples_split': 5
        }
        mlflow.log_params(params)

        # Train model
        model = RandomForestClassifier(**params)
        model.fit(X_train, y_train)

        # Evaluate
        train_score = model.score(X_train, y_train)
        test_score = model.score(X_test, y_test)

        # Log metrics
        mlflow.log_metrics({
            'train_accuracy': train_score,
            'test_accuracy': test_score
        })

        # Log model
        mlflow.sklearn.log_model(
            model,
            "model",
            registered_model_name="customer_churn_predictor"
        )

        return model

Experiment Tracking

Centralized logging of experiments:

MLflow:

  • Track parameters, metrics, and artifacts
  • Compare experiment runs
  • Model registry for version control
  • Deployment to various targets

Weights & Biases:

  • Advanced visualization and dashboards
  • Hyperparameter optimization
  • Model versioning
  • Team collaboration features

Model Registry

Version control for ML models:

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Transition model to staging
client.transition_model_version_stage(
    name="customer_churn_predictor",
    version=3,
    stage="Staging"
)

# Add description
client.update_model_version(
    name="customer_churn_predictor",
    version=3,
    description="Random Forest with feature engineering v2, 85% test accuracy"
)

# Promote to production
client.transition_model_version_stage(
    name="customer_churn_predictor",
    version=3,
    stage="Production"
)

Deployment Patterns

Batch Prediction

Process large datasets offline:

import mlflow.pyfunc

# Load production model
model = mlflow.pyfunc.load_model(
    model_uri="models:/customer_churn_predictor/Production"
)

# Batch prediction
predictions = model.predict(df_batch)

# Write results
df_batch['churn_prediction'] = predictions
df_batch['prediction_timestamp'] = datetime.now()
df_batch.to_sql('churn_predictions', con=engine, if_exists='append')

Use Cases:

  • Daily customer scoring
  • Fraud detection batch analysis
  • Recommendation system updates

Real-Time Inference

Low-latency predictions via API:

from fastapi import FastAPI
from pydantic import BaseModel
import mlflow.pyfunc

app = FastAPI()

# Load model at startup
model = mlflow.pyfunc.load_model(
    model_uri="models:/customer_churn_predictor/Production"
)

class PredictionInput(BaseModel):
    feature1: float
    feature2: int
    feature3: str

@app.post("/predict")
async def predict(input_data: PredictionInput):
    # Prepare features
    features = prepare_features(input_data)

    # Predict
    prediction = model.predict(features)

    # Log prediction for monitoring
    log_prediction(input_data, prediction)

    return {
        "prediction": float(prediction[0]),
        "model_version": get_model_version(),
        "timestamp": datetime.now().isoformat()
    }

Deployment Options:

  • Kubernetes: Scalable, orchestrated deployments
  • AWS SageMaker: Managed inference endpoints
  • Serverless: Lambda functions for sporadic requests
  • Edge: TensorFlow Lite for mobile/IoT devices

Streaming Predictions

Process events in real-time:

from kafka import KafkaConsumer, KafkaProducer
import json

consumer = KafkaConsumer(
    'events',
    bootstrap_servers=['kafka:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

for message in consumer:
    event = message.value

    # Feature engineering
    features = extract_features(event)

    # Predict
    prediction = model.predict([features])[0]

    # Enrich event
    result = {
        **event,
        'prediction': float(prediction),
        'model_version': MODEL_VERSION
    }

    # Publish result
    producer.send('predictions', value=result)

Model Monitoring

Performance Tracking

Detect model degradation:

Classification Metrics:

from prometheus_client import Gauge, Counter

# Define metrics
accuracy_gauge = Gauge('model_accuracy', 'Current model accuracy')
precision_gauge = Gauge('model_precision', 'Current precision')
recall_gauge = Gauge('model_recall', 'Current recall')
prediction_counter = Counter('predictions_total', 'Total predictions made')

def update_metrics(y_true, y_pred):
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, average='weighted')
    recall = recall_score(y_true, y_pred, average='weighted')

    accuracy_gauge.set(accuracy)
    precision_gauge.set(precision)
    recall_gauge.set(recall)

Alerts:

# prometheus/alerts.yml
groups:
  - name: model_performance
    rules:
      - alert: ModelAccuracyDegradation
        expr: model_accuracy < 0.75
        for: 1h
        annotations:
          summary: "Model accuracy below threshold"
          description: "Model accuracy has dropped to {{ $value }}"

Data Drift Detection

Identify when input distributions change:

from evidently import ColumnMapping
from evidently.dashboard import Dashboard
from evidently.dashboard.tabs import DataDriftTab

# Compare reference and current data
dashboard = Dashboard(tabs=[DataDriftTab()])
dashboard.calculate(reference_data, current_data, column_mapping=ColumnMapping())
dashboard.save("drift_report.html")

# Programmatic drift detection
from evidently.model_profile import Profile
from evidently.model_profile.sections import DataDriftProfileSection

profile = Profile(sections=[DataDriftProfileSection()])
profile.calculate(reference_data, current_data)
drift_report = profile.json()

if drift_report['data_drift']['data_drift_detected']:
    trigger_retraining_pipeline()

Drift Types:

  • Covariate Drift: Input feature distributions change
  • Concept Drift: Relationship between features and target changes
  • Label Drift: Target variable distribution shifts

Feature Distribution Monitoring

Track feature statistics over time:

import great_expectations as ge

def monitor_feature_distribution(df):
    expectations = {
        'age': {
            'expect_column_mean_to_be_between': {'min_value': 30, 'max_value': 50},
            'expect_column_stdev_to_be_between': {'min_value': 10, 'max_value': 20}
        },
        'income': {
            'expect_column_min_to_be_between': {'min_value': 0, 'max_value': 10000},
            'expect_column_quantile_values_to_be_between': {
                'quantile_ranges': {
                    'quantiles': [0.25, 0.5, 0.75],
                    'value_ranges': [[20000, 30000], [40000, 50000], [60000, 80000]]
                }
            }
        }
    }

    # Validate expectations
    ge_df = ge.from_pandas(df)
    results = ge_df.validate(expectations)

    if not results['success']:
        alert_ops_team(results)

Continuous Training

Automated Retraining Pipeline

Trigger retraining based on schedule or metrics:

# airflow DAG
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator

def check_retraining_needed():
    # Check model performance
    current_accuracy = get_current_model_accuracy()

    # Check data drift
    drift_detected = check_data_drift()

    if current_accuracy < 0.80 or drift_detected:
        return 'retrain_model'
    return 'skip_retraining'

def retrain_and_evaluate():
    # Fetch latest data
    data = fetch_training_data()

    # Train new model
    new_model = train_model(data)

    # Evaluate on holdout set
    test_accuracy = evaluate_model(new_model)

    if test_accuracy > current_production_accuracy:
        deploy_new_model(new_model)
    else:
        alert_data_science_team()

with DAG('model_retraining', schedule_interval='@daily') as dag:
    check = BranchPythonOperator(
        task_id='check_retraining_needed',
        python_callable=check_retraining_needed
    )

    retrain = PythonOperator(
        task_id='retrain_model',
        python_callable=retrain_and_evaluate
    )

    skip = PythonOperator(
        task_id='skip_retraining',
        python_callable=lambda: print("Retraining not needed")
    )

    check >> [retrain, skip]

Feature Store

Centralize feature engineering:

Feast Example:

from feast import FeatureStore

# Initialize feature store
store = FeatureStore(repo_path=".")

# Define feature service
from feast import FeatureService

feature_service = FeatureService(
    name="churn_prediction_features",
    features=[
        feature_view_1[['feature_1', 'feature_2']],
        feature_view_2[['feature_3']]
    ]
)

# Online inference
entity_rows = [{"customer_id": 123}]
features = store.get_online_features(
    features=feature_service,
    entity_rows=entity_rows
).to_dict()

# Batch training
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=feature_service
).to_df()

Benefits:

  • Consistent features across training and serving
  • Reusable feature definitions
  • Low-latency feature retrieval
  • Time-travel for historical features

Infrastructure as Code

Terraform for ML Infrastructure

# sagemaker_endpoint.tf
resource "aws_sagemaker_model" "model" {
  name               = "customer-churn-model-${var.version}"
  execution_role_arn = aws_iam_role.sagemaker.arn

  primary_container {
    image          = "${var.account_id}.dkr.ecr.${var.region}.amazonaws.com/ml-models:latest"
    model_data_url = "s3://${var.model_bucket}/models/churn/${var.version}/model.tar.gz"
  }
}

resource "aws_sagemaker_endpoint_configuration" "config" {
  name = "churn-model-config-${var.version}"

  production_variants {
    variant_name           = "AllTraffic"
    model_name            = aws_sagemaker_model.model.name
    initial_instance_count = var.instance_count
    instance_type         = "ml.t2.medium"
  }
}

resource "aws_sagemaker_endpoint" "endpoint" {
  name                 = "churn-prediction-endpoint"
  endpoint_config_name = aws_sagemaker_endpoint_configuration.config.name
}

Best Practices

Model Development:

  • Version control all code (Git)
  • Reproducible environments (Docker, conda)
  • Automated testing for model code
  • Document assumptions and trade-offs

Deployment:

  • Gradual rollouts (canary deployments)
  • A/B testing for model comparison
  • Automated rollback on degradation
  • Blue-green deployment for zero downtime

Monitoring:

  • Track prediction latency (p50, p95, p99)
  • Monitor resource utilization
  • Alert on anomalies
  • Dashboard for stakeholder visibility

Governance:

  • Model cards for documentation
  • Bias and fairness testing
  • Regulatory compliance (GDPR, etc.)
  • Audit logs for predictions

Getting Started

Week 1: Experiment Tracking

  • Set up MLflow or W&B
  • Log first experiments
  • Compare runs

Week 2: Model Registry

  • Version models
  • Tag production candidates
  • Document models

Week 3: Deployment

  • Deploy first model
  • Create inference API
  • Basic monitoring

Week 4: Automation

  • Automate retraining
  • Set up drift detection
  • Create alerting

MLOps transforms ML from research to reliable business systems. Partner with experts to build production-grade ML infrastructure from day one.

Ready to Transform Your Business?

Let's discuss how our AI and technology solutions can drive revenue growth for your organization.