发布于: Jun 17, 2022

Amazon Web Services 在中国境内有两个区,分别是北京区域和宁夏区域。假定复制路径是从北京到宁夏,那么根据复制应用所处区的不同,至少有三种配置方式,罗列如下:

  • 左图是复制应用与源表在同一区域的情况
  • 中图是复制应用与目标表在同一区域的情况
  • 右图是复制应用在云下的情况

建议是采用复制应用在云端的配置,以降低网络延迟。故在后面的测试环节,不对第三种架构进行测试。

表终端节点与流终端节点是相互独立的。要实现表复制,除需要开启表的流功能外,还需要进行必要的设置。有四种写入流的信息类型可以设置:

  • 键值:仅传输更改表项的键值信息
  • 新印象:传输更改表项的键值和更改后的信息
  • 旧印象:传输更改表项的键值和更改前的信息
  • 新旧印象:传输更改表项的键值和更改前后的信息

如果流功能关闭后再次开启,其终端节点会变动。

流由流记录构成。一条流记录包含该流所属表的一个数据项的更改。按组来组织流记录,又称为分区。分区类似于容器,包含多条流记录以及读取和遍历这些记录的必要信息。分区中的流记录会在二十四小时后删除。

流结构不是简单的线性结构。分区会按需被自动创建、拆分和删除。一个分区有可能有一个或者数个子分区,形成父子结构。访问时,必须按先父后子的顺序来处理流,以保证其正确性。在此,推荐使用 DynamoDB 流 Kinesis 适配器(以下简称流适配器)来简化流处理的复杂性。该适配器还可以正确处理新的或者过期的分区。本文不讨论如何使用更初级的流函数接口来处理流。

流函数接口的设计与 Kinesis 的函数接口很相似,例如下述四个关键函数:  ListStreams, DescribeStream, GetShards 和 GetShardIterator。因此,在使用了流适配器后, Kinesis 客户端(以下简称流客户端)就可以匹配及处理流信息。该客户端就顺理成章的可以用来进一步简化流处理操作,如下图所示。

组件 版本

Java

8

Dagger

2.x

Amazon Kinesis Client Library For Java

1.11.x

DynamoDB Streams Adapter For Java

1.5.x

DynamoDBLocal

1.11.x

本节基于 Java 语言和 Dagger 依赖注入机制,罗列相关源代码片段,构建一个跨区域 DynamoDB 表复制的应用程序。请注意,现阶段只支持单表的复制,未来可以拓展至多表的复制,其实现核心与机制是类似的。此外,囿于篇幅,以下仅罗列关键代码片段,省略了譬如异常处理、日志记录等代码片段。具体来说,主要依赖关系如下:

首先注册 Amazon Web Services 账号,拿到相应用户的访问密钥。然后构建密钥供应器 Amazon Credentials Provider

关于数据模型,由于 DynamoDB 表是无结构的存储,为了简化测试,假定该表只有一个字符串类型的主键 id 。用户可以根据实际表结构及主键信息,对代码做相应修改。事实上,这个表可以存储任意数据。

@DynamoDBTable(tableName = "BCSReplicateTestTable")public class Item {
    private String id;
    
    @DynamoDBHashKey
    @DynamoDBAutoGeneratedKey
    public String getId() { return id; }
    public void setId(String id) { this.id = id; }}

本文构造的复制器基于 Kinesis 流复制工作器进行复制,即具体的复制工作,是有该工作器完成的。具体实现类是:

  • amazonaws.services.kinesis.clientlibrary.lib.worker.Worker  

 需要在源表部分准备数个不同的组件,用以创建该复制工作器,包括:

  1. 目的 DynamoDB 客户端
  2. 目的 CloudWatch 客户端
  3. 流记录处理器
  4. Kinesis 客户端配置信息
  5. DynamoDB 流适配器客户端

前两项是常规的客户端,指定区域和密钥后即可创建。

以下创建流记录处理器。这里对于 DynamoDB 流的处理方法和对于 Kinesis 流的处理方法是一样的。当修改和删除信息事件发生时,调用目的表客户端对目的表进行相应修改,以达到复制的目的。注意这里只列出关键功能代码。

@Singletonpublic class SourceRecordReplicator implements IRecordProcessor {
    private final AmazonDynamoDB dynamoDB;
    private final String tableName;

    @Inject
    SourceRecordReplicator(
            @Destination AmazonDynamoDB dynamoDB,
            @Named("dynamodb.table.name") String tableName) {
        this.dynamoDB = dynamoDB;
        this.tableName = tableName;
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (com.amazonaws.services.kinesis.model.Record record :
             processRecordsInput.getRecords()) {
            final String data = new String(record.getData().array());

            if (record instanceof RecordAdapter) {
                final Record streamRecord = ((RecordAdapter) record).getInternalObject();
                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        modify(streamRecord);
                        break;

                    case "REMOVE":
                        remove(streamRecord);
                        break;
                }
            }
        }
    }

    private void modify(Record record) {
        dynamoDB.putItem(new PutItemRequest()
                .withTableName(tableName)
                .withItem(record.getDynamodb().getNewImage()));
    }

    private void remove(Record record) {
        final String id = Utils.getId(record);
        dynamoDB.deleteItem(new DeleteItemRequest()
                .withTableName(tableName)
                .withKey(Utils.asId(id)));
    }}
 
以下创建源表流的资源信息。该信息用以创建流客户端配置信息。
@Named("source.stream.arn")@Provides@Singletonstatic String getStreamArn(
        @Source AmazonDynamoDBStreams stream,
        @Named("dynamodb.table.name") String tableName) {
    final List<Stream> streams = stream
        .listStreams(new ListStreamsRequest().withTableName(tableName))
        .getStreams();
    return streams.get(0).getStreamArn();}

以下创建源表的流适配器。

@Source@Provides@Singletonstatic AmazonDynamoDBStreamsAdapterClient getStreamAdapter(
        AWSCredentialsProvider credentialsProvider,
        @Source Regions region) {
    return new AmazonDynamoDBStreamsAdapterClient(
            AmazonDynamoDBStreamsClientBuilder.standard()
                    .withCredentials(credentialsProvider)
                    .withRegion(region).build());}

以下创建流客户端配置信息。

@Source@Provides@Singletonstatic KinesisClientLibConfiguration getKclConfiguration(
        @Named("source.stream.arn") String streamArn,
        AWSCredentialsProvider credentialsProvider,
        @Named("worker.id") String workerId) {
    return new KinesisClientLibConfiguration(
            "ddb-replicator",
            streamArn,
            credentialsProvider,
            workerId)
            .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);}

至此,所有创建流复制工作器所需要的组件都准备好了。

以下创建目的表流复制工作器。

@Destination@Provides@Singletonstatic Worker getWorker(
        @Source IRecordProcessorFactory processorFactory,
        @Source KinesisClientLibConfiguration configuration,
        @Source AmazonDynamoDBStreamsAdapterClient adapterClient,
        @Destination AmazonDynamoDB dynamoDB,
        @Destination AmazonCloudWatch cloudWatch) {
    return new Worker.Builder()
            .recordProcessorFactory(processorFactory)
            .config(configuration)
            .kinesisClient(adapterClient)
            .dynamoDBClient(dynamoDB)
            .cloudWatchClient(cloudWatch)
            .build();}

以上准备好各个工作组件,现在将其装配起来,开始表复制工作。

@Singletonpublic class Replicator implements AutoCloseable {
    private final ExecutorService executor;
    private final Worker replicator;
    @Inject
    Replicator(@Destination Worker replicator) {
        this.replicator = replicator;
        executor = Executors.newSingleThreadExecutor();
    }
    public void replicate() {
        executor.execute(replicator);
    }
    @Override
    public void close() {
        replicator.shutdown();
        executor.shutdown();
    }}

对于表的跨区域复制,测试侧重复制延迟及复制器性能。本节测试在预置模式下,采用不同写容单位(WCU, Write Capacity Unit)时延迟的变化情况,同时监控复制器的性能指标。一个写容单位表示对大小最多为 1 KB 的项目每秒执行一次写入。而事务写入请求需要 2 个写容单位才能对大小最多为 1 KB 的项目每秒执行一次写入。考虑到各种不稳定因素可能会对测试产生影响,如网络异常等,测试程序并不会就单个表项目的复制计算延迟,而是在固定写容单位下随机测试部分表项目的延迟,以尽可能接近真实情况。

以下编写两个测试程序,分别运行在源端和目的端。

  • 运行在源端的测试程序负责按照指定的写容单位向源表中写入测试数据,以确保相应的写压力;
  • 运行在目的端的测试程序向源端写入测试数据,写入后开始计时直至从目的端读出数据作为复制延迟。

测试分两大部分,即北京复制到宁夏,及宁夏复制到北京。测试数据复制延迟的情况。按照不同的写容单位共进行五次复制测试,设置其分别为 30, 90, 180, 300 和 7000,前四个每次都只运行十分钟,最后一个运行一小时。

本节测试数据从北京复制到宁夏的情况。下表计算了在不同写容单位下复制器在不同区域的平均、最大及最小延迟时间,单位是毫秒。从表中可以看出,就最大延迟而言,大部分情况下是小于 3000 毫秒的。

测试延迟和数据复制完成度的关系。给定一定的延迟,计算完成数据复制的百分比。横轴是延迟时间,单位是毫秒,纵轴是数据复制完成百分比。如下面两图所示,无论复制器在源端(北京)还是在目的端(宁夏),当延迟时间为 1200 毫秒时,70% 以上的表项目都已复制完成。当延迟时间为 2000 毫秒时,90%以上的表项目都已复制完成。当延迟为 3000 毫秒时,几乎所有的数据(99%)都复制完成了。总体来看,复制器在源端(北京)对复制延迟影响最小,受写容单位影响也较小。同时对复制器运行环境的监控未发现明显异常。

本节测试数据从宁夏复制到北京的情况,和上面相反。和上面数据类似的是,绝大多数最大延迟都在 3000 毫秒以内。有趣的是,对于从宁夏复制到北京的情况,和从北京复制到宁夏一样,还是复制器在北京的时候,延迟情况比较好。

测试延迟和数据复制完成度的关系。和上面的测试类似,在 1200 毫秒之后,约 70% 的数据项都复制完成。在 3000 毫秒之后,绝大部分(99%)数据项都复制完成。所以可以推论,无论数据从北京到宁夏,还是反之,性能上未见明显差异。说明在中国区利用流对表进行复制的性能是非常稳定和可靠的。

本文探讨了利用 DynamoDB 的流对其表在中国区进行跨区域复制的实践,并对相关实践进行了具体测试。数据复制的核心思想是,当数据更改信息按序出现在流中以后,复制器读取该更改信息并施行在目的表。从测试结果来看,跨区域的复制延迟非常小,基本上都在秒级。在 DynamoDB 全局表尚未在中国区就绪之前,可以考虑使用该方式进行跨区域数据同步,作为一项有益补充。

需要指出的是,本文所罗列的代码仅针对单表的复制,尚不支持多张表的并发复制。但是对多表的复制,其原理是一样的。只不过需要对多线程的并发操作做出相应正确和有效的管理。笔者未来可以针对多表复制构建并发机制,提高复制的效率与可扩展性。另一个发展方向是通过自动化工具,对复制的各方面进行高效管理,包括前期的配置过程,复制中间的监控预警等事务。

相关文章