使用 Amazon EventBridge Pipes 丰富和自定义通知

作者: Chris McPeek |

这篇博客文章由助理规模解决方案架构师 Elie Elmalem 撰写

在实施事件驱动架构时,客户经常需要用额外的信息来丰富其传入的事件,以使其对下游消费者更有价值。传统上,使用亚马逊云科技事件桥的客户可以通过编写 Amazon Lambda 函数来使用补充数据增强他们的事件来实现这一点。但是,这种方法需要编写和维护自定义代码,从而增加了其事件处理管道的复杂性。

Amazon EventBridge Pipes 通过提供简化的托管服务来丰富事件,简化了这一流程,无需编写和管理自定义 Lambda 函数。这篇博客文章演示了如何使用 EventBridge Pipes 的内置数据丰富功能,通过额外的背景和客户特定的详细信息动态增强您的事件,从而提高事件处理的效率和更易于维护。

Amazon EventBridge 管道

Amazon EventBridge 在来源和目标之间建立了直接连接。使用 EventBridge 总线可以帮助您以发布/订阅模式将事件路由和分散到服务中。另一方面,EventBridge 管道可以帮助您采用点对点的服务集成模式。它与传统的事件总线/规则模式的不同之处在于其数据转换和丰富支持。

在定义 EventBridge 管道时,您可以指定管道的源和目标。管道支持各种来源和目标。在源和目标之间,EventBridge 管道支持过滤和充实。筛选使您能够选择和处理目标事件子集。利用扩展,您可以通过在将缺失信息发送到目标之前添加缺失信息来增强数据。例如,如果一个事件缺少必要的信息,它可以确保目标能够正确使用该事件。丰富数据可能非常强大,因为它可以增强通用事件并对其进行转换。EventBridge Pipes 支持使用 Lambda 函数、Amazon Step Functions、Amazon API Gateway 和 EventBridge API 目标进行扩展。有关这些概念的更多细节可以在 Amazon EventBridge 管道概念文档中找到。

显示过滤和充实步骤的 EventBridge 管道示意图。

图 1:显示过滤和充实步骤的 EventBridge 管道示意图

这篇博客文章将使用管道的丰富步骤来创建自定义通知。

概述

为了说明其功能,这篇文章使用了服装零售商的用例。像这家零售商这样的企业希望保持忠实客户的参与度。通常,他们依赖缺乏个性化的批量促销电子邮件。在此用例中,零售商希望发送有针对性的促销代码。一旦下了第 10 个订单,该代码就会通过电子邮件或短信发送给他们的客户。

如果没有 EventBridge 管道,这将使用 EventBridge 来响应订单事件来实现。所有事件都发送到自定义 Lambda 函数进行处理。如果订单符合正确的条件,Lambda 函数使用 Amazon Simple Notification Service(Amazon SNS)向客户发送包含折扣码的通知。

使用 EventBridge 的传统方法。

图 2:使用 EventBridge 的传统方法

尽管该架构可以运行,但它需要您维护 Lambda 函数中的集成代码和数据丰富逻辑,因为该函数需要从事件中提取必要的信息并管理到 SNS 的路由。随着越来越多的微服务遵循相同的模式,代码变得越来越复杂。这可能导致更长的执行时间、更高的成本和更大的维护工作量。

使用 Amazon EventBridge 管道进行简化

Amazon EventBridge 管道可以通过处理服务之间的丰富和集成来简化之前的实现。Amazon EventBridge 管道负责将事件发送到您配置的丰富步骤,然后将丰富事件路由到目标。如果选择的方法是 Lambda 函数,则将函数代码留给仅关注丰富逻辑。它消除了代码从事件中提取必要字段和发送通知的需要。

使用 EventBridge 管道的解决方案架构

图 3:使用 EventBridge 管道的解决方案架构

当事件进入管道时,扩充步骤会触发 Lambda 函数,该函数将检查资格并将消息返回路由到 SNS。如果客户没有资格获得折扣码,它将返回一条订单确认消息,其中包含从原始订单事件中检索的数据。如果客户有资格享受折扣,则消息中还包含折扣码。

这是更新后的流程的架构:

  1. 一位买家订购了一件新商品。订单将发送到 Simple Queue Service (SQS) 订单队列。
  2. 订单队列中的新消息会触发事件总线管道。
  3. 该管道会触发 Amazon Lambda 函数来丰富数据。
  4. 这些函数检查客户是否有资格获得针对 Amazon DynamoDB 表的折扣码。该表包含每位客户的订购次数。
  5. Lambda 函数返回将发送给客户的自定义消息,无论有无折扣码。
  6. 消息由 EventBridge 管道路由到 SNS 主题
  7. 客户通过其首选订阅方式收到通知。

构建更新的流程

为了构建更新的流程,我选择在 Python 中使用亚马逊云科技云开发套件 (CDK)。您可以使用此处提供的代码将其部署到您的账户中。该代码也可以在 GitHub 上找到。

注意:此示例代码仅用于测试目的,不适用于生产账户。

对于此解决方案,您需要满足以下先决条件:

  1. 已安装并配置了亚马逊云科技命令行接口 (CLI) 以供使用。
  2. 身份和访问管理 (IAM) 角色或用户,其权限足以创建 IAM 策略、DynamoDB 表、SQS 队列、SNS 主题、Lambda 函数和 EventBridge 管道。
  3. 亚马逊云科技 CDK
  4. Python 版本 3.9 或更高版本,带有 pip 和虚拟 virtualenv。

满足先决条件后,在空目录中设置一个新的 Python CDK 项目:

mkdir blog_code
cd blog_code
cdk init app –-language python

然后,激活虚拟环境并安装 CDK 的依赖项:

source .venv/bin/activate
python -m pip install -r requirements.txt

cdk init 命令创建一个 blog_code 文件夹。GitHub 存储库包含 blog_code 文件夹内的 blog_code_stack.py 文件的代码。

然后,在 blog_code 文件夹中,创建一个名为 lambda 的新文件夹。在这个新文件夹中,创建一个名为 index.py 的文件。该文件将包含丰富 lambda 函数的代码。再一次,这段代码可以在 GitHub 存储库中找到。以下是 Lambda 代码的一部分:

def lambda_handler(event, context):

    message = json.loads(event[0]['body'])

    id = message['id']
    order_content = message['order_content']
    
    nmb_orders = get_number_of_orders(id)
    
    # Calculate orders left
    orders_left = MAX_ORDERS - nmb_orders
    
    # Update the DynamoDB table with the new number of orders
    if nmb_orders == MAX_ORDERS:
        update_table(id, 0)
    else:
        update_table(id, nmb_orders)
    
    if orders_left == 0:
        return [f"Thank you for your order of {order_content}. You have earned a 10% discount code on your next order: XA5GT2SF"]
    else:  
        # Return the confirmation message
        return [f"Thank you for your order of {order_content}. This is your confirmation message! Only {orders_left} orders left until a 10% discount!"]

Lambda 函数的工作方式如下:

  1. 它从 EventBridge 管道接收一个事件,该事件由订单和下单用户的 ID 组成
  2. 它通过在 DynamoDB 表上调用 getItem 命令来获取用户已经下达的订单数量。
  3. 它计算在用户获得折扣码之前还剩下多少订单。
  4. 它使用新的订单数量更新 DynamoDB 表,以考虑刚刚下达的订单。
  5. 如果用户下了正确数量的订单,它将返回一条带有折扣码的确认消息。否则,它会通知用户仍需要下多少订单才能获得折扣。

现在,将 CDK 堆栈部署到您的账户。确保你在项目的根目录中:

cdk bootstrap
cdk deploy

堆栈完成部署后,您将在控制台上看到一个 EventBridge 管道,方法是前往 EventBridge 服务页面,点击左侧面板中的管道。

测试解决方案

要测试该解决方案,必须先设置 SNS 主题的订阅才能接收通知。为了简单起见和测试目的,建议设置电子邮件通知。为此,请按照名为 targetTopic 的主题的 Amazon SNS 文档中的说明进行操作。设置订阅后,不要忘记查看您的电子邮件收件箱并确认订阅。

设置通知后,访问 DynamoDB 控制台页面。你需要手动在资格表中添加一个条目来模仿真实环境:

  1. 点击左侧面板中的表格
  2. 选择 "资格表" 表。
  3. 单击 "浏览表格项目",然后单击 "创建项目"
  4. 输入值为 01 的 ID。
  5. 单击 "添加新属性",然后选择 "字符串"。
  6. 在属性名称下输入订单,然后在值下方输入 8
  7. 单击 "创建项目"。

退回的商品表应如下所示。这假设客户已经下了 8 个订单。

添加新项目后,项目返回表格。

图 4:添加新项目后返回的项目表

现在,访问 SQS 控制台页面。你需要向队列发送一条消息,模仿正在下的新订单。

  1. 点击名为 SourceQueue 的队列
  2. 单击 "发送和接收消息"。
  3. 在 "邮件正文" 下,粘贴以下消息,然后单击 "发送消息":
{
    "order_content": "large shirt",
    "id": "01",
    "username": "johndoe01",
    "transaction_time": "10:04:00"
  }

几分钟后,您应该会收到一封确认订单的电子邮件,因为您的订单消息被视为第 9 个订单。再次发送消息下第 10 个订单,您应该会收到折扣码!

收到的带有折扣码的电子邮件。

图 5:收到的带有折扣码的电子邮件

清理

要删除账户中的资源,请在项目的根目录中运行以下命令:

cdk destroy

结论

这篇博文展示了 Amazon EventBridge Pipes 及其丰富功能如何帮助您创建量身定制的通知。首先,它讨论了如何使用 EventBridge 实现它,然后介绍了使用 EventBridge 管道的简化实现。

有关事件桥管道常用模式的更多信息,您可以查看使用 Amazon EventBridge 管道实现架构模式。

如需更多无服务器学习资源,请访问 Serverless Land。要查找更多模式,请直接访问无服务器模式集合。


*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您发展海外业务和/或了解行业前沿技术选择推荐该服务。