Loading...

How to Build a Serverless Data Pipeline with AWS Lambda and Kinesis

Building a serverless data pipeline allows you to easily process real-time streaming data without managing any infrastructure. In this guide, we will create a simple serverless data pipeline using AWS Lambda and Kinesis Data Streams.

Serverless Data Pipeline Architecture

Core Components: Kinesis & Lambda

This pattern leverages two key AWS serverless services:

  • AWS Kinesis Data Streams: A scalable and durable real-time data streaming service. It acts as the ingestion point and buffer for your streaming data. Data is ingested into shards, which provide ordered processing within the shard.
  • AWS Lambda: A serverless compute service that lets you run code without provisioning or managing servers. Lambda functions can be automatically triggered by events from various AWS services, including Kinesis Data Streams.

Step 1: Setting Up an AWS Kinesis Data Stream

First, create a Kinesis Data Stream to receive your incoming data. You define the stream's capacity by specifying the number of shards. Each shard provides a certain level of read and write throughput (e.g., 1 MB/sec write, 2 MB/sec read).

You can create a stream via the AWS Management Console or the AWS CLI:

# Create a Kinesis stream named 'MyStream' with 1 shard
aws kinesis create-stream --stream-name MyStream --shard-count 1

Start with one shard for low volume, and increase the shard count later if needed to scale throughput.

Step 2: Create the Lambda Function for Processing

Next, create an AWS Lambda function that will contain your data processing logic. This function will be triggered automatically when new records arrive in the Kinesis stream.

Choose your preferred runtime (e.g., Node.js, Python, Java). The Lambda function handler will receive an event object containing a batch of records from the Kinesis stream. Your code needs to iterate through these records, decode the data (Kinesis records are Base64 encoded), and perform the desired processing (e.g., transformation, filtering, enrichment, writing to another service like S3, DynamoDB, or another Kinesis stream).

Example structure for a Python Lambda handler:

import base64
import json

def lambda_handler(event, context):
    print(f"Received {len(event['Records'])} records.")
    for record in event['Records']:
        # Kinesis data is base64 encoded
        payload_bytes = base64.b64decode(record['kinesis']['data'])
        payload_str = payload_bytes.decode('utf-8')

        print(f"Partition Key: {record['kinesis']['partitionKey']}")
        print(f"Sequence Number: {record['kinesis']['sequenceNumber']}")
        print(f"Decoded payload: {payload_str}")

        try:
            # Example: Assuming JSON data
            data = json.loads(payload_str)
            # --- Your Processing Logic Here ---
            # e.g., transform data, filter records, write to S3/DynamoDB
            print(f"Processing data: {data}")
            # ---------------------------------

        except json.JSONDecodeError:
            print(f"Payload is not valid JSON: {payload_str}")
        except Exception as e:
            print(f"Error processing record: {e}")
            # Consider error handling strategy (e.g., send to DLQ)
            # Re-raising the exception might cause Lambda to retry the batch

    print("Successfully processed records.")
    # Return value isn't typically used for Kinesis triggers
    return {'status': 'success'}

Step 3: Grant Permissions to Lambda

The Lambda function needs permission to read from the Kinesis stream and write logs to CloudWatch Logs. Create an IAM execution role for your Lambda function. Attach the AWS managed policy AWSLambdaKinesisExecutionRole to this role. This policy grants the necessary permissions:

  • kinesis:DescribeStream
  • kinesis:DescribeStreamSummary
  • kinesis:GetRecords
  • kinesis:GetShardIterator
  • kinesis:ListShards
  • kinesis:ListStreams
  • kinesis:SubscribeToShard
  • logs:CreateLogGroup
  • logs:CreateLogStream
  • logs:PutLogEvents

If your function needs to write to other services (like S3 or DynamoDB), add permissions for those actions to the IAM role as well.

Step 4: Configure the Kinesis Event Source Mapping

Connect the Kinesis stream to the Lambda function using an event source mapping. This tells AWS to invoke your Lambda function when new records are available in the stream.

Configure this in the Lambda console ("Add trigger" section) or via the AWS CLI:

# Replace placeholders with your actual function name, stream ARN, and desired batch size/position
aws lambda create-event-source-mapping \
    --function-name MyLambdaFunction \
    --event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/MyStream \
    --batch-size 100 \
    --starting-position LATEST \
    # Or TRIM_HORIZON to process from the beginning

# Optional parameters:
# --maximum-batching-window-in-seconds 10 # Wait up to 10s to fill batch
# --parallelization-factor 1 # Number of concurrent Lambda invocations per shard (default 1)
# --on-failure DestinationConfig={OnFailure={Destination=arn:aws:sqs:us-east-1:123456789012:my-dlq}} # Send failed batches to DLQ

Key parameters:

  • --batch-size: Max number of records sent to Lambda per invocation.
  • --starting-position: Where to start reading (LATEST or TRIM_HORIZON).
  • --maximum-batching-window-in-seconds: Max time to wait to gather records before invoking Lambda.
  • --on-failure: Configure a Dead-Letter Queue (DLQ) like SQS or SNS to send records from failed batches for later analysis.

Step 5: Testing the Pipeline

Send test data to your Kinesis stream to verify the pipeline is working. Use the AWS SDK or the CLI:

# Send a simple text record
aws kinesis put-record \
    --stream-name MyStream \
    --data "Hello Serverless Pipeline!" \
    --partition-key "user123"

# Send JSON data (ensure it's properly escaped for CLI or use SDK)
aws kinesis put-record \
    --stream-name MyStream \
    --data '{"sensorId": "A7", "temperature": 25.5, "timestamp": "2025-04-24T17:40:00Z"}' \
    --partition-key "sensorA7"

Monitor the Lambda function's execution logs in AWS CloudWatch Logs. You should see log entries indicating records were received and processed. Check any downstream services (like S3 or DynamoDB) if your function writes data there.

Conclusion

You have successfully built a basic serverless data pipeline using AWS Kinesis Data Streams and AWS Lambda. This architecture provides a highly scalable, cost-effective (pay-per-invocation/GB-second), and operationally simple way to process real-time streaming data. You can extend this pattern by adding more processing steps (chaining Lambda functions via SNS/SQS or Step Functions), writing to various data stores, or increasing Kinesis shard counts and Lambda concurrency to handle higher volumes.