The Lambda Blog

Serverless Cloud Guide

Menu
  • Home
  • Categories
  • Projects
  • About Us
  • Privacy Policy
  • Disclaimer
  • Contact
Menu
serverless email system

Serverless Email System – Cloudformation stack using AWS SES, Lambda, API Gateway and Dynamodb

Posted on December 11, 2021August 14, 2022 by user
Navigation » Home » Projects

Amazon’s Simple Email Service (SES) can be used to build email systems for any need at massive scale, but that should not intimidate us from making use of it for simple and humble uses – like the contact form backend of this blog. Serverless design with a pay per use model is often far more cost effective that paying for a nominal subscription or virtual space.

While it is quite straightforward to quickly setup SES to send out a quick starter email, and it takes just a little bit more effort to build robust production ready serverless email system complete with delivery status notifications and security. This project will walk through the steps to get such a system up using cloudformation. The complete stack will look like the feature image above.

The stack makes use of Lambdas to send the emails via SES as well as process the email delivery notifications. All sends and responses are saved into Dynamodb for record keeping and tracking statuses. The Lambdas are fronted by an Http API Gateway with one route to send the emails and another route for SES to send delivery statuses. SES is setup with a Configset that is used when actually sending out the emails and is also configured to send status updates using a Firehose Delivery Stream. And lastly, but importantly, there is an authorizer Lambda to validate all incoming calls to the API Gateway endpoints.

Lets build all of this now.

Creating the API Gateway with a Lambda Authorizer

Lets start cloudformation stack with the API Gateway. It is the entry point to make requests and we will also configure SES to send us status notifications through this as well. Lets also go ahead and build the authorizer Lambda for incoming requests.

Note, this guide uses Python for the Lambdas but simply change the runtime in the cloudformation yaml to use your language of choice.

Create a folder for the project and a new template.yaml file. As a good practice, create a globals section for your Lambdas.

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Serverless Email

Globals:
  Function:
    Architectures:
      - arm64
    Runtime: python3.9
    Timeout: 300

Notice the Architectures section? There is no reason not to use AWS’ new Graviton 2 processors. Always override the default in the globals. I like to set a lower timeout threshold for my Lambdas than the 15 minute default, but what we are building shouln’t run for more than seconds at each invocation.

Next lets create the Authorizer and the IAM policy and role for the API Gateway to invoke the authorizer. I am also creating a parameter to hold the auth token value for the authorizer. Will go with the default value in the project but this can be overridden when it is being deployed with the “real” token.

Parameters:
  AuthToken:
    Type: String
    Default: AuthToken

Resources:

  Authorizer:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: ./authorizer/
      Handler: authorizer.auth_handler
      Description: Lambda Authorizer for API Gateway
      FunctionName: authorizer
      Environment:
        Variables:
          AUTH_TOKEN:
            Ref: AuthToken

  APIGatewayRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: APIGatewayRole
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Sid: ""
            Effect: Allow
            Principal:
              Service: apigateway.amazonaws.com
            Action: "sts:AssumeRole"

  APIGatewayPolicy:
    Type: AWS::IAM::Policy
    Properties:
      PolicyName: APIGatewayPolicy
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action:
              - "lambda:InvokeAsync"
              - "lambda:InvokeFunction"
            Resource:
              - !GetAtt Authorizer.Arn
      Roles:
        - !Ref APIGatewayRole


Lets code the authorizer – make sure you create the Lambda using the same values as you set them in the yaml for “CodeUri” and “Handler”.

import os

AUTH_TOKEN = os.environ["AUTH_TOKEN"]


def auth_handler(event, context):

    try:

        headers = event["headers"]

        if "x-amz-firehose-access-key" in headers:
            token = headers["x-amz-firehose-access-key"]
        elif "auth-token" in headers:
            token = headers["auth-token"]
        else:
            return {
                "isAuthorized": False
            }

        if token == AUTH_TOKEN:
            auth = True
        else:
            auth = False

        return {
            "isAuthorized": auth
        }
    except Exception as e:
        print(e)
        return {
            "isAuthorized": False
        }

Note the two headers in the Lambda code:

  • “x-amz-firehose-access-key” is the header the ses firehose delivery stream will use to populate the access token we are going to provide it.
  • “auth-token” – this is the token we will expect client side applications requesting an email send to use. You can change this to use what ever you want.

Finally lets create the API Gateway in the cloudformation yaml. Am using a Http API and there aren’t many good reasons to go with a Rest API in this particular case.

  APIGateway:
    Type: AWS::Serverless::HttpApi
    Properties:
      Description: API Gateway for email requests and SES status notifications
      Auth:
        DefaultAuthorizer: Authorizer
        Authorizers:
          Authorizer:
            FunctionArn: !GetAtt Authorizer.Arn
            FunctionInvokeRole: !GetAtt APIGatewayRole.Arn
            AuthorizerPayloadFormatVersion: 2.0
            EnableSimpleResponses: true

Designing the Dynamodb Table to store the emails

Dynamodb schema design takes up the most time when working on any serverless project. There needs to be a lot of consideration of the access pattern for the use case.

Depending on your system, it can be quite different from what we are going to use here, but am going to go with a few of simple access patterns.

  • Retrieve a single email by its unique key
  • Retrieve all emails by descending value of the time sent
  • Retrieve all emails by status – Sent, Failed, Delivered – by time sent

Based on this, lets create the following Dynamodb yaml.

EmailsTable:
    Type: AWS::DynamoDB::Table
    DeletionPolicy: Retain
    UpdateReplacePolicy: Retain
    Properties:
      TableName: emails
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: email_key
          AttributeType: S
        - AttributeName: email_timestamp
          AttributeType: S
        - AttributeName: email_status
          AttributeType: S
      KeySchema:
        - AttributeName: email_key
          KeyType: HASH
      GlobalSecondaryIndexes:
        - IndexName: emails_by_time
          KeySchema:
            - AttributeName: email_key
              KeyType: HASH
            - AttributeName: email_timestamp
              KeyType: RANGE
          Projection:
            ProjectionType: ALL
        - IndexName: emails_by_status
          KeySchema:
            - AttributeName: email_status
              KeyType: HASH
            - AttributeName: email_timestamp
              KeyType: RANGE
          Projection:
            ProjectionType: ALL

The above yaml should help us achieve the 3 access patterns we need. To retrieve a single email, we use the hash key directly. The Global Secondary (GSI) Index “emails_by_time” will allow us to get all the emails is descending order of time by scanning the table with the option “ScanIndexForward” set to false. This is because the Sort Key will be a timestamp and Dynamodb will default to storing the records in the order of the Sort Key. The 3rd GSI will allow us to get records by also filtering for a status.

Creating the Lambda to process delivery updates from SES

This is actually going to be the last step in our serverless email system’s flow, but it helps to have the Lambda ready to process the delivery notifications ahead of configuring SES. And looking at this code will help understand what the next step – creating the firehose delivery stream will do for us. So lets go ahead and do that now.

SESCallback:
    Type: AWS::Serverless::Function
    DependsOn:
      - EmailsTable
    Properties:
      CodeUri: ./ses-callback/
      Handler: ses-callback.ses_callback_handler
      Description: Handler for ses status notifications
      FunctionName: ses-callback
      Policies:
        - DynamoDBCrudPolicy:
            TableName: !Ref EmailsTable
      Environment:
        Variables:
          EMAILS_TABLE: !Ref EmailsTable
      Events:
        FirehoseEvent:
          Type: HttpApi
          Properties:
            ApiId: !Ref APIGateway
            Path: /ses-callback
            Method: post
            PayloadFormatVersion: "2.0"

Cloudformation will wire a route from our API Gateway endpoint to invoke the ses-callback Lambda. We have also given it permissions to our Dynamodb table.

Now lets code the Lambda to process the deliveries from Firehose.

Note: AWS has some documentation about the SES firehose event payload but can take some trial and error to work with different response objects. Hopefully this can be a reference to save time.

import json
import os
import time
import boto3
import base64
from datetime import datetime

dynamodb = boto3.resource("dynamodb")
emails_table = dynamodb.Table(os.environ["EMAILS_TABLE"])


def ses_callback_handler(event, context):
    
    request_id = event["headers"]["x-amz-firehose-request-id"]
    stream = json.loads(event["body"])
    records = stream["records"]

    for record in records:

        payload = json.loads(base64.b64decode(record["data"]))

        event_type = payload["eventType"]
        mail = payload["mail"]
        tags = mail["tags"]

        email_key = tags["email_key"][0]

        timestamp = datetime.now()
        
        if event_type == "Send":
            print(f" {email_key} en route to recipient.")
            email_status = "Sent"

        elif event_type == "Delivery":
            print(f"{email_key} delivered to recipient.")
            email_status = "Delivered"

        elif event_type == "Bounce":
            print(f"{email_key} bounced.")
            email_status = "Failed"
        else:
            email_status = "Unsupported"

        emails_table.update_item(
            Key={
                "email_key": email_key
            },
            UpdateExpression="set email_status = :p1, "
                             "update_time = :p2",
            ExpressionAttributeValues={
                ":p1": email_status,
                ":p2": timestamp.isoformat(),
            }
        )

    return {
        "statusCode": 200,
        "headers": {"content-type": "application/json"},
        "body": json.dumps(
            {
                "requestId": request_id,
                "timestamp": int(time.time() * 1000)
            }
        )
    }

Note this statement in the code…

email_key = tags["email_key"][0]

This is the unique id of the email that we will send to SES as a correlation id in the tags option when we send the email. We simply use that to update the status in our dynamodb table.

The return statement in the code above is what Firehose expects back. It will be unhappy if it doesn’t get get that back and will keep retrying to deliver the notification till it gives up eventually.

Creating the SES Firehose Delivery Stream

Seeing the Lambda above, we now know what is eventually going to be sent to us by SES. SES offers a few ways to deliver notifications. You could simply have it send notifications to Cloudwatch. But the advantage of configuring a firehose delivery stream is that we can have the payloads sent to a Lambda like the one above for more fine grained processing, like storing in our Dynamo Table.

Would be nice if AWS could allow Lambda as an endpoint for SES notifications directly – would save us the trouble of configuring a Firehose stream. But in the mean time…this is a good reference point to setup Firehouse streams with cloudformation.

Lots of configuration code, but the end result should be seamless and easily replicable anywhere. First we are going to create a Cloudwatch Log Group and an S3 bucket for Firehose to log data and dump failed payloads. This is only for failures delivering to our Lambda – which hopefully we should not need, but we are going to code for it in any case

SESFirehoseS3Bucket:
    Type: AWS::S3::Bucket
    DeletionPolicy: Delete
    UpdateReplacePolicy: Delete

SESFirehoseLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub "/aws/kinesisfirehose/ses-notifications"
      RetentionInDays: 1

SESFirehoseLogStream:
    Type: AWS::Logs::LogStream
    Properties:
      LogGroupName: !Ref SESFirehoseLogGroup
      LogStreamName: "SESDeliveryStream"

SESFirehoseS3LogStream:
    Type: AWS::Logs::LogStream
    Properties:
      LogGroupName: !Ref SESFirehoseLogGroup
      LogStreamName: "SESS3Stream"

Next lets create the IAM role and policy for the Firehose stream to drop failed payloads to the S3 bucket.

SESFirehoseRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: SESFirehoseRole
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Sid: ""
            Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: "sts:AssumeRole"
            Condition:
              StringEquals:
                "sts:ExternalId": !Ref "AWS::AccountId"

SESFirehosePolicy:
    Type: AWS::IAM::Policy
    Properties:
      PolicyName: SESFirehosePolicy
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action:
              - "s3:AbortMultipartUpload"
              - "s3:GetBucketLocation"
              - "s3:GetObject"
              - "s3:ListBucket"
              - "s3:ListBucketMultipartUploads"
              - "s3:PutObject"
            Resource:
              - !Sub 'arn:aws:s3:::${SESFirehoseS3Bucket}'
              - !Sub 'arn:aws:s3:::${SESFirehoseS3Bucket}*'
      Roles:
        - !Ref SESFirehoseRole

And finally, lets create the Firehose Delivery Stream which we will configure out SES configset to use next. We will pass in the authtoken we defined earlier in the parameters and define the endpoint to the route the call back Lambda is expecting.

SESFirehoseDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: ses-delivery-stream
      DeliveryStreamType: DirectPut
      HttpEndpointDestinationConfiguration:
        RoleARN: !GetAtt SESFirehoseRole.Arn
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Ref SESFirehoseLogGroup
          LogStreamName: !Ref SESFirehoseLogStream
        S3BackupMode: FailedDataOnly
        S3Configuration:
          BucketARN: !Sub 'arn:aws:s3:::${SESFirehoseS3Bucket}'
          CloudWatchLoggingOptions:
            Enabled: true
            LogGroupName: !Ref SESFirehoseLogGroup
            LogStreamName: !Ref SESFirehoseS3LogStream
          CompressionFormat: GZIP
          RoleARN: !GetAtt SESFirehoseRole.Arn
        RetryOptions:
          DurationInSeconds: 30
        BufferingHints:
          IntervalInSeconds: 60
          SizeInMBs: 1
        EndpointConfiguration:
          AccessKey:
            Ref: AuthToken
          Name: ses-callback-api
          Url:
            Fn::Sub: 'https://${APIGateway}.execute-api.${AWS::Region}.${AWS::URLSuffix}/ses-callback'

Creating the SES ConfigSet

We are finally ready to create the SES ConfigSet which we will use to send out emails. Creating the ConfigSet only involves giving it an appropriate name, but we do need to do a couple of more things.

  • Create a cloudformation ConfigSet destination resource to point to the Firehose delivery stream to send us “Send”, “Delivery”, Bounce notifications.
  • Allow the SES ConfigSet destination to put records to our Firehose delivery stream via an IAM policy/role
SESRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: SESRole
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Sid: ""
            Effect: Allow
            Principal:
              Service: ses.amazonaws.com
            Action: "sts:AssumeRole"

SESPolicy:
    Type: AWS::IAM::Policy
    Properties:
      PolicyName: SESPolicy
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action:
              - "firehose:PutRecord"
              - "firehose:PutRecordBatch"
            Resource:
              - !GetAtt SESFirehoseDeliveryStream.Arn
      Roles:
        - !Ref SESRole

SESConfigSet:
    Type: 'AWS::SES::ConfigurationSet'
    Properties:
      Name: ses-configset

SESConfigSetDestination:
    Type: 'AWS::SES::ConfigurationSetEventDestination'
    DependsOn:
      - SESRole
      - SESPolicy
      - SESConfigSet
    Properties:
      ConfigurationSetName: !Ref SESConfigSet
      EventDestination:
        Name: ses-firehose-delivery-stream
        Enabled: true
        MatchingEventTypes:
          - "send"
          - "delivery"
          - "bounce"
        KinesisFirehoseDestination:
          DeliveryStreamARN: !GetAtt SESFirehoseDeliveryStream.Arn
          IAMRoleARN: !GetAtt SESRole.Arn

Creating the Mailer Lambda

This is the last step to complete the serverless email system. A Lambda that will process incoming requests with the email details and send it onwards via SES.

Note: You will need an email address that Amazon has verified in order to use as a “from” address. If you own a domain that you have verified in AWS, then you can set any from address for that domain.

Note Also: To send an emails to any destination, your AWS SES account needs to be moved out of sandbox mode to production mode. Otherwise you are restricted to both verified tos and froms.

Mailer:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: ./mailer/
      Handler: mailer.mail_handler
      Description: Send mails via ses
      FunctionName: mailer
      Policies:
        - Statement:
            - Effect: Allow
              Action:
                - "ses:SendEmail"
                - "ses:SendTemplatedEmail"
                - "ses:SendRawEmail"
                - "ses:SendBulkTemplatedEmail"
              Resource: "*"
        - DynamoDBCrudPolicy:
            TableName: !Ref EmailsTable
      Environment:
        Variables:
          EMAILS_TABLE: !Ref EmailsTable
          SES_CONFIG_SET: ses-configset
      Events:
        SendEmail:
          Type: HttpApi
          Properties:
            ApiId: !Ref APIGateway
            Path: /send-email
            Method: post
            PayloadFormatVersion: "2.0"

import os
import uuid
import boto3
import json
from datetime import datetime

dynamodb = boto3.resource("dynamodb")
client = boto3.client('ses')
SES_CONFIG_SET = os.environ["SES_CONFIG_SET"]
emails_table = dynamodb.Table(os.environ["EMAILS_TABLE"])


def mail_handler(event, context):

    try:

        email_request = json.loads(event["body"])

        to_email = email_request["to_email"]
        to_subject = email_request["to_subject"]
        to_message = email_request["to_message"]

        body_html = f"""<html>
        <head></head>
        <body>
          <p>{to_message}</p> 
        </body>
        </html>
                    """

        email_message = {
            'Body': {
                'Html': {
                    'Charset': 'utf-8',
                    'Data': body_html,
                },
            },
            'Subject': {
                'Charset': 'utf-8',
                'Data': to_subject,
            },
        }

        email_key = str(uuid.uuid4())
        email_timestamp = datetime.now().isoformat()

        email_record = {
            "email_key": email_key,
            "email_timestamp": email_timestamp,
            "email_status": "Sending",
            "to_email": to_email,
            "to_subject": to_subject,
            "to_message": to_message,
            "update_time": email_timestamp
        }

        emails_table.put_item(Item=email_record)

        client.send_email(
            Destination={
                'ToAddresses': [to_email],
            },
            Message=email_message,
            Source="verifiedemail@example.com",
            Tags=[
                {"Name": "email_key", "Value": email_key},
            ],
            ConfigurationSetName=SES_CONFIG_SET,
        )

        return {
            "statusCode": 200,
            "headers": {
                'Access-Control-Allow-Headers': "*",
                'Access-Control-Allow-Origin': "*",
                'Access-Control-Allow-Methods': "*"
            },
            "body": json.dumps(
                {
                    "message": "Email sent successfully."
                }
            )
        }

    except Exception as e:
        print(e)
        return {
            "statusCode": 500,
            "headers": {
                'Access-Control-Allow-Headers': "*",
                'Access-Control-Allow-Origin': "*",
                'Access-Control-Allow-Methods': "*"
            },
            "body": json.dumps(
                {
                    "message": "Could not send email."
                }
            )
        }

Deploying and Testing

Not covering the deployment setup in this post, but the easiest way to get this deployed would be using AWS SAM.

If the sam build went well and the initial changeset went through – you should see this conformation below.

Waiting for changeset to be created..

CloudFormation stack changeset
-------------------------------------------------------------------------------------------------
Operation                LogicalResourceId        ResourceType             Replacement
-------------------------------------------------------------------------------------------------
+ Add                    APIGatewayApiGatewayDe   AWS::ApiGatewayV2::Sta   N/A
                         faultStage               ge
+ Add                    APIGatewayPolicy         AWS::IAM::Policy         N/A
+ Add                    APIGatewayRole           AWS::IAM::Role           N/A
+ Add                    APIGateway               AWS::ApiGatewayV2::Api   N/A
+ Add                    AuthorizerRole           AWS::IAM::Role           N/A
+ Add                    Authorizer               AWS::Lambda::Function    N/A
+ Add                    EmailsTable              AWS::DynamoDB::Table     N/A
+ Add                    MailerRole               AWS::IAM::Role           N/A
+ Add                    MailerSendEmailPermiss   AWS::Lambda::Permissio   N/A
                         ion                      n
+ Add                    Mailer                   AWS::Lambda::Function    N/A
+ Add                    SESCallbackFirehoseEve   AWS::Lambda::Permissio   N/A
                         ntPermission             n
+ Add                    SESCallbackRole          AWS::IAM::Role           N/A
+ Add                    SESCallback              AWS::Lambda::Function    N/A
+ Add                    SESConfigSetDestinatio   AWS::SES::Configuratio   N/A
                         n                        nSetEventDestination
+ Add                    SESConfigSet             AWS::SES::Configuratio   N/A
                                                  nSet
+ Add                    SESFirehoseDeliveryStr   AWS::KinesisFirehose::   N/A
                         eam                      DeliveryStream
+ Add                    SESFirehoseLogGroup      AWS::Logs::LogGroup      N/A
+ Add                    SESFirehoseLogStream     AWS::Logs::LogStream     N/A
+ Add                    SESFirehosePolicy        AWS::IAM::Policy         N/A
+ Add                    SESFirehoseRole          AWS::IAM::Role           N/A
+ Add                    SESFirehoseS3Bucket      AWS::S3::Bucket          N/A
+ Add                    SESFirehoseS3LogStream   AWS::Logs::LogStream     N/A
+ Add                    SESPolicy                AWS::IAM::Policy         N/A
+ Add                    SESRole                  AWS::IAM::Role           N/A
-------------------------------------------------------------------------------------------------

Let it through and wait for the stack to get created.

To test everything works, first grab the API gateway endpoint that was created.

https://{id}.execute-api.{region}.amazonaws.com/send-email

And submit a postman request with the auth-token in the header and a body that looks like this:

{
    "to_email": "email@example.com",
    "to_subject": "Serverless Test Email!",
    "to_message": "Serverless Test Email via SES!"
}

If all went well, you should be able to see the results in the dynamodb table.

Concluding

And that’s a wrap. This should serve as a good base for a similar system that you might need. One thing to consider when doing so would be the nature of the dynamodb schema for your specific use case. There are also more SES states that can be configured for and even SMTP status codes in the notification payloads that you may want to capture. And lastly if you are planning on replicating your data to another region, consider using Dynamodb Global Tables defined in a separate stack to make that process easier.

Hope you found this useful and feel free to reach out via the contact form for any questions.

Recent Posts

  • Coding a JSON format logger in Python for use as a Lambda Layer package
  • Configuring an S3 Bucket to send events to a Lambda destination for processing
  • How to request a public SSL certificate for a domain name from the AWS Certificate Manager Console
  • Creating automated CloudFormation Stack Build and Deployments with AWS CodePipeline and CodeBuild
  • A concise guide to setting up the AWS command-line libraries on your local development environment
  • How to implement a Lambda Authorizer for an AWS AppSync API and invoke the API with the required Authorization Token
  • Filtering CloudWatch Logs by LogGroups and LogStreams and reading them using Python and the Boto3 SDK
  • Azure AD Multi Tenancy issue in AWS Cognito
  • Setting up Enterprise Federation from Azure Active Directory to Amazon Cognito using Open ID Connect
  • How to Setup IAM Multifactor Authentication (MFA) for the AWS CLI

Categories

  • Amplify
  • API Gateway
  • AppSync
  • AWS CLI
  • CloudFormation
  • CloudWatch
  • Cognito
  • DynamoDB
  • EventBridge
  • KMS
  • Lambda
  • Projects
  • Route 53
  • SES
  • SNS

Post Tags

ACM Amplify API Gateway AppSync AWS CLI Azure Boto3 CloudFormation CloudWatch CodeBuild CodePipeline Cognito DynamoDB EventBridge Firebase IAM KMS Lambda OIDC Project Python Rekognition Route53 S3 SAM SES SNS VPC

©2022 The Lambda Blog