Implement a serverless CDC process with Apache Iceberg using Amazon DynamoDB and Amazon Athena

by Vijay Velpula , Karthikeyan Ramachandran , and Sriharsh Adari | on

Apache Iceberg is an open table format for very large analytic datasets. Iceberg manages large collections of files as tables, and it supports modern analytical data lake operations such as record-level insert, update, delete, and time travel queries. The Iceberg specification allows seamless table evolution such as schema and partition evolution, and its design is optimized for usage on Amazon Simple Storage Service (Amazon S3). Iceberg also helps guarantee data correctness under concurrent write scenarios.

Most businesses store their critical data in a data lake, where you can bring data from various sources to a centralized storage. Change Data Capture (CDC) in the context of a data lake refers to the process of capturing and propagating changes made to source data. Source systems often lack the capability to publish data that is modified or changed. This requires data pipelines to consume full load datasets every day, increasing the data processing duration and also the storage cost. If the source is tabular format, then there are mechanisms to identify the data changes easily. However, the complexity increases if the data is in semi-structured format and propagating changes made to source data into the data lake in near-real-time.

This post presents a solution to handle incoming semi-structured datasets from source systems and effectively determine changed records and load them into Iceberg tables . With this approach, we will not only use Athena to query data source files in Amazon S3, but also achieve ACID compliance.

Solution overview

We demonstrate this solution with an end-to-end serverless CDC process. We use a sample JSON file as input to Amazon DynamoDB . We identify changed records by utilizing Amazon DynamoDB Streams and Amazon Web Services Lambda to update the data lake with changed records. We then utilize an Iceberg table to demonstrate CDC functionality for a sample employee dataset. This data represents employee details such as name, address, date joined, and other fields.

The architecture is implemented as follows:

  1. Source systems ingest a semi-structured (JSON) dataset into a DynamoDB table.
  2. The DynamoDB table stores the semi-structured dataset, and these tables have DynamoDB Streams enabled. DynamoDB Streams helps identify if the incoming data is new, modified, or deleted based on the keys defined and delivers the ordered messages to a Lambda function.
  3. For every stream, the Lambda function parses the stream and builds the dynamic DML SQL statements.
  4. The constructed DML SQL statements are run on the corresponding Iceberg tables to reflect the changes.

The following diagram illustrates this workflow.

Prerequisites

Before you get started, make sure you have the following prerequisites:

  • An Amazon Web Services account
  • Appropriate Amazon Web Services Identity and Access Management (IAM) permissions to deploy Amazon Web Services CloudFormation stack resources

Deploy the solution

For this solution, we provide a CloudFormation template that sets up the services included in the architecture, to enable repeatable deployments.

Note : – Deploying the CloudFormation stack in your account incurs Amazon Web Services usage charges.

To deploy the solution, complete the following steps:

  1. Choose Launch Stack to launch the CloudFormation stack.
  2. Enter a stack name.
  3. Select I acknowledge that Amazon Web Services CloudFormation might create IAM resources with custom names .
  4. Choose Create stack .

After the CloudFormation stack deployment is complete, navigate to Amazon Web Services CloudFormation console to note the following resources on the Outputs tab:

  • Data lake S3 bucket iceberg-cdc-xxxxx-us-east-1-xxxxx
  • AthenaWorkGroupName AthenaWorkgroup-xxxxxx
  • DataGeneratorLambdaFunction UserRecordsFunction-xxxxxx
  • DynamoDBTableName users_xxxxxx
  • LambdaDMLFunction IcebergUpsertFunction-xxxxxx
  • AthenaIcebergTableName users_xxxxxx

Generate sample employee data and load into the DynamoDB table using Lambda

To test the solution, trigger the UserRecordsFunction-XXXXX function by creating a test event which loads sample data into DynamoDB table.

  1. On the Lambda console, open the Lambda function with the name UserRecordsFunction-XXXXX.
  2. On the Code tab, choose Test , then Configure test event .
  3. Configure a test event with the default hello-world template event JSON.
  4. Provide an event name without any changes to the template and save the test event.
  5. On the Test tab, choose Test to trigger the SampleEvent test event. This will invoke the data generator Lambda function to load data into the users_xxxxxx DynamoDB table. When the test event is complete, you should notice a success notification as shown in the following screenshot.
  6. On the DynamoDB console, navigate to the users_XXXXXX table and choose Explore table items to verify the data loaded into the table.

The data loads performed on the DynamoDB table will be cascaded to the Athena table with the help of the IcebergUpsertFunction-xxxxx Lambda function deployed by CloudFormation template.

In the following sections, we simulate and validate various scenarios to demonstrate Iceberg capabilities, including DML operations, time travel, and optimizations.

Simulate the scenarios and validate CDC functionality in Athena

After the first run of the data generator Lambda function, navigate to the Athena query editor, choose the AthenaWorkgroup-XXXXX workgroup, and preview the user_XXXXXX Iceberg table to query the records.

With the data inserted into the DynamoDB table, all the data change activities such as inserts, updates, and deletes are captured in DynamoDB Streams. DynamoDB Streams triggers IcebergUpsertFunction-xxxxx Lambda function which processes the events in the order they are received. IcebergUpsertFunction-xxxxx function, performs the following steps:

  • Receives the stream event
  • Parses the stream event based on the  DynamdoDB eventType (insert, update, or delete) and eventually generates an Athena DML SQL statement
  • Runs the SQL statement in Athena

Let’s deep dive in to the IcebergUpsertFunction-XXXX function code and how it handles various scenarios.

IcebergUpsertFunction-xxxxx function code

As indicated in the following Lambda function code block, the DynamoDB Streams event received by the function, categorizes events based on eventType—INSERT, MODIFY, or DELETE. Any other event raises InvalidEventException. MODIFY is considered an UPDATE event.

All the DML operations are run on the user_XXXXXX table in Athena. We fetch the metadata of the users_xxxxxx table from Athena. The following are a few important considerations regarding how the Lambda function handles Iceberg table metadata changes:

  • In this approach, target metadata takes precedence during DML operations.
  • Any columns that are missing in the target will be excluded in the DML command.
  • It’s imperative that the source and target metadata match. Incase new columns and attributes are added to source table than the current solution is configured to skip the new columns and attributes.
  • This solution can be enhanced further to cascade source system metadata changes to the target table in Athena.

The following is the Lambda function code:

def iceberg_upsert(event, database, tablename):
    response ={}
    logger.info(f'Started iceberg_upsert executing.')
    logger.info(f'Started parsing received event.')
    
    # Determine type of event
    resp=event
    eventName=resp['eventName']
    
    # call for athena function 
    athresp=retrieve_athena_table_metadata(database,tablename) 
    try:
        AthenTblMd=athresp['TableMetadata']['Columns']
    except Exception as e:
        logger.error(f"Athena Metadata does not have column information. Please check table {tablename} and database {database} ")
        raise(e)
    else: # else block for try/except
        logger.info(f"{AthenTblMd}")
        
    try:
        if eventName == "INSERT":
            sqlstmt=insert_stmt(resp,AthenTblMd,database,tablename)
            logger.info(sqlstmt)
            response=run_query(sqlstmt, database_name, athena_workgroup, output_location,wait_time)
        elif eventName == "MODIFY":
            sqlstmt=update_stmt(resp,AthenTblMd,database,tablename)
            logger.info(sqlstmt)
            response=run_query(sqlstmt, database_name, athena_workgroup, output_location,wait_time)
        elif eventName == "REMOVE":
            sqlstmt=del_stmt(resp,database,tablename)
            logger.info(sqlstmt)
            response=run_query(sqlstmt, database_name, athena_workgroup, output_location,wait_time)
        else:
            raise InvalidEventTypeException
        
    except InvalidEventTypeException:
        logger.warning(f'Event type should be INSERT/MODIFY/REMOVE. Received event type is : {eventName}.')
        logger.warning(f'Skipping applying grant/revoke permissions.')
    except Exception as e:
        logger.error("iceberg_upsert function failed with error")
        raise(e)
    else : # else block for try/except
        return response

The following code uses the Athena Boto3 client to fetch the table metadata:

def retrieve_athena_table_metadata(databaseName, tableName, catalogName=None):
    if catalogName is None:
        catalogName='AWSDATACATALOG' # default value 
    try:
        athenaTblMd=client.get_table_metadata(CatalogName=catalogName,DatabaseName=databaseName,TableName=tableName)
    except Exception as e:
        logger.error("Athena Table Metadata retrieval function Failed.Please check exception", e)
        raise(e) 
    else: # else block for try except
        return athenaTblMd

Insert operations

Now let’s see how insert operations are handled with the sample data generated in the DynamoDB table.

  1. On the DynamoDB console, navigate to the users_XXXXX table.
  2. Choose Create item .
  3. Enter a sample record with the following code:
    {
      "emp_no": {
         "N": "11"
      },
      "country": {
         "S": "USA"
      },
      "dateOfBirth": {
         "S": "1991-10-23"
      },
      "first_name": {
         "S": "Tom"
      },
      "isContractAthlete": {
         "BOOL": false
      },
      "job": {
         "S": "Sr Manager"
      },
      "last_name": {
         "S": "Carter"
      },
      "phone_number": {
         "S": "+1-226-333-789"
      },
      "sex": {
         "S": "male"
      },
      "ssn": {
         "S": "434-98-2345"
      }
    }
    
  4. Choose Create item to insert the new record into the DynamoDB table.

After the item is created in the DynamoDB table, a stream event is generated in DynamoDB Streams, which triggers the Lambda function. The function processes the event and generates an equivalent INSERT SQL statement to run on the Athena table. The following screenshot shows the INSERT SQL that was generated by the Lambda function on the Athena console in the Recent queries section.

The IcebergUpsertFunction-xxxxx Lambda code has modularized functions for each eventType. The following code highlights the function, which processes insert eventType streams:

def insert_stmt(insert_event_resp,AthenTblMd,database,tablename):
    resp=insert_event_resp
    
    Tablevalues=resp['dynamodb']['NewImage']
    Tblvalues={ k.lower():v for k,v in Tablevalues.items()} # converting key names to lowercase to prevent case-sensitive mismatches
    
    val_list=unpack_dict(Tblvalues,AthenTblMd)
    col_nm,val_for_col=[],[]
 
    for item in val_list:
        
        if item.get('data') is not None:
            col_nm.append(item['Name'])
            if item['Type'] != 'string':
                val_for_col.append(f"CAST ({(item['data'])} AS {item['Type']})" )
            else:
                val_for_col.append(str((item['data'])))
 
    colnames_with_doublequotes=",".join([f'"{i}"' for i in col_nm])
    values_formatted=",".join([f"{i}" if i.startswith('CAST') else f"'{i}'" for i in val_for_col] )
 
    return f"insert into {database}.{tablename} ({colnames_with_doublequotes}) values ({values_formatted})"

This function parses the create item stream event and constructs an INSERT SQL statement in the following format:

INSERT into <tablename> values (val1, val2....)

The function returns a string, which is an ANSI SQL compliant statement that can be run directly in Athena.

Update operations

For our update operation, let’s identify the current state of a record in the Athena table. We see emp_no=5 and its column values in Athena and compare them to the DynamoDB table. If there are no changes, the records should be the same, as shown in the following screenshots.

Let’s initiate an edit item operation in the DynamoDB table. We modify the following values:

  • IsContractAthlete – True
  • Phone_number – 123-456-789

After the item is edited in the DynamoDB table, a MODIFY stream event is generated in DynamoDB Streams, which triggers the Lambda function. The function processes the event and generates the equivalent UPDATE SQL statement to run on the Athena table.

MODIFY DynamoDB Streams events have two components: the old image and the new image. Here we parse only the new image data section to construct an UPDATE ANSI SQL statement and run it on the Athena tables.

The following update_stmt code block parses the modify item stream event and constructs the corresponding UPDATE SQL statement with new image data. The code block performs the following steps:

  • Finds the key columns for the WHERE clause
  • Finds columns for the SET clause
  • Ensures key columns are not part of the SET command

The function returns a string that is a SQL ANSI compliant statement that can be run directly in Athena. For example:

UPDATE <TABLENAME> SET col = value where key = value

See the following code:

def update_stmt(update_event_resp,AthenTblMd,database,tablename):
    resp=update_event_resp
    
    Tablevalues=resp['dynamodb']['NewImage']
    primary_key_col_names=resp['dynamodb']['Keys']     
    
    Tblvalues={ k.lower():v for k,v in Tablevalues.items()} # converting key names to lowercase to prevent case-sensitive mismatches
    
    new_upd_AthenaTblMd=AthenTblMd.copy()
    where_nm,set_nm=[],[]
    forUpdate=Tblvalues.copy()
 
    # removing primary keys from the stream dictionary so that SET command for Update can be constructed.
    for col_pkey in primary_key_col_names.keys():
        forUpdate.pop(col_pkey,None)
    
 
    for position,item in enumerate(AthenTblMd):
        if forUpdate.get(item.get('Name')) is not None:
            datafromsource=(list(forUpdate.get(item.get('Name')).values())[0])
            new_upd_AthenaTblMd[position]['data']=datafromsource
 
    # For set clause
    for item in new_upd_AthenaTblMd:
        if item.get('data') is not None:
            if item['Type'] != 'string':
                set_nm.append(f"{item['Name']} = CAST ('{(item['data'])}' AS {item['Type']})")
            else:
                set_nm.append(f" {item['Name']} = '{item['data']}' ")
    
    set_cmd=f" set {','.join(set_nm)}"
    
    # for where clause
    for key, val in primary_key_col_names.items():
        where_nm.append(f" {key} = {list(val.values())[0]}")
 
    where_cmd=f" where {' and '.join(where_nm)}"
 
    return (f" UPDATE {database}.{tablename} {set_cmd}  {where_cmd}")

In the Athena table, we can see the columns IsContractAthlete and Phone_number have been updated to the recent values. The other column values remain the same because they weren’t modified.

Delete operations

For delete operations, let’s identify the current state of a record in Athena table. We choose emp_no=6 for this activity.

  1. On the DynamoDB console, navigate to the user table.
  2. Select the record for emp_no=6 .
  3. On the Actions menu, choose Delete items .

After the delete item operation is performed on the DynamoDB table, it generates a DELETE eventType in the DynamoDB stream, which triggers the Iceberg-Upsert Lambda function.

The DELETE function removes the data based on key columns in the stream. The following function parses the stream to identify key columns of the deleted item. We construct a DELETE DML SQL statement with a WHERE clause of emp_no=6:

DELETE &lt;TABLENAME&gt; WHERE key = value

See the following code:

def del_stmt(del_event_resp,database,tablename):
    
    resp=del_event_resp
    
    primary_key_col_names=resp['dynamodb']['Keys'] 
    del_where_nm=[]
    
    for key, val in primary_key_col_names.items():
        del_where_nm.append(f" {key} = {list(val.values())[0]}")
 
    del_where_cmd=f" where {' and '.join(del_where_nm)}"
    return f" DELETE FROM {database}.{tablename} {del_where_cmd} "   

The function returns a string, which is an ANSI SQL compliant statement that can be run directly in Athena. The following screenshot shows the DELETE statement that was run in Athena.

As you can see from the following screenshot, emp_no=6 record no longer exists in the Iceberg table when queried with Athena.

Time travel

Time travel queries in Athena query Amazon S3 for historical data from a consistent snapshot as of a specified date and time. Iceberg tables provide the capability of time travel. Each Iceberg table maintains a versioned manifest of the S3 objects that it contains. Previous versions of the manifest can be used for time travel and version travel queries. Version travel queries in Athena query Amazon S3 for historical data as of a specified snapshot ID. Iceberg format tracks every change that happened to the table in the tablename$iceberg_history table. When you query them, it will show timestamps when the changes occurred in the table.

Let’s find the timestamp when a DELETE statement was applied to the Athena table. In our query, it corresponds to the time 2023-04-18 21:34:13.970. With this timestamp, let’s query the main table to see if the emp_no=6 exists in it.

As shown in the following screenshot, the query result shows that the deleted record exists, and this can be used to reinsert data if required.

Optimize Iceberg tables

Every insert and update operation on an Iceberg table creates a separate data and metadata file. If there are multiple such update and insert operations, it might lead to multiple small fragmented files. Having these small files can cause an unnecessary number of metadata and less efficient queries. Utilize Athena OPTIMIZE command to compact these small files.

OPTIMIZE

The OPTIMIZE table REWRITE DATA compaction action rewrites data files into a more optimized layout based on their size and number of associated delete files.

The following query shows the number of data files that exist before the compaction process:

SELECT * FROM "users_73591300$iceberg_files"

The following query performs compaction on the Iceberg table:

OPTIMIZE "users_73591300$iceberg_files" REWRITE DATA USING BIN_PACK

We can observe that the compaction process merged multiple data files into a larger file.

VACUUM

The VACUUM statement on Iceberg tables removes data files that are no longer relevant, which reduces metadata size and storage consumption. VACUUM removes unwanted files older than the amount of time that is specified by the vacuum_max_snapshot_age_seconds table property (default 432000), as shown in the following code:

ALTER TABLE users_73591300 SET TBLPROPERTIES ('vacuum_max_snapshot_age_seconds'='259200')

The following query performs a vacuum operation on the Iceberg table:

VACUUM users_73591300

Clean up

When you have finished experimenting with this solution, clean up your resources to prevent Amazon Web Services charges from being incurred:

  1. Empty the S3 buckets.
  2. Delete the stack from the Amazon Web Services CloudFormation console.

Conclusion

In this post, we introduced a serverless CDC solution for semi-structured data using DynamoDB Streams and processing them in Iceberg tables. We demonstrated how to ingest semi-structured data in DynamoDB, identify changed data using DynamoDB Streams, and process them in Iceberg tables. We can expand the solution to build SCD type-2 functionality in data lakes to track historical data changes. This solution is appropriate for low frequency of updates, but for high frequency and larger volumes of data, we can aggregate the changes in a separate intermediate table using DynamoDB Streams and Amazon Kinesis Data Firehose, and then run periodic MERGE operations into the main Iceberg table.

We hope this post provided insights on how to process semi-structured data in a data lake when sources systems lack CDC capability.


About the authors

Vijay Velpula is a Data Lake Architect with Amazon Web Services Professional Services. He helps customers building  modern data platforms through implementing Big Data & Analytics solutions. Outside of work, he enjoys spending time with family, traveling, hiking and biking.

Karthikeyan Ramachandran is a Data Architect with Amazon Web Services Professional Services. He specializes in MPP systems helping Customers build and maintain Data warehouse environments. Outside of work, he likes to binge-watch tv shows and loves playing cricket and volleyball.

Sriharsh Adari is a Senior Solutions Architect at Amazon Web Services (Amazon Web Services), where he helps customers work backwards from business outcomes to develop innovative solutions on Amazon Web Services. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise include Technology Strategy, Data Analytics, and Data Science. In his spare time, he enjoys playing sports, binge-watching TV shows, and playing Tabla.


The mentioned AWS GenAI Services service names relating to generative AI are only available or previewed in the Global Regions. Amazon Web Services China promotes AWS GenAI Services relating to generative AI solely for China-to-global business purposes and/or advanced technology introduction.