推出适用于 Apache Spark 的云随机存储插件

亚马逊云科技 Glu e 是一项无服务器数据集成服务,可轻松发现、准备和组合用于分析、机器学习 (ML) 和应用程序开发的数据。在 亚马逊云科技 Glue 中,您可以使用 A pache Spark ,这是一个开源的分布式处理系统,用于执行数据集成任务和大数据工作负载。

Apache Spark 利用内存缓存和优化的查询执行来对您的数据集进行快速分析查询,这些数据集在不同的节点上分成多个 Spark 分区,因此您可以并行处理大量数据。在 Apache Spark 中,当需要在集群中重新分配数据时, 就会发生 洗牌 。在洗牌期间,数据被写入本地磁盘并通过网络传输。shuffle 操作通常受可用本地磁盘容量或数据偏差的限制,这可能会导致执行器滞后。当执行器上剩余的磁盘空间不足且无法恢复时,Spark 经常会抛出 “设备上 没有剩余空间” 或 MetadataFetchFailedException 错误。如果不增加额外的计算和附加存储,这样的 Spark 任务通常无法成功,在这种情况下,计算通常处于空闲状态,从而产生额外的成本。

2021 年,我们推出了 搭载 Spark 2.4 的 亚马逊云科技 Glue 2.0 版 亚马逊 S3 洗牌 游戏。该功能利用 亚马逊简单存储服务 (Amazon S3)来存储 Spark 洗牌文件,对 Spark 计算和洗牌存储 空间进行了分解。使用 Amazon S3 进行 Spark 随机存储,您可以更可靠地运行数据密集型工作负载。发布后,我们继续在这一领域进行投资,并收集了客户反馈。

今天,我们很高兴发布适用于 Apache Spark 的 Cloud Shuffle 存储插件 。 它支持最新的 Apache Spark 3.x 发行版,因此你可以在 亚马逊云科技 Glue 或任何其他 Spark 环境中使用该插件。现在,它还可以在 亚马逊云科技 Glue 3.0 和最新 亚马逊云科技 Glue 4.0 版本上的 亚马逊云科技 Glu e Spark 任务中原生使用, 无需任何额外的设置或引入外部库。就像 亚马逊云科技 Glue 2.0 的 A mazon S3 洗 牌一样 ,Cloud Shuffle 存储插件可以帮助你解决在无服务器 Spark 环境中洗牌期间出现的磁盘空间有限错误。

我们还很高兴地宣布,在Apache 2.0许可下发布适用于Apache Spark的Cloud Shuffle存储插件的软件二进制文件。你可以 下载二进制文件 并在任何 Spark 环境中运行它们。新插件是开放云的,对亚马逊 S3 提供开箱即用的支持,并且可以轻松配置为使用其他形式的云存储,例如 谷歌云存储 和 微软 Azure Bl ob 存储

了解 Apache Spark 中的洗牌操作

在 Apache Spark 中,有两种类型的转换:

  • 狭义转换 -这包括 地图 过滤器 联合 Map Partition ,其中每个输入分区仅贡献一个输出分区。
  • 广泛转换 — 这包括 联接 GroupByKey 、redu ceByKey 重新分区 ,其中每个输入分区都构成许多输出分区。 包括 联接 排序依据、分依据在内的 Spark SQL 查询 需要进行广泛的转换。

宽转换会触发洗牌,每当将数据重组为新分区并将每个密钥分配给其中一个分区时,就会发生洗牌。在洗牌阶段,所有 Spark 地图任务都会将洗牌数据写入本地磁盘,然后通过网络传输并由 Spark reduce 任务获取。在 Spark 用户界面中可以看到洗牌后的数据量。当 shuffle 写入占用的空间大于本地可用磁盘容量时,会导致 “设备上 没有剩余空间” 错误。

为了说明其中一个典型场景,让我们使用标准 TPC-DS 3 TB 数据集中的查询 q80.sql 作为示例。此查询尝试计算在特定时间段内实现的总销售额、回报和最终利润。它涉及由 左外连接和分组依据引起的多个宽变换(洗牌) 。

让我们在包含 10 个 G1.X 工作人员的 亚马逊云科技 Glue 3.0 任务上运行以下查询,其中总共有 640 GB 的可用本地磁盘空间:

with ssr as
 (select  s_store_id as store_id,
          sum(ss_ext_sales_price) as sales,
          sum(coalesce(sr_return_amt, 0)) as returns,
          sum(ss_net_profit - coalesce(sr_net_loss, 0)) as profit
  from store_sales left outer join store_returns on
         (ss_item_sk = sr_item_sk and ss_ticket_number = sr_ticket_number),
     date_dim, store, item, promotion
 where ss_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
       and ss_store_sk = s_store_sk
       and ss_item_sk = i_item_sk
       and i_current_price > 50
       and ss_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by s_store_id),
 csr as
 (select  cp_catalog_page_id as catalog_page_id,
          sum(cs_ext_sales_price) as sales,
          sum(coalesce(cr_return_amount, 0)) as returns,
          sum(cs_net_profit - coalesce(cr_net_loss, 0)) as profit
  from catalog_sales left outer join catalog_returns on
         (cs_item_sk = cr_item_sk and cs_order_number = cr_order_number),
     date_dim, catalog_page, item, promotion
 where cs_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
        and cs_catalog_page_sk = cp_catalog_page_sk
       and cs_item_sk = i_item_sk
       and i_current_price > 50
       and cs_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by cp_catalog_page_id),
 wsr as
 (select  web_site_id,
          sum(ws_ext_sales_price) as sales,
          sum(coalesce(wr_return_amt, 0)) as returns,
          sum(ws_net_profit - coalesce(wr_net_loss, 0)) as profit
  from web_sales left outer join web_returns on
         (ws_item_sk = wr_item_sk and ws_order_number = wr_order_number),
     date_dim, web_site, item, promotion
 where ws_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
        and ws_web_site_sk = web_site_sk
       and ws_item_sk = i_item_sk
       and i_current_price > 50
       and ws_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by web_site_id)
 select channel, id, sum(sales) as sales, sum(returns) as returns, sum(profit) as profit
 from (select
        'store channel' as channel, concat('store', store_id) as id, sales, returns, profit
      from ssr
      union all
      select
        'catalog channel' as channel, concat('catalog_page', catalog_page_id) as id,
        sales, returns, profit
      from csr
      union all
      select
        'web channel' as channel, concat('web_site', web_site_id) as id, sales, returns, profit
      from  wsr) x
 group by rollup (channel, id)
 order by channel, id

以下屏幕截图显示了 Spark 用户界面 中的 “执行器” 选项卡
Spark UI Executor Tab

以下屏幕截图显示了 亚马逊云科技 Glue 任务运行中包含的 Spark 任务的状态。
Spark UI Jobs
在失败的 Spark 作业(作业 ID=7)中,我们可以在 Spark 用户界面中看到失败的 Spark 阶段。
Spark UI Failed stage
该阶段有 167.8Gib 随机写入,由于错误 java.io.ioException,有 14 个任务失败:由于主机 172.34.97.212 的本地空间用完,设备上没有剩余空间 磁盘。
Spark UI Tasks

适用于 Apache Spark 的云端随机存储

适用于 Apache Spark 的 Cloud Shuffle Storage 允许您在亚马逊 S3 或其他云存储服务上存储 Spark 随机播放文件。这为 Spark 作业提供了完全的弹性,从而使您能够可靠地运行数据密集度最高的工作负载。下图说明了 Spark 地图任务如何将随机播放文件写入云随机存储。Reducer 任务将洗牌块视为远程块,并从同一个洗牌存储空间中读取它们。

此架构使您的无服务器 Spark 任务能够使用 Amazon S3,而无需运行、操作和维护额外的存储或计算节点的开销。
Chopper diagram
以下 Glue 作业参数允许和调整 Spark 以使用 S3 存储桶存储随机播放数据。在向 Amazon S3 写入 shuffle 数据时,您还可以使用 安全配置设置 启用静态加密。

Key Value Explanation
--write-shuffle-files-to-s3 TRUE This is the main flag, which tells Spark to use S3 buckets for writing and reading shuffle data.
--conf spark.shuffle.storage.path=s3://<shuffle-bucket> This is optional, and specifies the S3 bucket where the plugin writes the shuffle files. By default, we use –TempDir/shuffle-data.

shuffle 文件被写入该位置并创建如下文件:

s3:////[0-9]//shuffle_ _ _0.data

启用 Cloud Shuffle Storage 插件并使用相同的 亚马逊云科技 Glue 任务设置后,TPC-DS 查询现在可以成功完成,没有任何任务或阶段失败。
Spark UI Jobs with Chopper plugin

Cloud Shuffle 存储插件的软件二进制文件

现在,您还可以在自己的Spark环境和其他云存储服务中下载和使用该插件。插件二进制文件可在 Apache 2.0 许可下使用。

将插件与您的 Spark 应用程序捆绑在一起

在开发 Spark 应用程序时,您可以将插件与 Spark 应用程序捆绑在一起,方法是将其作为依赖项添加到 Maven pom.xml 中,如以下代码所示。有关插件和 Spark 版本的更多详细信息,请参阅 插件版本

<repositories>
   ...
    <repository>
        <id>aws-glue-etl-artifacts</id>
        <url>https://aws-glue-etl-artifacts.s3.amazonaws.com/release/</url>
    </repository>
</repositories>
...
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>chopper-plugin</artifactId>
    <version>3.1-amzn-LATEST</version>
</dependency>

或者,您可以直接从 亚马逊云科技 Glue Maven 工件中下载二进制文件,并将它们包含在您的 Spark 应用程序中,如下所示:

#!/bin/bash
sudo wget -v https://aws-glue-etl-artifacts.s3.amazonaws.com/release/com/amazonaws/chopper-plugin/3.1-amzn-LATEST/chopper-plugin-3.1-amzn-LATEST.jar -P /usr/lib/spark/jars

通过在类路径中添加 JAR 文件并为插件指定两个 Spark 配置来提交 Spark 应用程序:

spark-submit --deploy-mode cluster \
--conf spark.shuffle.sort.io.plugin.class=com.amazonaws.spark.shuffle.io.cloud.ChopperPlugin \
--conf spark.shuffle.storage.path=s3://<s3 bucket>/<shuffle-dir> \
 --class <your class> <your application jar> 

以下 Spark 参数允许和配置 Spark 使用外部存储 URI(例如 Amazon S3)来存储随机播放文件;URI 协议决定使用哪个存储系统。

Key Value Explanation
spark.shuffle.storage.path s3://<shuffle-storage-path> It specifies an URI where the shuffle files are stored, which much be a valid Hadoop FileSystem and be configured as needed
spark.shuffle.sort.io.plugin.class com.amazonaws.spark.shuffle.io.cloud.ChopperPlugin The entry class in the plugin

其他云存储集成

该插件为亚马逊 S3 提供开箱即用的支持,也可以配置为使用其他形式的云存储,例如 谷歌云存储 和 微软 Azure Blob 存储 。 要启用其他与 Hadoop File System 兼容的云存储服务,您只需为相应的服务方案添加存储 URI,例如为 G oogle Cloud Storage 添加存储 URI,而不是 Amazon S3 的 s3://, 为该服务添加文件系统 JAR 文件,然后设置相应的身份验证配置。

有关如何将插件与谷歌云存储和微软 Azure Blob 存储集成的更多信息,请参阅将 适用于 Apache Spark 的 亚马逊云科技 Glue Cloud Shuffle 插件与其他云存储服务一起 使用

最佳做法和注意事项

请注意以下注意事项:

  • 此功能用 Amazon S3 取代了本地随机播放存储。您可以使用它来解决常见故障,为您的无服务器分析任务和管道提供性价比优势。当你想要确保数据密集型工作负载可靠运行从而产生大量随机播放数据时,或者当你遇到 “设备 上 没有剩余空间” 错误时,我们建议启用此功能。如果你的任务遇到提取失败 org.apache.spark.shuffle.metadatafetchFailedException 或者你的数据存在偏差,你也可以使用这个插件。
  • 我们建议在洗牌 存储 桶(sp ark.shuffle.storage.s3.path) 上设置 S3 存储桶生命周期策略 ,以便自动清理旧的洗牌 数据。
  • 默认情况下,Amazon S3 上的洗牌数据是加密的。您还可以使用自己的 亚马逊云科技 密钥管理服务 (亚马逊云科技 KMS) 密钥加密数据。

结论

这篇文章介绍了适用于 Apache Spark 的新的 Cloud Shuffle 存储插件,并描述了在不增加额外工作程序的情况下独立扩展 Spark 作业中的存储空间的好处。有了这个插件,你可以预期处理千兆字节数据的作业可以更可靠地运行。

该插件可在所有支持 亚马逊云科技 Glue 的区域的 亚马逊云科技 Glue 3.0 和 4.0 Spark 任务中使用。我们还将在 Apache 2.0 许可下发布该插件的软件二进制文件。你可以在 亚马逊云科技 Glue 或其他 Spark 环境中使用该插件。我们期待听到您的反馈。


作者简介

关山则隆是 AW S Glue 团队的首席大数据架构师。他负责构建软件工件,帮助客户在云端构建数据湖。

Rajendra Gujja 是 亚马逊云科技 Glue 团队的高级软件开发工程师。他热衷于分布式计算以及与数据有关的一切事物。

刘楚涵 是 亚马逊云科技 Glue 团队的软件开发工程师。

贡萨洛·埃雷罗斯 是 亚马逊云科技 Glue 团队的高级大数据架构师。

莫希特·萨克森纳 是 亚马逊云科技 Glue 团队的高级软件开发经理。他的团队专注于构建分布式系统,使客户能够集成数据并连接到各种来源,高效地管理 Amazon S3 上的数据湖,并优化 Apache Spark 以实现 ETL 工作负载的容错能力。