Skip to main content
AWS Lambda serverless data processing architecture diagram
Cloud Computing

AWS Lambda for Data Processing: Serverless ETL Patterns

Cesar Adames
•

Build scalable, cost-effective data processing pipelines using AWS Lambda with event-driven architectures, automated workflows, and serverless patterns.

#aws-lambda #data-processing #etl #serverless #event-driven

Serverless Data Processing Advantages

AWS Lambda revolutionizes data processing by eliminating server management, automatically scaling from zero to thousands of concurrent executions, and charging only for compute time used. For data pipelines with variable workloads—ETL jobs, real-time transformations, event processing—Lambda offers compelling economics and operational simplicity.

Organizations processing millions of events monthly achieve 60-80% cost savings vs traditional server-based architectures while improving reliability and deployment velocity.

Core Lambda Data Processing Patterns

Pattern 1: S3-Triggered ETL

import json
import boto3
import pandas as pd
from io import StringIO

s3 = boto3.client('s3')

def lambda_handler(event, context):
    """
    Process CSV file uploaded to S3
    """
    # Get bucket and key from S3 event
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']

    # Read file from S3
    response = s3.get_object(Bucket=bucket, Key=key)
    csv_content = response['Body'].read().decode('utf-8')

    # Process with pandas
    df = pd.read_csv(StringIO(csv_content))

    # Data transformations
    df = transform_data(df)

    # Write results to destination S3 bucket
    output_csv = df.to_csv(index=False)
    s3.put_object(
        Bucket='processed-data-bucket',
        Key=f"processed/{key}",
        Body=output_csv
    )

    return {
        'statusCode': 200,
        'body': json.dumps(f"Processed {len(df)} records")
    }

def transform_data(df):
    """Example transformations"""
    # Clean data
    df = df.dropna()

    # Type conversions
    df['amount'] = df['amount'].astype(float)
    df['date'] = pd.to_datetime(df['date'])

    # Calculated fields
    df['month'] = df['date'].dt.month
    df['year'] = df['date'].dt.year

    # Aggregations
    df = df.groupby(['customer_id', 'year', 'month']).agg({
        'amount': 'sum',
        'transaction_id': 'count'
    }).reset_index()

    return df

Pattern 2: Streaming Data Processing (Kinesis)

import base64
import json

def lambda_handler(event, context):
    """
    Process streaming records from Kinesis
    """
    processed_records = []

    for record in event['Records']:
        # Decode Kinesis data
        payload = base64.b64decode(record['kinesis']['data'])
        data = json.loads(payload)

        # Process event
        processed = process_event(data)

        # Optionally: write to database, S3, or another stream
        if processed:
            write_to_dynamodb(processed)
            processed_records.append(processed)

    return {
        'statusCode': 200,
        'processedCount': len(processed_records)
    }

def process_event(event_data):
    """
    Transform streaming event
    """
    # Enrich with external data
    enriched = enrich_event(event_data)

    # Filter
    if enriched['amount'] < 10:
        return None  # Skip low-value events

    # Aggregate
    aggregated = {
        'customer_id': enriched['customer_id'],
        'total_amount': enriched['amount'],
        'event_count': 1,
        'timestamp': enriched['timestamp']
    }

    return aggregated

Pattern 3: Scheduled Batch Processing

import boto3
from datetime import datetime, timedelta

s3 = boto3.client('s3')
athena = boto3.client('athena')

def lambda_handler(event, context):
    """
    Daily aggregation job triggered by EventBridge
    """
    # Define date range
    end_date = datetime.now()
    start_date = end_date - timedelta(days=1)

    # Run Athena query
    query = f"""
        SELECT customer_id,
               DATE(timestamp) as date,
               SUM(amount) as total_amount,
               COUNT(*) as transaction_count
        FROM transactions
        WHERE DATE(timestamp) = DATE('{start_date.strftime('%Y-%m-%d')}')
        GROUP BY customer_id, DATE(timestamp)
    """

    response = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={'Database': 'analytics'},
        ResultConfiguration={'OutputLocation': 's3://query-results/'}
    )

    query_execution_id = response['QueryExecutionId']

    # Wait for query completion (or use Step Functions for long queries)
    wait_for_query(query_execution_id)

    # Process results
    results = get_query_results(query_execution_id)

    # Write to destination
    write_to_redshift(results)

    return {
        'statusCode': 200,
        'message': f'Processed data for {start_date.strftime("%Y-%m-%d")}'
    }

Advanced Lambda Patterns

Parallel Processing with Fan-Out

import boto3

lambda_client = boto3.client('lambda')

def lambda_handler(event, context):
    """
    Coordinator Lambda that fans out to worker Lambdas
    """
    # Get list of files to process
    files = list_s3_files('data-bucket', prefix='incoming/')

    # Invoke worker Lambda for each file (parallel processing)
    for file_key in files:
        lambda_client.invoke(
            FunctionName='file-processor-worker',
            InvocationType='Event',  # Async invocation
            Payload=json.dumps({
                'bucket': 'data-bucket',
                'key': file_key
            })
        )

    return {
        'statusCode': 200,
        'filesQueued': len(files)
    }

# Worker Lambda
def worker_handler(event, context):
    """
    Process single file
    """
    bucket = event['bucket']
    key = event['key']

    # Process file
    result = process_file(bucket, key)

    # Mark as complete
    mark_complete(key)

    return result

Error Handling & Dead Letter Queues

import boto3

sqs = boto3.client('sqs')

def lambda_handler(event, context):
    """
    Robust error handling with retries
    """
    try:
        # Attempt processing
        result = process_data(event)

        return {
            'statusCode': 200,
            'body': json.dumps(result)
        }

    except RetryableError as e:
        # Temporary failure - Lambda will retry automatically
        print(f"Retryable error: {e}")
        raise

    except FatalError as e:
        # Permanent failure - send to DLQ for manual review
        send_to_dlq(event, str(e))

        return {
            'statusCode': 500,
            'error': str(e)
        }

def send_to_dlq(event, error_message):
    """Send failed event to Dead Letter Queue"""
    sqs.send_message(
        QueueUrl='https://sqs.us-east-1.amazonaws.com/123456789/processing-dlq',
        MessageBody=json.dumps({
            'originalEvent': event,
            'error': error_message,
            'timestamp': datetime.now().isoformat()
        })
    )

Optimization Strategies

Memory and Performance Tuning

# Lambda pricing: memory Ă— duration
# More memory = more CPU, faster execution, potentially lower cost

# Example: processing 1000 records

# 128 MB memory: 30 seconds execution = $0.00005
# 1024 MB memory: 4 seconds execution = $0.00007

# Sweet spot: usually 512-1024 MB for data processing

Optimization Techniques:

import concurrent.futures

def optimized_batch_processing(records):
    """
    Process records in parallel using ThreadPoolExecutor
    """
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(process_record, record) for record in records]
        results = [f.result() for f in concurrent.futures.as_completed(futures)]

    return results

Connection Pooling for Databases

import pymysql

# Create connection outside handler (reused across invocations)
connection = None

def get_db_connection():
    global connection

    if connection is None or not connection.open:
        connection = pymysql.connect(
            host='database.endpoint.rds.amazonaws.com',
            user='admin',
            password=os.environ['DB_PASSWORD'],
            database='analytics'
        )

    return connection

def lambda_handler(event, context):
    """
    Reuse database connection across invocations
    """
    conn = get_db_connection()
    cursor = conn.cursor()

    # Execute query
    cursor.execute("SELECT * FROM customers WHERE id = %s", (event['customer_id'],))
    result = cursor.fetchall()

    return result

Efficient S3 Operations

import boto3

s3 = boto3.client('s3')

def batch_s3_operations(keys):
    """
    Process multiple S3 files efficiently
    """
    # Use threading for I/O-bound operations
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        futures = {
            executor.submit(s3.get_object, Bucket='my-bucket', Key=key): key
            for key in keys
        }

        results = {}
        for future in concurrent.futures.as_completed(futures):
            key = futures[future]
            try:
                data = future.result()
                results[key] = process_s3_object(data)
            except Exception as e:
                print(f"Error processing {key}: {e}")

    return results

Cost Optimization

Request Pricing Analysis

def calculate_lambda_cost(invocations, avg_duration_ms, memory_mb):
    """
    Calculate monthly Lambda cost
    """
    # Pricing (us-east-1, as of 2025)
    price_per_request = 0.0000002  # $0.20 per 1M requests
    price_per_gb_second = 0.0000166667  # $0.0000166667 per GB-second

    # Request costs
    request_cost = invocations * price_per_request

    # Compute costs
    gb_seconds = (memory_mb / 1024) * (avg_duration_ms / 1000) * invocations
    compute_cost = gb_seconds * price_per_gb_second

    # Free tier: 400,000 GB-seconds, 1M requests
    free_tier_compute = 400000
    free_tier_requests = 1000000

    billable_compute = max(0, gb_seconds - free_tier_compute)
    billable_requests = max(0, invocations - free_tier_requests)

    total_cost = (
        billable_requests * price_per_request +
        billable_compute * price_per_gb_second
    )

    return {
        'total_cost': total_cost,
        'request_cost': billable_requests * price_per_request,
        'compute_cost': billable_compute * price_per_gb_second,
        'invocations': invocations,
        'gb_seconds': gb_seconds
    }

# Example
monthly_cost = calculate_lambda_cost(
    invocations=10_000_000,  # 10M invocations
    avg_duration_ms=500,     # 500ms average
    memory_mb=512            # 512 MB memory
)
# Result: ~$15-20/month

Cost Reduction Strategies

  1. Right-size memory allocation
  2. Minimize cold starts with provisioned concurrency (if needed)
  3. Use S3 Batch Operations for large-scale file processing
  4. Compress data before storing in S3
  5. Implement intelligent batching

Monitoring and Debugging

CloudWatch Logging

import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    """
    Structured logging for observability
    """
    logger.info('Processing started', extra={
        'event_id': event.get('id'),
        'record_count': len(event.get('records', []))
    })

    try:
        result = process_event(event)

        logger.info('Processing completed', extra={
            'event_id': event.get('id'),
            'processed_count': result['count'],
            'duration_ms': context.get_remaining_time_in_millis()
        })

        return result

    except Exception as e:
        logger.error('Processing failed', extra={
            'event_id': event.get('id'),
            'error': str(e),
            'error_type': type(e).__name__
        }, exc_info=True)
        raise

X-Ray Tracing

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('process_data')
def lambda_handler(event, context):
    """
    Distributed tracing with X-Ray
    """
    # Subsegment for database operation
    with xray_recorder.capture('database_query'):
        data = query_database(event['query'])

    # Subsegment for transformation
    with xray_recorder.capture('transform'):
        transformed = transform_data(data)

    # Subsegment for writing results
    with xray_recorder.capture('write_s3'):
        write_to_s3(transformed)

    return {'success': True}

Production Best Practices

  1. Use Lambda Layers for shared dependencies
  2. Implement idempotency for exactly-once processing
  3. Set appropriate timeouts (max 15 minutes)
  4. Use environment variables for configuration
  5. Implement circuit breakers for external dependencies
  6. Version functions and use aliases for deployments
  7. Monitor cold start frequency and optimize
  8. Implement proper IAM least-privilege policies

Conclusion

AWS Lambda transforms data processing from infrastructure management to pure business logic. For variable workloads, event-driven architectures, and cost-conscious organizations, serverless data processing with Lambda delivers scalability, reliability, and operational efficiency at a fraction of traditional costs.

The key is understanding Lambda’s constraints (15-minute timeout, memory limits), designing appropriate event-driven architectures, and optimizing for cost and performance through proper sizing and batching strategies.

Next Steps:

  1. Identify data processing workload suitable for Lambda
  2. Build proof-of-concept with S3-triggered processing
  3. Implement error handling and monitoring
  4. Optimize memory allocation and execution time
  5. Scale to production with proper governance and cost tracking

Ready to Transform Your Business?

Let's discuss how our AI and technology solutions can drive revenue growth for your organization.