Core Components: Kinesis & Lambda
This pattern leverages two key AWS serverless services:
- AWS Kinesis Data Streams: A scalable and durable real-time data streaming service. It acts as the ingestion point and buffer for your streaming data. Data is ingested into shards, which provide ordered processing within the shard.
- AWS Lambda: A serverless compute service that lets you run code without provisioning or managing servers. Lambda functions can be automatically triggered by events from various AWS services, including Kinesis Data Streams.
Step 1: Setting Up an AWS Kinesis Data Stream
First, create a Kinesis Data Stream to receive your incoming data. You define the stream's capacity by specifying the number of shards. Each shard provides a certain level of read and write throughput (e.g., 1 MB/sec write, 2 MB/sec read).
You can create a stream via the AWS Management Console or the AWS CLI:
# Create a Kinesis stream named 'MyStream' with 1 shard aws kinesis create-stream --stream-name MyStream --shard-count 1
Start with one shard for low volume, and increase the shard count later if needed to scale throughput.
Step 2: Create the Lambda Function for Processing
Next, create an AWS Lambda function that will contain your data processing logic. This function will be triggered automatically when new records arrive in the Kinesis stream.
Choose your preferred runtime (e.g., Node.js, Python, Java). The Lambda function handler will receive an event object containing a batch of records from the Kinesis stream. Your code needs to iterate through these records, decode the data (Kinesis records are Base64 encoded), and perform the desired processing (e.g., transformation, filtering, enrichment, writing to another service like S3, DynamoDB, or another Kinesis stream).
Example structure for a Python Lambda handler:
import base64 import json def lambda_handler(event, context): print(f"Received {len(event['Records'])} records.") for record in event['Records']: # Kinesis data is base64 encoded payload_bytes = base64.b64decode(record['kinesis']['data']) payload_str = payload_bytes.decode('utf-8') print(f"Partition Key: {record['kinesis']['partitionKey']}") print(f"Sequence Number: {record['kinesis']['sequenceNumber']}") print(f"Decoded payload: {payload_str}") try: # Example: Assuming JSON data data = json.loads(payload_str) # --- Your Processing Logic Here --- # e.g., transform data, filter records, write to S3/DynamoDB print(f"Processing data: {data}") # --------------------------------- except json.JSONDecodeError: print(f"Payload is not valid JSON: {payload_str}") except Exception as e: print(f"Error processing record: {e}") # Consider error handling strategy (e.g., send to DLQ) # Re-raising the exception might cause Lambda to retry the batch print("Successfully processed records.") # Return value isn't typically used for Kinesis triggers return {'status': 'success'}
Step 3: Grant Permissions to Lambda
The Lambda function needs permission to read from the Kinesis stream and write logs to CloudWatch Logs. Create an IAM execution role for your Lambda function. Attach the AWS managed policy AWSLambdaKinesisExecutionRole
to this role. This policy grants the necessary permissions:
kinesis:DescribeStream
kinesis:DescribeStreamSummary
kinesis:GetRecords
kinesis:GetShardIterator
kinesis:ListShards
kinesis:ListStreams
kinesis:SubscribeToShard
logs:CreateLogGroup
logs:CreateLogStream
logs:PutLogEvents
If your function needs to write to other services (like S3 or DynamoDB), add permissions for those actions to the IAM role as well.
Step 4: Configure the Kinesis Event Source Mapping
Connect the Kinesis stream to the Lambda function using an event source mapping. This tells AWS to invoke your Lambda function when new records are available in the stream.
Configure this in the Lambda console ("Add trigger" section) or via the AWS CLI:
# Replace placeholders with your actual function name, stream ARN, and desired batch size/position aws lambda create-event-source-mapping \ --function-name MyLambdaFunction \ --event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/MyStream \ --batch-size 100 \ --starting-position LATEST \ # Or TRIM_HORIZON to process from the beginning # Optional parameters: # --maximum-batching-window-in-seconds 10 # Wait up to 10s to fill batch # --parallelization-factor 1 # Number of concurrent Lambda invocations per shard (default 1) # --on-failure DestinationConfig={OnFailure={Destination=arn:aws:sqs:us-east-1:123456789012:my-dlq}} # Send failed batches to DLQ
Key parameters:
--batch-size
: Max number of records sent to Lambda per invocation.--starting-position
: Where to start reading (LATEST
orTRIM_HORIZON
).--maximum-batching-window-in-seconds
: Max time to wait to gather records before invoking Lambda.--on-failure
: Configure a Dead-Letter Queue (DLQ) like SQS or SNS to send records from failed batches for later analysis.
Step 5: Testing the Pipeline
Send test data to your Kinesis stream to verify the pipeline is working. Use the AWS SDK or the CLI:
# Send a simple text record aws kinesis put-record \ --stream-name MyStream \ --data "Hello Serverless Pipeline!" \ --partition-key "user123" # Send JSON data (ensure it's properly escaped for CLI or use SDK) aws kinesis put-record \ --stream-name MyStream \ --data '{"sensorId": "A7", "temperature": 25.5, "timestamp": "2025-04-24T17:40:00Z"}' \ --partition-key "sensorA7"
Monitor the Lambda function's execution logs in AWS CloudWatch Logs. You should see log entries indicating records were received and processed. Check any downstream services (like S3 or DynamoDB) if your function writes data there.
Conclusion
You have successfully built a basic serverless data pipeline using AWS Kinesis Data Streams and AWS Lambda. This architecture provides a highly scalable, cost-effective (pay-per-invocation/GB-second), and operationally simple way to process real-time streaming data. You can extend this pattern by adding more processing steps (chaining Lambda functions via SNS/SQS or Step Functions), writing to various data stores, or increasing Kinesis shard counts and Lambda concurrency to handle higher volumes.