使用亚马逊 SNS FIFO 存档和重播消息

作者: 本杰明·史密斯 |

这篇文章由解决方案架构师穆罕默德·阿蒂克和 Serverle ss 首席解决方案架构师 Mithun Mallick 撰写

亚马逊简单通知服务 (SNS) Simple Notification S ervic e 提供灵活、完全托管的消息服务,允许应用程序发送和接收消息。SNS 充当渠道,将活动从发布商传递给订阅者。

今天,亚马逊云科技 宣布了一项新功能,使您能够存档和重播发布到 SNS FIFO(先入先出)主题的消息。现在,启用存档策略后,SNS FIFO 主题将自动:

  • 存档事件 ,使用不需要任何外部资源的无代码就地消息存档。您只需要为主题定义存档策略,包括所需的保留期(从 1 天到 365 天)。
  • 重播事件: 订阅者受益于托管的无代码消息重播功能,以及内置的进度报告和消息筛选功能。要启动重播,订阅者只需将重播策略应用于他们的订阅,使用时间戳定义起点和终点。

此功能在故障恢复和状态复制场景中可能很有用。

故障恢复

在故障恢复场景中,开发人员可以使用它来重新处理消息子集,并从下游应用程序故障或依赖关系问题中恢复。

假设搜索应用程序需要重新处理消息的情况,因为搜索引擎的索引已被删除。为了启动恢复,搜索应用程序将使用 setSubs criptionAttributionAttributes API 操作更新其现有订阅中的 ReplayP olicy 属性 ,以开始接收来自特定时间点的消息,而不是从将存档策略应用到该主题时开始接收消息。

状态复制

对于状态复制方案,此功能使新应用程序能够复制先前订阅的应用程序的状态。

以内部数据仓库应用程序为例,它必须复制外部搜索应用程序的状态,以便产品经理和其他内部员工可以使用在搜索引擎中索引的数据。数据仓库应用程序使用订阅 API 操作将其新创建的终端节点(例如, Amazon SQS FIFO 队列) 订阅 该主题,并设置 Re playPolicy 订阅属性。

如果它选择复制搜索引擎的完整状态,它可能会在其 ReplayPol icy 中将时间戳设置 为与搜索引擎订阅的创建日期和时间一致,从而确保曾经发送到搜索引擎的所有数据也被传送到数据仓库工具。

通过 SNS 控制台启用存档策略

创建新的 SNS FIFO 主题时,您会看到存档策略选项。此政策决定了 SNS 将您的消息存储多长时间,以便在必要时重新发送这些消息以供订阅。默认情况下,存档策略不会激活,您必须为每个主题手动启用该策略或自动执行操作。

例如,此 FIFO 主题的保留期设置为 30 天。但是,您可以将此持续时间从 1 天调整到 365 天不等。激活存档策略后,发送到该主题的消息将在定义的时间段内存档。

要在创建主题后确认存档策略是否生效,请查看主题详细信息。在保留策略旁边,其状态显示为 “ 有效 ” 。

通过为 SQS FIFO 队列订阅 SNS FIFO 主题,您可以重播消息,并且重播状态显示未运行 。 您可以订阅 FIFO 和标准 SQS 队列的 SNS FIFO 主题,从而灵活地满足各种用例要求。要启动重播,请导航到 SNS 控制台,选择 “ 重播 ” ,然后选择 “ 开始重播 ”。

启动重播时,会出现一个窗口,允许您指定开始和结束日期,以及存档消息的确切时间。此功能允许您设置特定的时间表,从而可以灵活地仅重播感兴趣的消息,而不是每封存档的消息。当您选择 “ 开始重播 ” 时 ,该服务将开始向订阅者发送消息。

您还可以使用 亚马逊云科技 CloudFormation 和 亚马逊云科技 无服务器应用程序模型 (亚马逊云科技 SA M) 为 SNS FIFO 存档和 重播功能定义设置。

用例

在微服务中重放事件以恢复错误

在保险应用程序使用多个微服务的场景中,假设一个索赔处理微服务遇到错误并撤销了索赔。这种疏忽可能会导致工作负载不同步。

借助存档和重播功能,您可以重新访问和重播从检测到错误之时起的事件。这使微服务能够识别错过的事件并完成必要的操作,从而确保系统保持更新和准确。

  1. 消息从应用程序发布到 SNS FIFO 主题。
  2. 消息被传送到包含索赔详细信息的 SQS FIFO 队列,供下游微服务处理。
  3. 由于异常,微服务无法处理一系列消息,并丢弃所有消息。
  4. 然后,用户从 SNS FIFO 主题启动重播,根据故障发生的时间指定要重播的消息的时间范围。
  5. 微服务现在能够成功处理重播的消息,并将数据保存到 DynamoDB 表中。

跨区域复制状态

如果应用程序跨越多个区域,而微服务在其主区域遇到困难,则可以使用活动/备用设置将基础架构复制到另一个区域。

您可以将流量重新路由到辅助区域中的备用微服务,从而通过事件重播保持同步。您可以在 SNS 重播策略中设置结束时间,但如果未定义结束时间,则重播将持续到发送所有最新消息为止。

之后,SNS 订阅恢复正常运行,捕获所有新消息。这种方法适用于许多状态复制场景,例如跨区域备份策略,因为它有助于最大限度地减少停机时间并防止消息丢失。

  1. 消息从应用程序发布到 SNS FIFO 主题。
  2. 消息被传送到包含索赔详细信息的 SQS FIFO 队列,供下游微服务处理。
  3. 由于异常,微服务未能处理一系列消息,并丢弃了所有消息。
  4. 然后,用户在另一个区域订阅新的 SQS FIFO 队列,从 SNS FIFO 主题启动重播,并根据故障发生的时间指定要重播的消息的时间范围。
  5. 位于不同区域的微服务能够从新的 SQS FIFO 队列中检索重播的消息,成功处理一系列消息,并将数据保存到 DynamoDB 表中。

配置 SNS FIFO 存档和重播以进行汽车保险处理

管理汽车保险索赔需要及时的协调。本演练显示了 SNS FIFO 和 SQS FIFO 在按正确顺序处理索赔方面的综合优势。

SQS FIFO 和 SQS 标准队列均可订阅 SNS FIFO 主题,从而提供处理索赔的多功能性。SNS FIFO 的存档和重播功能至关重要;由于重播功能,下游微服务的中断不会损害索赔的完整性。

本演练指导您使用 亚马逊云科技 CLI 部署汽车保险索赔处理示例。您可以为索赔提交创建一个 SNS FIFO 主题和两个 SQS FIFO 队列。第一个队列用于主张处理,而第二个队列专门用于消息重放,以支持在各种系统实例之间复制应用程序状态。

先决条件

  • 亚马逊云科技 命令行接口 (CLI): 您可以使用 文档下载并进行设置。
  • JQ :要安装,请按照安装 说明进行 安装

第 1 步-使用 亚马逊云科技 CLI 创建资源并存储变量

在终端中运行以下命令。


# Define the AWS Region
REGION=$(aws configure get region)

# Create an SNS FIFO topic for auto insurance claims
AUTO_INSURANCE_TOPIC_ARN=$(aws sns create-topic --name "AutoInsuranceClaimsTopic.fifo" --attributes "FifoTopic=true,ContentBasedDeduplication=true,DisplayName=Auto Insurance Claims Topic" --region $REGION | jq -r '.TopicArn')

# Create primary and replay SQS FIFO queues
AUTO_INSURANCE_QUEUE_URL=$(aws sqs create-queue --queue-name "AutoInsuranceClaimsQueue.fifo" --attributes "FifoQueue=true" --region $REGION | jq -r '.QueueUrl')
AUTO_INSURANCE_REPLAY_QUEUE_URL=$(aws sqs create-queue --queue-name "AutoInsuranceReplayQueue.fifo" --attributes "FifoQueue=true" --region $REGION | jq -r '.QueueUrl')

# Get ARNs for both SQS queues
AUTO_INSURANCE_QUEUE_ARN=$(aws sqs get-queue-attributes --queue-url $AUTO_INSURANCE_QUEUE_URL --attribute-names QueueArn --region $REGION | jq -r '.Attributes.QueueArn')
AUTO_INSURANCE_REPLAY_QUEUE_ARN=$(aws sqs get-queue-attributes --queue-url $AUTO_INSURANCE_REPLAY_QUEUE_URL --attribute-names QueueArn --region $REGION | jq -r '.Attributes.QueueArn')

# Define a policy allowing the topic to publish to both queues
SQS_POLICY_TEMPLATE="{\"Policy\" : \"{ \\\"Version\\\": \\\"2012-10-17\\\", \\\"Statement\\\": [ { \\\"Sid\\\": \\\"1\\\", \\\"Effect\\\": \\\"Allow\\\", \\\"Principal\\\": { \\\"Service\\\": \\\"sns.amazonaws.com\\\" }, \\\"Action\\\": [\\\"sqs:SendMessage\\\"], \\\"Resource\\\": [\\\"$AUTO_INSURANCE_QUEUE_ARN\\\", \\\"$AUTO_INSURANCE_REPLAY_QUEUE_ARN\\\"], \\\"Condition\\\": { \\\"ArnLike\\\": { \\\"aws:SourceArn\\\": [\\\"$AUTO_INSURANCE_TOPIC_ARN\\\"] } } } ]}\"}"

# Apply the access policy to the queues
aws sqs set-queue-attributes --queue-url $AUTO_INSURANCE_QUEUE_URL --attributes file://<(echo $SQS_POLICY_TEMPLATE)
aws sqs set-queue-attributes --queue-url $AUTO_INSURANCE_REPLAY_QUEUE_URL --attributes file://<(echo $SQS_POLICY_TEMPLATE)

# Subscribe the primary queue to the created SNS FIFO topic
aws sns subscribe --topic-arn $AUTO_INSURANCE_TOPIC_ARN --protocol sqs --notification-endpoint $AUTO_INSURANCE_QUEUE_ARN --region $REGION

步骤 2-在 SNS FIFO 主题上设置存档策略

修改 SNS FIFO 主题的属性以设置保留期。这决定了消息在主题存档中保留多长时间。此示例使用 30 天。

# Set a 30-day retention period for the SNS FIFO topic

aws sns set-topic-attributes --region $REGION --topic-arn $AUTO_INSURANCE_TOPIC_ARN --attribute-name ArchivePolicy --attribute-value "{\"MessageRetentionPeriod\":\"30\"}"

第 3 步-发布汽车保险索赔详情

发布有关 SNS FIFO 主题的声明示例。此步骤模仿了现实场景,即保险索赔必须由该主题的订阅者处理。

# Get the current timestamp and publish a sample insurance claim
TIMESTAMP_START=$(date -u +%FT%T.000Z)
aws sns publish --region $REGION --topic-arn $AUTO_INSURANCE_TOPIC_ARN --message "{ \"claim_type\": \"collision\", \"registration\": \"AB123CDE\" }" --message-group-id "group1"

第 4 步 — 阅读汽车保险索赔详情

从 SQS FIFO 主队列中检索保险索赔详情。这模拟了阅读保险索赔以采取行动的过程。读取消息后,声明将从队列中删除,以避免重新处理。

# Fetch the claim details from the primary queue, then delete to avoid redundancy
MESSAGE=$(aws sqs receive-message --region $REGION --queue-url $AUTO_INSURANCE_QUEUE_URL --output json)
MESSAGE_TEXT=$(echo "$MESSAGE" | jq -r '.Messages[0].Body')
MESSAGE_RECEIPT=$(echo "$MESSAGE" | jq -r '.Messages[0].ReceiptHandle')
aws sqs delete-message --region $REGION --queue-url $AUTO_INSURANCE_QUEUE_URL --receipt-handle $MESSAGE_RECEIPT
echo "Received claim details: ${MESSAGE_TEXT}"

步骤 5-为重播 SQS 队列订阅 SNS FIFO 主题

为确保索赔不会丢失,请为您的 SQS FIFO 队列订阅配置重播策略。此策略设置了将消息重播到 SQS FIFO 队列的时间表。在这里,您可以使用重播策略订阅重播队列,然后监控重播队列的状态。完成后,从辅助 SQS FIFO 队列中读取重播的索赔详情。如果最初出现任何处理问题,则有第二次机会处理索赔。

订阅 SNS FIFO 主题的重播队列:

# Subscribe the replay queue to the topic and define its replay policy
NEW_SUBSCRIPTION_ARN=$(aws sns subscribe --region $REGION --topic-arn $AUTO_INSURANCE_TOPIC_ARN --protocol sqs --return-subscription-arn --notification-endpoint $AUTO_INSURANCE_REPLAY_QUEUE_ARN --attributes "{\"ReplayPolicy\":\"{\\\"PointType\\\":\\\"Timestamp\\\",\\\"StartingPoint\\\":\\\"$TIMESTAMP_START\\\"}\"}" --output json | jq -r '.SubscriptionArn')

要监控重播状态,请执行以下操作:

# Wait for the replay to complete
while [[ $(aws sns get-subscription-attributes --region $REGION --subscription-arn $NEW_SUBSCRIPTION_ARN --output text | awk 'END{print $9}') != 'Completed' ]]; do printf "."; sleep 5; done; echo "Replay complete";

要阅读重播的消息并从队列中删除该消息,请执行以下操作:

# Fetch the replayed message and then remove it from the queue
REPLAYED_MESSAGE=$(aws sqs receive-message --region $REGION --queue-url $AUTO_INSURANCE_REPLAY_QUEUE_URL --output json)
REPLAYED_MESSAGE_TEXT=$(echo "$REPLAYED_MESSAGE" | jq -r '.Messages[0].Body')
REPLAYED_MESSAGE_RECEIPT=$(echo "$REPLAYED_MESSAGE" | jq -r '.Messages[0].ReceiptHandle')
aws sqs delete-message --region $REGION --queue-url $AUTO_INSURANCE_REPLAY_QUEUE_URL --receipt-handle $REPLAYED_MESSAGE_RECEIPT
echo "Received replayed claim details: ${REPLAYED_MESSAGE_TEXT}"

正在清理

为避免产生不必要的成本,请清理本演练中创建的资源:

# Delete the primary SQS FIFO queue
aws sqs delete-queue --queue-url $AUTO_INSURANCE_QUEUE_URL --region $REGION

# Delete the replay SQS FIFO queue
aws sqs delete-queue --queue-url $AUTO_INSURANCE_REPLAY_QUEUE_URL --region $REGION

# Unset the 'ArchivePolicy' attribute
aws sns set-topic-attributes --region $REGION --topic-arn $AUTO_INSURANCE_TOPIC_ARN --attribute-name ArchivePolicy --attribute-value "{}"

# Delete the SNS FIFO topic
aws sns delete-topic --topic-arn $AUTO_INSURANCE_TOPIC_ARN --region $REGION

结论

新的 SNS FIFO 存档和重播功能为事件驱动的应用程序提供了坚实的基础,强调故障恢复和应用程序状态复制。这些功能使开发人员能够高效地管理和从中断中恢复,并确保在不同的应用程序实例或环境中复制状态。

使用 亚马逊云科技 管理控制台、亚马逊云科技 CLI、亚马逊云科技 软件开发套件 (SDK) 或 亚马逊云科技 Cl oudF or mat ion 开始使用这项新的 SNS FIFO 功能。 有关成本的信息,请参阅 SNS 定价 和 S QS 定价。

如需更多无服务器学习资源,请访问 无服务器世界


*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您发展海外业务和/或了解行业前沿技术选择推荐该服务。