We use machine learning technology to do auto-translation. Click "English" on top navigation bar to check Chinese version.
Implementing architectural patterns with Amazon EventBridge Pipes
This post is written by Dominik Richter (Solutions Architect)
Architectural patterns help you solve recurring challenges in software design. They are blueprints that have been used and tested many times. When you design distributed applications,
This post shows you how to use
Content filter and message filter pattern
To remove unwanted messages and content, combine
In the following example, the goal is to retain only non-PII data from “ORDER”-events. To achieve this, you must remove all events that aren’t “ORDER” events. In addition, you must remove any field in the “ORDER” events that contain PII.
While you can use these patterns with various sources and targets, the following architecture shows this pattern with
Instead of using Pipes, you could connect the streams using an
Some situations require an enrichment function. For example, if your goal is to mask an attribute without removing it entirely. For example, you could replace the attribute “birthday” with an “age_group”-attribute.
In this case, if you use Pipes for integration, the Lambda function contains only your business logic. On the other hand, if you use Lambda for both integration and business logic, you do not pay for Pipes. At the same time, you add complexity to your Lambda function, which now contains integration code. This can increase its execution time and cost. Therefore, your priorities determine the best option and you should compare both approaches to make a decision.
To implement Pipes using the
const filterPipe = new pipes.CfnPipe(this, 'FilterPipe', {
roleArn: pipeRole.roleArn,
source: sourceStream.streamArn,
target: targetStream.streamArn,
sourceParameters: { filterCriteria: { filters: [{ pattern: '{"data" : {"event_type" : ["ORDER"] }}' }] }, kinesisStreamParameters: { startingPosition: 'LATEST' } },
targetParameters: { inputTemplate: '{"event_type": <$.data.event_type>, "currency": <$.data.currency>, "sum": <$.data.sum>}', kinesisStreamParameters: { partitionKey: 'event_type' } },
});
To allow access to source and target, you must assign the correct permissions:
const pipeRole = new iam.Role(this, 'FilterPipeRole', { assumedBy: new iam.ServicePrincipal('pipes.amazonaws.com') });
sourceStream.grantRead(pipeRole);
targetStream.grantWrite(pipeRole);
Message translator pattern
In an event-driven architecture, event producers and consumers are independent of each other. Therefore, they may exchange events of different formats. To enable communication, the events must be translated. This is known as the
If a computation is required to translate messages, use the enrichment step. The following architecture diagram shows how to accomplish this enrichment via API destinations. In the example, you can call an existing geocoding service to resolve addresses to coordinates.
There may be cases where the translation is purely syntactical. For example, a field may have a different name or structure.
You can achieve these translations without enrichment by using input transformers.
Here is the source code for the pipe, including the role with the correct permissions:
const pipeRole = new iam.Role(this, 'MessageTranslatorRole', { assumedBy: new iam.ServicePrincipal('pipes.amazonaws.com'), inlinePolicies: { invokeApiDestinationPolicy } });
sourceQueue.grantConsumeMessages(pipeRole);
targetStepFunctionsWorkflow.grantStartExecution(pipeRole);
const messageTranslatorPipe = new pipes.CfnPipe(this, 'MessageTranslatorPipe', {
roleArn: pipeRole.roleArn,
source: sourceQueue.queueArn,
target: targetStepFunctionsWorkflow.stateMachineArn,
enrichment: enrichmentDestination.apiDestinationArn,
sourceParameters: { sqsQueueParameters: { batchSize: 1 } },
});
Normalizer pattern
The
The example shows a system where different source systems store the name property differently. To process the messages differently based on their source, use an
In the example, you unify the events using Step Functions before putting them on your event bus. As is often the case with architectural choices, there are alternatives. Another approach is to introduce separate queues for each source system, connected by its own pipe containing only its unification actions.
This is the source code for the normalizer pattern using a Step Functions workflow as enrichment:
const pipeRole = new iam.Role(this, 'NormalizerRole', { assumedBy: new iam.ServicePrincipal('pipes.amazonaws.com') });
sourceQueue.grantConsumeMessages(pipeRole);
enrichmentWorkflow.grantStartSyncExecution(pipeRole);
normalizerTargetBus.grantPutEventsTo(pipeRole);
const normalizerPipe = new pipes.CfnPipe(this, 'NormalizerPipe', {
roleArn: pipeRole.roleArn,
source: sourceQueue.queueArn,
target: normalizerTargetBus.eventBusArn,
enrichment: enrichmentWorkflow.stateMachineArn,
sourceParameters: { sqsQueueParameters: { batchSize: 1 } },
});
Claim check pattern
To reduce the size of the events in your event-driven application, you can temporarily remove attributes. This approach is known as the
The claim check pattern has two parts. First, when an event is received, you split it and store the payload elsewhere. Second, when the event is processed, you retrieve the relevant information. You can implement both aspects with a pipe.
In the first pipe, you use the enrichment to split the event, in the second to retrieve the payload. Below are several enrichment options, such as using an external API via API Destinations, or using
Using a pipe to split and retrieve messages has three advantages. First, you keep events concise as they move through the system. Second, you ensure that the event contains all relevant information when it is processed. Third, you encapsulate the complexity of splitting and retrieving within the pipe.
The following code implements a pipe for the claim check pattern using the CDK:
const pipeRole = new iam.Role(this, 'ClaimCheckRole', { assumedBy: new iam.ServicePrincipal('pipes.amazonaws.com') });
claimCheckLambda.grantInvoke(pipeRole);
sourceQueue.grantConsumeMessages(pipeRole);
targetWorkflow.grantStartExecution(pipeRole);
const claimCheckPipe = new pipes.CfnPipe(this, 'ClaimCheckPipe', {
roleArn: pipeRole.roleArn,
source: sourceQueue.queueArn,
target: targetWorkflow.stateMachineArn,
enrichment: claimCheckLambda.functionArn,
sourceParameters: { sqsQueueParameters: { batchSize: 1 } },
targetParameters: { stepFunctionStateMachineParameters: { invocationType: 'FIRE_AND_FORGET' } },
});
Conclusion
This blog post shows how you can implement four enterprise integration patterns with
You can find the source code for all the patterns on the
For more serverless learning resources, visit
The mentioned AWS GenAI Services service names relating to generative AI are only available or previewed in the Global Regions. Amazon Web Services China promotes AWS GenAI Services relating to generative AI solely for China-to-global business purposes and/or advanced technology introduction.