对于具有只读副本的主实例,使用 Amazon Secrets Manager 自动轮换 Amazon RDS 凭证

作者: 拉瓦尼亚·萨洛基, 拉武·帕瓦尔, Sudhakar Darse |

Amazon Relational Database Service (Amazon RDS) 和 Amazon Secrets Manager 集成为数据库实例提供强大的凭证管理。使用 Secrets Manager 管理您的主用户密码时,您无法为数据库实例创建新的只读副本。这适用于除 Amazon RDS for SQL Server 之外的所有数据库引擎,这可能会影响您的组织在保持安全凭证做法的同时高效扩展其读取操作的能力。

在这篇文章中,我们提出了一种解决方案,该解决方案可以自动执行具有只读副本的主实例的密码轮换过程,同时保持安全的凭据管理实践。这种方法使您可以利用读取扩展和自动凭证轮换的优势。

解决方案概述

在本文中,我们使用了 Amazon Lambda 函数,该函数为具有只读副本的主实例编排轮换密码,并管理其源数据库 Secrets Manager 集成的证书。以下是该过程的高级概述:

  1. 密钥管理器根据配置的轮换计划自动触发 Lambda 函数。
  2. 这启动了安全凭证轮换工作流程
  3. Lambda 函数生成新的安全证书
  4. 这些凭据配置为同时适用于主数据库实例和关联的只读副本
  5. 新生成的证书安全地存储在 Amazon Secrets Manager 中
  6. 整个过程会根据您定义的轮换时间表自动重复,确保无需人工干预即可定期更新密码

这种方法可确保 RDS 主实例及其只读副本保持同步的证书,所有密码管理均通过 Amazon Secrets Manager 安全处理。

下图说明了解决方案架构。

设置 RDS 主实例并创建只读副本

在本节中,我们创建了 Lambda 函数,该函数将自动轮换带有只读副本的主数据库实例的密码。完成以下步骤以配置 RDS 实例和只读副本:

  1. 在 Amazon RDS 控制台上,在导航窗格中选择数据库
  2. 选择 "创建数据库"。
  3. 对于选择数据库创建方法,选择标准创建
  4. 在 "引擎选项" 下,对于引擎类型,选择 PostgreSQL
  5. "凭据设置" 下,在 "凭据管理" 中,选择 "自我管理"。(如果您选择 "在 Amazon Secrets Manager 中管理",则我们无法为此实例创建只读副本,因此选择自我管理很重要)
  6. 根据您的场景填写其他参数并创建实例。如果您为实例创建了只读副本,则可以继续阅读下一节。如果您还没有,请为您的主实例创建一个(或多个)只读副本。

创建 Lambda 函数

在本节中,我们创建了 Lambda 函数,该函数将在带有只读副本的主要 RDS 实例的密钥管理器中自动轮换密码。完成以下步骤以创建 Lambda 函数:

  1. 在 Lambda 控制台上,在导航窗格中选择函数
  2. 选择创建函数
  3. 函数名称中,输入函数的名称。
  4. 对于运行时,选择 Python 3.13
  5. 对于执行角色,选择使用基本 Lambda 权限创建新角色。(在本指南的稍后部分,我们将为 Lambda 函数角色的 RDS 和 Amazon Secrets Manager 配置额外的 IAM 权限)
  6. 选择创建函数

  7. 在 Lambda 函数的代码选项卡上,输入以下 Python 代码:
    import boto3
    import json
    import string
    import random
    import os
    from botocore.exceptions import ClientError
    
    def lambda_handler(event, context):
        """Secrets Manager RDS Password Rotation Lambda function"""
        print(f"Starting password rotation with event: {event}")
        arn = event['SecretId']
        token = event['ClientRequestToken']
        step = event['Step']
    
        # Setup the clients
        rds_client = boto3.client('rds')
        secrets_client = boto3.client('secretsmanager')
    
        # Make sure the version is staged correctly
        print("Checking secret metadata and rotation status...")
        metadata = secrets_client.describe_secret(SecretId=arn)
        if not metadata['RotationEnabled']:
            print(f"ERROR: Secret {arn} is not enabled for rotation")
            raise ValueError(f"Secret {arn} is not enabled for rotation")
        
        versions = metadata['VersionIdsToStages']
        print(f"Current version stages: {versions}")
    
        if token not in versions:
            print(f"ERROR: Version {token} not found in version stages")
            raise ValueError(
                f"Secret version {token} has no stage for rotation of secret {arn}")
    
        if "AWSCURRENT" in versions[token]:
            print("Secret version is already marked as AWSCURRENT. Exiting.")
            return
        elif "AWSPENDING" not in versions[token]:
            print(f"ERROR: Version {token} not in AWSPENDING stage")
            raise ValueError(
                f"Secret version {token} not set as AWSPENDING for secret {arn}")
       
        if step == "createSecret":
            create_secret(secrets_client, arn, token, rds_client)
        elif step == "setSecret":
            wait_for_db_availability(rds_client, secrets_client, arn, token)
        elif step == "testSecret":
            check_rds_status(rds_client, secrets_client, arn, token)
        elif step == "finishSecret":
            finish_secret(secrets_client, arn, token)
        else:
            raise ValueError("Invalid step parameter")
    
    
    def create_secret(secrets_client, arn, token, rds_client):
        """Generate a new secret value and put it to AWSPENDING"""
        print(f"Starting create_secret step for secret: {arn}")
    
        new_password = get_random_password(secrets_client)
        print("Successfully generated new password")
    
        try:
            db_instance_id = os.environ['DB_IDENTIFIER']
            print(f"Getting master username for DB instance: {db_instance_id}")
    
            response = rds_client.describe_db_instances(
                DBInstanceIdentifier=db_instance_id
            )
            master_username = response['DBInstances'][0]['MasterUsername']
            print(f"Retrieved master username: {master_username}")
    
            secret_dict = {
                'username': master_username,
                'password': new_password
            }
            print("Storing new secret in AWSPENDING stage...")
            secrets_client.put_secret_value(
                SecretId=arn,
                ClientRequestToken=token,
                SecretString=json.dumps(secret_dict),
                VersionStages=['AWSPENDING']
            )
            print("Successfully stored new secret value")
    
        except ClientError as e:
            print(f"ERROR in create_secret: {str(e)}")
            raise ValueError(f"Error creating secret: {str(e)}")
    
        try:
            print(f"Updating password in RDS instance: {db_instance_id}")
            rds_client.modify_db_instance(
                DBInstanceIdentifier=db_instance_id,
                MasterUserPassword=new_password,
                ApplyImmediately=True
            )
            print(
                f"Successfully updated password for RDS instance {db_instance_id}")
    
        except ClientError as e:
            print(f"ERROR updating database password: {str(e)}")
            raise ValueError(f"Error updating database: {str(e)}")
    
    
    def wait_for_db_availability(rds_client, secrets_client, arn, token):
        print(f"Starting set_secret step for secret: {arn}")
        try:
            db_instance_id = os.environ['DB_IDENTIFIER']
            print(
                f"Waiting for DB instance {db_instance_id} to become available...")
    
            waiter = rds_client.get_waiter('db_instance_available')
            waiter.wait(
                DBInstanceIdentifier=db_instance_id,
                WaiterConfig={
                    'Delay': 30,
                    'MaxAttempts': 10
                }
            )
            print(f"DB instance {db_instance_id} is now available")
    
        except ClientError as e:
            print(f"ERROR in checking for Database availability: {str(e)}")
            raise ValueError(f"Error for Database availability: {str(e)}")
    
    
    def check_rds_status(rds_client, secrets_client, arn, token):
        print(f"Starting check_rds_status step for secret: {arn}")
        try:
            db_instance_id = os.environ['DB_IDENTIFIER']
            print(f"Checking status of DB instance: {db_instance_id}")
    
            response = rds_client.describe_db_instances(
                DBInstanceIdentifier=db_instance_id
            )
            status = response['DBInstances'][0]['DBInstanceStatus']
            print(f"Current DB instance status: {status}")
    
            if status != 'available':
                print(
                    f"ERROR: Database is not available. Current status: {status}")
                raise ValueError("Database is not available")
    
            print("Successfully verified database availability")
    
        except ClientError as e:
            print(f"ERROR in checking rds status: {str(e)}")
            raise ValueError(f"Error checking rds status: {str(e)}")
    
    
    def finish_secret(secrets_client, arn, token):
        print(f"Starting finish_secret step for secret: {arn}")
        try:
            print("Retrieving secret metadata...")
            metadata = secrets_client.describe_secret(SecretId=arn)
            current_version = None
    
            for version in metadata["VersionIdsToStages"]:
                if "AWSCURRENT" in metadata["VersionIdsToStages"][version]:
                    current_version = version
                    break
    
            print(f"Current version: {current_version}")
            print(
                f"Updating secret version stage from {current_version} to {token}")
    
            secrets_client.update_secret_version_stage(
                SecretId=arn,
                VersionStage="AWSCURRENT",
                MoveToVersionId=token,
                RemoveFromVersionId=current_version
            )
            print("Successfully updated secret version to AWSCURRENT")
    
        except ClientError as e:
            print(f"ERROR in finish_secret: {str(e)}")
            raise ValueError(f"Error finishing secret: {str(e)}")
    
    def get_environment_bool(variable_name, default_value):
        """Loads the environment variable and converts it to the boolean.
    
        Args:
            variable_name (string): Name of environment variable
    
            default_value (bool): The result will fallback to the default_value when the environment variable with the given name doesn't exist.
    
        Returns:
            bool: True when the content of environment variable contains either 'true', '1', 'y' or 'yes'
        """
        variable = os.environ.get(variable_name, str(default_value))
        return variable.lower() in ['true', '1', 'y', 'yes']
    
    
    # Generate random password
    def get_random_password(rds_client):
        """ Generates a random new password. Generator loads parameters that affects the content of the resulting password from the environment
        variables. When environment variable is missing sensible defaults are chosen.
    
        Supported environment variables:
            - EXCLUDE_CHARACTERS
            - PASSWORD_LENGTH
            - EXCLUDE_NUMBERS
            - EXCLUDE_PUNCTUATION
            - EXCLUDE_UPPERCASE
            - EXCLUDE_LOWERCASE
            - REQUIRE_EACH_INCLUDED_TYPE
    
        Args:
            rds_client (client): The secrets manager service client
    
        Returns:
            string: The randomly generated password.
        """
        passwd = rds_client.get_random_password(
            ExcludeCharacters=os.environ.get('EXCLUDE_CHARACTERS', ':/@"\'\\'),
            PasswordLength=int(os.environ.get('PASSWORD_LENGTH', 32)),
            ExcludeNumbers=get_environment_bool('EXCLUDE_NUMBERS', False),
            ExcludePunctuation=get_environment_bool('EXCLUDE_PUNCTUATION', False),
            ExcludeUppercase=get_environment_bool('EXCLUDE_UPPERCASE', False),
            ExcludeLowercase=get_environment_bool('EXCLUDE_LOWERCASE', False),
            RequireEachIncludedType=get_environment_bool(
                'REQUIRE_EACH_INCLUDED_TYPE', True)
        )
        return passwd['RandomPassword']

创建密钥管理器密钥

完成以下步骤,在密钥管理器中存储您的 RDS 实例的新密钥:

  1. 在凭据管理器控制台上,在导航窗格中选择密钥
  2. 选择 "存储新密钥"。
  3. 选择 "其他类型的密钥"。
  4. 密钥/值对下输入用户或管理员凭据。在密钥中添加用户名,在值中添加密码)
  5. 选择 "下一步"。
  6. 输入密钥的名称。
  7. 选择 "下一步"。
  8. 选择 "自动旋转"。
  9. 填写轮换时间表。
  10. 旋转函数下,选择您之前部署的 Lambda 函数。
  11. 选择 "下一步"。

  12. 查看设置并选择存储

对于这篇文章,我们将轮换时间表设置为 4 小时。这意味着每隔 4 小时,密码将在 Secrets Manager 中轮换一次,我们可以使用新密码连接到 RDS 实例及其只读副本。

为 Lambda 函数设置 IAM 权限

在下一步中,您将创建 Amazon Identity and Access Management (IAM) 策略作为 Lambda 函数的角色。该策略为 Lambda 函数提供了为 RDS 数据库实例中的用户执行密钥轮换所需的权限。

完成以下步骤,为 Lambda 函数配置 IAM 角色:

  1. 在 Lambda 控制台上,导航到您的函数。
  2. 在功能概述页面的配置部分中,选择权限
  3. 执行角色下,选择指向 Lambda 角色名称的链接以在 IAM 控制台上查看该角色。
  4. 在 "权限策略" 下,选择 "添加权限" 和 "创建内联策略"。
  5. 对于策略编辑器,选择 JSON
  6. 使用密钥轮换策略替换文本编辑器中的 JSON。将 IAM 政策中正在轮换的密钥的亚马逊资源名称 (ARN) 和 RDS 实例的 ARN 替换为您的亚马逊云科技账户中的值:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "rds:DescribeDBInstances",
                    "rds:ModifyDBInstance"
                ],
                "Resource": "arn:aws:rds:us-east-1:xxxxxxxxx:db:test-primary"
            },
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "secretsmanager:DescribeSecret",
                    "secretsmanager:PutSecretValue",
                    "secretsmanager:UpdateSecretVersionStage"
                ],
                "Resource": "arn:aws:secretsmanager:us-east-1:XXXXXXXXX:secret:password-roatation-for-test-primary-replica-b1Rdt0"
            },
            {
                "Sid": "VisualEditor1",
                "Effect": "Allow",
                "Action": "secretsmanager:GetRandomPassword",
                "Resource": "*"
            }
        ]
    }
  7. 选择 "下一步"。
  8. 输入策略名称并选择创建策略

为密钥管理器设置 Lambda 调用权限

完成以下步骤以更新 Lambda 函数权限:

  1. 在 Lambda 控制台上,导航到您的函数。
  2. 基于资源的政策声明下,选择添加权限
  3. 选择 亚马逊云科技服务
  4. 对于服务,选择密钥管理器
  5. 在 "报表 ID" 中,输入您选择的对账单 ID。
  6. 对于 "操作",选择lambda:InvokeFunction
  7. 选择 "保存"。

为 Lambda 函数配置环境变量

完成以下步骤以配置函数的环境变量:

  1. 在 Lambda 函数的函数概述页面上,选择配置
  2. 选择环境变量,然后选择编辑
  3. 选择添加环境变量
  4. 输入DB_IDENTIFIER密钥。
  5. 为该值输入 RDS 实例名称。在本演示中,我们的实例名称是test-primary
  6. 选择 "保存"。

配置 Lambda 函数的超时设置

  1. 在 Lambda 函数的函数概述页面上,选择常规配置
  2. 选择编辑并将超时时间修改为大约 1 分钟。
  3. 在这个演示中,我增加了 1 分 3 秒。
  4. 选择 "保存"。

验证密钥密码轮换

我们建议先在非生产环境中对其进行测试。使用只读副本创建测试 RDS 实例,然后使用 Lambda 函数轮换密码。验证密码是否已成功创建和轮换,以及其凭据是否在 Secrets Manager 中独立管理。

要验证密码轮换,请完成以下步骤:

  1. 在密钥管理器控制台上,导航到您为 RDS 凭证创建的密钥。
  2. 在密钥详细信息中,查看密钥值您应该看到密码已更新为新值。上次轮换日期应反映最近的轮换事件。
  3. 检查密钥的轮换历史记录。这可以在机密详细信息的轮换配置部分中找到。验证轮换是否按您设置的计划间隔进行。
  4. 要进一步确认轮换成功,请尝试使用存储在密钥管理器中的新凭证连接到您的 RDS 实例。如果可以成功连接,则表明轮换过程已正确更新了数据库凭据。
  5. 查看您的 Lambda 函数的 Amazon CloudWatch 日志。在轮换过程中查找任何错误消息或意外行为。应记录成功的轮换,且不会出现任何错误。

局限性

该解决方案旨在管理亚马逊云科技账户内具有只读副本的单个 RDS 实例的密码轮换。虽然该解决方案可用于多个数据库,但每个数据库都需要:

  • 单独部署 Lambda 函数
  • 它自己的密钥管理器机密
  • Lambda 函数的相应 IAM 权限

清理干净

为避免产生不必要的费用并清理此解决方案中使用的资源,请执行以下步骤:

  1. 删除 Lambda 函数:
    1. 在 Lambda 控制台上,在导航窗格中选择函数
    2. 选择您为此解决方案创建的函数,然后在 "操作" 菜单上选择 "删除"。
    3. 出现提示时确认删除。
  2. 删除 IAM 角色:
    1. 在 IAM 控制台上,在导航窗格中选择角色
    2. 选择为 Lambda 函数创建的角色并选择删除
    3. 出现提示时确认删除。
  3. 删除密钥管理器密钥:
    1. 在凭据管理器控制台上,在导航窗格中选择密钥
    2. 选择您为存储 Amazon RDS 凭证而创建的密钥,然后在 "操作" 菜单上选择 "删除密钥"。
  4. 如果您创建了专门用于测试此解决方案的 RDS 实例,则可能需要删除
  5. 查看并清理 CloudWatch 日志:
    1. 在 CloudWatch 控制台上,在导航窗格中选择日志组
    2. 找到并删除与您的 Lambda 函数关联的所有日志组。

结论

在这篇文章中,展示了如何使用 Amazon Lambda 创建解决方案,在带有只读副本的 Amazon RDS 数据库上的 Secrets Manager 中自动轮换密码。这种方法使您可以保持安全的凭证管理实践,同时仍能受益于 Amazon RDS 的读取扩展功能。

如果您对这篇文章有任何疑问或建议,请发表评论。


作者简介

Lavanya Salokye 是亚马逊云科技的云支持 DBE,专门从事基于云的解决方案的故障排除和优化。她的职责包括协助客户使用亚马逊云科技服务、诊断技术问题、数据迁移以及就云架构、安全性和性能的优秀实践提供专家指导。

Lavu Pawar 是亚马逊云科技的交付顾问,支持客户在亚马逊云科技云中设计、构建和部署高度可用和可扩展的解决方案。在空闲时间,他喜欢阅读和与家人和朋友共度美好时光。

Sudhakar Darse 是亚马逊云科技的数据库专家解决方案架构师。他与亚马逊云科技客户合作,提供有关数据库服务的指导和技术援助,帮助他们将数据库迁移到亚马逊云科技云并提高其解决方案在使用亚马逊云科技时的价值。


*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您发展海外业务和/或了解行业前沿技术选择推荐该服务。