DEV Community

Cover image for How to Automate the Encryption of an Amazon RDS Instance with Python
Dmitry Romanoff
Dmitry Romanoff

Posted on

How to Automate the Encryption of an Amazon RDS Instance with Python

Amazon RDS (Relational Database Service) is a powerful and scalable database service offered by AWS, but sometimes, for compliance or security reasons, you need to encrypt an existing unencrypted database instance. In this article, we'll walk through a Python script that automates the process of migrating an unencrypted Amazon RDS instance to an encrypted one.

Why Encrypt an RDS Instance?

Encryption of RDS instances ensures that data at rest is secure and meets various compliance requirements, such as PCI DSS, HIPAA, and more. Encryption ensures that backups, snapshots, and the underlying storage of your RDS database are encrypted automatically.

However, you cannot enable encryption on an existing unencrypted RDS instance directly. Instead, you must create a snapshot, copy that snapshot with encryption enabled, and then restore a new RDS instance from the encrypted snapshot.

This is what we'll automate in this tutorial.

Prerequisites

To follow along with this guide, you'll need:

  • AWS Account: Access to an AWS account with permissions to manage RDS and KMS (Key Management Service).
  • Python 3.x: Installed and configured on your local machine.
  • Boto3: The AWS SDK for Python, which you can install using pip:
  pip install boto3
Enter fullscreen mode Exit fullscreen mode

You'll also need the following AWS credentials:

  1. AWS_ACCESS_KEY_ID
  2. AWS_SECRET_ACCESS_KEY
  3. AWS_DEFAULT_REGION

The Encryption Migration Process

This Python script automates the following steps:

  1. Create a Snapshot: Take a snapshot of the existing unencrypted RDS instance.
  2. Copy the Snapshot with Encryption: Create an encrypted copy of the snapshot using AWS KMS (Key Management Service).
  3. Restore the Database: Create a new RDS instance from the encrypted snapshot.

Python Script to Automate the Migration

import boto3
import time
from botocore.exceptions import WaiterError

class RDSEncryptionMigrator:
    def __init__(self, source_db_identifier, target_db_identifier, kms_key_alias, region='us-east-1'):
        self.source_db_identifier = source_db_identifier
        self.target_db_identifier = target_db_identifier
        self.kms_key_alias = kms_key_alias if kms_key_alias.startswith('alias/') else f'alias/{kms_key_alias}'
        self.region = region

        self.rds_client = boto3.client('rds', region_name=region)
        self.kms_client = boto3.client('kms', region_name=region)

    def get_kms_key_id(self):
        """Get the KMS key ID from the alias"""
        try:
            response = self.kms_client.describe_key(
                KeyId=self.kms_key_alias
            )
            return response['KeyMetadata']['Arn']
        except Exception as e:
            print(f"Error getting KMS key ID from alias: {e}")
            raise

    def create_snapshot(self, snapshot_identifier):
        print(f"Creating snapshot of source database: {self.source_db_identifier}")
        response = self.rds_client.create_db_snapshot(
            DBSnapshotIdentifier=snapshot_identifier,
            DBInstanceIdentifier=self.source_db_identifier
        )

        # Wait for snapshot to be available
        waiter = self.rds_client.get_waiter('db_snapshot_available')
        try:
            waiter.wait(
                DBSnapshotIdentifier=snapshot_identifier,
                WaiterConfig={'Delay': 30, 'MaxAttempts': 60}
            )
        except WaiterError as e:
            print(f"Error waiting for snapshot: {e}")
            raise

        return response['DBSnapshot']['DBSnapshotArn']

    def create_encrypted_snapshot_copy(self, source_snapshot_id, encrypted_snapshot_id):
        print("Creating encrypted copy of snapshot")
        kms_key_id = self.get_kms_key_id()
        response = self.rds_client.copy_db_snapshot(
            SourceDBSnapshotIdentifier=source_snapshot_id,
            TargetDBSnapshotIdentifier=encrypted_snapshot_id,
            KmsKeyId=kms_key_id,
            CopyTags=True,
            SourceRegion=self.region
        )

        # Wait for encrypted snapshot to be available
        waiter = self.rds_client.get_waiter('db_snapshot_available')
        try:
            waiter.wait(
                DBSnapshotIdentifier=encrypted_snapshot_id,
                WaiterConfig={'Delay': 30, 'MaxAttempts': 60}
            )
        except WaiterError as e:
            print(f"Error waiting for encrypted snapshot: {e}")
            raise

        return response['DBSnapshot']['DBSnapshotArn']

    def restore_from_snapshot(self, snapshot_identifier):
        print(f"Restoring new encrypted database from snapshot")

        # Get source DB instance details
        source_db = self.rds_client.describe_db_instances(
            DBInstanceIdentifier=self.source_db_identifier
        )['DBInstances'][0]

        # Restore the encrypted instance
        response = self.rds_client.restore_db_instance_from_db_snapshot(
            DBInstanceIdentifier=self.target_db_identifier,
            DBSnapshotIdentifier=snapshot_identifier,
            DBInstanceClass=source_db['DBInstanceClass'],
            VpcSecurityGroupIds=self._get_security_group_ids(source_db),
            DBSubnetGroupName=source_db['DBSubnetGroup']['DBSubnetGroupName'],
            PubliclyAccessible=source_db['PubliclyAccessible'],
            MultiAZ=source_db['MultiAZ']
        )

        # Wait for the new instance to be available
        waiter = self.rds_client.get_waiter('db_instance_available')
        try:
            waiter.wait(
                DBInstanceIdentifier=self.target_db_identifier,
                WaiterConfig={'Delay': 30, 'MaxAttempts': 60}
            )
        except WaiterError as e:
            print(f"Error waiting for database restoration: {e}")
            raise

        return response['DBInstance']['DBInstanceArn']

    def _get_security_group_ids(self, db_instance):
        return [sg['VpcSecurityGroupId'] for sg in db_instance['VpcSecurityGroups']]

    def perform_encryption(self):
        try:
            # Create timestamp for unique identifiers
            timestamp = int(time.time())

            # Step 1: Create initial snapshot
            snapshot_id = f"{self.source_db_identifier}-snapshot-{timestamp}"
            self.create_snapshot(snapshot_id)

            # Step 2: Create encrypted copy of the snapshot
            encrypted_snapshot_id = f"{self.source_db_identifier}-encrypted-snapshot-{timestamp}"
            self.create_encrypted_snapshot_copy(snapshot_id, encrypted_snapshot_id)

            # Step 3: Restore from encrypted snapshot
            self.restore_from_snapshot(encrypted_snapshot_id)

            print(f"""
            Encryption process completed successfully!
            New encrypted database instance: {self.target_db_identifier}

            Next steps:
            1. Verify the new encrypted instance
            2. Update your application connection strings
            3. Once verified, you can delete the old unencrypted instance
            """)

        except Exception as e:
            print(f"Error during encryption process: {e}")
            raise

def main():
    # These values should ideally come from environment variables or command line arguments
    source_db_identifier = 'database-2'
    target_db_identifier = 'database-2-enc'
    kms_key_alias = 'aws/rds'
    region = 'us-east-1'

    migrator = RDSEncryptionMigrator(
        source_db_identifier=source_db_identifier,
        target_db_identifier=target_db_identifier,
        kms_key_alias=kms_key_alias,
        region=region
    )

    migrator.perform_encryption()

if __name__ == '__main__':
    main()
Enter fullscreen mode Exit fullscreen mode

How the Script Works

The script defines a class RDSEncryptionMigrator which handles:

  1. Snapshot Creation: A snapshot of the source database is created.
  2. Encrypted Snapshot Copy: The snapshot is copied and encrypted using the provided KMS key alias.
  3. Database Restoration: The encrypted snapshot is used to restore a new RDS instance.

Conclusion

By using the script provided, you can automate the encryption process for your RDS databases and ensure that your data is secure. This approach eliminates the need for manual intervention and reduces the risk of human error in the migration process. Make sure to verify the new encrypted instance, update your application connection strings, and remove the old unencrypted instance once you’re ready.

If you're looking to scale this further, you can integrate this script with AWS Lambda or AWS Step Functions to automate the process further within your CI/CD pipeline.

How to Automate the Encryption of an Amazon RDS Instance with Python

Top comments (0)