发布于: Aug 18, 2022

与其他部署方法类似,首先构建 Flink 应用程序,并将其打包到一个 JAR 中,该 JAR 包含应用程序运行所需的所有依赖项。然后,将生成的 JAR 上传到 Amazon S3。接下来,使用 S3 上 JAR 的位置和一些其他配置参数来创建一个可由 Kinesis Data Analytics for Java 应用程序执行的应用程序。因此,无需登录到集群并直接将作业提交到 Flink 运行,而是将相应的 JAR 上传到 S3。然后,您可以创建 Kinesis Data Analytics for Java 应用程序,分别使用 API 调用、控制台和 Amazon Web Services CLI 进行交互。

要获得有效的 Kinesis Data Analytics for Java 应用程序,Flink 应用程序的 JAR 必须包含某些依赖项。当您使用 Apache Maven 构建 Flink 应用程序时,只需将另一个依赖项添加到项目的 .pom 文件中。

<!—pom.xml -><project>
    ...
    <dependencies>
        ...
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-kinesisanalytics-runtime</artifactId>
            <version>1.0.1</version>
        </dependency>
    </dependencies>
    ...</project>

然后,您可以指定在创建或更新时传递给生成的 Kinesis Data Analytics for Java 应用程序的参数。这些参数基本上是键值对,包含在作为属性组一部分的属性映射中。

"ApplicationConfiguration": {
    "EnvironmentProperties": {
        "PropertyGroups": [
            {
                "PropertyGroupId": "FlinkApplicationProperties",
                "PropertyMap": {
                    "InputStreamName": "...",
                    ...
                }
            }
        ]
    },
    ...
}

然后,您可以从源自 Kinesis Data Analytics for Java 应用程序运行时的应用程序代码中,获取这些参数的值。例如,以下代码段可获取应用程序应从 FlinkApplicationProperties 属性组连接到的 Kinesis 数据流的名称。

Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();

Properties flinkProperties = applicationProperties.get("FlinkApplicationProperties");

String kinesisStreamName = flinkProperties.getProperty("InputStreamName");

您可以使用相同的机制为 Kinesis Data Analytics for Java 应用程序配置其他属性(例如,检查点和应用程序的并行性),这些属性通常直接指定为 Flink 运行时的参数或配置选项。

"ApplicationConfiguration": {
    "FlinkApplicationConfiguration": {
        "CheckpointConfiguration": {
            "ConfigurationType": "DEFAULT"
        },
        "MonitoringConfiguration": {
            "ConfigurationType": "CUSTOM",
            "MetricsLevel": "TASK",
            "LogLevel": "INFO"
        },
        "ParallelismConfiguration": {
            "ConfigurationType": "DEFAULT"
        }
    },
    ...
}

使用此配置,检查点和并行度设置可保留默认值。这样可以实现检查点和自动扩展,并将 Kinesis Data Analytics for Java 应用程序的初始并行性设置为一个。此外,日志级别增加到 INFO,并为应用程序的每个子任务收集 CloudWatch 指标。

在构建 Kinesis 数据流的 Flink 应用程序时,您可能会发现,在 Maven Central 中无法使用 Flink Kinesis 连接器。因为实际上需要自己来构建它。以下步骤为最近的 Apache Flink 版本构建连接器。由于 Kinesis Data Analytics for Java 应用程序是基于 Flink 1.6.2 的,您现在可以使用此特定版本。

$ wget -qO- https://github.com/apache/flink/archive/release-1.6.2.zip | bsdtar -xf-

$ cd flink-release-1.6.2

$ mvn clean package -B -DskipTests -Dfast -Pinclude-kinesis -pl flink-connectors/flink-connector-kinesis

请注意,连接器已经可以通过 Amazon Web Services CloudFormation 模板构建,并已经存储在 S3 上。从那里即可下载连接器的 JAR 文件,并使用以下 Maven 命令将其放在本地 Maven 存储库中:

$ mvn install:install-file -Dfile=flink-connector-kinesis_2.11-1.6.2.jar -DpomFile flink-connector-kinesis_2.11-1.6.2.pom.xml

从 1.6 版本开始,Apache Flink 附带一个 Elasticsearch 连接器,该连接器支持 HTTP 上的 Elasticsearch API。因此,它可以与 Amazon Elasticsearch Service 提供的终端节点进行本地通信。

您只需要决定如何针对 Elasticsearch 集群的公共终端节点对请求进行身份验证。您可以将单个 IP 列入白名单以访问集群。针对 Amazon Elasticsearch Service 终端节点进行身份验证的推荐方法是使用 IAM 凭证和签名版本 4 签名过程向 Amazon Web Services 请求添加身份验证信息。

您也可以使用可从 Maven central 获得的 open-source Amazon Web Services-signing-request-interceptor。只需在将请求发送到 Amazon Elasticsearch Service 终端节点之前调用的 Elasticsearch 接收器中添加拦截器。然后,拦截器可以使用为 Kinesis Data Analytics for Java 应用程序配置的角色权限对请求进行签名。

final List<HttpHost> httpHosts = Arrays.asList(HttpHost.create("https://...")));

ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(
    httpHosts,
    new ElasticsearchSinkFunction<T>() {
      ...
    });

final Supplier<LocalDateTime> clock = () -> LocalDateTime.now(ZoneOffset.UTC);
final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
final AWSSigner awsSigner = new AWSSigner(credentialsProvider, "eu-west-1", "es", clock);

esSinkBuilder.setRestClientFactory(
    restClientBuilder -> restClientBuilder.setHttpClientConfigCallback(
        callback -> callback.addInterceptorLast(new AWSSigningRequestInterceptor(awsSigner))
    ));

esSinkBuilder.build();

请注意,GitHub 存储库中的实际代码有点复杂,因为您需要获取可序列化的请求拦截器。但签署请求的基本方法仍然相同。

当我们运行 Kinesis Data Analytics for Java 应用程序时,需直接访问运行 Flink 的集群。这是因为底层基础设施完全由服务托管。您仅需要通过 API 与服务进行交互即可。您也可以分别通过 CloudWatch 和 CloudWatch Logs 获取指标和日志记录信息。

Kinesis Data Analytics for Java 应用程序公开了许多操作指标,从整个应用程序的指标到应用程序运算符单个进程的指标。您可以根据您的目的,控制适合或需要的详细程度。事实上,上一节中使用的指标都是通过 CloudWatch 获得的。

除了操作指标之外,您还可以配置 Kinesis Data Analytics for Java 应用程序以将消息写入 CloudWatch Logs。此功能与常见的日志记录框架无缝集成,例如 Apache Log4j 和 Simple Logging Facade for Java (SLF4J)。因此,它对于调试和识别操作问题的原因很有用。

如需为您的 Kinesis Data Analytics for Java 应用程序启用日志记录,只需在启动应用程序时将现有 CloudWatch 日志流指定为日志记录选项即可,如下所示:

final Logger LOG = LoggerFactory.getLogger(...);
LOG.info("Starting to consume events from stream {}", flinkProperties.getProperty("InputStreamName"));

将日志消息持久保存到 CloudWatch Logs 后,可通过 CloudWatch Logs Insights

相关文章