将活动发布与亚马逊 EventBridge Pipes 脱钩

作者: 詹姆斯·贝斯威克 | 2023

这篇文章由无服务器首席福音师格雷戈尔·霍普撰写。

事件驱动架构 (EDA) 有助于将应用程序或应用程序组件相互分离。使用事件可以降低组件对发送者位置或实现细节的依赖。由于事件是独立的,因此可以异步使用,这允许发送者和接收者遵循独立的计时注意事项。在构建精细分布式应用程序时,通过事件解耦可以提高开发灵活性和运营弹性,而精细分布式应用程序是无服务器应用程序的首选风格。

许多 亚马逊云科技 服务通过内置机制发布事件,以支持使用最少量的自定义编码构建事件驱动架构。建立在这些服务之上的现代应用程序也可以根据其特定的业务逻辑发送和使用事件。A mazon Ev entBridge 或 Amazon SNS(一种托管的发布-订阅服务)等 亚马逊云科技 应用程序集成服务会筛选这些事件并将其路由到预定目的地,从而在事件创建者和使用者之间提供额外的解耦关系。

发布活动

充当事件生成器的自定义应用程序通常使用支持 12 种编程语言的 亚马逊云科技 SDK 库 来发送事件消息。应用程序代码将事件构造为本地数据结构并指定将其发送到何处,例如发送到 EventBridge 事件总线。

向 EventBridge 发送事件所需的应用程序代码非常简单,只需要几行代码,如发布应用程序生成的订单事件的这个(简化的)辅助方法所示:

async function sendEvent(event) {
    const ebClient = new EventBridgeClient();
    const params = { Entries: [ {
          Detail: JSON.stringify(event),
          DetailType: "newOrder",
          Source: "orders",
          EventBusName: eventBusName
        } ] };
    return await ebClient.send(new PutEventsCommand(params));
}

应用程序很可能在另一个操作的上下文中调用此类方法,例如,将收到的订单保存到数据存储库时。执行这些任务的代码可能如下所示:

const order = { "orderId": "1234", "Items": ["One", "Two", "Three"] }
await writeToDb(order);
await sendEvent(order);

该代码使用多个行项目填充订单对象(实际上,这将基于用户输入的数据或通过 API 调用接收的数据),将其写入数据库(通过另一个未显示实现的辅助方法),然后通过前面的方法将其发送到 EventBridge 总线。

代码导致耦合

尽管这段代码并不复杂,但从架构的角度来看,它有缺点:

  • 它将应用程序逻辑与解决方案的拓扑交织在一起,因为事件的目的地,包括服务(例如,EventBridge 与 SNS)和实例(本例中为服务总线名称),都是在应用程序的源代码中定义的。如果事件目标发生变化,则必须更改源代码,或者至少知道哪个字符串常量是通过环境变量传递给函数的。这两个方面都违背了最大限度地减少组件间耦合的 EDA 原则,因为通信结构的变化会传播到生产者的源代码中。
  • 更新数据库后发送事件很脆弱,因为它在两个步骤中都缺乏交易保证。您必须实现错误处理和重试逻辑来处理发送事件失败的情况,甚至撤消数据库更新。编写这样的代码可能很乏味且容易出错。
  • 代码是一种责任。毕竟,这就是错误的来源。在现实生活中的示例中,类似于前面代码的辅助方法错误地在事件日期交换了日和月,这导致了调试周期的艰巨性。因此,最好避免使用发送事件的样板代码。

在 EventBridge 中执行事件路由可以减少第一个顾虑。您可以重新配置 EventBridge 的规则和目标,将具有指定类型和源的事件路由到不同的目的地,前提是保持事件总线名称稳定。但是,其他问题仍然存在。

无服务器:更少的基础设施,更少的代码

亚马逊云科技 无服务器集成服务可以减少编写自定义应用程序代码以完全发布事件的需求。

无服务器应用程序的一个关键优势是,您可以让 亚马逊云科技 云为您完成无差别的繁重工作。传统上,我们将无服务器与预置、扩展和运营计算基础设施相关联,以便开发人员可以专注于编写可产生业务价值的代码。

无服务器应用程序集成服务还可以为您处理应用程序级任务,包括发布事件。大多数应用程序将数据存储在亚马逊云科技数据存储中,例如 亚马逊简单存储服务(S3 )或 Ama zon DynamoDB,它们可以在更新时自动发出事件,无需任何应用程序代码。

EventBridge 管道:没有应用程序代码的事件

EventBridge Pipes 允许您使用可选的转换、筛选和丰富步骤在活动制作者和消费者之间创建点对点集成。与过去相比,无服务器集成服务与云自动化相结合,可以更轻松地管理 “点对点” 集成,这使其非常适合此用例。

此示例利用了 EventBridge Pipes 主动从 DynamoDB Streams 等来源获取事件的能力。 DynamoDB Streams 在任何 DynamoDB 表中捕获按时间顺序排列的项目级修改序列,并将这些信息存储在日志中长达 24 小时。 EventBridge Pipes 从该日志中提取事件并将其推送到超过 14 个事件目标之一,包括 EventBridge 总线、SNS、SQS 或 API 目的地。 它还可以在需要时适应批量大小、超时和速率限制。

EventBridge Pipes example

通过 EventBridge Pipes 进行的集成可以替换发送事件的自定义应用程序代码,包括任何重试或错误逻辑。只剩下以下代码:

const order = { "orderId": "1234", "Items": ["One", "Two", "Three"] }
await writeToDb(order);

自动化即代码

EventBridge Pipes 可以通过 CLI、亚马逊云科技 管理控制台进行配置,也可以使用 亚马逊云科技 C loudFormation 或 亚马逊云科技 CDK 通过自动化代码进行配置。 通过使用 亚马逊云科技 CDK,您可以使用与编写应用程序逻辑相同的编程语言来编写自动化代码。

例如,以下 CDK 代码段将 EventBridge 管道配置为从附加到订单表的 DynamoDB 流中读取事件,并将其传递给 EventBridge 事件总线。

此代码通过 ordersTab le 变量引用 DynamoDB 表,该 变量将在创建表时设置:

const pipe = new CfnPipe(this, 'pipe', {
  roleArn: pipeRole.roleArn,
  source: ordersTable.tableStreamArn!,
  sourceParameters: {
    dynamoDbStreamParameters: {
      startingPosition: 'LATEST'
    },
  },
  target: ordersEventBus.eventBusArn,
  targetParameters: {
    eventBridgeEventBusParameters: {
      detailType: 'order-details',
      source: 'my-source',
    },
  },
}); 

自动化代码清晰地定义了 DynamoDB 表和事件目标之间的依赖关系,与应用程序逻辑无关。

与数据转换解耦

耦合不仅限于事件来源和目的地。源的数据格式可以决定事件格式,如果数据格式或数据源发生变化,则需要进行下游更改。EventBridge Pipes 也可以缓解这种考虑。

从 DynamoDB 流发出的事件使用原生、 封送的 DynamoDB 格式 ,该格式 包含类型信息,例如字符串为 “S” 或列表为 “L”。

例如,此示例中 DynamoDB 流中的订单事件如下所示。为了便于阅读,省略了一些字段:

{
  "detail": {
    "eventID": "be1234f372dd12552323a2a3362f6bd2",
    "eventName": "INSERT",
    "eventSource": "aws:dynamodb",
    "dynamodb": {
      "Keys": { "orderId": { "S": "ABCDE" } },
      "NewImage": { 
        "orderId": { "S": "ABCDE" },
        "items": {
            "L": [ { "S": "One" }, { "S": "Two" }, { "S": "Three" } ]
        }
      }
    }
  }
} 

这种格式不太适合下游处理,因为它会不必要地将事件使用者与该事件源自 DynamoDB 流的事实联系起来。EventBridge Pipes 可以将此事件转换为更易于使用的格式。转换是通过 InputTemp late 参数使用 JsonPath 表达式指定的。事实证明,EventBridge Pipes 增加了对使用通配符进行列表处理的支持,非常适合这种情况。

在此示例中,将目标参数内的以下转换模板添加到前面的 CDK 代码中(星号字符与完整的元素列表相匹配):

targetParameters: {
  inputTemplate: '{"orderId": <$.dynamodb.NewImage.orderId.S>,' + 
                 '"items": <$.dynamodb.NewImage.items.L[*].S>}'
}

这种转换将 EventBridge Pipes 发布的事件设置为常规商业活动,将任何活动消费者与其源自 DynamoDB 表的事实分开:

{
  "time": "2023-06-01T06:18:10Z",
  "detail": {
    "orderId": "ABCDE",
    "items": ["One", "Two", "Three" ]
  }
}

结论

在构建事件驱动的应用程序时,请考虑是否可以将应用程序代码替换为无服务器集成服务,以提高应用程序的弹性,并在应用程序逻辑和系统依赖关系之间提供清晰的区分。

在这种情况下,EventBridge Pipes 可能是一项有用的功能,例如根据 DynamoDB 表更新捕获和发布事件。

要了解有关 EventBridge Pipes 的更多信息, 请访问 https://aws.amazon.com/eventbridge/pipes/, 并访问 https://serverlessland.com/refactoring-serverless 了解减少无服务器应用程序代码的其他方法。 有关完整的代码示例,请参阅 https://github.com/aws-samples/aws-refactoring-to-serverless