发布于: Oct 30, 2022

如何自动监控数据库迁移步骤,并在出现错误时发出告警?当面对大量同时进行的复制任务时,靠人工去监控每项任务的进度无疑是一项既枯燥、又容易出错的工作。现在我们就教你如何解决这一困扰。

以下 CloudFormation 堆栈将为您的 Amazon DMS 任务创建 CloudWatch 警报:

CloudFormation 堆栈提供以下信息:

  • 栈名称
  • Amazon DMS 任务标识符
  • Amazon DMS 复制实例名称
  • SNS 主题 ARN

其余所有设置皆可保留为默认值。

当复制任务或复制实例发生特定事件时(例如创建或删除实例时),您可以通过创建的 Amazon DMS 事件订阅接收到相应通知。

对于复制任务,请为以下事件创建订阅:

  • 配置变更
  • 创建
  • 删除
  • 失败
  • 状态变更

对于复制实例,请为以下事件创建订阅:

  • 配置变更
  • 创建
  • 删除
  • 故障转移
  • 失败
  • 低存储容量
  • 维护

创建事件订阅后,Amazon DMS 会将事件通知发送至您所提供的目标地址处。您可能希望创建多个不同的订阅,例如由一个订阅接收所有事件通知,再由另一订阅处理仅涉及生产环境 Amazon DMS 资源的关键事件。

您可以通过在 Amazon DMS 控制台将 Enabled 选项设置为 No,或者使用 Amazon DMS APIEnabled 参数设置为 false 来轻松关闭通知但又继续保留订阅。关于更多详细信息,请参阅Amazon数据库迁移服务中使用事件与通知

以下 CloudFormation 堆栈,将为您的 Amazon DMS 任务创建事件订阅:

需要为堆栈提供以下信息:

  • 堆栈名称
  • Amazon DMS 任务名称
  • SNS 主题 ARN

其余所有设置皆可保留为默认值。

Amazon DMS 可以将详细任务信息发布至 CloudWatch 日志。您可以利用这一点来在任务运行的时候监控其运行状况并且诊断发生的任何问题

在默认情况下,日志将被存储在日志组 dms-tasks-<replication-instance-name>dms-task-<Task Identifier>日志流当中。关于更多详细信息,请参阅对任务设置进行日志记录

要获取关于 CloudWatch 日志中出现错误消息的通知,请在日志组上创建订阅过滤器,具体参阅以下 Python 脚本:

from __future__ import print_functionimport jsonimport base64, zlibimport boto3import os
def logstream_handler(event, context):
    bstream_data = event.get("awslogs").get("data")
    decoded_data = json.loads(zlib.decompress(base64.b64decode(bstream_data),16 + zlib.MAX_WBITS))
    client = boto3.client('sns')
    subscriptionFilters = decoded_data.get("subscriptionFilters")
    subject = ""
    if subscriptionFilters:
        subject = "Log Filter Alert : {0}".format(subscriptionFilters[0])
    decoded_msg = decoded_data.get("logEvents")
    msg = "logGroup : {0}\nlogStream : {1}".format(
        decoded_data.get("logGroup"),
        decoded_data.get("logStream"))
    msg = "{0}\n\nMessages: \n".format(msg)
    for m in decoded_msg:
        msg = "{0}\n{1}".format(msg,m.get("message"))
    topicARN=os.environ.get("topicARN")
    args = {}
    args["TargetArn"]=topicARN
    args["Message"]=msg
    if subject:
        args["Subject"]=subject
    response = client.publish(**args)
    return {
        "statusCode": 200,
        "body": json.dumps('Sent Message.')
    }

以下 CloudFormation 堆栈可创建用于发送 SNS 错误日志通知的环境:

为堆栈提供以下信息:

  • 堆栈名称
  • 日志组名称
  • SNS 主题 ARN
  • 过滤模式

其余所有设置皆可保留为默认值。

您可以创建多个 CloudWatch 告警,借此了解告警状态何时发生变更。在某些情况下,告警可能长时间处于活跃状态,导致您可能错过此前已发送的告警。要获取重复告警,我们为您提供一个 Lambda 函数,用于检查告警的状态以及当前状态的持续时间,并据此发送通知。

以下可以发送 SNS 通知的 Python 脚本将由 CloudWatch Events 规则调用:

import jsonimport boto3import os

cloudwatch = boto3.client('cloudwatch')
sns = boto3.client('sns')

subject_str = '{}: "{}" in {}'
message_str = """You are receiving this email because your Amazon CloudWatch Alarm "{}" in the {} region has entered the {} state, because "{}".

Alarm Details :
    - Name: {}
    - Description: {}
    - Reason for State Change: {}


Monitored Metric:
    - MetricNamespace: {}
    - MetricName: {}
    - Dimensions: {}
    - Period: {}
    - Statistic: {}
    - Unit: {}
    - TreatMissingData: {}
"""
def send_alarm(topic, subject, message):
    """ Sends SNS Notification to given topic """
    response = sns.publish(
                    TopicArn=topic,
                    Message=message,
                    Subject=subject
                )
    print("Alarm Sent Subject : {}".format(subject))
    return
def main_handler(event, context):
    """
        Describes existing alarms in current region and check it's state
        If state matches to alarmState that sent as input send alarm.

        Parameters
        ----------
        alarmNames - ['string']
        alarmState - string (Alarm/OK/INSUFFICIENT_DATA)
    """
    alarm_names = event['alarmNames']
    alarm_state = event.get('alarmState', 'Alarm').lower()
    region = os.environ["AWS_REGION"]
    response = cloudwatch.describe_alarms(
        AlarmNames=alarm_names
        )
    metric_alarms = response["MetricAlarms"]
    if len(metric_alarms) == 0:
        return {
            'statusCode': 200,
            'body': json.dumps('No Alarms Configured')
        }
    for alarm in metric_alarms:
        if alarm["StateValue"].lower() != alarm_state:
            continue
        topics = alarm["AlarmActions"] if alarm_state == 'alarm' else alarm["OKActions"] if alarm_state == 'ok' else alarm['InsufficientDataActions'] if alarm_state == 'insufficient_data' else []
        if len(topics) == 0:
            print('No Topics Configured for state %s to %s' %(alarm_state, alarm['AlarmName']))
            continue
        subject = subject_str.format(alarm["StateValue"], alarm['AlarmName'], region)
        message = message_str.format(alarm['AlarmName'], region,
                                    alarm['StateValue'], alarm['StateReason'],
                                    alarm['AlarmName'], alarm['AlarmDescription'],
                                    alarm['StateReason'], alarm['Namespace'],
                                    alarm['MetricName'], str(["{}={}".format(d['Name'], d['Value']) for d in alarm["Dimensions"]]),
                                    alarm['Period'], alarm['Statistic'],
                                    alarm.get('Unit', 'not specified'), alarm['TreatMissingData'])
        for topic in topics:
            send_alarm(topic, subject, message)
    return {
        'statusCode': 200,
        'body': json.dumps('Success')
    }

以下 CloudFormation 堆栈可创建用于发送 SNS 错误日志消息通知的环境:

向堆栈提供以下信息:

  • 堆栈名称
  • Amazon DMS 任务名称
  • SNS 主题 ARN

其余所有设置皆可保留为默认值。

在本文中,我们向您介绍了如何使用 CloudWatchAmazon DMS 事件订阅、Amazon SNS 以及 Amazon Lambda 以自动化方式对 Amazon DMS 复制任务进行监控与告警。

使用这套解决方案,您无需使用控制台即可轻松跟踪复制任务的状态。系统会向您通报每个变更事件;如果发生错误,您还会收到相应的告警。

希望本文能够帮助大家了解如何监控 Amazon DMS 数据库迁移过程。

相关文章