DEV Community

Cover image for Migrating Serverless Aurora Schema Using a Custom Resource
Samuel James for AWS Community Builders

Posted on • Edited on

Migrating Serverless Aurora Schema Using a Custom Resource

Backend applications have a lot of moving parts. Besides designing your domain models, you also have to think about what to expose and how to persist your data.

For applications that use relational datastores like MYSQL, PostgreSQL, migrating schema is crucial for a successful deployment.

I like to think of migrations as code that updates a database schema. In this post, supported by a Github project, you will learn one way to handle serverless aurora schema migration using a custom resource.

Custom Resources

Custom resources enable you to write custom provisioning logic in templates that AWS CloudFormation runs anytime you create, update (if you change the custom resource), or delete the stack.

We could write a lambda function with a custom resource that runs migrations after provisioning a database.

The next question that comes to mind is how to make the schema scripts available to lambda without actually including it in the deployment layer. This is where the AWS Lambda layer comes in.

The Lambda layer allows you to archive dependencies and uses those in your functions without including them in your deployment layer.

Without further ado, let's see how to do this with CDK.

Creating the Migration Handler

The migration_handler.py is a function that is executed during the creation of the custom resource. It uses DATA API to connect the database instance, read the migration files, and run them on the database.

import boto3
import os
import logging as log

from botocore import exceptions
import cfnresponse
import glob

log.getLogger().setLevel(log.INFO)


def main(event, context):
    SQL_PATH = "/opt"  # Layers are extracted to the /opt directory in the function execution environment.

    # This needs to change if there are to be multiple resources
    # in the same stack
    physical_id = "SchemaMigrationResource"

    # If this is a Delete event, do nothing. The schema will be destroyed along with the cluster.
    if event['RequestType'] == 'Delete':
        cfnresponse.send(event, context, cfnresponse.SUCCESS, {"Response": "Deleted successfully"}, physical_id)

    try:
        log.info("Input event: %s", event)

        sqlfiles = glob.glob(os.path.join(SQL_PATH, "*.sql"))

        log.info(sqlfiles)
        for file_path in sqlfiles:
            log.info(f"Found an SQL script in path:{file_path}")
            execute_sql_file(file_path)

        log.info("Ran migration successfully")

        attributes = {"Response": f"Ran migration successfully for these files:{sqlfiles}"}

        cfnresponse.send(event, context, cfnresponse.SUCCESS, attributes, physical_id)
    except Exception as e:
        log.exception(e)
        # cfnresponse's error message is always "see CloudWatch"
        cfnresponse.send(event, context, cfnresponse.FAILED, {}, physical_id)
        raise RuntimeError("Create failure requested")


def execute_statement(sql, sql_parameters=[], transaction_id=None):
    log.info(f"sql query:{sql}")
    client = boto3.client("rds-data")
    parameters = {
        "secretArn": os.getenv("DB_SECRET_ARN"),
        "database": os.getenv("DB_NAME"),
        "resourceArn": os.getenv("DB_CLUSTER_ARN"),
        "sql": sql,
        "parameters": sql_parameters,
    }
    if transaction_id is not None:
        parameters["transactionId"] = transaction_id
    try:
        response = client.execute_statement(**parameters)
        return response
    except client.exceptions.BadRequestException as e:
        log.exception(e)
        raise RuntimeError("Create failure requested")


def execute_sql_file(file_path: str):
    log.info(f"executing file in : {file_path}")
    with open(file_path, "r") as script:
        script_content = script.read()
        queries = script_content.split(";")
        for query in queries:
            sql = query.strip()
            if sql:
                execute_statement(query)
    log.info(f"executed the file : {file_path} successfully")

Enter fullscreen mode Exit fullscreen mode

Creating the Custom Resource

The migration.py contains the code for creating the lambda function and the lambda layer. We passed some environment variables (like database reference, secret and database name) needed by the lambda function to connect to the database.

from aws_cdk import (
    core,
    custom_resources as cr,
    aws_lambda as _lambda,
    aws_cloudformation as cfn,
    aws_iam as _iam,
)
from aws_cdk.aws_lambda import Runtime

SQL_SCRIPTS_PATH = "scripts/schema"


class SchemaMigrationResource(core.Construct):
    def __init__(
        self,
        scope: core.Construct,
        id: str,
        secret_arn: str,
        db_name: str,
        db_ref: str,
        **kwargs,
    ):
        super().__init__(
            scope,
            id,
            **kwargs,
        )

        with open("migration_handler.py", encoding="utf-8") as fp:
            code_body = fp.read()

            lambda_function = _lambda.SingletonFunction(
                self,
                "Singleton",
                uuid="f7d4f730-4ee1-11e8-9c2d-fa7ae01bbebc",
                code=_lambda.InlineCode(code_body),
                handler="index.main",
                timeout=core.Duration.seconds(300),
                runtime=_lambda.Runtime.PYTHON_3_7,
                layers=[
                    _lambda.LayerVersion(
                        scope,
                        id="migrationscripts",
                        code=_lambda.Code.from_asset(SQL_SCRIPTS_PATH),
                        description="Database migration scripts",
                    )
                ],
                environment={
                    "DB_NAME": db_name,
                    "DB_SECRET_ARN": secret_arn,
                    "DB_CLUSTER_ARN": db_ref,
                },
            )

        # Allow lambda to read database secret
        lambda_function.add_to_role_policy(
            _iam.PolicyStatement(
                resources=[secret_arn],
                actions=["secretsmanager:GetSecretValue"],
            )
        )
        # allow lambda to execute query on the database
        lambda_function.add_to_role_policy(
            _iam.PolicyStatement(
                resources=[db_ref],
                actions=[
                    "rds-data:ExecuteStatement",
                    "rds-data:BatchExecuteStatement",
                ],
            )
        )
        # assign policies to the Lambda function so it can output to CloudWatch Logs.
        lambda_function.add_to_role_policy(
            _iam.PolicyStatement(
                resources=["*"],
                actions=[
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents",
                ],
            )
        )

        resource = cfn.CustomResource(
            self,
            "Resource",
            provider=cfn.CustomResourceProvider.lambda_(lambda_function),
            properties=kwargs,
        )

        self.response = resource.get_att("Response").to_string()
Enter fullscreen mode Exit fullscreen mode

Creating Serverless Aurora Instance

With the custom resource defined, we can create the serverless aurora stack with this code:

# filename: migration.py
import os
from aws_cdk import (
    aws_ec2 as ec2,
    aws_rds as rds,
    core,
    aws_secretsmanager as sm,
)
from .migration import SchemaMigrationResource


class RDSStack(core.Stack):
    def __init__(self, scope: core.Construct, id: str, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)

        vpc = ec2.Vpc(self, "VPC")
        db_master_user_name = os.getenv("DB_USERNAME", "admin_user")

        self.secret = rds.DatabaseSecret(
            self, id="MasterUserSecret", username=db_master_user_name
        )

        rds.CfnDBSubnetGroup(
            self,
            "rdsSubnetGroup",
            db_subnet_group_description="private subnets for rds",
            subnet_ids=vpc.select_subnets(
                subnet_type=ec2.SubnetType.PRIVATE
            ).subnet_ids,
        )
        db_name = os.getenv("DB_NAME", "anonfed")
        self.db = rds.CfnDBCluster(
            self,
            "auroraCluster",
            engine="aurora-mysql",
            engine_version="5.7.mysql_aurora.2.08.1",
            db_cluster_parameter_group_name="default.aurora-mysql5.7",
            # snapshot_identifier="<snapshot_arn>",  # your snapshot
            engine_mode="serverless",
            scaling_configuration=rds.CfnDBCluster.ScalingConfigurationProperty(
                auto_pause=True,
                min_capacity=1,
                max_capacity=4,
                seconds_until_auto_pause=300,
            ),
            db_subnet_group_name=core.Fn.ref("rdsSubnetGroup"),
            database_name=db_name,
            master_username=self.secret.secret_value_from_json("username").to_string(),
            master_user_password=self.secret.secret_value_from_json(
                "password"
            ).to_string(),
            enable_http_endpoint=True,
        )

        secret_attached = sm.CfnSecretTargetAttachment(
            self,
            id="secret_attachment",
            secret_id=self.secret.secret_arn,
            target_id=self.db.ref,
            target_type="AWS::RDS::DBCluster",
        )
        secret_attached.node.add_dependency(self.db)
        db_ref = f"arn:aws:rds:{self.region}:{self.account}:cluster:{self.db.ref}"
        migration = SchemaMigrationResource(
            self, "schemamigration", self.secret.secret_arn, db_name, db_ref
        )

        # Publish the custom resource output
        core.CfnOutput(
            self,
            "ResponseMessage",
            description="Database Migration",
            value=migration.response,
        )

Enter fullscreen mode Exit fullscreen mode

Finally, we can hook everything together in app.py


env_EU = core.Environment(
     account=os.environ.get("CDK_DEPLOY_ACCOUNT", os.environ["CDK_DEFAULT_ACCOUNT"]),
     region=os.environ.get("CDK_DEPLOY_REGION", os.environ["CDK_DEFAULT_REGION"]),
    )

    app = core.App()
    db = RDSStack(scope=app, id="aurora", env=env_EU)
    ap_stack = Api(scope=app, id="api", env=env_EU, db=db)

    app.synth()

Enter fullscreen mode Exit fullscreen mode

Conclusion

In this post, we've seen how to migrate database schema using cloud formation custom resource. To build on this knowledge, I encourage you to take a look at the full project in this repository.

References

Top comments (0)