使用亚马逊 DynamoDB 交易实现具有筛选功能的无服务器 FIFO 队列

消息队列允许系统的不同部分异步通信和处理操作。 亚马逊简单队列服务 (Amazon SQS) 是一项完全托管的消息队列服务,提供两种类型的消息队列:标准队列和 先进先出 (FIFO) 队列。 对于某些应用程序,例如呼叫中心应用程序,您需要进行消息筛选以及对消息进行FIFO排序。这种额外的筛选功能可以使用 亚马逊简单通知服务 (Amazon SNS) FIFO 主题 和亚马逊 SQS FIFO 队 列的组合来实现。但是,此选项可能会导致主题、队列激增,并增加跨这些不同队列实现事务的额外开销。

在这篇文章中,我们将向您介绍一个用例和一个基于 Amazon Dynam oDB 事务的队列实现,它演示了消息筛选和有序消息处理功能。DynamoDB 是一个完全托管的无服务器数据库,支持键值和文档数据模型。我们选择基于 DynamoDB 的实现有三个原因。 首先,DynamoDB 是无服务器的;其次,DynamoDB 采用按 需定价模式 ,该模式提供简单的按请求付费定价,因此用户按实际使用量付费,而不是提前预置容量;第三,DynamoDB 在单个 亚马逊云科技 区域中高度可用,并且具有用于多区域容错的全局表选项。

解决方案概述

我们使用呼叫中心用例来演示消息队列处理解决方案。呼叫中心排队系统必须根据请求者的首选语言和请求类型等规则,将客户支持请求按收到的顺序与可用的最佳客服进行匹配。对于我们的用例,我们应用了一条规则,即支持人员的语言和性别属性与来电者个人资料中的偏好保持一致,如下面的图 1 所示。我们实现了代理队列,该队列提供筛选功能,这样,请求说西班牙语的女性代理的来电者就会与具有相应配置文件且最早被添加到队列中的客服(FIFO)进行匹配。

SQS based implementation

图 1:客服的语言和性别属性与来电者个人资料中的偏好相符

亚马逊 SQS FIFO 队列非常适合呼叫队列。但是,SQS 不为代理队列提供筛选功能。下面的图 2 显示了一种使用 DynamoDB 作为代理队列的方法。DynamoDB 支持两种不同的 主键 :简单主键,由一个称为分区键的属性组成;由两个属性组成的复合主键。在复合主键中,第一个属性是分区键,第二个属性是排序键。此实现中的代理队列表使用复合主键,因为它在查询数据时提供了额外的灵活性,例如能够同一分区键的排序键值对结果进行排序。

代理队列表的分区键是一个位掩码,掩码中的每个位代表某种代理能力,例如西班牙语能力。排序键是代理可用性时间戳。这两个属性共同确保了物品的唯一性。当新的调用进入队列时,匹配器会构造位掩码并要求提供与位掩码匹配的第一条记录。由于 DynamoDB 根据排序键对分区进行排序,因此该查询会返回第一个可用代理。

图 2:使用 DynamoDB 实现代理队列

当我们对代理队列使用 DynamoDB 方法时,我们会做出两个重要的假设:

  • 首先,我们假设可能有多个代理匹配请求并行运行。为了防止第二个线程读取另一个线程正在处理的项目,我们结合使用 DynamoDB 事务和乐观锁定来强制执行等效的项目可见性。
  • 其次,我们要求在表格中插入代理可用性记录的流程为具有多种可能技能组合的客服(例如会说西班牙语和英语的客服)重复录入。

DynamoDB 表格设计

我们使用 单表设计 来实现代理队列。该表代表三种类型的项目:代理、队列信息和代理队列。

以下示例说明了代理表的设计。

Primary key Attributes
Partition key: pk Sort key: sk AgentName AgentID Gender Languages AgentStatus
AllAgents Agent#Andrew Andrew 929bb98b85fe M [“English”,”French”] attendingCall
Agent#Billy Billy 836c66055b77 M [“English”,”French”,”Spanish”] queued
Agent#Christine Christine bd14b59ac8e4 F [“Spanish”] queued
Agent#Courtney Courtney 43156c44832a F [“English”,”Spanish”] queued
Agent#Ellen Ellen 60f6a95b747c F [“English”,”French”,”Spanish”] queued
Agent#Remy Remy 56c4c66 bb98 T [“English”] queued

每个座席项目代表一个 客服、他们的性别、他们说的一组语言以及他们的当前状态(在线、排队 或Att ending Call)。为了解释解决方案,使用 agentN am e 属性而不是 Agent I d 作为排序键 ,这保证了唯一性。

下表说明了我们的队列信息设计。

Primary key Attributes
Partition key: pk Sort key: sk QueueVersionId QueueDepth
AllQueues Q#English#F 12 2
Q#English#M 14 1
Q#English#T 13 1
Q#French#F 44 3
Q#French#M 35 1
Q#Spanish#F 46 1
Q#Spanish#M 15 2

队列信息项代表每个性别和语言组合队列 的当前深度和 versionID 队列的 queueVersionID 用于实现乐观锁定。 queueDe pth 属性不是必需的,但包含该属性是为了演示整体解决方案。

以下示例显示代理队列表的设计。

Primary key Attributes
Partition key: pk Sort key: sk AgentName AgentID Gender Languages
Q#English#F 2022/01/05-14:01:37.370825 Courtney 43156c44832a F [“English”,”Spanish”]
Q#English#F 2022/01/05-14:01:39.416341 Ellen 60f6a95b747c F [“English”,”French”,”Spanish”]
Q#English#T 2022/01/05-14:01:19.416341 Remy 56c4c66 bb98 T [“English”]
Q#English#M 2022/01/05-14:01:29.257873 Billy 836c66055b77 M [“English”,”French”,”Spanish”]
Q#French#F 2022/01/05-14:01:39.416341 Ellen 60f6a95b747c F [“English”,”French”,”Spanish”]
Q#French#M 2022/01/05-14:01:29.257873 Billy 836c66055b77 M [“English”,”French”,”Spanish”]
Q#Spanish#F 2022/01/05-14:01:34.330782 Christine bd14b59ac8e4 F [“Spanish”]
Q#Spanish#F 2022/01/05-14:01:37.370825 Courtney 43156c44832a F [“English”,”Spanish”]
Q#Spanish#F 2022/01/05-14:01:39.416341 Ellen 60f6a95b747c F [“English”,”French”,”Spanish”]
Q#Spanish#M 2022/01/05-14:01:29.257873 Billy 836c66055b77 M [“English”,”French”,”Spanish”]

代理队列项目代表队列中的消息。会说英语和西班牙语的女特工将在表中 显示两个留言 项目。一个 消息 项的分区键是 Q #English #F 。另一个 消息 项的分区键是 Q #Spanish #F 。这两个项目具有相同的排序键,即将该代理添加到代理队列的时间戳。

将消息出队时使用这些 消息 项中的 AgentName 、性别 语言 属性。要使代理的消息出队,需要对表中的其他项目进行以下相应的更改:

  • 必须删除同一代理的其他 消息 项目。
  • 必须更新队列属性项以反映 Queue VersionID 和 Q ueueDepth 属性的变化。
  • 必须更新代理项目以反映 Agent Status 的变化。

我们将在后续章节中介绍这些细节。

在此示例中,我们使用使用 亚马逊云科技 开发工具包的 Python 脚本来创建表并填充各种项目类型。DynamoDB 表和项目类型代表代理队列,该队列提供 FIFO 和筛选功能。然后,我们向该代理队列查询具体标准,以演示实际解决方案。

先决条件

对于此解决方案,您必须首先完成以下先决条件:

  1. 拥有 Python 版本 3.7 及更高版本。
  2. 配置亚马逊网络服务 (亚马逊云科技) 证书 和 亚马逊云科技 区域。
  3. 确保 亚马逊云科技 身份和访问管理 (IAM ) 委托人有权对 DynamoDB 表执行操作。代码存储库中提供了示例身份策略。
  4. 克隆 GitHub 存储库:
    
    git clone github.com: aws-samples/aws-dynamodb-fifo.git 
     
    
  5. 更改目录:c
    
    d aw
    s-dynamodb-fifo
  6. 安装依赖包:
    
    pip install-r requirements.txt 
     
    

现在,你可以使用三个不同的脚本来设置代理队列功能。

创建表

第一个脚本使用复合主键和按需容量模式创建一个 DynamoDB 表。

运行以下命令:

python createTable.py

下面的图 3 显示了验证表是否已创建的输出。

Creating AgentQueueFIFO table
图 3:DynamoDB 表 AgentQueueFifo 已创建

初始化表

第二个脚本使用以下方法初始化表:

  • 代理池 — 该脚本使用 Faker 包 生成随机代理。每个代理都有一个随机的名字、 AgentID 、性别以及他们会说的最多三种语言的列表。默认情况下,该脚本会创建五个代理。
  • 队列属性 -创建了六个队列属性项目。每个项目代表语言和性别的组合。

运行以下命令:

python initializeAgentPoolAndQueueMetaData.py

下面的图 4 显示了 DynamoDB 控制台上列出的五个代理项目和六个队列属性。

Agents and their properties
图 4:代理及其属性

将可用代理分配给队列

对于每个 Ag entStat us 属性值为 available 的 代理 ,脚本会创建新项目并更新现有项目。

使用以下命令运行第三个脚本:

python addAvailAgentsToQueue.py

下面的图 5 显示了即将排队的每个可用代理所涉及的步骤。

Steps for queuing each available agent

图 5:每个代理的步骤

该脚本完成以下步骤:

  1. 为代理使用的每种语言创建消息项目类型。

例如,探员艾丽西亚会说两种语言,英语和西班牙语。表中添加了两个新项目,如下面的图 6 所示。这两个项目的排序键是将代理添加到可用性队列时的时间戳。
Two items for agent Alicia, who speaks English and Spanish
图 6:会说英语和西班牙语的特工艾丽西亚的两件物品

  1. 代理 项目的 Ag entStat us 属性值更新为已排队。

Agent status updated to queued
图 7:代理状态已更新为排队

  1. 增加相应 队列元数据项的 queueDepth queueversionID

QueueDepth and QueueVersionId updated
图 8:队列深度和队列版本 ID 已更新

这三个操作都是 DynamoDB TransactWr iteItems 操作的一部分。这可确保将可用代理添加到队列的单一逻辑业务操作的所有操作一起成功或一起失败。

该脚本循环遍历 Ag entStat us 属性值为 可用的所有代理项目, 并创建其他相应的项目。下面的图 9 是 DynamoDB 表的视图,其中所有五个代理都排在队列。

DynamoDB table showing all agents queued
图 9:显示所有排队代理的 DynamoDB 表

解决方案现已准备就绪。

按照 FIFO 顺序筛选消息

为了演示过滤功能和先进先出功能,让我们假设来电者想要与会说西班牙语且是男性的客服通话。解决方案应返回与这些条件匹配的第一条消息。

对于组合密钥为 allQueues+Q #Spanish #M 的项目 ,有四个代理符合该标准,如下面的图 10 所示。

Four agents that match the composite key
图 10:与复合密钥匹配的四个代理 allQueues+Q #Spanish #M

用例要求将呼叫者与符合条件的代理(讲西班牙语的男性)进行匹配,并且是第一个添加到队列的代理(FIFO 排序)。特工 Chad 的物品符合此要求,因为该物品符合筛选条件(讲西班牙语的男性),并且是第一个根据排序键中的时间戳添加到队列的项目。如下面的图 11 所示。

Agent Chad matches all criteria and was first added to the queue
图 11:Agent Chad 匹配所有条件并首先被添加到队列中

当将代理 Chad 分配给呼叫者时,Chad 必须出队,这样其他呼叫者就不会被分配到同一个代理。例如,如果第二个来电者同时在寻找一位讲法语的男性,则该标准也与查德相匹配。

当代理请求同时来自多个调用方线程时,可能会出现这种争用情况。我们通过使用 DynamoDB 事务强制锁定和使用 QueueversionID 进行乐观锁定来解决这个问题。

让我们来看看这是如何使用示例调用者匹配流程来实现的。来电者正在请求一位会说西班牙语的男性特工( Q #Spanish #M)。 后面的图 12 中的流程图显示了所发生的步骤。

Agent matching request flowchart
图 12:代理匹配请求流程图

步骤如下:

  1. DynamoDB TransactGetItems 操作可捕获所有队列的当前元数据的快照。此事务操作可确保在读取项目时没有其他冲突的更新正在进行中。在此步骤中 检索到的 queueVersionID 用于在后续步骤中实现乐观锁定。
  2. 使用 强一致性读取的查询操作会 检索匹配的代理。强一致性读取可确保查询返回最新数据,反映所有先前写入操作的更新。要找到会说西班牙语且是男性的特工,我们使用 Q #Spanish #M 的分区键进行查询 。返回的结果始终按排序键值排序,在本例中,排序键值是代理状态设置为排队的时间戳。
  3. 查询结果集中的第一项是应分配给调用者的代理,如下面的图 13 所示。

First item that matches the query

图 13:与查询匹配的第一项

为了完成识别和分配代理的步骤,我们使用该项目中的 性别、语言、sk (代理可用性时间戳)和 AgentNam e 属性值对表中的其他相应项目执行删除和更新。

    1. 删除匹配代理的所有队列项目,如下面的图 14 所示。对于特工 Chad,删除三个项目(每种语言一个项目)。这三个需要删除的项目的分区键是使用语言属性值 {'英语'、'法语'、'西班牙语'} 和 “ 性别” 属性计算得出的。删除操作的排序键源自 s k 值。
    2. Items marked for removal
      图 14 — 标记为要删除的项目
    3. 更新队列信息项目,如下面的图 15 所示。对于特工 Chad 来说,必须更新三个队列的信息: Q #English #M Q #French #M Q #Spanish #M 队列版本 ID 递增 1, 队列 深度 递减 1。 Cond it ionExpression 用于验证项目的当前 queueversionID 是否与代理匹配过程开始之前拍摄的快照中 捕获的对应 queueversionID 相同(步骤 1)。这种 乐观的锁定 策略以及交易操作可确保所有竞争条件都得到解决。 Update queue information
      图 15:更新队列信息将
    4. Ag
    5. entI tem 的代理状态 排队 更新为 A tten dingCall ,如下面的图 16 所示。

Update agent status

图 16:更新代理状态

要查看其实际运行效果,请运行以下命令:

python assignAgentToCallRequest.py Q#Spanish#M

该脚本会找到匹配的代理并执行上一节中讨论的删除和更新。下面的图 17 显示了运行脚本后对项目所做的更改:

  • 表中的项目数量已从 23 变为 20。这反映了已删除的三个 AgentQueue 项目。
  • Q #English #M 、Q #French #M 和 Q #Spanish #M 的队列元数据 已更新。 队列深度减少 了 1, 队列版本 ID 增加 了 1。
  • 代理的代理状态已更新为 Att endingCall。

Changes following deletes and updates
图 17:删除和更新后的更新

清理

要删除 DynamoDB 表,请运行以下命令:

python deleteTable.py

注意事项

通过在执行查询操作时将 ScanIndexForward 参数设置为 false,可以将此解决方案扩展到具有筛选功能的后进 先出 (LIFO) 队列。 这会按降序而不是默认的升序对查询操作的结果进行排序。

本文中的解决方案使用单表设计,但由于 DynamoDB 交易操作可以跨越同一 亚马逊云科技 账户和区域内的多个表,因此您可以使用多个表创建类似的解决方案。

目前,可以 作为 TransactWriteItems 或 t ransactGetItems 操作一部分的独特物品数量 限制 为 100。 在这篇文章中演示了三种语言 (n) 和三种性别 (m) 的组合后,写入交易最多可以有 10 个项目(n*m +1)。对于其他用例,必须考虑属性的数量和每个属性的可能值,以确保组合不会超过对 100 个项目的运算。

此解决方案使用 DynamoDB 事务读写请求和强一致性读取请求。交易读取和写入请求 的价格 是标准读取和写入请求的两倍。强一致性读取请求的价格是最终一致性读取请求的两倍。大规模使用此解决方案时,应考虑这些额外成本。

摘要

在这篇文章中,我们向您展示了如何使用 DynamoDB 实现具有过滤功能的无服务器 FIFO 队列的示例。您还了解到,当多个客户端与该队列实现进行交互时,会满足队列系统的标准特征,例如消息排序、可见性、入队和出队。你可以使用这篇文章中的代码示例作为实现用例的起点。

试试这个解决方案,并在评论部分分享你的反馈。


作者简介

Nikhil Penmetsa 是亚马逊网络服务的高级解决方案架构师。他帮助组织了解有关基于云的高级解决方案的最佳实践。他热衷于与客户深入合作,以创建具有成本效益、安全和高性能的解决方案。

Randy DeFauw 是 亚马逊云科技 的高级首席解决方案架构师。他拥有密歇根大学电气工程硕士学位,在那里他从事自动驾驶汽车的计算机视觉研究。他还拥有科罗拉多州立大学的工商管理硕士学位。兰迪在技术领域担任过各种职位,从软件工程到产品管理。In 于 2013 年进入大数据领域,并继续探索该领域。他积极参与机器学习领域的项目,并在包括Strata和GlueCon在内的许多会议上发表过演讲。