The Lambda Blog

Serverless Cloud Guide

Menu
  • Home
  • Categories
  • Projects
  • About Us
  • Privacy Policy
  • Disclaimer
  • Contact
Menu
setup and process dynamodb streams with lambda triggers in python

Getting DynamoDB Data Changes with Streams and processing them with Lambdas using Python

Posted on February 11, 2022August 14, 2022 by user
Navigation » Home » DynamoDB

Amazon Web Services’ DynamoDB is a high performance NoSQL database that is an excellent option for applications of practically any kind or use case to work with. It is also a completely managed serverless solution which makes it even better. Simply focus on your functional implementations and allow AWS to worry about the provisioning and management of the infrastructure.

Aside from serving as a great standard NoSQL database with all the associated operations that come with it – puts, retrieves, highly performant key based retrieval and the design paradigms that go with working with DynamoDB as a solution, DynamoDB also has an extremely powerful feature that you can utilize to enhance your own application offerings in a variety of use cases – DynamoDB Streams.

Seamless Data Change Notifications with DynamoDB Streams

DynamoDB Streams are are feature of DynamoDB Tables that once enabled provides a means to get a full and complete set of notifications of any data change that occurs in the DynamoDB table. A DynamoDB stream is essentially another endpoint that is created which can then be used to retrieve data change information as needed.

All the expected change types are captured in real time – new data as well the previous state – and you have the option to retrieve both the old and new states. It is possible to also retrieve a historical record of the sequence of changes though currently there is time limit of 24 hours of change data capture being maintained at the end point itself. Of course should your use case need for capturing every change going far longer than that, you can process the data yourself and save it in some other record – perhaps in some time stream database.

Real Time Stream Processing with Lambdas

And this is exactly where Lambda Triggers for DynamoDB streams complement the feature perfectly. Not just for use cases where you want to simply capture the change as it happened – but for all other more prosaic use cases related to acting on specific data change events. For example, sending out a notification when a state changes, or triggering other applications or services when a new record gets inserted. All of this can be setup with a DynamoDB stream Lambda Trigger to get real time updates and processed within the Lambda.

In this post we will demonstrate how a DynamoDB stream can be setup explaining some of the supported options like the batching size and such and also how to process the captured stream data with a Lambda using the Python language.

We will also cover how to complete the setup both from the console manually as well with the SDK and my preferred choice – a cloudformation template.

DynamoDB Stream Options

When enabling a stream for a particular DynamoDB table there are 4 options to choose from that affects the kind of change data that is captured in the stream.

1. Key attributes only

2. New image

3. Old image

4. New and old images

Going with both New and Old Images as you might have guessed will give you the entire record set before the change and the record as it looks like after. New and Old image options provide the record as described by their respective names while the key attributes provide just the table keys – this is useful for situations where you don’t need to capture the entire record set but need the keys to retrieve as needed later and this has cost saving implications depending on your application use case.

Incidentally, when DynamoDB does replication with its Global Tables feature, the stream is enabled with New and old – basically DynamoDB itself uses streams to manage replication.

You can enable your stream as per your functional use case but the most full featured option is to have both old and new enabled.

Enabling DynamoDB stream via the console

From the console, navigate to a table you want to enable streams for and switch to the streams and exports tab. From there simply select enable and choose the option you need.

dynamodb-streams-enable

dynamodb-streams-options

Enabling DynamoDB streams via the SDK

For an existing table, the same can be accomplished via the update-table API.

The options keys are NEW_AND_OLD_IMAGES, KEYS_ONLY, OLD_IMAGE, NEW_IMAGE

aws dynamodb update-table --table-name your-table-name \
    --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES

Enabling DynamoDB Streams via Cloudformation

For cloudformation – update an existing DynamoDB Table definition with the Stream details as shown in this example.

SampleDynamoDBTable:
    Type: AWS::DynamoDB::GlobalTable
    DeletionPolicy: Retain
    UpdateReplacePolicy: Retain
    Properties:
      TableName: sample-table
      BillingMode: PAY_PER_REQUEST
      StreamSpecification:
        StreamViewType: NEW_AND_OLD_IMAGES
      Replicas:
        - Region: us-east-1
      AttributeDefinitions:
        - AttributeName: primary_key
          AttributeType: S
      KeySchema:
        - AttributeName: primary_key
          KeyType: HASH

Enabling a Lambda trigger via the console

For the console, once enabled choose the add trigger function to link it to a Lambda.

Now the DynamoDB stream change data capture for the table will invoke the Lambda in real time.

dynamodb-streams-trigger


Enabling a Lambda trigger via Cloudformation

I often state this is preferable to managing Lambdas and other AWS resources manually because you can then use Cloudformation stacks to replicate your application seamlessly in other regions or environments.

In Cloudformation, it is an simple as adding a new event to your Lambda Function. For more details on this, please see my articles on deploying templates and working with cloudformation and lambdas.

StreamProcessorLambda:
    Type: AWS::Serverless::Function
    Properties:
      Runtime: python3.9
      CodeUri: ./stream-processor/
      Handler: stream-processor.lambda_handler
      FunctionName: stream-processor
      Events:
        DynamoDBStreamEvent:
          Type: DynamoDB
          Properties:
            Stream:
              !GetAtt SampleDynamoDBTable.StreamArn
            StartingPosition: LATEST
            BatchSize: 10

The starting position is usually the latest so you get updates as they happen, but there is the option known as TRIM_HORIZON which windows the data being sent depending on how frequently you want this triggered.

Several options are available to manage the frequency and nature of the triggering. The full list of options can be seen in the reference.

Processing the DynamoDB stream with Python

And now for the exciting part – how to work with this in Lambda. While of course this example is in Python as indicated – the basic structure of processing a stream applies to the language of your choice.

The stream types that are returned are: INSERT, REMOVE & MODIFY which should let you take appropriate action depending on your functional use cases.

The other quirk with streams is the DynamoDB type is part of the object and this sometimes trips me up. When you work with regular DynamoDB queries and such, you don’t really specify the data type to access the record – but with streams the event dictionary has the type explicitly set for each record like in the example below where to retrieve the item the String “S” type is required.

record["dynamodb"]["Keys"]["primary_key"]["S"]

Also note – the sample table record is based on the cloudformation template table from above which has a single “primary_key” as its hash key.



def lambda_handler(event, context):

    for record in event["Records"]:

        primary_key = record["dynamodb"]["Keys"]["primary_key"]["S"]

        if record['eventName'] == 'REMOVE':
            old_record = record['dynamodb']['OldImage']
            print(f"{old_record} was deleted")
		
        if record['eventName'] == 'INSERT':
	        new_record = record['dynamodb']['NewImage']	
            print(f"{new_record} was added")

        if record['eventName'] == 'MODIFY':
	        new_record = record['dynamodb']['NewImage']
        	old_record = record['dynamodb']['OldImage']
		
        	print(f"Record Modified OLD:{old_record} and NEW:{new_record})


In case you require your Lambda to process streams from multiple tables – simply create additional triggers – events in cloudformation template. You can identify the specific DynamoDB stream that invoked your Lambda using the eventSource attribute in the event record of the stream.

Hopefully this was a good enough base for you to get started with utilizing the power of DynamoDB streams for your applications.

Recent Posts

  • Coding a JSON format logger in Python for use as a Lambda Layer package
  • Configuring an S3 Bucket to send events to a Lambda destination for processing
  • How to request a public SSL certificate for a domain name from the AWS Certificate Manager Console
  • Creating automated CloudFormation Stack Build and Deployments with AWS CodePipeline and CodeBuild
  • A concise guide to setting up the AWS command-line libraries on your local development environment
  • How to implement a Lambda Authorizer for an AWS AppSync API and invoke the API with the required Authorization Token
  • Filtering CloudWatch Logs by LogGroups and LogStreams and reading them using Python and the Boto3 SDK
  • Azure AD Multi Tenancy issue in AWS Cognito
  • Setting up Enterprise Federation from Azure Active Directory to Amazon Cognito using Open ID Connect
  • How to Setup IAM Multifactor Authentication (MFA) for the AWS CLI

Categories

  • Amplify
  • API Gateway
  • AppSync
  • AWS CLI
  • CloudFormation
  • CloudWatch
  • Cognito
  • DynamoDB
  • EventBridge
  • KMS
  • Lambda
  • Projects
  • Route 53
  • SES
  • SNS

Post Tags

ACM Amplify API Gateway AppSync AWS CLI Azure Boto3 CloudFormation CloudWatch CodeBuild CodePipeline Cognito DynamoDB EventBridge Firebase IAM KMS Lambda OIDC Project Python Rekognition Route53 S3 SAM SES SNS VPC

©2022 The Lambda Blog