发布于: Jun 28, 2022

基于我们之前的架构设计,那我们分阶段来具体动手实践一个数据跨区域迁移的具体场景, Amazon 官网有很多公开的数据集,我们选定 Next Generation Weather Radar (NEXRAD) 作为数据源,该数据源在美东(us-east-1)区域;目标存储桶我们选择在 BJS 区域。该实验仅仅为了验证技术可行性,完整的参考代码见 s3deepdive github;代码不作为生产用途仅仅用来学习用途。

为了简单地完成技术验证,我们所有的测试环境基于一台 r4.2xlarge 的 Amazon Linux 机型展开,系统需要准备好:

• 在 Amazon Web Services Global 美东区域创建一台 EC2 实例
• 关联一个 IAM Role,需要有访问 S3 及 SQS 相关的管理权限
• Python2.7.x
• Boto3
• 300GB gp2 EBS 磁盘

配置好目标存储桶的 IAM Profile 及修改默认获取 IAM Role 的临时 Token 的默认超时时间和重试次数:

[ec2-user@youserver]$ aws configure --profile bjs
AWS Access Key ID [None]: AKI***************A
AWS Secret Access Key [None]: 7j+R6*****************oDrqUDefault region name [None]: cn-north-1Default output format [None]:[ec2-user@youserver]$ vi ~/.aws/configure[default]
region = us-east-1
metadata_service_timeout = 5
metadata_service_num_attempts = 5[profile bjs]
region = cn-north-1
metadata_service_timeout = 5
metadata_service_num_attempts = 5
与直接在 HDFS 上存储数据相比, 由于数据不是本地缓存,所以需要读取的每个数据必须通过 EMRFS 从 S3 获取。另外,大多数 Hadoop 应用程序都依赖于强一致的存储系统。EMRFS 需要通过缓存 到Amazon Web Services DynamoDB 中的对象元数据来部分解决强一致性问题。但元数据仍仅当对象在 Amazon EMR 作业过程中由 EMRFS 写入或对象使用 EMRFS CLI 同步或导入到 EMRFS 元数据时,元数据才会添加到 EMRFS。

Amazon 公共数据集没有提供清单列表,因此,我们利用前文的逻辑,并尝试利用 Amazon Web Services S3 CLI 命令生成该存储桶的对象清单。该数据集按照年月进行数据分区,我们设定对象的 Prefix 的迭代深度为 3,后台执行以下命令,并观察执行日志:

[ec2-user@youserver]$ cd NEXRAD_Demo/inventory[ec2-user@youserver]$ nohup python ../../s3deepdive/s3_inventory.py -b noaa-nexrad-level2 -r us-east-1 -d 3 > noaa-nexrad-level2.log 2>&1 & 

由于数据集非常大,该命令执行需要点时间,最终,3 层的扫描帮助我们并发生成了 2751 个对象清单文件,总大小 4.5GB:

[ec2-user@youserver]$ cd NEXRAD_Demo/inventory[ec2-user@youserver]$ ls noaa-nexrad-level2.*obj* | wc –l2751

由于该场景下,源存储桶和目标存储桶之间的单次传输的速度非常有限,实测该场景下大概在 9KB/s 左右,而且网络抖动比较厉害,因此,我们尽量缩小单个任务的总数据量大小,并设定大对象的大小阈值设置为 2MB;具体参数需要在 Python 常量参数中修改:

由于清单文件太多,总数据量太大,因此,我们可以数据清单分成多个目录,分别进行计算,比如如下命令:

  • 我们把大小小于 800000
  • bytes 的文件放到目录./1/里面
  • 把大小小于 2MB 的文件放到./2/里面
  • 把大小小于 6MB 的文件放到./3/里面

大家可以根据自己的需要,分成不同的对象清单文件夹

[ec2-user@youserver]$ cd NEXRAD_Demo/inventory[ec2-user@youserver]$ mkdir 1 2[ec2-user@youserver]$ find ./ -size -800000c -print0 | xargs -0 -I {} mv {} ../1/[ec2-user@youserver]$ find ./ -size -2M -print0 | xargs -0 -I {} mv {} ../2/[ec2-user@youserver]$ find ./ -size -6M -print0 | xargs -0 -I {} mv {} ../3/[ec2-user@youserver]$ cd NEXRAD_Demo/tasksubmit[ec2-user@youserver]$ nohup python ../../s3deepdive/s3_task_submit.py -d ../inventory/1/ -r us-east-1 > noaa-nexrad-level2.task1.log 2>&1 &[ec2-user@youserver]$ nohup python ../../s3deepdive/s3_task_submit.py -d ../inventory/2/ -r us-east-1 > noaa-nexrad-level2.task2.log 2>&1 &[ec2-user@youserver]$ nohup python ../../s3deepdive/s3_task_submit.py -d ../inventory/3/ -r us-east-1 > noaa-nexrad-level2.task3.log 2>&1 &

为了演示,我们没有生成所有对象清单的传输任务,仅仅选取了其中某连个文件夹,生成的传输任务如下图所示,有些队列的消息数为 0,表示我们后台还有传输任务消息没有发送到队列中:

我们来看看队列里面的一个任务的结构组成,S3Task_Bigsize*队列中的任务相比于普通队列中的任务多了一组分片的 Range 范围:

在并发执行数据传输任务之前,我们先看看单个任务执行情况,任务执行需要指明任务队列,源和目的存储桶以及访问目标存储桶的 IAM Profile 名:

[ec2-user@youserver]$ cd NEXRAD_Demo/taskexec[ec2-user@youserver]$ nohup python ../../s3deepdive/s3_task_exec.py -q S3Task_NormalQueue15098144850121 -source_bucket noaa-nexrad-level2 -dest_bucket bjsdest -dest_profile bjs > S3Task_NormalQueue15098144850121.exec1.log 2>&1 &[ec2-user@youserver]$ nohup python ../../s3deepdive/s3_task_exec.py -q S3Task_BigSizeQueue1 -source_bucket noaa-nexrad-level2 -dest_bucket bjsdest -dest_profile bjs > S3Task_BigSizeQueue1.exec1.log 2>&1 & 

从执行日志可以分析出,对于 NormalQueue 中的单个任务,由于是小对象,而且数量是 10,因此我们的执行代码可以并发执行,总体执行时间是 26 秒;对比 BigsizeQueue 中的任务,虽然总体数据大小和 NormalQueue 差 不多,但由于只有 2 个对象并发复制,该任务的总体执行时间是 363 秒。

关于并发任务执行,本质上是一个批处理的业务逻辑,假定有 1000 个任务列表,

• 每个任务数据量上限 20MB,如果传输速度在10KB/s那么一个任务需要大概需要 2048 秒即 34 分钟,但我们的的任务执行是多线程并发操作,按每个任务最多 10 个对象算,在 10KB/s 的速度下,一个任务最快需要执行 3.4 分钟左右(10 个对象并发上传),最慢34分钟(一个对象的情况下)
• 如果同时 100 个并发执行,完成所有任务,需要至少执行 10 次,总时长在 34 分钟到 340 分钟之间
• 如果并发 1000 个,完成所有任务需要至少执行1次;总时长 3.4 分钟到 34 分钟之间

本实验为了学习的目的,我们在测试机 r4.2xlarge 的机器上,后台并发执行 100 个任务,并观察数据传输的实际状况,

[ec2-user@youserver]$ cd NEXRAD_Demo/taskexec[ec2-user@youserver]$ vi parallel_run.sh
#!/bin/bash
for((i=2; i<52;i++))
do

  nohup python ../../s3deepdive/s3_task_exec.py -q S3Task_BigSizeQueue1 -source_bucket noaa-nexrad-level2 -dest_bucket bjsdest -dest_profile bjs > S3Task_BigSizeQueue1.exec_$i.log 2>&1 &

done
for((i=2; i<52;i++))
do

  nohup python ../../s3deepdive/s3_task_exec.py -q S3Task_NormalQueue15098144850121 -source_bucket noaa-nexrad-level2 -dest_bucket bjsdest -dest_profile bjs > S3Task_NormalQueue15098144850121.exec_$i.log 2>&1 &

done
[ec2-user@youserver]$ chmod +x parallel_run.sh[ec2-user@youserver]$ ./parallel_run.sh

针对下面这两个队列,每个运行了 50 个并发任务,因此在 SQS 界面上,可以看到传输中的消息是 50,也就是同时有 50 个消息任务正在被处理:

可以看到 BigsizeQueue 队列一次就成功完成的数据传输任务总数为 79949+50-79971=28;NormalQueue 队列一次就成功完成的数据传输任务总数为 79950+50-79958=42;我们定义任务的成功与否,为该任务中所有的对象都成功传输完成;该实验我们对于分段采取的大小是 2MB,在 9KB/s 左右的互联网传输速度下,还是有点大,容易失败;普通队列上中的任务,对象大小都在几百 KB 左右,一次传输成功的概率大很多。

本文不对单个对象的完整性问题展开探讨,对于用户首先最关心的问题是,源存储桶的对象有没有完全迁移到目前存储桶中;因此,可以定期生成目标存储桶的对象清单,并比对源存储桶的对象清单,在自定义的清单程序中,我们是逐级生产对象清单文件,有一定的规律,如果两个存储桶使用同样的 depth 参数生成,生成的对象清单文件个数首先一致的;具体到识别出有没有遗漏的迁移对象,可以进一步对比清单中的对象列表。

本文就跨区域 S3 数据迁移整体架构作了基本探讨,并在架构的基础上,学习和实践了利用 Amazon Web Services S3 CLI 以及 boto3 库如何实现自定义的对象清单,传输任务分解及执行逻辑。现实的生产场景下,还需要更多细节的思考和实践,接下来,我们会继续在大规模批处理,大规模对象集的完整性校验方面和大家继续探讨。

相关文章