了解亚马逊 SQS 和 亚马逊云科技 Lambda 事件源映射以实现高效的消息处理

作者:Tushar Sharma,员工无服务器开发人员 — 无服务器大师作者: Shaun Wang,高级合 人解决方案架构师
— A WS

Serverless-Guru-AWS-Partners-2023
Serverless Guru
Serverless-Guru-APN-Blog-CTA-2023

亚马逊简单队列服务 (SQS) 和 AW S Lambda 之间的集成是 S erverless Guru 为客户使用的 架构 的重要组成 部分。要充分利用事件源映射的 SQS-Lambda 集成,了解集成的工作原理非常重要。

例如,如果您曾经使用过 SQS-Lambda 集成,则可能有一个用例,即您想要设置将要调用的 Lambda 函数的最大数量,但没有这样的选项可用。

在这篇文章中,我们将分享为什么会这样,并探讨解决这个限制的方法。Serverless Guru 是 亚马逊云科技 高级等级服务合作伙伴 ,可帮助公司建立、迁移和培训团队进行亚马逊网络服务 (亚马逊云科技) 无服务器开发。无服务器 Guru 拥有 亚马逊云科技 Lambda 服务交付专业证书。

亚马逊 SQS-亚马逊云科技 Lambda 事件源映射

假设我们有一个名为 用户点击 的 SQS 队列,想要处理 Lambda 函数中的点击事件,因此我们在 SQS 队列和 Lambda 函数(我们称之为用户点击消费者)之间配置事件源映射。

以下是 亚马逊云科技 在幕后为提供这种集成所做的工作:

Serverless-Guru-SQS-Lambda-1

图 1 — SQS-Lambda 事件源映射。

如上图所示,亚马逊云科技 每个区域最多有 1,000 个并行线程(此处仅显示五个)轮询 SQS 队列以获取消息。每个线程都将轮询我们的 SQS 队列,如果有消息可用,它将从队列中接收一批消息。

然后,它使用此批处理同步调用 Lambda 函数。如果 Lambda 函数成功返回,则线程将从队列中删除批处理中的消息。如果 Lambda 函数返回错误(不是节流错误),则线程不执行任何操作,消息会在可见性超时结束后再次出现在队列中。

配置 SQS-Lambda 集成

亚马逊云科技 提供以下选项来配置此类集成:

  • 批次大小: 每个批次中要发送给函数的记录数。对于标准队列,最多可以有 10,000 条记录。
  • 批处理窗口: 在调用 Lambda 函数之前,亚马逊云科技 轮询 SQS 队列的最大时间(以秒为单位)。

提供的选项确实可以让你在一定程度上控制你希望 亚马逊云科技 如何处理轮询和调用。

扩展 SQS-Lambda 集成

如果队列大小很大,亚马逊云科技 可以在任何给定时间启动最多 1,000 个并行线程轮询队列。这听起来可能不错,因为它可以帮助快速清空队列,但是你必须考虑下游服务能否承受这种负载。

让我们通过几个场景来理解这个用例。

处理大型队列

场景 #1: 如果队列很大,则可以增加批量大小并缩短批处理窗口。这样,轮询线程可以快速将消息从 SQS 中出队并发送到您的 Lambda 函数。如果你想尽可能快地排干队列,这种方法非常适合。

处理小到中等规模的队列

场景 #2: 如果队列大小较小,则可以将批量大小调整为更小,将批处理窗口调整为更长。这样,Lambda 线程可以高效地对 SQS 消息进行批处理,然后再将批处理发送到函数。

例如,如果您的队列每秒收到一条新消息,而您的批处理窗口大小为一秒,则 Lambda 线程将在一分钟内调用您的函数 60 次,每次调用仅包含批处理中的一条消息。

如果您将批处理窗口增加到 60 秒,则它会轮询队列 60 秒,并在批处理中包含 60 条消息的情况下调用您的 Lambda 函数。这更有效率,因为它每分钟只调用一次 Lambda,而不是每分钟 60 次调用。

留意能见度超时

要记住的另一个配置选项是 SQS 队列上的可见性超时。当一条消息离开 SQS 队列时,它不会从队列中删除;它只是在一段时间内对队列中的其他使用者隐藏。该时间段被称为 “可见性超时”。

可见性超时结束后,队列中的所有消费者都可以再次看到该消息。这意味着,要成功处理消息,您必须在可见性超时内处理该消息。在消息可见之前,您还必须进行 API 调用以将其从队列中删除。

为了更好地理解这种行为,下面是一个包含六条消息和两个消费者的示例队列:A 和 B。他们都一次轮询队列中的一条消息——获取 ID 为 1 的消息,B 获取 ID 为 2 的消息。

Serverless-Guru-SQS-Lambda-2

图 2 — 包含两个使用者的 SQS 队列。

如您所见,当 A 和 B 处理消息时,消息并未从队列中删除;它们只是隐藏而已。因此,如果还有另一个消费者,如果它对队列进行轮询,它将永远不会收到 #1 和 #2 消息。

Serverless-Guru-SQS-Lambda-3

图 3 — SQS 使用者 A 和 B 处理消息。

假设队列中的可见性超时为 60 秒。消费者 A 在 30 秒内处理了该消息,然后从队列中删除了该消息,所有这一切都在 60 秒超时之前。该消息再也没有出现在队列中,只处理过一次。

Serverless-Guru-SQS-Lambda-4

图 4 — SQS 使用者 B 正在处理消息。

另一方面,消费者 B 花了 60 多秒钟处理该消息,然后将其删除。消费者 B 会在 80 秒后删除该消息,但该消息很有可能也被消费者 A 处理。让我们试着理解为什么会发生这种情况。

队列的可见性超时为 60 秒,因此 60 秒后,消息再次出现在队列中,而消费者 B 仍在处理该消息。

Serverless-Guru-SQS-Lambda-5

图 5 — ID 为 2 的 SQS 消息在队列中可见,同时消费者 B 仍在处理中。

现在,由于消息在队列中可见,因此消费者 A 会批量接收该消息(而 B 仍在处理该消息)。

Serverless-Guru-SQS-Lambda-6

图 6 — 使用者 A 和 B 正在处理 ID 为 2 的 SQS 消息。

尽管消费者 B 稍后删除了该消息,但消费者 A 已经处理了该消息。

Serverless-Guru-SQS-Lambda-7

图 7 — SQS 使用者 A 正在处理 ID 为 2 的消息。

这就是为什么对于 SQS-Lambda 集成,建议将可见性超时保留为 Lambda 函数超时、批处理窗口时间以及另外 30 秒作为缓冲区的总和。

这可确保 您分配足够的时间让轮询线程等待(批处理窗口时间)、调用 Lambda 函数(Lambda 超时),然后从队列中删除消息(缓冲时间)。否则,正如我们在上面的示例中看到的那样,您可能会开始看到队列中的许多消息被多次处理。

逐条出队消息

在某些用例中,你希望一个接一个地同步使用队列中的消息。例如,如果您的应用程序有多个实例,并且每个实例都对数据库进行了大量写入,则数据库可能无法处理那么多的并行连接。

想象一下 1,000 个并行 Lambda 函数创建传输控制协议 (TCP) 连接并向您的数据库发送请求。对于Serverless Guru的一位客户,该团队使用 Promise.all 向每个Lambda的数据库发出了200多个并发请求,因此总共有1,000*200(即20万个)的并发请求。数据库服务器可能无法处理那么多的并发请求,或者您可能会看到很多限制错误。

Serverless-Guru-SQS-Lambda-8

图 8-写入数据库的多个应用程序实例。

在这样的场景中,有时将队列放在数据库之前,让一个消费者将消息从队列中出队并写入数据库是有意义的。

Serverless-Guru-SQS-Lambda-9

图 9 — 向数据库写入数据的单个 SQS 使用者。

这是一种相当常见的模式。关键在于有有限数量的消费者(通常只有一个)来限制并行数据库连接的数量。

根据我们刚才看到的 SQS-Lambda 集成架构,很明显,最多可以有 1,000 个 Lambda 并行函数处理消息。如果它们中的每一个都创建一个与数据库的新 TCP 连接,想象一下,其中成千上万个会生成,每个都建立了几个连接。

亚马逊云科技 Lambda 预留并发可以提供帮助吗?

解决此问题的一种方法是在使用者 Lambda 函数上设置预留并发性。在函数上配置预留并发时,可以将该函数的最大并行调用次数限制为特定的次数。无论谁调用了该函数,亚马逊云科技 Lambda 都会在其保留的并发性用完后限制该函数的所有调用。

对于场景 #3,您可以将使用者 Lambda 函数的预留并发设置为一 (1),然后启用 SQS-Lambda 集成。你可以放心地假设,无论队列有多大,在任何给定时间最多只能运行一次该函数的调用。

尽管此解决方案有效,但如果您查看 Lambda 函数的指标,您会发现很多限制错误。你可能还会开始看到很多 SQS 消息进入死信队列 (DLQ)。

Lambda 预留并发和 SQS 集成

亚马逊云科技 Lambda 预留并发仅控制 Lambda 函数的调用,并将活动调用保持在预留的并发限制内。如果您尝试在预留并发已满后调用 Lambda,则会看到限制错误。在 SQS-Lambda 集成的情况下,轮询线程只是另一个调用 Lambda 函数的客户端。

在这种情况下,还将有多个轮询线程(实际数量将取决于队列的大小),并且每个线程仍将尝试调用我们的 Lambda 函数。这次的区别在于,一旦我们的使用者 Lambda 的预留并发已满,所有线程都将开始收到任何后续调用的限制错误。

在这种情况下,发生的情况是,在收到来自 Lambda 函数的节流错误时,轮询线程会再次尝试使用 SQS 批次调用该函数,然后才会拒绝该批次。批处理中消息的可见性超时结束后,它们会再次出现在队列中。

这种行为的问题在于,如果你的队列上设置了重新驱动策略(根据 亚马逊云科技 文档 ,你应该这样做) ,你可能会看到很多发送到 DLQ 的消息。之所以发生这种情况,是因为一旦轮询线程未能通过 SQS 消息调用您的 Lambda 函数,该消息就会再次出现在队列中,其接收次数增加一。该周期可能会重复多次,一旦消息的接收次数达到重启策略中设置的限制,该消息就会发送到 DLQ。

如果您没有重启政策,则该消息只有在保留期结束后才会被删除。

请注意,您现在可以直接 在事件源映射中 配置 Lambda 函数的最大并发 性。

FIFO 排队等候救援

解决这个问题的一个好方法是使用 SQS FIFO 队列。亚马逊云科技 甚至支持 高吞吐量 FIFO 队列。

FIFO 队列的工作方式与普通队列不同,因为 亚马逊云科技 需要确保按发送顺序处理每条消息,因此命名为 FIFO。如果您使用事件源映射将 FIFO SQS 队列与 亚马逊云科技 Lambda 集成,则对于每个消息 组,亚马逊云科技 将生成一个 从队列中提取的线程。

以下是 亚马逊云科技 文档 的摘录 :“FIFO 队列逻辑仅适用于每个消息组 ID。每个消息组 ID 代表 Amazon SQS 队列中一个不同的有序消息组。”

这意味着,亚马逊云科技 不会将 FIFO 逻辑应用于所有进入 FIFO 队列的消息;它会将该逻辑应用于属于同一消息组的消息。消息组 ID 是将消息发送到 FIFO 队列时可以对其设置的属性。

轮询 FIFO SQS 队列时,Lambda 知道该消息组。如果您为所有消息输入一个消息组 ID,则 亚马逊云科技 将仅使用一个线程来轮询队列,并且一次只会调用一次 Lambda。

Serverless-Guru-SQS-Lambda-10

图 10 — FIFO 队列的 SQS 事件源映射。

如果您知道拥有两个并行 Lambda 函数不会影响下游服务,则可以使用随机分配给每条消息的两个唯一消息组 ID,亚马逊云科技 只会使用两个线程来轮询消息。在任何给定时间最多调用两次 Lambda。

FIFO 队列与预留并发性对比

这比在 亚马逊云科技 Lambda 上设置预留并发性有什么好处?使用 FIFO 队列事件源映射,轮询线程不会尝试积极调用 Lambda 函数。它只会调用与 FIFO 队列中唯一组 ID 一样多的并行函数。

需要注意的一点是,使用 FIFO 队列时,常规 SQS 队列的吞吐量确实会丢失。此外,FIFO队列的使用费用比常规队列高,因此在做出此决定时请考虑这些因素。

如果使用 FIFO 队列不符合你的用例,你可以随时使用 亚马逊云科技 Batch 亚马逊弹性容器服务 (Amazon ECS) 任务来手动使用队列。

结论

本文中详细介绍的 Amazon SQS 和 亚马逊云科技 Lambda 集成是一项强大功能,对于中小型队列,您无需过多考虑集成,因为它开箱即用。

如果您对如何实现 SQS-Lambda 集成或与无服务器相关的任何问题有疑问,请联系无服务器 Guru


Serverless-Guru-APN-Blog-Connect-2023


无服务器 Guru — 亚马逊云科技 合作伙伴聚焦

Serverless Guru 是 亚马逊云科技 高级等级服务合作伙伴 ,可帮助公司建立、迁移和培训 亚马逊云科技 无服务器开发团队。

联系无服务器 Guru | 合作伙伴概


*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您发展海外业务和/或了解行业前沿技术选择推荐该服务。