使用 Amazon AppSync 事件实现应用程序现代化

作者: Ricardo Marques |

在当今快节奏的数字世界中,组织面临着应用程序现代化的挑战。一个常见的问题是如何在无需对客户端或前端进行实质性更改的情况下,从同步通信平稳过渡到异步通信。对应用程序进行现代化改造时,通常需要从同步通信模型迁移到异步通信模型。但是,这种过渡可能很复杂,尤其是当客户端或前端使用同步通信时。调整当前代码以进行异步通信需要大量的时间和资源。

Amazon AppSync Events 使您能够构建事件驱动的 API,从而在同步和异步通信模型之间架起桥梁,帮助应对这一挑战。借助 Amazon AppSync Events,您可以对后端架构进行现代化改造,以利用异步模式,同时保持与现有同步客户端的兼容性。

概述

该解决方案包括一个使用 Amazon AppSync Events 将客户端同步请求转换为异步后端请求的 API。

为了演示 API 和后端之间的集成,我正在使用异步 Amazon Step Functions 工作流程来模拟后端处理。此工作流程接收"姓名和姓氏"事件,等待 10 秒钟,然后将全名事件发布到 Amazon AppSync 事件频道。为了接收事件通知,该 API 订阅了 Amazon AppSync 频道。同时,后端异步处理事件。

图 1:使用 Amazon AppSync 事件将同步前端与异步后端集成的 API 表示法。

图 1:使用 Amazon AppSync 事件将同步前端与异步后端集成的 API 表示法。

  1. Amazon API Gateway 向 Amazon Lambda 发出同步请求并等待响应。
  2. Lambda 函数开始执行异步工作流程。
  3. 启动工作流程执行后,Lambda 连接到 Amazon AppSync 并创建一个接收异步通知的频道(频道是临时的,没有限制)。在这里,它使用工作流程执行 ID 为每个请求创建一个频道。
  4. 该工作流程异步执行,调用其他工作流程。
  5. 主工作流程完成后,它会向 Amazon AppSync 事件 API 发送一个包含处理结果的 POST 请求。POST 是在 Lambda 函数使用工作流程执行 ID 创建的频道上进行的。
  6. Amazon AppSync 接收 POST 请求并向订阅者发送通知,在本例中为 Lambda 函数。整个过程必须在您定义的 Lambda 函数的超时限制内完成。
  7. Lambda 将响应发送到 API Gateway,该网关一直在等待同步响应。

要更好地了解此解决方案中使用的事件 API WebSocket 协议,请参阅此 Amazon AppSync 文档。

你可以通过以下链接访问 GitHub 存储库:appSync_sync_async_Async_integration。

该存储库包含一个全面的 README 文件,可引导您完成设置和配置上述解决方案的过程。

先决条件

要完成本演练,您需要满足以下先决条件:

  • 一个亚马逊云科技账户
  • 创建所有指定资源所必需的亚马逊云科技权限
  • 使用相应证书配置的亚马逊云科技命令行接口 (亚马逊云科技 CLI)
  • 已安装 Terraform(版本 0.12 或更高版本)

这篇文章在 GitHub 上提供了包括 API Gateway 和 Step Functions 在内的完整代码,仅涵盖核心组件:Amazon AppSync 事件 API 和 Lambda 函数。

演练步骤

以下步骤将引导您完成此解决方案。

使用 API 密钥授权创建 Amazon AppSync 事件 API

Amazon AppSync 事件 API 允许使用 API 密钥、Amazon Cognito 用户池、Lambda 授权方、OIDC 或亚马逊云科技身份和访问管理 (IAM) 进行调用。此解决方案使用 API 密钥。

基础设施即代码 (IaC) 是使用 Terraform 创建的。但是,在撰写这篇文章时,还没有可用的 Terraform Amazon AppSync 事件 API 资源。因此,Amazon AppSync 事件 API 资源是使用 Amazon CloudFormation 制作的,由 Terraform 导入和实现。

在资源 aws:appSync:API 中,定义 API 名称和身份验证方法:

Resources:
  #Creating the AppSync Events API
  EventAPI:
    Type: AWS::AppSync::Api
    Properties:
      Name: SyncAsyncAPI
      EventConfig:
        AuthProviders:
          - AuthType: API_KEY
        ConnectionAuthModes:
          - AuthType: API_KEY
        DefaultPublishAuthModes:
          - AuthType: API_KEY
        DefaultSubscribeAuthModes:
          - AuthType: API_KEY
#Creating the Events API Namespace
  DefaultNamespace:
    Type: AWS::AppSync::ChannelNamespace
    Properties:
      Name: AsyncEvents
      ApiId: !GetAtt EventAPI.ApiId
  
  #Creating the Events API APIKey
  EventAPIKey:
    Type: AWS::AppSync::ApiKey
    Properties:
      ApiId: !GetAtt EventAPI.ApiId
      Expires: 1748950672
      Description: 'API Key for Event API'

  #Creating the SecretsManager to store the APIKey
  SecretsManagerAPIKey:
    Type: AWS::SecretsManager::Secret
    Properties:
      Name: 'AppSyncEventAPIKEY'
      SecretString: !GetAtt EventAPIKey.ApiKey

要让 Terraform 模板引用创建的主机 DNS、实时终端节点和密钥管理器,请输出它们:

Outputs:
  ApiARN:
    Description: 'The ARN ID'
    Value: !GetAtt EventAPI.ApiArn

  AppSyncHost:
    Description: 'The API Endpoint'
    Value: !GetAtt EventAPI.Dns.Http

  AppSyncRealTimeEndpoint:
    Description: 'The Real-time Endpoint'
    Value: !GetAtt EventAPI.Dns.Realtime

  SecretsManagerARN:
    Description: 'The ARN of the Secrets Manager entry'
    Value: !Ref SecretsManagerAPIKey

Amazon AppSync 事件 API 所需的关键信息是:

  1. 主机 DNS:此 DNS 用于通过 HTTP 发布请求向 API 频道发送事件。
  2. 实时终端节点:此终端节点是一个 WebSocket 终端节点,Lambda 函数连接到该终端节点以接收在 Amazon AppSync 频道中发布的事件。
  3. API 密钥:此密钥不仅用于发布 HTTP 请求,还用于连接和订阅 Amazon AppSync 频道。

Lambda 同步/异步 API

在此解决方案中,Lambda 函数运行两个任务:

  1. 启动异步工作流程
  2. 通过 WebSocket 订阅活动频道

要处理 WebSocket 连接,请使用 websocket-client lib,这是一款专为使用 WebSockets 而开发的强大 Python 库。

通过对工作流程名称和 Amazon AppSync 频道名称使用相同的 UUID 来保持请求隔离。

try:
        handler = WebSocketHandler()
        sfn_response = wf.start_workflow_async(event["body"])
        
        if sfn_response["status"] == "started":
            handler.execution_name = sfn_response["id"]
            handler.start_websocket_connection()
            
            return {
                'statusCode': 200,
                'body': json.dumps({ 
                        "id": handler.execution_name,
                        "nome completo": handler.final_name
                        })
            }
        else:
            raise ValueError("Workflow failed to start")

首先,要初始化 WebSocket 连接,必须定义子协议:

  • WEBSOCKET_PROTOCOL
  • 标题
    • 主机:Amazon AppSync 主机 DNS(即使使用 WebSocket 连接,也必须发送 HTTP 主机)
    • x-api-key:为事件 API 创建的 API 密钥
    • Sec-Websocket-Protocol:WEBSOCKET_PROTOCOL
def start_websocket_connection(self) -> None:
        try: 
            """Initialize and start WebSocket connection."""
            header_str = self._create_connection_header()
            
            self.ws = websocket.WebSocketApp(
                os.environ["API_URL"],
                subprotocols=[WEBSOCKET_PROTOCOL, f'header-{header_str}'],
                on_open=self.on_open,
                on_message=self.on_message,
                on_error=self.on_error,
                on_close=self.on_close
)
            self.ws.run_forever()
        except Exception as e:
            return e
def _create_connection_header(self) -> str:
        """Create and encode connection header."""
        connection_header = {
            "host": os.environ["API_HOST"],
            "x-api-key": APIKEY,
            "Sec-WebSocket-Protocol": WEBSOCKET_PROTOCOL
        }
        return base64.b64encode(json.dumps(connection_header).encode()).decode()

建立 WebSocket 连接后,必须发送第一条类型为 CONNECTION_INIT_TYPE 的 WebSocket 消息。

要订阅在 Step Functions 工作流程完成后通知我们的函数的频道,请发送第二条类型为 SUBSCRIBE_TYPE、ID、频道名称和授权的消息。

有关消息类型的更多信息,请阅读此 Amazon AppSync 文档。

def on_open(self, ws: websocket.WebSocketApp) -> None:
        try:
            """Handle WebSocket connection opening and send initial messages."""
            logger.info("Connection opened")
            
            # Send connection initialization
            connection_init = {"type": CONNECTION_INIT_TYPE}
            ws.send(json.dumps(connection_init))

            # Send subscription
            subscription_msg = {
                "type": SUBSCRIBE_TYPE,
                "id": self.execution_name,
                "channel": f"{os.environ["APPSYNC_NAMESPACE"]}/{self.execution_name}",
                "authorization": {
                    "x-api-key": APIKEY,
                    "host": os.environ["API_HOST"]
                }
            }
            
            logger.info("Sending subscription")
            ws.send(json.dumps(subscription_msg))
        except Exception as e:
            self.on_error = e

收到确认订阅的消息后,等待类型为 data 的消息。每当有这种类型的消息到达时,执行逻辑以确定工作流程是否成功执行,然后关闭连接。

def on_message(self, ws: websocket.WebSocketApp, message: str) -> None:
        """Handle incoming WebSocket messages."""
        logger.info("Message received: %s", message)
        try:
            message_dict = json.loads(message)
            required_keys = ["id", "type", "event"]
            
            if all(key in message_dict for key in required_keys):
                event_json = json.loads(message_dict["event"])
                
                if (message_dict["id"] == self.execution_name and 
                    message_dict["type"] == "data"):
                    
                    self.final_name = event_json["nome_completo"]
                    logger.info("Message received: %s", self.final_name)
                    logger.info("Successfully received return message")
                    logger.info("Ending processing")
                    
                    self.message_queue = {
                        "status": SUCCESS_STATUS,
                        "executionID": message_dict["id"]
                    }
                    ws.close()
        except json.JSONDecodeError as e:
            logger.error("Failed to parse message: %s", str(e))
        except Exception as e:
            logger.error("Error processing message: %s", str(e))

结论

在这篇文章中,您学习了如何使用事件驱动的架构和 Amazon AppSync Events 的功能将同步和异步通信模式集成到您的应用程序中。这使您可以对系统进行现代化改造,而无需对现有的前端代码库进行大量修改。浏览 GitHub 存储库中提供的演示和文档,以更深入地了解如何将 Amazon AppSync 事件应用于您的特定用例。

要了解有关无服务器架构和异步调用模式的更多信息,请参阅 Serverless Land。


*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您发展海外业务和/或了解行业前沿技术选择推荐该服务。