为 亚马逊云科技 Glue Studio 创建您自己的可重复使用的视觉变换

亚马逊云科技 Glue Studio 最近 增加了添加自定义转换的可能性,您可以使用这些转换来构建可视化作业,将其与开箱即用的 亚马逊云科技 Glue Studio 组件结合使用。现在,您只需将 JSON 文件和 Python 脚本拖放到 Amazon S3 上即可定义自定义可视化转换,后者分别定义组件和处理逻辑。

自定义可视化转换允许您在团队之间定义、重复使用和共享特定业务的 ETL 逻辑。借助这项新功能,数据工程师可以为 亚马逊云科技 Glue 可视化作业编辑器编写可重复使用的转换。可重复使用的转换可提高团队之间的一致性,并通过最大限度地减少重复工作和代码来帮助保持工作的最新状态。

在这篇博客文章中,我将向你展示一个虚构的用例,它需要创建两个自定义转换来说明你可以用这个新功能完成什么。一个组件将即时生成合成数据用于测试目的,另一个组件将准备数据以将其分区存储。

用例:即时生成合成数据

你想要一个生成合成数据的组件有多种原因。可能是真实数据受到严重限制或尚不可用,或者目前没有足够的数量或种类来测试性能。或者,使用真实数据可能会给真实系统带来一些成本或负载,我们希望在开发过程中减少其使用量。

使用新的自定义视觉转换框架,让我们创建一个组件,为自然年份的虚构销售量生成合成数据。

定义生成器组件

首先,通过为组件指定名称、描述和参数来定义组件。在这种情况下,使用 s alesdata_gen erator 作为名称和函数,并使用两个参数:要生成多少行以及生成哪一年。

对于参数,我们将它们都定义为 int,您可以添加正则表达式验证以确保用户提供的参数格式正确。

还有更多配置选项可用;要了解详情,请参阅 亚马逊云科技 Glue 用户指南

这就是组件定义的样子。将其另存为 s alesdata_generator.json 。为方便起见,我们将匹配 Python 文件的名称,因此选择与现有 Python 模块不冲突的名称非常重要。
如果未指定年份,则脚本将默认为去年。

{
  "name": "salesdata_generator",
  "displayName": "Synthetic Sales Data Generator",
  "description": "Generate synthetic order datasets for testing purposes.",
  "functionName": "salesdata_generator",
  "parameters": [
    {
      "name": "numSamples",
      "displayName": "Number of samples",
      "type": "int",
      "description": "Number of samples to generate"
    },
    {
      "name": "year",
      "displayName": "Year",
      "isOptional": true,
      "type": "int",
      "description": "Year for which generate data distributed randomly, by default last year",
      "validationRule": "^\\d{4}$",
      "validationMessage": "Please enter a valid year number"
    }
  ]
}

实现生成器逻辑

现在,你需要使用实现逻辑创建一个 Python 脚本文件。
将以下脚本另存为 salesdata_generator.py 。请注意,名称与 JSON 相同,只是扩展名不同。

from awsglue import DynamicFrame
import pyspark.sql.functions as F
import datetime
import time

def salesdata_generator(self, numSamples, year=None):
    if not year:
        # Use last year
        year = datetime.datetime.now().year - 1
    
    year_start_ts = int(time.mktime((year,1,1,0,0,0,0,0,0)))
    year_end_ts = int(time.mktime((year + 1,1,1,0,0,0,0,0,0)))
    ts_range = year_end_ts - year_start_ts
    
    departments = ["bargain", "checkout", "food hall", "sports", "menswear", "womenwear", "health and beauty", "home"]
    dep_array = F.array(*[F.lit(x) for x in departments])
    dep_randomizer = (F.round(F.rand() * (len(departments) -1))).cast("int")

    df = self.glue_ctx.sparkSession.range(numSamples) \
      .withColumn("sale_date", F.from_unixtime(F.lit(year_start_ts) + F.rand() * ts_range)) \
      .withColumn("amount_dollars", F.round(F.rand() * 1000, 2)) \
      .withColumn("department", dep_array.getItem(dep_randomizer))  
    return DynamicFrame.fromDF(df, self.glue_ctx, "sales_synthetic_data")

DynamicFrame.salesdata_generator = salesdata_generator

脚本 中的函数 s alesdata_gen erator 将源 DynamicFrame 作为 “自我” 接收,并且参数必须与 JSON 文件中的定义相匹配。请注意,“ 年份 ” 是一个可选参数,因此它分配了一个默认调用函数,该函数会检测该函数并将其替换为前一年。该函数返回转换后的动态帧。在这种情况下,它不是从源代码中衍生出来的,这很常见,而是被新的情况所取代。

该转换利用了 Spark 函数和 Python 库来实现这个生成器。
为了简单起见,这个例子只生成四列,但我们可以通过硬编码值、从列表中分配值、寻找其他输入或做任何有意义的事情来使数据变得现实,对更多列进行同样的处理。

部署和使用生成器变换

现在我们已经准备好了两个文件,我们要做的就是将它们上传到Amazon S3的以下路径下。

s3://aws-glue-assets--/transforms/

如果账户和区域中从未使用过 亚马逊云科技 Glue,则该存储桶可能不存在,需要创建。当您创建第一个任务时,亚马逊云科技 Glue 将自动创建此存储桶。

您将需要在该存储桶中手动创建一个名为 “transf or ms” 的文件夹,以便将文件上传到该文件中。

您上传了两个文件后,下次我们在 亚马逊云科技 Glue Studio 可视化编辑器上打开(或刷新)页面时,转换应与其他转换一起列出。您可以按名称或描述进行搜索。

因为这是转换而不是来源,所以当我们尝试使用该组件时,用户界面将需要一个父节点。你可以使用真实的数据源作为父级(这样你就可以轻松地删除生成器并使用真实数据),也可以只使用占位符。我会告诉你怎么做:

  1. 前往 亚马逊云科技 Glue,然后在左侧菜单中选择 A WS Glue Studio 下的 工作” 。
  2. 保留默认选项(带有源和目标的 Visual 选项 以及 S3 源和目标选项),然后选择 “ 创建” 。
  3. 通过编辑左上角的 无标题作业来为该作业命名;例如, customTransformsDemo
  4. 前往 任务详情 选项卡,选择具有 亚马逊云科技 Glue 权限的角色作为 IAM 角色 。如果下拉列表中未列出任何角色,请按照 以下说明 创建一个。
    在此演示中,您还可以将 请求的工作人员数量减少 到 2,将重试次 数减少到 0, 以最大限度地降低成本。
  5. 选择图表底部的 数据目标 节点 S3 存储桶 ,然后选择 “删除”,将其 删除 。稍后我们会在需要时将其恢复。
  6. 通过在 数据源属性 选项卡中选择 S3 源节点并选择源类型 S3 位置来编辑 S3 源节点 。
    S3 URL 框中,输入选定角色可以访问的存储桶上不存在的路径,例如: s3://aws-glue-assets--/fil e_that_doesnt_exist。注意尾部没有斜杠。
    选择 JSON 作为具有默认设置的数据格式;没关系。
    你可能会收到警告,说它无法推断架构,因为文件不存在;没关系,我们不需要它。
  7. 现在,通过在变换搜索框中键入 “合成” 来搜索变换。结果出现(或者在列表中滚动搜索)后,选择它以将其添加到作业中。
  8. 节点属性 选项卡中,将刚刚添加的转换的父项设置为 S3 存储桶 源。然后,对于 ApplyMapping 节点, 使用转换后的 合成销售数据生成器替换父 S3 存储桶。 请注意,这个长名称来自之前上传的 JSON 文件中 定义的 displayNam e。
  9. 进行这些更改后,您的作业图应如下所示(如果您尝试保存,可能会出现一些警告;没关系,接下来我们将完成配置)。
  10. 选择 “ 综合销售” 节点,然后转到 “ 转换” 选项卡。输入 10000 作为样本数,默认情况下保留年份,因此使用去年。
  11. 现在我们需要应用生成的架构。如果我们的源代码与生成器架构相匹配,则需要这样做。
    在同一个节点中,选择 数据预览选项卡 并启动会话。运行后,您应该会看到合成数据示例。请注意,销售日期是全年随机分配的。
  12. 现在选择 “ 输出架构” 选项卡, 然后选择 “ 使用 datapreview 架构” 这样,节点生成的四个字段将被传播,我们可以基于此架构进行映射。
  13. 现在我们要将生成的 sale_dat e 时间戳转换为日期列,这样我们就可以用它按天对输出进行分区。在 “ 转换 ” 选项卡中选择节点 applyMapping 。对于 sale_dat e 字段,选择 日期 作为目标类型。这会将时间戳截断为仅限日期。
  14. 现在是保存这份工作的好时机。它应该可以让你成功保存。

最后,我们需要配置水槽。请按照以下步骤操作:

  1. 选择 ApplyMapping 节点后,转到 目标 下拉列表并选择 Amazon S3。 接收器将添加到 ApplyMapping 节点 。如果您在添加接收器之前没有选择父节点,则仍然可以在接收器的 节点详细信息 选项卡中进行设置。
  2. 在运行任务的同一区域创建 S3 存储桶。我们将使用它来存储输出数据,因此最后我们可以轻松地进行清理。如果您通过控制台创建,则默认存储桶配置正常。
    您可以在 Amazon S3 文档 中 阅读有关存储桶创建的更多信息
  3. 数据目标属性 选项卡中,在 S3 目标位置 中输入存储桶的 URL 以及一些路径和尾部斜杠,例如:s 3:///output/
    其余部分 保留提供的默认值。
  4. 选择底部的 “ 添加分区键 ”,然后选择 s ale_d ate 字段。

我们只需选择相应的目录更新选项即可同时创建分区表。为简单起见,此时生成分区文件而不更新目录,这是默认选项。

现在,您可以保存然后运行作业。

作业完成后,几分钟后(您可以在 “ 运行” 选项卡中对此进行验证),浏览上面输入的 S3 目标位置。您可以使用亚马逊 S3 控制台或 亚马逊云科技 CLI。 你会看到这样命名的文件:s 3:///output/sale_date =/

如果您计算文件,则应接近但不超过 1,460 个(视使用年份而定,假设您使用的是 2 个 G.1X 工作人员和 亚马逊云科技 Glue 3.0 版)

用例:改进数据分区

在上一节中,您使用自定义可视化组件创建了一个作业,该组件生成了合成数据,对日期进行了小幅转换,并按天将其分区保存在 S3 上。

你可能想知道为什么这项工作为合成数据生成了这么多文件。这并不理想,尤其是当它们像这种情况一样小时。如果将这些数据保存为具有多年历史记录的表,则生成小文件会对使用这些数据的工具(例如Amazon Athena)产生不利影响。

其原因是,当生成器在不指定内存分区数量的情况下调用 Apache Spark 中的 “范围” 函数时(请注意,这些分区与保存到 S3 的输出分区不同),它默认为集群中的内核数,在本示例中只有 4 个。

由于日期是随机的,因此每个内存分区可能包含代表一年中所有日期的行,因此,当接收器需要将日期拆分为输出目录以对文件进行分组时,每个内存分区需要为现在的每一天创建一个文件,因此可以有 4 * 365(不在闰年中)为 1,460。

这个例子有点极端,通常从源读取的数据不会随着时间的推移而分散。当你添加其他维度(例如输出分区列)时,通常会发现这个问题。

现在,您将构建一个对此进行优化的组件,尝试尽可能减少输出文件的数量:每个输出目录一个。
另外,让我们想象一下,在您的团队中,您的策略是生成按年、月和日作为字符串分隔的 S3 日期分区,因此无论是否使用顶部的表格,都可以高效地选择文件。

我们不希望个人用户必须单独处理这些优化和惯例,而是希望他们可以在工作中添加一个组件。

定义重新分区器转换

对于这个新的转换,创建一个单独的 JSON 文件,我们称之为 repartition_date.json ,我们在其中定义新的转换及其所需的参数。

{
  "name": "repartition_date",
  "displayName": "Repartition by date",
  "description": "Split a date into partition columns and reorganize the data to save them as partitions.",
  "functionName": "repartition_date",
  "parameters": [
    {
      "name": "dateCol",
      "displayName": "Date column",
      "type": "str",
      "description": "Column with the date to split into year, month and day partitions. The column won't be removed"
    },
    {
      "name": "partitionCols",
      "displayName": "Partition columns",
      "type": "str",
      "isOptional": true,
      "description": "In addition to the year, month and day, you can specify additional columns to partition by, separated by commas"
    },
    {
      "name": "numPartitionsExpected",
      "displayName": "Number partitions expected",
      "isOptional": true,
      "type": "int",
      "description": "The number of partition column value combinations expected, if not specified the system will calculate it."
    }
  ]
}

实现转换逻辑

该脚本将日期拆分为多列,前导为零,然后根据输出分区重新组织内存中的数据。将代码保存在名为 repartition_date.py 的文件中 :

from awsglue import DynamicFrame
import pyspark.sql.functions as F

def repartition_date(self, dateCol, partitionCols="", numPartitionsExpected=None):
    partition_list = partitionCols.split(",") if partitionCols else []
    partition_list += ["year", "month", "day"]
    
    date_col = F.col(dateCol)
    df = self.toDF()\
      .withColumn("year", F.year(date_col).cast("string"))\
      .withColumn("month", F.format_string("%02d", F.month(date_col)))\
      .withColumn("day", F.format_string("%02d", F.dayofmonth(date_col)))
    
    if not numPartitionsExpected:
        numPartitionsExpected = df.selectExpr(f"COUNT(DISTINCT {','.join(partition_list)})").collect()[0][0]
    
    # Reorganize the data so the partitions in memory are aligned when the file partitioning on s3
    # So each partition has the data for a combination of partition column values
    df = df.repartition(numPartitionsExpected, partition_list)    
    return DynamicFrame.fromDF(df, self.glue_ctx, self.name)

DynamicFrame.repartition_date = repartition_date

像上次转换一样,将两个新文件上传到 S3 转换文件夹。

部署和使用生成器变换

现在编辑作业,使用新组件生成不同的输出。
如果未列出新的转换,请在浏览器中刷新页面。

  1. 选择生成器转换,然后从转换下拉列表中找到 “按日期 分区”, 然后选择它;它应该作为生成器的子项添加。
    现在将 数据目标 节点的父节点更改为添加的新节点,然后删除 ApplyMapping ;我们不再需要它了。
  2. 按日期重新分区 需要您输入包含时间戳的列。
    输入 sale_dat e (框架尚不允许使用下拉列表选择字段),并将另外两个保留为默认值。
  3. 现在我们需要使用新的日期分割字段更新输出架构。为此,请使用 “ 数据预览 ” 选项卡检查其是否正常运行(如果前一个已过期,则启动会话)。然后在 输出架构中 ,选择 使用数据预览架构 以添加新字段。请注意,转换并不会删除原始列,但如果您对其进行更改,则可能会移除原始列。
  4. 最后,编辑 S3 目标以输入不同的位置,这样这些文件夹就不会与之前的运行混在一起,而且更易于比较和使用。将路径更改为 /outp ut2/
    删除现有的分区列,改为添加年、月和日。

保存并运行作业。一两分钟后,完成后,检查输出文件。它们应该更接近最佳数字,即每天一个,也许是两个。假设在这个例子中,我们只有四个分区。在真实的数据集中,没有这种重新分区的文件数量很容易激增。
此外,现在路径遵循传统的日期分区结构,例如: output2/year=2021/month=09/day=01/run-Amazons3_node1669824410-4-part-r-00292

请注意,文件名的末尾是分区号。虽然我们现在有更多的分区,但输出文件更少,因为数据在内存中的组织方式更符合所需的输出。

重新分区转换还有其他配置选项,我们将其留空。现在,你可以继续尝试不同的值,看看它们如何影响输出。
例如,可以在转换中将 “部门” 指定为 “分区列”,然后将其添加到汇分区列列表中。或者,您可以输入 “预期的分区数”,然后查看它如何影响运行时间(它不再需要在运行时确定这一点),以及输入更高的数字(例如,3,000)时生成的文件数。

此功能在幕后是如何运作的

  1. 加载 亚马逊云科技 Glue Studio 可视化作业创作页面后,存储在前述 S3 存储桶中的所有转换都将加载到用户界面中。亚马逊云科技 Glue Studio 将解析 JSON 定义文件 以显示转换元数据,例如名称、描述和参数列表。
  2. 用户使用自定义可视化转换完成创建和保存任务后,亚马逊云科技 Glue Studio 将生成任务脚本并使用以逗 号分隔的 Python 文件 S3 路径列表更新 Python 库路径(也称为 —extra-py-files 任务参数 )。
  3. 在运行脚本之前,亚马逊云科技 Glue 会将存储在 —extra-py-files 作业参数中的所有文件路径添加 到 P ython 路径中,从而允许您的脚本运行您定义的所有自定义可视化转换函数。

清理

为了避免运行成本,如果您不想保留生成的文件,则可以清空并删除为此演示创建的输出存储桶。您可能还想删除创建的 亚马逊云科技 Glue 任务。

结论

在这篇文章中,您已经看到了如何创建自己的可重复使用的可视化转换,然后在 亚马逊云科技 Glue Studio 中使用它们来改善工作和团队的工作效率。

您首先创建了一个组件来按需使用合成生成的数据,然后又创建了一个转换来优化在 Amazon S3 上进行分区的数据。


作者简介

贡萨洛·埃雷罗斯 是 亚马逊云科技 Glue 团队的高级大数据架构师。

迈克尔·贝纳塔尔 是 A WS Glue Studio 团队的高级软件工程师。他领导了自定义视觉变换功能的设计和实现。