We use machine learning technology to do auto-translation. Click "English" on top navigation bar to check Chinese version.
Ingesting enriched IoT data into Amazon S3 using Amazon Kinesis Data Firehose
Introduction
When sending data from Internet of Things (IoT) devices to a data lake, you may need to enrich the device data payload with additional metadata in the cloud for further data processing and visualization. There are multiple reasons this data might not exist in the device payload, such as minimizing the device payload in limited bandwidth environments or modifying it with business inputs in the cloud. For example, a machine on the factory floor might be assigned to different operators during the day. This variable business data would be stored in a database. In your data lake, you might need this information to be stored along with the payload.
In this blog post, you will learn how to ingest enriched IoT data to a data lake in near real-time.
Prerequisites
- An Amazon Web Services account
- Amazon Web Services Command Line Interface (Amazon Web Services CLI). See Amazon Web Services CLI
quick setup for configuration.
Use case definition
Let’s assume that in your logistics company, you have containers equipped with sensor-enabled IoT devices. When the container is loaded into a ship, the container ID is associated with the ship ID. You need to store the IoT device payload with the ship ID in your data lake.
In such a use case, the sensor payload comes from the IoT device attached to the container. However, the associated ship ID is only stored in the metadata store. Therefore, the payload must be enriched with the ship ID before putting it into the data lake.
Solution architecture
In the architecture diagram,
- The IoT devices stream payloads to the
Amazon Web Services IoT Core message broker to a specific MQTT topic device/data/ DEVICE_ID . The Amazon Web Services IoT Core message broker allows devices to publish and subscribe to messages by using supported protocols. - The
Amazon Web Services IoT rule is triggered when there is a payload in its topic. It is configured with anAmazon Kinesis Data Firehose action in this use case. You can use Amazon Web Services IoT rules to interact with Amazon Web Services services by calling them when there is a message in a specific MQTT topic or directly by usingBasic Ingest feature. -
Amazon Kinesis Data Firehose buffers the device payloads before delivering them to the data store based on the size or the time, whichever happens first. Kinesis Data Firehose delivers real-time streaming data to destinations for storing or processing. - Once the buffer hits the size or the time threshold, Kinesis Data Firehose calls an
Amazon Web Services Lambda function to enrich the device payloads in batches with the metadata retrieved from anAmazon DynamoDB Amazon Web Services Lambda is a serverless compute service that runs your code for any type of application. Amazon DynamoDB is a fully managed NoSQL database that provides fast performance. - The enriched payloads are returned back to Kinesis Data Firehose to deliver to the destination.
- The enriched payloads are put into an
Amazon Simple Storage Service (Amazon S3) bucket as a destination. Amazon S3 is an object storage service which stores any amount of data for a range of use cases.
Amazon Web Services CloudFormation template
Download the Amazon Web Services Cloudformation template from the
The Amazon Web Services CloudFormation template deploys all the necessary resources to run this example use case. Let’s have a closer look at Amazon Web Services IoT rules, Kinesis Data Firehose, and Amazon Web Services Lambda function resources.
Amazon Web Services IoT rules resource
IoTToFirehoseRule:
Type: AWS::IoT::TopicRule
Properties:
TopicRulePayload:
Actions:
-
Firehose:
RoleArn: !GetAtt IoTFirehosePutRecordRole.Arn
DeliveryStreamName: !Ref FirehoseDeliveryStream
Separator: "\n"
AwsIotSqlVersion: ‘2016-03-23’
Description: This rule logs IoT payloads to S3 Bucket by aggregating in Kinesis Firehose.
RuleDisabled: false
Sql: !Ref IotKinesisRuleSQL
The Amazon Web Services IoT rule takes a SQL parameter which defines the IoT topic to trigger the rule and data to extract from the payload.
- In the example, the SQL parameter is set to SELECT *, topic(3) as containerId FROM ‘device/data/+’ by default. SELECT * means the whole payload is taken as it is and containerId is generated from the second item in the MQTT topic and included to the payload.
- FROM ‘device/data/+’ describes the IoT topic that will trigger the Amazon Web Services IoT rule. + is a wildcard character for MQTT topics and the IoT devices will publish data payloads to device/data/DEVICE_ID topic to trigger this rule.
The Amazon Web Services IoT rule also defines actions. In the example, you can see a Kinesis Data Firehose action which defines the target Kinesis Data Firehose delivery stream and the
Kinesis Data Firehose delivery stream resource
FirehoseDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt IoTLogBucket.Arn
BufferingHints:
IntervalInSeconds: 60
SizeInMBs: 1
Prefix: device-data/
RoleARN: !GetAtt FirehosePutS3Role.Arn
ProcessingConfiguration:
Enabled: true
Processors:
- Type: Lambda
Parameters:
- ParameterName: LambdaArn
ParameterValue: !Sub '${FirehoseTransformLambda.Arn}:$LATEST'
- ParameterName: RoleArn
ParameterValue: !GetAtt FirehoseLambdaInvokeRole.Arn
Kinesis Data Firehose delivery stream must define a destination to put the stream into. It supports different types of destinations. You can find the available destination types and their usage in this
The example Delivery Stream resource defines the following properties:
- BucketARN: the destination bucket which will store the aggregated data. The destination bucket is created by the CloudFormation stack.
- BufferingHints: the size and time threshold for data buffering. In this example, they are set to 1MB and 60 seconds respectively to see the results faster. It can be adjusted according to the business needs. Keeping these thresholds low will cause the Lambda function to be invoked more frequently. If the thresholds are high, the data will be ingested to the data store less frequently, therefore, it will take time to see the latest data in the data store.
- Prefix: the created objects will be put under this prefix. Kinesis Data Firehose partitions the data based on the timestamp by default. In this example, the objects will be put under the device-data/YYYY/MM/dd/HH folder. Kinesis Data Firehose has advanced features for data partitioning such as dynamic partitioning. The partitioning of the data is important when querying the data lake. For example, if you need to query the data per device basis by using Amazon Athena, scanning only the partition of the relevant device ID will significantly reduce the scan time and the cost. You can find details on partitioning in this
documentation . - RoleARN: this is the IAM role that gives PutObject permission to Kinesis Data Firehose to be able to put aggregated data into the Amazon S3 bucket.
- ProcessingConfiguration: As described in the use case, a transform Lambda function will enrich the IoT data with the metadata. Processing Configuration defines the processor which is a Lambda function in the example. For each batch of data, Kinesis Data Firehose will call this Lambda function for the transformation of the data. You can read more about data processing in this
documentation .
Transformation Lambda Function
As you can see in the following Python code, Kinesis Data Firehose returns a batch of records where each record is a payload from the IoT devices. First, the base64 encoded payload data is decoded. Then, the corresponding ship ID comes from the DynamoDB table based on the container ID. The payload is enriched with the ship ID and encoded back to base64. Lastly, the record list is returned back to Kinesis Data Firehose.
Once Kinesis Data Firehose receives the records, it puts them as an aggregated file into the Amazon S3 bucket.
import os
import boto3
import json
import base64
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['METADATA_TABLE'])
records = []
def function_handler(event, context):
for record in event["records"]:
# Get data field of the record in json format. It is a base64 encoded string.
json_data = json.loads(base64.b64decode(record["data"]))
container_id = json_data["containerId"]
# Get corresponding shipId from the DynamoDB table
res = table.get_item(Key={'containerId': container_id})
ddb_item = res["Item"]
ship_id = ddb_item["shipId"]
# Append shipId to the actual record data
enriched_data = json_data
enriched_data["shipId"] = ship_id
# Encode the enriched record to base64
json_string = json.dumps(enriched_data).encode("ascii")
b64_encoded_data = base64.b64encode(json_string).decode("ascii")
# Create a record with enriched data and return back to Firehose
rec = {'recordId': record["recordId"], 'result': 'Ok', 'data': b64_encoded_data}
records.append(rec)
return {'records': records}
Deployment
Run the following command in a terminal to deploy the stack.
aws cloudformation deploy --stack-name IoTKinesisDataPath --template-file IoTKinesisDataPath.yml --parameter-overrides IotKinesisRuleSQL="SELECT *, topic(3) as containerId FROM 'device/data/+'" --capabilities CAPABILITY_NAMED_IAM
After the deployment is complete, run the following command in a terminal to see the output of the deployment.
aws cloudformation describe-stacks --stack-name IoTKinesisDataPath
Note the IoTLogS3BucketName, MetadataTableName output parameters.
Testing
After the deployment is complete, first thing you need to do is to create a metadata item for data enrichment. Run the following command to create an item in the DynamoDB table. It will create an item with cont1 as containerId and ship1 as shipId. Replace IoTKinesisDataPath-MetadataTable-SAMPLE parameter with the DynamoDB table output parameter from the CloudFormation stack deployment.
aws dynamodb put-item --table-name IoTKinesisDataPath-MetadataTable-SAMPLE --item '{"containerId":{"S":"cont1"},"shipId":{"S":"ship1"}}'
In a real-life scenario, the devices publish the payloads to a specific MQTT topic. In this example, instead of creating IoT devices, you will use Amazon Web Services CLI to publish payloads to MQTT topics. Run the following command in a terminal to publish a sample data payload Amazon Web Services IoT Core. Pay attention to the payload field of the command, the only data provided by the device is the dynamic data.
aws iot-data publish --topic "device/data/cont1" --payload '{"temperature":20,"humidity":80,"latitude":0,"longitude":0}' --cli-binary-format raw-in-base64-out
Now, navigate to Amazon S3 from the
{“temperature”: 20, “humidity”: 80, “latitude”: 0, “longitude”: 0, “containerId”: “cont1”, “shipId”: “ship1”}
Troubleshooting
In case of failure in the system, the following resources can be useful for analyzing the source of the problem.
To monitor Amazon Web Services IoT Core Rules Engine, you need to enable
Amazon Web Services Lambda can be monitored by using
In case of failure, Kinesis Data Firehose will create a processing-failed folder under the device-data prefix in the Amazon Web Services IoT Rules Engine action, transform Lambda function or Amazon S3 bucket. The details of the failure can be read as json objects. You can find more information in this
Clean up
To clean up the resources that have been created, first empty the Amazon S3 bucket. Run the following command by changing the bucket-name parameter with the name of the bucket deployed by the CloudFormation stack. Important: this command will delete all the data inside the bucket irreversibly.
aws s3 rm s3://bucket-name --recursive
Then, you can delete the CloudFormation stack by running the following command in a terminal.
aws cloudformation delete-stack --stack-name IoTKinesisDataPath
Conclusion
In this blog, you have learned a common pattern of enriching IoT payloads with metadata and storing cost effectively in a data lake in near real-time by using Amazon Web Services IoT Rules Engine and Amazon Kinesis Data Firehose delivery stream. The proposed solution and the CloudFormation template can be used as a baseline for a scalable IoT data ingestion architecture.
You can read further about
The mentioned AWS GenAI Services service names relating to generative AI are only available or previewed in the Global Regions. Amazon Web Services China promotes AWS GenAI Services relating to generative AI solely for China-to-global business purposes and/or advanced technology introduction.