DEV Community

Cover image for Create AWS Lambda Function Triggered By S3 Notification Event
๐Ÿš€ Vu Dao ๐Ÿš€
๐Ÿš€ Vu Dao ๐Ÿš€

Posted on

Create AWS Lambda Function Triggered By S3 Notification Event

  • The Amazon S3 notification feature is able to receive notifications when certain events happen in the bucket.
  • To enable notifications, it must first adds a notification configuration that identifies the events which want Amazon S3 to publish and the destinations where to send the notifications here is lambda function
  • Overview of notifications
  • This post describe how to use AWS chalice to create this.

Alt Text

1. Create aws chalice new-project cdn-invalidation

โšก $ chalice new-project cdn-invalidation
โšก $ ls cdn-invalidation
app.py  requirements.txt
Enter fullscreen mode Exit fullscreen mode

2. Define which region to create lambda function instead of the default in local aws configuration

โšก $ export AWS_DEFAULT_REGION=us-east-1
Enter fullscreen mode Exit fullscreen mode

3. Create lamdba function handler

  • The handler listen to s3:ObjectCreated:Put event so any changes in s3://mybucket/static/src will trigger the lambda function app.py Alt Text
from chalice import Chalice
import boto3
import time


app_name = 'cdn-invalidation'
app = Chalice(app_name=app_name)
app.debug = True


class InvalidateCDN:
    """ Invalidate CDN """
    def __init__(self):
        self.distribution_id = 'A1AA1AA11A11AA'
        self.client = boto3.client('cloudfront')

    def create_invalidation(self, file_change):
        try:
            res = self.client.create_invalidation(
                DistributionId=self.distribution_id,
                InvalidationBatch={
                    'Paths': {
                        'Quantity': 1,
                        'Items': ["/{}".format(file_change)]
                    },
                    'CallerReference': str(time.time()).replace(".", "")
                }
            )
            invalidation_id = res['Invalidation']['Id']
            return invalidation_id
        except Exception as err:
            print(f"Failed to create invalidation, error {err}")
            exit(1)

    def get_invalidation_status(self, inval_id):
        try:
            res = self.client.get_invalidation(
                DistributionId=self.distribution_id,
                Id=inval_id
            )
            return res['Invalidation']['Status']
        except Exception as err:
            print(f"Failed to get invalidation status ID {inval_id}, error {err}")
            exit(1)

    def run(self, key):
        print(f"Deploying CDN file: {key}")
        the_id = self.create_invalidation(key)
        count = 0
        while True:
            status = self.get_invalidation_status(the_id)
            if status == 'Completed':
                print(f"Completed, id: {the_id}")
                break
            elif count < 10:
                count += 1
                time.sleep(30)
            else:
                print("Timeout, please check CDN")
                break


@app.on_s3_event(bucket='mybucket',
                 prefix='static/src/',
                 events=['s3:ObjectCreated:Put'])
def handle_s3_event(event):
    cdn = InvalidateCDN()
    cdn.run(event.key)
Enter fullscreen mode Exit fullscreen mode

4. Updaterequirements.txt to include boto3 in lambda fuction

โšก $ cat requirements.txt
boto3
Enter fullscreen mode Exit fullscreen mode

5. Deploy

โšก $ chalice deploy
Enter fullscreen mode Exit fullscreen mode

Alt Text

Alt Text

Ref: https://github.com/vumdao/cicd-invalidation-cdn/tree/master/cdn-invalidation

Top comments (7)

Collapse
 
samzoozi profile image
Ali Zonoozi

Is there a way to do this using chalice with CDK integration? when I try deploying using cdk deploy command I get the following error:

Unable to package chalice apps that @app.on_s3_event decorator. CloudFormation does not support modifying the event notifications of existing buckets. You can deploy this app using chalice deploy.

Collapse
 
vumdao profile image
๐Ÿš€ Vu Dao ๐Ÿš€

Unfortunately I migrated all my chalice projects to fully CDK :)

Collapse
 
baotran2207 profile image
baotran2207 • Edited

With current version , we can not use on_s3_event with cdk deploy. The only option i think is s3 event -> sqs/sns -> chalice.on_sqs_message . There is one cons is that we need to filter the prefix/suffix by ourself in on_sqs_message handler .

I assume you follow the cdk deploy tutorial

This is example chalice stack infrastructure/stacks/chaliceapp.py

from aws_cdk import (
    aws_s3,
    aws_sns, ## you can notify sns
    aws_sqs, ## you can notify sqs
    aws_s3_notifications as aws_s3_noti,
)
from chalice.cdk import Chalice
RUNTIME_SOURCE_DIR = os.path.join(
    os.path.dirname(os.path.dirname(__file__)), os.pardir, "runtime"
)

class ChaliceApp(cdk.Stack):
     def __init__(self, scope, id, **kwargs):
         self.sqs_generic = aws_sqs.Queue(self, "mysqs")
         self.bucket = s3.Bucket(self, "MyBucket")
         self.chalice = Chalice(
             self,
            "BaoTranBackend",
            source_dir=RUNTIME_SOURCE_DIR,
            stage_config={
                "lambda_memory_size": 256,
                "environment_variables": {
                    "S3_MAIN_BUCKET": self.bucket.bucket_name,
                    "SQS_GENERIC" : self.sqs_generic.queue_name,
                }
            },
        )
         self.chalice_role = self.chalice.get_role("DefaultRole")
         self.bucket.grant_read_write(self.chalice_role)
          self.bucket.add_event_notification(aws_s3.EventType.OBJECT_CREATED_PUT, aws_s3_noti.SqsDestination(self.sqs_generic))
Enter fullscreen mode Exit fullscreen mode

Then in your runtime/app.py , you can receive the sqs message :

from chalice import Chalice
app = Chalice(app_name='chalice-backend')
@sqs_events.on_sqs_message(
    queue=os.environ.get("SQS_GENERIC", ""),
    batch_size=1)
def handle_sqs_message(event):
    print('Trigger generic')
    print("dict ", event.to_dict())

    for record in event:
        print(record, 'in event')
        logger.info(f" in even ! Detail {record} ")
    ## with the bucket and the key , you can open boto3 session and implement logic here 

app.register_blueprint(sqs_events) ## remember to register events
Enter fullscreen mode Exit fullscreen mode

And after deployment, upload a file to s3 then in cloudwatch , there would be something like this

Image description

Collapse
 
sillians profile image
Basil Ihuoma

I need help... I have this scrapper that gets data from a website, please how do I use lambda function with EventBridge to trigger the scrapper to get data on a daily basis.

Collapse
 
vumdao profile image
๐Ÿš€ Vu Dao ๐Ÿš€

Use EventBridge with cron schedule to trigger lambda function

Collapse
 
sillians profile image
Basil Ihuoma

thanks

Collapse
 
routinggames profile image
Duy Nguy3n

thanks for sharing