この記事は「フリュー Advent Calendar 2023」13日目の記事です。
はじめに
こんにちは、ピクトリンク事業部の竹本です。
最近業務でサーバレスアプリケーションを構築していることもあり、学習の一環としてAPI GatewayのWebSocket APIを用いてチャットアプリケーションを作成してみました。
機能
複数のブラウザで同じURLにアクセスすることで、アクセスした人同士でチャットを送り合うことができます。
また、パスパラメータがチャットルームを識別するIDとなっており、https://sample.cloudfront.net/{room_id}
の{room_id}
部分を変えることで、異なる部屋を作成することができます。
また、左上のハンバーガーメニューを開いて、表示名を変更することができます。
機能としては以上の、非常にシンプルなアプリとなっています。
アーキテクチャ構成
今回作成したアプリのアーキテクチャ構成は下図のようになります。
また、それぞれのサービスの役割を下に記載します。
フロントエンド
S3・CloudFront
- SPAで構築したフロントエンドアプリのホスティングのために使用
バックエンド
API Gateway・Lambda
- WebSocket APIのエンドポイント作成のために使用
- 各Lambda Handlerの説明
- on-connect/on-disconnect
- ソケット接続時と切断時に実行される処理を記述しています。ここでは、接続毎のconnectionIdをDynamoDBに保存・削除する処理を行なっています。
- on-enter-room
- ユーザーがチャットルームに入ったときと、ユーザー名が変更されたときに実行されるカスタムルートです。グループIDやユーザーIDの保存を行います
- on-mesasage
- メッセージが送信されたときに実行されるカスタムルートです。グループIDに基づいてメッセージを配信します。
- on-connect/on-disconnect
DynamoDB
- connectionId・グループID・ユーザー名の永続化のために使用
アプリケーションの実装について
- アプリケーションのインフラ部分の構築は、AWS CDKを用いてIaCで構築しました。また、フロントエンドは
client/
ディレクトリ内に、Reactを用いて構築しました。 - 下記が、ディレクトリ構成(抜粋)になります。
. ├── bin │ └── serverless-chat-app.ts ├── cdk.json ├── cdk.out ├── client │ ├── README.md │ ├── node_modules │ ├── package-lock.json │ ├── package.json │ ├── public │ └── src │ ├── App.css │ ├── App.js │ ├── component │ │ └── Chat.jsx │ ... ├── jest.config.js ├── lib │ ├── database-stack.ts │ ├── s3-stack.ts │ └── websocket-api-stack.ts ├── node_modules ├── package-lock.json ├── package.json ├── resources │ └── handlers │ ├── on-connect.ts │ ├── on-disconnect.ts │ ├── on-enter-room.ts │ └── on-message.ts ├── test └── tsconfig.json
ソースコード(抜粋)
websocket-api-stack.ts
AWS上にWebSocket APIを構築するためのAPI Gatewayの定義、および、それらを構築するためのLambda関数の定義を行なっています。
import { WebSocketLambdaIntegration } from "@aws-cdk/aws-apigatewayv2-integrations-alpha"; import * as cdk from "aws-cdk-lib"; import { Runtime } from "aws-cdk-lib/aws-lambda"; import { Construct } from "constructs"; import { WebSocketApi, WebSocketStage } from "@aws-cdk/aws-apigatewayv2-alpha/lib/websocket"; import { RetentionDays } from "aws-cdk-lib/aws-logs"; export interface WebSocketApiStackProps extends cdk.StackProps { readonly table: cdk.aws_dynamodb.Table; } export class WebSocketApiStack extends cdk.Stack { constructor(scope: Construct, id: string, props: WebSocketApiStackProps) { super(scope, id, props); // Lambda handler definition const connectHandler = new cdk.aws_lambda_nodejs.NodejsFunction(this, "OnConnectHandler", { environment: { CONNECTIONS_TABLE_NAME: props.table.tableName, }, runtime: Runtime.NODEJS_18_X, entry: "resources/handlers/on-connect.ts", logRetention: RetentionDays.ONE_WEEK, }); const disconnectHandler = new cdk.aws_lambda_nodejs.NodejsFunction(this, "OnDisconnectHandler", { environment: { CONNECTIONS_TABLE_NAME: props.table.tableName, }, runtime: Runtime.NODEJS_18_X, entry: "resources/handlers/on-disconnect.ts", logRetention: RetentionDays.ONE_WEEK, }); const onMessageHandler = new cdk.aws_lambda_nodejs.NodejsFunction(this, "OnMessageHandler", { environment: { CONNECTIONS_TABLE_NAME: props.table.tableName, }, runtime: Runtime.NODEJS_18_X, entry: "resources/handlers/on-message.ts", logRetention: RetentionDays.ONE_WEEK, }); const onEnterRoomHandler = new cdk.aws_lambda_nodejs.NodejsFunction(this, "OnEnterRoomHandler", { environment: { CONNECTIONS_TABLE_NAME: props.table.tableName, }, runtime: Runtime.NODEJS_18_X, entry: "resources/handlers/on-enter-room.ts", logRetention: RetentionDays.ONE_WEEK, }); props.table.grantReadWriteData(connectHandler); props.table.grantReadWriteData(disconnectHandler); props.table.grantReadWriteData(onEnterRoomHandler); props.table.grantReadWriteData(onMessageHandler); // WebSocket API definition const webSocketApi = new WebSocketApi(this, "MessageApi", { routeSelectionExpression: "$request.body.action", connectRouteOptions: { integration: new WebSocketLambdaIntegration("MessageApiConnectIntegration", connectHandler), }, disconnectRouteOptions: { integration: new WebSocketLambdaIntegration("MessageApiDisconnectIntegration", disconnectHandler), }, }); webSocketApi.addRoute("on-enter-room", { integration: new WebSocketLambdaIntegration("MessageApiSendIntegration", onEnterRoomHandler), }); webSocketApi.addRoute("on-message", { integration: new WebSocketLambdaIntegration("MessageApiSendIntegration", onMessageHandler), }); webSocketApi.grantManageConnections(onEnterRoomHandler); webSocketApi.grantManageConnections(onMessageHandler); new WebSocketStage(this, "MessageApiProd", { webSocketApi, stageName: "test", autoDeploy: true, }); } }
on-message.ts
WebSocket接続において受信したメッセージを同じグループに属する他の接続にブロードキャストしています。
DynamoDBテーブルを使用して接続情報を管理し、各接続に対して送信元と同じグループIDであるか検証し、同じグループの場合にApiGatewayManagementApi
を介してメッセージを送信しています。
import { APIGatewayProxyResult, APIGatewayProxyWebsocketHandlerV2 } from "aws-lambda"; import { DynamoDBClient } from "@aws-sdk/client-dynamodb"; import { ScanCommand } from "@aws-sdk/lib-dynamodb"; import * as AWS from "aws-sdk"; const ddb = new DynamoDBClient(); export const handler: APIGatewayProxyWebsocketHandlerV2 = async (event) => { let response: APIGatewayProxyResult = { statusCode: 200, body: "OK" }; let errorResponse: APIGatewayProxyResult = { statusCode: 500, body: "internal server error" }; const groupId = JSON.parse(event.body ?? "{}").groupId; let connections; // 全ての接続を取得 try { connections = ( await ddb.send(new ScanCommand({ TableName: process.env.CONNECTIONS_TABLE_NAME ?? "" })) ).Items as { connectionId: string, groupId?: string, username?: string }[]; } catch (e) { console.warn(e); return errorResponse; } // 送信者名を取得 const senderName = connections.filter(item => item.connectionId === event.requestContext.connectionId && item.groupId == groupId).map(item => item.username); const sendMessages = connections.map(async (item) => { let isOwnMessage = false; // 送信者自身の接続の時判別フラグを立てる if (item.connectionId === event.requestContext.connectionId) isOwnMessage = true; // リクエストと同じグループIDに属する接続全員にメッセージを送信する if (item.groupId === groupId) { await new AWS.ApiGatewayManagementApi({ apiVersion: '2018-11-29', endpoint: event.requestContext.domainName + "/" + event.requestContext.stage }) .postToConnection({ ConnectionId: item.connectionId, Data: JSON.stringify({ message: JSON.parse(event.body ?? "{}").message, username: senderName, isOwnMessage: isOwnMessage }) }) .promise() .catch(e => console.warn(e)); } }); await Promise .all(sendMessages) .catch((e) => { console.warn(e); return errorResponse; }); return response; };
Chat.jsx
WebSocketを使用してAWS Lambdaでホストされるサーバーレスチャットアプリケーションに接続し、パスパラメータで指定されたグループに入室し、メッセージの送受信を行う機能を提供するコンポーネントです。
useEffectを用いてコンポーネントがマウントされたときにWebSocket接続を確立し、アンマウントされたときに接続を閉じています。
function Chat() { const [ws, setWs] = useState(null); const [messages, setMessages] = useState([]); const [input, setInput] = useState(''); const [username, setUsername] = useState(''); const { groupId } = useParams();//パスパラメータを取得 useEffect(() => { // Open websocket connection. const websocket = new WebSocket('wss://sample.execute-api.ap-northeast-1.amazonaws.com/test/'); setWs(websocket); websocket.addEventListener("open", (event) => { if (websocket) { websocket.send(JSON.stringify({ action: "on-enter-room", groupId: groupId ?? "dummy", username: username ?? "no_name", })); } }); websocket.onmessage = (event) => { setMessages((prevMessages) => [...prevMessages, event.data]); }; return () => { websocket.close(); }; }, []); //usernameが更新された時に実行される useEffect(() => { if (ws) { ws.send(JSON.stringify({ action: "on-enter-room", groupId: groupId ?? "dummy", username: username ?? "no_name", })); } }, [username]); //メッセージ送信ボタンを押した時に実行される const sendMessage = () => { if (ws) { ws.send(JSON.stringify({ action: "on-message", message: input, groupId: groupId })); setInput(''); } }; //以下略
まとめ
今回はAPI GateWay + Lambdaを用いて、WebSocketを用いたチャットアプリケーションをAWSマネージドの環境で構築してみました。
感想としては、アーキテクチャの構成を理解した後は従来のREST APIベースの構成と同じ感覚で実装していけたかなという感じです。また、通常のREST APIを用いたサーバレス構成や認証機能等と組み合わせることで、より多くの機能を実現できそうだと感じました。
また、開発者体験的な話では、変更のたびにデプロイしてログを確認するのが手間だと感じたので、あらかじめDynamoDB LocalやAWS SAM等を利用してローカル実行環境を用意しておいた方がストレスフリーに開発できそうだと思いました。