使用 Apache Iceberg 和 Amazon Data Firehose 简化 Amazon WAF 日志分析

作者: Charishma Makineni, Phaneendra Vuliyaragoli |

各组织正在迅速扩大其数字影响力,为通过网络应用程序更好地为客户提供服务创造机会。Amazon WAF 日志使组织能够主动监控安全、强制合规并加强应用防御,从而在这次扩展中发挥着至关重要的作用。Amazon WAF 日志分析对许多行业至关重要,包括银行、零售和医疗保健,每个行业都需要提供安全的数字体验。

为了优化其安全运营,各组织正在采用现代方法,将实时监控与可扩展的数据分析相结合。他们正在使用数据湖架构和 Apache Iceberg 来高效处理大量安全数据,同时最大限度地减少运营开销。在处理存储在 Amazon Simple Storage Service(Amazon S3)中的安全数据时,Apache Iceberg 将企业级可靠性与 SQL 简单性相结合,使组织能够专注于安全洞察而不是基础设施管理。

Apache Iceberg 通过多项关键功能增强了安全分析。它与各种亚马逊云科技服务和分析工具无缝集成,同时支持并行读写操作,以同步提取和分析日志。其时空旅行功能可实现全面的安全取证和事件调查,其架构演变支持使团队能够在不中断现有工作流程的情况下适应新兴的安全模式。这些功能使 Apache Iceberg 成为构建强大的安全分析解决方案的理想选择。但是,组织在构建自己的解决方案以向 Apache Iceberg 表提供数据时经常会遇到困难。其中包括管理复杂的提取、转换和加载 (ETL) 流程、处理架构验证、提供可靠的交付以及维护用于数据转换的自定义代码。团队还必须建立弹性错误处理、实施重试逻辑和管理扩展基础架构,同时保持数据一致性和高可用性。这些挑战占用了分析安全数据和得出见解的宝贵时间。

为了应对这些挑战,Amazon Data Firehose 可在几秒钟内向 Apache Iceberg 表提供实时数据交付。Firehose 在多个可用区提供高可靠性,同时可自动扩展以满足吞吐量需求。它是完全托管的,不需要基础架构管理或自定义代码开发。Firehose 通过可配置的缓冲选项提供流媒体数据,这些选项可以针对接近零的延迟进行优化。它还提供内置的数据转换、压缩和加密功能,以及自动重试机制,以提供可靠的数据传输。这使其成为将 Amazon WAF 日志直接传输到数据湖的理想选择,同时最大限度地减少运营开销。

在这篇文章中,我们演示了如何使用 Firehose 和 Apache Iceberg 构建可扩展的 Amazon WAF 日志分析解决方案。Firehose 允许您配置交付流,将 Amazon WAF 日志直接传送到 Amazon S3 中的 Apache Iceberg 表,从而简化了从日志摄取到存储的整个流程。该解决方案无需设置基础架构,您只需为处理的数据付费。

解决方案概述

要实施此解决方案,您首先配置 Amazon WAF 日志以捕获网络流量信息。这会捕获有关网络访问控制列表 (ACL) 分析的流量的详细信息。每个日志条目都包括请求时间戳、详细的请求信息以及触发的规则匹配项。这些日志会持续实时传输到 Firehose。

Firehose 将这些日志写入 Apache Iceberg 表,该表存储在 Amazon S3 中。当 Firehose 向 S3 表传输数据时,它使用 Amazon Glue 数据目录来存储和管理表元数据。这些元数据包括架构信息、分区详细信息和文件位置,可在亚马逊云科技分析服务中实现无缝数据发现和查询。

最后,安全团队可以使用各种亚马逊云科技服务分析 Apache Iceberg 表中的数据,包括 Amazon Redshift、Amazon Athena、Amazon EMR 和 Amazon SageMaker。在本演示中,我们使用 Athena 对安全日志运行 SQL 查询。

下图说明了解决方案架构。

实施包括四个步骤:

  1. 使用 Amazon CloudFormation 部署基础架构。
  2. 使用 Amazon Glue 笔记本创建 Apache Iceberg 表。
  3. 创建一个 Firehose 直播来处理日志数据。
  4. 配置 Amazon WAF 日志,通过 Firehose 流向 Apache Iceberg 表发送数据。

您可以使用 CloudFormation 模板将所需的资源部署到美国东部(弗吉尼亚北部)亚马逊云科技区域的亚马逊云科技环境中。该模板创建了一个用于存储 Amazon WAF 日志的 S3 存储桶,一个用于 Apache Iceberg 表的 Amazon Glue 数据库,以及该解决方案所需的 Amazon Identity and Access Management (IAM) 角色和策略。

先决条件

在开始之前,请确保您具备以下先决条件:

  • 可以访问美国东部(弗吉尼亚北部)区域的亚马逊云科技账户
  • 在美国东部(弗吉尼亚北部)区域配置了网络 ACL 的 Amazon WAF

如果您尚未设置 Amazon WAF,请参阅 Amazon WAF 研讨会,使用 Amazon WAF 创建示例 Web 应用程序。

Amazon WAF 日志使用区分大小写的字段名称(如httpRequestwebaclId)。为了成功提取日志,该解决方案通过 Amazon Glue 任务使用 Apache Iceberg API 来创建表,这是一种保留 Amazon WAF 日志中确切字段名称的可靠方法。尽管 Amazon Glue 爬虫和 Athena DDL 为创建 Apache Iceberg 表提供了便捷的方式,但它们会将混合大小写的列名转换为小写,这可能会影响 Amazon WAF 日志处理。通过将 Amazon Glue 任务与 Apache Iceberg API 结合使用,可以保留列名称的区分大小写,从而在 Amazon WAF 日志字段和表列之间提供正确的映射。

部署 CloudFormation 堆栈

完成以下步骤,使用 Amazon CloudFormation 部署解决方案资源:

  1. 登录 Amazon CloudFormation 控制台。
  2. 选择 "启动堆栈"。
    启动云端堆栈
  3. 选择 "下一步"。
  4. 对于堆栈名称,保留为WAF-Firehose-Iceberg-Stack
  5. 在 "参数" 下,指定是否将 Amazon Lake Formation 权限用于 Amazon Glue 表。
  6. 选择 "下一步"。

  1. 选择 "我确认 Amazon CloudFormation 可能会使用自定义名称创建 IAM 资源",然后选择 "下一步"。

  1. 查看部署并选择提交

该堆栈需要几分钟才能部署。部署完成后,您可以通过导航到 CloudFormation 堆栈上的 "资源" 选项卡来查看创建的资源。

创建 Apache Iceberg 表

在设置 Firehose 交付流之前,必须在数据目录中创建目标 Apache Iceberg 表。如前所述,这是使用 Amazon Glue 任务和 Apache Iceberg API 完成的。完成以下步骤以创建 Apache Iceberg 表:

  1. 在 Amazon Glue 控制台上,选择导航窗格中 ETL 任务下的笔记本电脑

  1. 在 "创建作业" 下选择 "笔记本" 选项。

  1. 在 "选项" 下,选择 "重新开始"。
  2. 对于 IAM 角色,选择WAF-Firehose-Iceberg-Stack-GlueServiceRole-*
  3. 选择 "创建笔记本"。

  1. 在笔记本中输入以下配置命令,使用 Apache Iceberg 扩展配置 Spark 会话。请务必更新由 CloudFormation 模板创建的 S3 存储桶的配置。sql.catalog.glue_catalog.warehouse
%%configure
{
    "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.warehouse=s3://<S3BucketName>/waflogdata --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO",
    "--datalake-formats": "iceberg"
}

  1. 在 Amazon Glue 笔记本中输入以下 SQL 来创建 Apache Iceberg 表:
# Note: This code uses Glue version 5.0 (as of April 2024)
# Please check Amazon Glue release notes for the latest version and update accordingly:
# https://docs.our website.html
# To update: Change the %glue_version parameter below to the latest version

%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.conf import SparkConf

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

spark.sql(""" CREATE TABLE glue_catalog.waf_logs_db.firehose_waf_logs(
  `timestamp` bigint,
  `formatVersion` int,
  `webaclId` string,
  `terminatingRuleId` string,
  `terminatingRuleType` string,
  `action` string,
  `terminatingRuleMatchDetails` array <
                                    struct <
                                        conditiontype: string,
                                        sensitivitylevel: string,
                                        location: string,
                                        matcheddata: array < string >
                                          >
                                     >,
  `httpSourceName` string,
  `httpSourceId` string,
  `ruleGroupList` array <
                      struct <
                          rulegroupid: string,
                          terminatingrule: struct <
                                              ruleid: string,
                                              action: string,
                                              rulematchdetails: array <
                                                                   struct <
                                                                       conditiontype: string,
                                                                       sensitivitylevel: string,
                                                                       location: string,
                                                                       matcheddata: array < string >
                                                                          >
                                                                    >
                                                >,
                          nonterminatingmatchingrules: array <
                                                              struct <
                                                                  ruleid: string,
                                                                  action: string,
                                                                  overriddenaction: string,
                                                                  rulematchdetails: array <
                                                                                       struct <
                                                                                           conditiontype: string,
                                                                                           sensitivitylevel: string,
                                                                                           location: string,
                                                                                           matcheddata: array < string >
                                                                                              >
                                                                   >,
                                                                  challengeresponse: struct <
                                                                            responsecode: string,
                                                                            solvetimestamp: string
                                                                              >,
                                                                  captcharesponse: struct <
                                                                            responsecode: string,
                                                                            solvetimestamp: string
                                                                              >
                                                                    >
                                                             >,
                          excludedrules: string
                            >
                       >,
`rateBasedRuleList` array <
                         struct <
                             ratebasedruleid: string,
                             limitkey: string,
                             maxrateallowed: int
                               >
                          >,
  `nonTerminatingMatchingRules` array <
                                    struct <
                                        ruleid: string,
                                        action: string,
                                        rulematchdetails: array <
                                                             struct <
                                                                 conditiontype: string,
                                                                 sensitivitylevel: string,
                                                                 location: string,
                                                                 matcheddata: array < string >
                                                                    >
                                                             >,
                                        challengeresponse: struct <
                                                            responsecode: string,
                                                            solvetimestamp: string
                                                             >,
                                        captcharesponse: struct <
                                                            responsecode: string,
                                                            solvetimestamp: string
                                                             >
                                          >
                                     >,
  `requestHeadersInserted` array <
                                struct <
                                    name: string,
                                    value: string
                                      >
                                 >,
  `responseCodeSent` string,
  `httpRequest` struct <
                    clientip: string,
                    country: string,
                    headers: array <
                                struct <
                                    name: string,
                                    value: string
                                      >
                                 >,
                    uri: string,
                    args: string,
                    httpversion: string,
                    httpmethod: string,
                    requestid: string
                      >,
  `labels` array <
               struct <
                   name: string
                     >
                >,
  `CaptchaResponse` struct <
                        responsecode: string,
                        solvetimestamp: string,
                        failureReason: string
                          >,
  `ChallengeResponse` struct <
                        responsecode: string,
                        solvetimestamp: string,
                        failureReason: string
                        >,
  `ja3Fingerprint` string,
  `overSizeFields` string,
  `requestBodySize` int,
  `requestBodySizeInspectedByWAF` int
)
USING iceberg
TBLPROPERTIES ("format-version"="2")
""")
job.commit()

  1. 导航到数据目录和waf_logs_db数据库以确认表firehose_waf_logs已创建。

创建 Firehose 直播

完成以下步骤以创建 Firehose 直播:

  1. 在 Data Firehose 控制台上,选择创建 Firehose 直播

  1. 选择直接放置,为目标选择 Apache Iceberg 表

  1. 对于 Firehose 直播名称,请输入aws-waf-logs-firehose-iceberg-1
  1. 目标设置部分中,启用路由信息的内联解析。因为我们要将所有记录发送到一个表,所以请指定目标数据库和表名称:
    1. 对于数据库表达式,输入"waf_logs_db"
    2. 对于表表达式,输入"firehose_waf_logs"

确保包含双引号以使用文本值作为数据库和表名称。如果您不使用双引号,Firehose 会假定这是一个 JSON 查询表达式,并会在处理您的流时尝试解析该表达式但失败。Firehose 还可以根据数据内容路由到不同的 Apache Iceberg 表。有关更多信息,请参阅将传入记录路由到不同的 Iceberg 表。

  1. 对于 S3 备份存储桶,输入由 CloudFormation 模板创建的 S3 存储桶。
  2. 对于 S3 备份存储桶错误输出前缀,请输入error/events-1/

  1. 在 "高级设置" 下,选择 "为 Firehose 直播中的源记录启用服务器端加密"。

  1. 对于现有 IAM 角色,选择以 WAF-Firehose-Iceberg-stack-FirehoseIAMRole-* CloudFormation 模板创建的开头的角色。
  2. 选择创建 Firehose 直播

将 Amazon WAF 日志配置到 Firehose 数据流

完成以下步骤,将 Amazon WAF 日志配置到 Firehose 数据流。

  1. 在 Amazon WAF 控制台上,在导航窗格中选择 Web ACL

  1. 选择您的网页 ACL。
  2. 日志记录和指标选项卡上,选择启用

  1. 对于 Amazon Data Firehose 直播,请选择该直播。aws-waf-logs-firehose-iceberg-1
  2. 选择 "保存"。

查询和分析日志

你可以使用不同的处理引擎(例如 Apache Spark、Apache Flink 或 Trino)来查询写入 Apache Iceberg 表的数据。在此示例中,我们使用 Athena 来查询存储在 Apache Iceberg 表中的 Amazon WAF 日志数据。完成以下步骤:

  1. 在 Athena 主机上,选择右上角的设置
  2. 在 "查询结果的位置" 中,输入由 CloudFormation 模板创建的 S3 存储桶

s3://<S3BucketName>/athena/

  1. 输入预期存储桶所有者的亚马逊云科技账户 ID,然后选择保存。

  1. 在查询编辑器的 "表格和视图" 中,选择旁边的选项菜单,firehose_waf_logs然后选择 "预览表格"。

你应该能够使用 Athena 在 Apache Iceberg 表中查看 Amazon WAF 日志。

以下是一些其他有用的示例查询:

  • 通过分析封锁的 IP 地址来识别潜在的攻击来源:
-- Top 10 blocked IP addresses
SELECT httpRequest.clientip, COUNT() as block_count
FROM waf_logs_db.firehose_waf_logs
WHERE action = 'BLOCK'
GROUP BY httpRequest.clientip
ORDER BY block_count DESC
LIMIT 10;
  • 监控一段时间内的攻击模式和趋势:
-- Rate of blocked requests over time
SELECT DATE_TRUNC('hour', FROM_UNIXTIME(timestamp/1000)) as hour,
       COUNT() as request_count
FROM waf_logs_db.firehose_waf_logs
WHERE action = 'BLOCK'
GROUP BY DATE_TRUNC('hour', FROM_UNIXTIME(timestamp/1000))
ORDER BY hour;

Apache Iceberg 表优化

尽管 Firehose 可以高效地将 Amazon WAF 日志流式传输到 Apache Iceberg 表中,但流式写入的本质可能会导致创建许多小文件。这是因为 Firehose 根据其缓冲配置提供数据,这可能会导致查询性能不佳。为了解决这个问题,建议定期进行表格优化。

推荐的表格优化方法有两种:

  • 压缩 — 数据压缩合并小型数据文件以减少存储使用量并提高读取性能。合并和重写数据文件以删除过时的数据并将分散的数据合并为更大、更有效的文件。
  • 存储优化 - 您可以通过删除较旧、不必要的快照及其关联的基础文件来管理存储开销。此外,这包括定期删除孤立文件以保持高效的存储利用率和优秀的查询性能。

这些优化可以使用数据目录或 Athena 来实现。

使用数据目录优化表

数据目录提供自动表格优化功能。在表优化功能中,您可以为压缩、快照保留和孤立文件删除配置特定的优化器。可以从 Amazon Glue 控制台管理表格优化计划并监控状态。

使用 Athena 进行表格优化

Athena 支持通过 SQL 命令进行手动优化。该OPTIMIZE命令将小文件重写成更大的文件并应用文件压缩:

OPTIMIZE waf_logs_db.firehose_waf_logs REWRITE DATA USING BIN_PACK 

VACUUM命令删除旧快照并清理过期的数据文件:

ALTER TABLE waf_logs_db.firehose_waf_logs SET TBLPROPERTIES (
  'vacuum_max_snapshot_age_seconds'='259200'
)
VACUUM waf_logs_db.firehose_waf_logs

您可以使用以下查询监控表的优化状态:

SELECT * FROM "waf_logs_db"."firehose_waf_logs$files"

清理

为避免将来收费,请完成以下步骤:

  1. 清空 S3 存储桶。
  2. 删除 CloudFormation 堆栈。
  3. 删除 Firehose 直播。
  4. 禁用 Amazon WAF 日志。

结论

在这篇文章中,我们演示了如何使用 Firehose 构建 Amazon WAF 日志分析管道,将 Amazon WAF 日志传送到 Amazon S3 上的 Apache Iceberg 表。该解决方案无需复杂代码或基础设施管理即可处理大规模 Amazon WAF 日志处理。尽管这篇文章侧重于将 Apache Iceberg 表作为目标,但 Data Firehose 也与 Amazon S3 表格无缝集成。为了优化您的查询表,Amazon S3 Tables 会持续执行自动维护操作,例如压缩、快照管理和删除未引用的文件。这些操作通过将较小的对象压缩成更少、更大的文件来提高表性能。

要开始自己实施,请在您的亚马逊云科技账户中试用该解决方案,并浏览以下资源以获取其他功能和优秀实践:

  • Data Firehose:注意事项和限制
  • Amazon Glue:优化冰山桌
  • Athena:优化冰山牌桌
  • Amazon S3 表:使用 Amazon Data Firehose 将数据流式传输到表


作者简介

Charishma Makineni 是亚马逊云科技的高级技术客户经理。她为独立软件供应商 (ISV) 提供战略技术指导,帮助他们在亚马逊云科技上构建和优化解决方案。她专门研究大数据和分析技术,帮助组织在亚马逊云科技上优化其数据驱动计划。

Phaneendra Vuliyaragoli 是亚马逊云科技 Amazon Data Firehose 的产品管理主管。在此职位上,Phaneendra 负责领导 Amazon Data Firehose 的产品和市场进入战略。


*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您发展海外业务和/或了解行业前沿技术选择推荐该服务。