AWS Compute Blog

Modernizing applications with AWS AppSync Events

In today’s fast-paced digital world, organizations are facing challenges for modernizing their applications. A common problem is the smooth shift from synchronous to asynchronous communication without substantial client or frontend alterations. When modernizing applications, it is often necessary to move from a synchronous communication model to an asynchronous one. However, this transition can be complex, especially when the client or frontend communicates synchronously. Adapting the current code for asynchronous communication demands significant time and resources.

AWS AppSync Events helps address this challenge by enabling you to build event-driven APIs that can bridge between synchronous and asynchronous communication models. With AppSync Events, you can modernize your backend architecture to leverage asynchronous patterns while maintaining compatibility with existing synchronous clients.

Overview

The solution comprises an API that converts client synchronous requests to asynchronous backend requests using AppSync Events.

For demonstrating the integration between the API and the backend, I’m simulating the backend processing using an asynchronous AWS Step Functions workflow. This workflow receives a Name and Surname event, waits 10 seconds, and posts a full-name event to the AppSync Event channel. To receive event notifications, the API subscribes to the AppSync channel. At the same time, the backend handles events asynchronously.

Figure 1: Representation of an API integrating a synchronous frontend with an asynchronous backend using AWS AppSync Events.

Figure 1: Representation of an API integrating a synchronous frontend with an asynchronous backend using AWS AppSync Events.

  1. The Amazon API Gateway makes a synchronous request to AWS Lambda and waits for the response.
  2. Lambda function starts the execution of the asynchronous workflow.
  3. After starting the workflow execution, Lambda connects to AppSync and creates a channel to receive asynchronous notifications (channels are ephemeral and unlimited. Here it creates one channel per request using the workflow execution ID).
  4. The workflow executes asynchronously, calling other workflows.
  5. Upon completion of the main workflow, it sends a POST request to the AppSync events API with the processing result. The POST is made to the channel that was created by the Lambda function using the workflow execution ID.
  6. AppSync receives the POST request and sends a notification to the subscriber, which in this case is the Lambda function. The entire process must be finished within the Lambda functions’s timeout limit you defined.
  7. Lambda sends the response to the API Gateway, which has been waiting for the synchronous response.

To better understand the Event API WebSocket Protocol used in this solution, refer to this AppSync documentation.

You can access the GitHub repo through this link: AppSync_Sync_Async_Integration.

The repository includes a comprehensive README file that walks you through the process of setting up and configuring the preceding solution.

Prerequisites

To follow this walkthrough, you need the following prerequisites:

With the full code, including API Gateway and Step Functions, on GitHub, this post only covers the core components: the AppSync Events API and the Lambda function.

Walkthrough

The following steps walk you through this solution.

Creating an AppSync event API with API Key Authorization

An AppSync Event API allows calls using API key, Amazon Cognito user pools, Lambda authorizer, OIDC, or AWS identity and Access Management (IAM). This solution uses API Key.

The infrastructure as code (IaC) has been created using Terraform. However, as of writing this post, there weren’t Terraform AppSync Event API resource available. Therefore, the AppSync Event API resources were made with AWS CloudFormation, which is imported and implemented by Terraform.

In the resource AWS:AppSync:Api, define the API name and Auth method:

Resources:
  #Creating the AppSync Events API
  EventAPI:
    Type: AWS::AppSync::Api
    Properties:
      Name: SyncAsyncAPI
      EventConfig:
        AuthProviders:
          - AuthType: API_KEY
        ConnectionAuthModes:
          - AuthType: API_KEY
        DefaultPublishAuthModes:
          - AuthType: API_KEY
        DefaultSubscribeAuthModes:
          - AuthType: API_KEY
#Creating the Events API Namespace
  DefaultNamespace:
    Type: AWS::AppSync::ChannelNamespace
    Properties:
      Name: AsyncEvents
      ApiId: !GetAtt EventAPI.ApiId
  
  #Creating the Events API APIKey
  EventAPIKey:
    Type: AWS::AppSync::ApiKey
    Properties:
      ApiId: !GetAtt EventAPI.ApiId
      Expires: 1748950672
      Description: 'API Key for Event API'

  #Creating the SecretsManager to store the APIKey
  SecretsManagerAPIKey:
    Type: AWS::SecretsManager::Secret
    Properties:
      Name: 'AppSyncEventAPIKEY'
      SecretString: !GetAtt EventAPIKey.ApiKey

To have the Host DNS, Realtime Endpoint, and Secret Manager created referenced by the Terraform template, output them:

Outputs:
  ApiARN:
    Description: 'The ARN ID'
    Value: !GetAtt EventAPI.ApiArn

  AppSyncHost:
    Description: 'The API Endpoint'
    Value: !GetAtt EventAPI.Dns.Http

  AppSyncRealTimeEndpoint:
    Description: 'The Real-time Endpoint'
    Value: !GetAtt EventAPI.Dns.Realtime

  SecretsManagerARN:
    Description: 'The ARN of the Secrets Manager entry'
    Value: !Ref SecretsManagerAPIKey

The key information needed from the AppSync Event API is:

  1. Host DNS: This DNS is used to send events to the API Channel through HTTP Post requests.
  2. Realtime endpoint: This endpoint is a WebSocket endpoint where the Lambda function connects to receive the events posted in the AppSync Channel.
  3. API Key: This key is used not only in the Post HTTP requests, but also to connect and subscribe to the AppSync channel.

Lambda Sync/Async API

In this solution, the Lambda function runs two tasks:

  1. Start an asynchronous workflow
  2. Subscribe to an event channel through WebSocket

To handle the WebSocket connection, use the websocket-client lib, which is a powerful Python lib developed for working with WebSockets.

Request isolation is maintained by using the same UUID for workflow name and AppSync channel name.

try:
        handler = WebSocketHandler()
        sfn_response = wf.start_workflow_async(event["body"])
        
        if sfn_response["status"] == "started":
            handler.execution_name = sfn_response["id"]
            handler.start_websocket_connection()
            
            return {
                'statusCode': 200,
                'body': json.dumps({ 
                        "id": handler.execution_name,
                        "nome completo": handler.final_name
                        })
            }
        else:
            raise ValueError("Workflow failed to start")

First, to initialize the WebSocket Connection, the subprotocols must be defined:

  • WEBSOCKET_PROTOCOL
  • Headers:
    • Host: The AppSync Host DNS (even with a WebSocket Connection, the HTTP Host must be sent)
    • x-api-key: The API key create fot the Event API.
    • Sec-Websocket-Protocol: WEBSOCKET_PROTOCOL
def start_websocket_connection(self) -> None:
        try: 
            """Initialize and start WebSocket connection."""
            header_str = self._create_connection_header()
            
            self.ws = websocket.WebSocketApp(
                os.environ["API_URL"],
                subprotocols=[WEBSOCKET_PROTOCOL, f'header-{header_str}'],
                on_open=self.on_open,
                on_message=self.on_message,
                on_error=self.on_error,
                on_close=self.on_close
)
            self.ws.run_forever()
        except Exception as e:
            return e
def _create_connection_header(self) -> str:
        """Create and encode connection header."""
        connection_header = {
            "host": os.environ["API_HOST"],
            "x-api-key": APIKEY,
            "Sec-WebSocket-Protocol": WEBSOCKET_PROTOCOL
        }
        return base64.b64encode(json.dumps(connection_header).encode()).decode()

Once the WebSocket connection is established, a first message with the type CONNECTION_INIT_TYPE must be sent.

To subscribe to the channel by which our function is notified when the Step Functions workflow finishes, send a second message with the type SUBSCRIBE_TYPE, an ID, the channel name and authorization.

For more information about types of message, read this AppSync documentation.

def on_open(self, ws: websocket.WebSocketApp) -> None:
        try:
            """Handle WebSocket connection opening and send initial messages."""
            logger.info("Connection opened")
            
            # Send connection initialization
            connection_init = {"type": CONNECTION_INIT_TYPE}
            ws.send(json.dumps(connection_init))

            # Send subscription
            subscription_msg = {
                "type": SUBSCRIBE_TYPE,
                "id": self.execution_name,
                "channel": f"{os.environ["APPSYNC_NAMESPACE"]}/{self.execution_name}",
                "authorization": {
                    "x-api-key": APIKEY,
                    "host": os.environ["API_HOST"]
                }
            }
            
            logger.info("Sending subscription")
            ws.send(json.dumps(subscription_msg))
        except Exception as e:
            self.on_error = e

After receiving the message confirming the subscription, wait for messages with the type data. Whenever a message with this type arrives, execute the logic to identify if the workflow was successfully executed, and then close the connection.

def on_message(self, ws: websocket.WebSocketApp, message: str) -> None:
        """Handle incoming WebSocket messages."""
        logger.info("Message received: %s", message)
        try:
            message_dict = json.loads(message)
            required_keys = ["id", "type", "event"]
            
            if all(key in message_dict for key in required_keys):
                event_json = json.loads(message_dict["event"])
                
                if (message_dict["id"] == self.execution_name and 
                    message_dict["type"] == "data"):
                    
                    self.final_name = event_json["nome_completo"]
                    logger.info("Message received: %s", self.final_name)
                    logger.info("Successfully received return message")
                    logger.info("Ending processing")
                    
                    self.message_queue = {
                        "status": SUCCESS_STATUS,
                        "executionID": message_dict["id"]
                    }
                    ws.close()
        except json.JSONDecodeError as e:
            logger.error("Failed to parse message: %s", str(e))
        except Exception as e:
            logger.error("Error processing message: %s", str(e))

Conclusion

In this post, you learned how to use event-driven architectures and the capabilities of AWS AppSync Events to integrate synchronous and asynchronous communication patterns in your applications. This allows you to modernize your systems without the need for extensive modifications to your existing frontend codebase. Explore the demonstrations and documentation provided in the GitHub repository to gain a deeper understanding of how AppSync Events can be applied to your specific use cases.

To learn more about serverless architectures and asynchronous invocation patterns, see Serverless Land.