发布于: Oct 10, 2021

如何构建基于 Apache Flink 和 Kinesis Data Analytics for Java 应用程序的流应用程序,下面就以纽约市机场的打车请求状况为例进行说明。要查看所描述的架构,请在您自己的 Amazon Web Services 账户中执行以下 Amazon CloudFormation 模板。该模板首先构建分析传入的出租车行程的 Flink 应用程序,包括读取来自 Kinesis 数据流的数据所需的 Flink Kinesis 连接器。然后,它创建基础设施并将 Flink 应用程序提交到 Kinesis Data Analytics for Java Applications。

构建应用程序和创建基础设施的整个过程大约需要 20 分钟。创建 Amazon CloudFormation 堆栈后,Flink 应用程序已部署为 Kinesis Data Analytics for Java 应用程序。然后它等待数据流中的事件到达。启用检查点,以便应用程序可以从底层基础设施的故障中无缝恢复,而 Kinesis Data Analytics for Java 应用程序代表您管理检查点。此外,还配置了自动扩展,以便 Kinesis Data Analytics for Java 应用程序自动分配或删除资源并扩展应用程序(即调整其并行性)以响应传入流量的变化。

为了填充 Kinesis 数据流,我们使用 Java 应用程序将纽约市历史出租车行程的公共数据集回放到数据流中。Java 应用程序已下载到Amazon CloudFormation 配置的 Amazon EC2实例。您只需连接到实例并执行 JAR 文件即可开始将事件提取到流中。

您可以从先前执行的 Amazon CloudFormation 模板的输出部分获取以下所有命令及参数。

$ ssh ec2-user@«Replay instance DNS name»

java -jar amazon-kinesis-replay-1.0-SNAPSHOT.jar -streamName «Kinesis data stream name» -streamRegion «AWS region» -speedup 3600

加速参数确定相对于实际发生的历史事件,将数据导入 Kinesis 数据流的速度加快多少。使用给定的参数,Java 应用程序可在一秒内提取一小时的历史数据。这会带来大约 13k 个事件的吞吐量和每秒 6MB 的数据,从而使 Kinesis 数据流完全饱和(稍后将详细介绍)。

然后,您可以继续通过已创建的 Kibana 控制面板检查派生数据。或者,您可以创建自己的可视化以探索 Kibana 中的数据。

https://«Elasticsearch endpoint»/_plugin/kibana/app/kibana#/dashboard/nyc-tlc-dashboard

准备的 Kibana 控制面板包括一幅热图和一幅折线图。热图可视化当前叫车位置,并显示叫车需求最高的位置是曼哈顿。此外,地图上还标出了肯尼迪国际机场和拉瓜迪亚机场,与其直接社区相比,这两个机场的叫车需求高很多。折线图可视化这两个机场的平均行程持续时间。下图显示了叫车量在日间如何稳定增加,直到晚间突然下降。

在本博文中,Elasticsearch 集群被配置为接受来自指定为 Amazon CloudFormation 模板参数的 IP 地址范围的连接。对于生产工作负载,更需要进一步加强 Elasticsearch 域的安全性,例如,使用 Amazon Cognito 进行 Kibana 访问控制。

在本博文中,Kinesis 数据流被故意预置不足,因此 Java 应用程序完全使数据流饱和。如果仔细检查 Java应用程序的输出,您会注意到“回放延迟”不断增加。这意味着生产者无法根据指定的加速参数尽快提取事件。

您可以通过 Amazon CloudWatch 控制面板访问数据流,从而深入了解数据流的指标。然后,您可以看到,WriteProvisionedThroughputExceeded 指标略有增加:由于相应请求受限,大约 0.4% 的记录不会被接受到流中。另一方面,数据流预置不足,特别是在飞行中发生太多事件时生产者暂停提取新事件的情况下。

要增加数据流的吞吐量,只需在控制台上点击几次和通过 API 调用,就可以将分片数量从 6 更新为 12。对于生产环境,您甚至可能希望自动执行此过程。有关如何自动扩展 Kinesis 数据流的详细信息,请参阅博文使用 Amazon Application Auto Scaling 扩展 Amazon Kinesis 数据流。

当流分片的扩展操作完成时,您可以观察“回放延迟”如何减少以及更多事件如何被导入流中。

但是,直接结果是需要处理更多事件。因此,现在 Kinesis Data Analytics for Java 应用程序变得过载,无法再跟上增加的传入事件数量。您可以通过发布到 CloudWatch 的 millisBehindLatest 指标来观察此情况。指标根据提取时间(以毫秒为单位),报告 Kinesis Data Analytics for Java 应用程序当前读取的最旧记录与流中的最新记录之间的时间差。因此,它表明流提示中有多少事件的处理滞后。

正如这些指标所示,扩展操作完成 10 分钟后,流分片中最新事件的处理已滞后 3 分钟以上。更糟糕的是,它不断变得更加延后,并不断扩大此差距。
但是,与 Kinesis Data Streams 相比,Kinesis Data Analytics for Java 应用程序本身支持自动扩展。几分钟后,您可以在指标中看到扩展活动的效果。millisBehindLatest 指标开始减少,直到达到零,此时处理已赶上 Kinesis 数据流的提示。

但是,请注意 millisBehindLatest 指标在开始下降之前是如何激增的。这是由今天扩展 Kinesis Data Analytics for Java 应用程序的方式导致的。要扩展正在运行的应用程序,应用程序的内部状态将持久保存为所谓的保存点。此保存点由 Kinesis Data Analytics for Java 应用程序公开为快照。随后,终止应用程序的运行实例,并创建一个具有更多资源和更高并行性的相同应用程序的新实例。然后,应用程序的新实例从快照填充其内部状态,并从当前终止的实例中断位置开始恢复处理。

因此,扩展操作导致处理短暂中断,这解释了指标的激增。但是,此操作对生产者和消费者是透明的。生产者可以继续写入 Kinesis 数据流,因为它们与应用程序是松耦合的。同样,消费者仍然可以使用 Kibana 查看他们的控制面板,不过他们可能看不到最新数据,因为其尚未处理。

让我们退一步,回顾一下您刚刚做了什么:您创建了一个完全托管、高度可用、可扩展的流式架构。您每秒提取和分析多达 25,000 个事件。通过几次单击即可扩展 Kinesis 数据流和 Kinesis Data Analytics for Java 应用程序,从而将架构的吞吐量翻倍。操作完成后,架构仍然完全正常运行,并且不断接收和处理事件,而不会丢失单个事件。您还可以像其他组件一样无缝扩展 Elasticsearch 集群。但是我们会把它留给感兴趣的读者练习。

在本文中,您不仅构建了基于 Apache Flink 和 Kinesis Data Analytics for Java 应用程序的可靠、可扩展且高度可用的流应用程序,还可以扩展不同的组件,同时近乎实时地每秒提取和分析多达 25k 个事件。大部分情况下,通过使用托管服务启用此场景,因此您无需花时间配置和配置底层基础架构。

相关文章