FURYU Tech Blog - フリュー株式会社

フリュー株式会社の開発者が技術情報を発信するブログです。

Lambda(Python)を使ってCloudWatchLogsをS3にエクスポートする

はじめに

ピクトリンク事業部でSREエンジニアをしている山根です。
今回は、Lambdaを使ってCloudWatchLogsをS3にエクスポートする方法をご紹介します。
CloudWatchLogsをS3にエクスポートするということで、CreateExportTaskを利用します。 docs.aws.amazon.com また、CreateExportTaskが完了したかどうかをチェックするためにDescribeExportTasksを利用します。 docs.aws.amazon.com

構成の全体像

今回の構造の全体像は、AWS CloudFormationとして、以下のような構成になります。

Parameters:
  LambdaFunctionS3Bucket:
    NoEcho: "true"
    Type: "String"
    Description:
      "The Amazon S3 key of the deployment package.\nThis property can\
      \ be replaced with other exclusive properties"
  LambdaFunctionS3Key:
    NoEcho: "true"
    Type: "String"
    Description:
      "An Amazon S3 bucket in the same AWS-Region as your function. The\
      \ bucket can be in a different AWS-account.\nThis property can be replaced with\
      \ other exclusive properties"
Resources:
  IAMRoleLambdaRole:
    UpdateReplacePolicy: "Retain"
    Type: "AWS::IAM::Role"
    DeletionPolicy: "Retain"
    Properties:
      Path: "/service-role/"
      ManagedPolicyArns:
        - "arn:aws:iam::0123456789:policy/service-role/AWSLambdaBasicExecutionRole-XXXXX"
      MaxSessionDuration: 3600
      RoleName: "export-logs-role-abcdefg"
      Policies:
        - PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Resource: "arn:aws:logs:*:0123456789:log-group:*"
                Action: "logs:CreateExportTask"
                Effect: "Allow"
                Sid: "Statement1"
              - Resource: "arn:aws:logs:*:0123456789:log-group:*"
                Action:
                  - "logs:DescribeExportTasks"
                Effect: "Allow"
                Sid: "Statement2"
          PolicyName: "CloudWatchLogsExportPolicy"
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Action: "sts:AssumeRole"
            Effect: "Allow"
            Principal:
              Service: "lambda.amazonaws.com"
  LambdaFunctionRole:
    UpdateReplacePolicy: "Retain"
    Type: "AWS::Lambda::Function"
    DeletionPolicy: "Retain"
    Properties:
      MemorySize: 128
      Description: ""
      TracingConfig:
        Mode: "PassThrough"
      Timeout: 900
      RuntimeManagementConfig:
        UpdateRuntimeOn: "Auto"
      Handler: "lambda_function.lambda_handler"
      Code:
        S3Bucket:
          Ref: "LambdaFunctionS3Bucket"
        S3Key:
          Ref: "LambdaFunctionS3Key"
      Role:
        Fn::GetAtt:
          - "IAMRoleLambdaRole"
          - "Arn"
      FileSystemConfigs: []
      FunctionName: "export-logs"
      Runtime: "python3.12"
      PackageType: "Zip"
      LoggingConfig:
        LogFormat: "Text"
        LogGroup: "/aws/lambda/export-logs"
      EphemeralStorage:
        Size: 512
      Architectures:
        - "x86_64"
  S3BucketPolicytestlogs:
    UpdateReplacePolicy: "Retain"
    Type: "AWS::S3::BucketPolicy"
    DeletionPolicy: "Retain"
    Properties:
      Bucket: "test-logs"
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Condition:
              ArnLike:
                aws:SourceArn: "arn:aws:logs:ap-northeast-1:0123456789:log-group:*"
            Resource: "arn:aws:s3:::test-logs"
            Action: "s3:GetBucketAcl"
            Effect: "Allow"
            Principal:
              Service: "logs.ap-northeast-1.amazonaws.com"
            Sid: "Statement1"
          - Condition:
              ArnLike:
                aws:SourceArn: "arn:aws:logs:ap-northeast-1:0123456789:log-group:*"
            Resource: "arn:aws:s3:::test-logs/*"
            Action: "s3:PutObject"
            Effect: "Allow"
            Principal:
              Service: "logs.ap-northeast-1.amazonaws.com"
            Sid: "Statement2"

Roleの設定

CloudWatchLogsをS3にエクスポートするCreateExportTask、CreateExportTaskの状態を取得するDescribeExportTasks
それぞれを利用するために、LambdaのRoleに以下のポリシーを追加します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Statement1",
            "Effect": "Allow",
            "Action": "logs:CreateExportTask",
            "Resource": "arn:aws:logs:*:0123456789:log-group:*"
        },
        {
            "Sid": "Statement2",
            "Effect": "Allow",
            "Action": [
                "logs:DescribeExportTasks"
            ],
            "Resource": "arn:aws:logs:*:0123456789:log-group:*"
        }
    ]
}

また、CreateExportTaskのエクスポート先となるS3バケット(ここでは、test-logsとします)に対して、
バケット情報の取得とログファイルをバケットにアップロードするために、以下のようなバケットポリシーを設定する必要があります。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Statement1",
            "Effect": "Allow",
            "Principal": {
                "Service": "logs.ap-northeast-1.amazonaws.com"
            },
            "Action": "s3:GetBucketAcl",
            "Resource": "arn:aws:s3:::test-logs",
            "Condition": {
                "ArnLike": {
                    "aws:SourceArn": "arn:aws:logs:ap-northeast-1:0123456789:log-group:*"
                }
            }
        },
        {
            "Sid": "Statement2",
            "Effect": "Allow",
            "Principal": {
                "Service": "logs.ap-northeast-1.amazonaws.com"
            },
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3:::test-logs/*",
            "Condition": {
                "ArnLike": {
                    "aws:SourceArn": "arn:aws:logs:ap-northeast-1:0123456789:log-group:*"
                }
            }
        }
    ]
}

Lambda関数

エクスポート対象とするCloudWatchLogsのロググループをtest/logs/access-log,test/logs/application-logと仮定し、実行日の前日分をエクスポートすることにします。
また、エクスポート先のS3バケットは、test-logsとします。

以上を踏まえ、関数は以下のようになります。

import boto3
import datetime
import zoneinfo
import time

client = boto3.client('logs')

def lambda_handler(event, context):
    # タイムゾーン情報を取得
    tokyo = zoneinfo.ZoneInfo("Asia/Tokyo")
    # JSTの現在日時をdateで取得
    today = datetime.datetime.now(tokyo).date()
    yesterday = (today + datetime.timedelta(days=-1))
    start_time = int(time.mktime(yesterday.timetuple()) * 1000)
    end_time = int(time.mktime(today.timetuple()) * 1000)
    # S3のPrefix(yyyyMM/yyyyMMdd形式)
    s3_prefix = yesterday.strftime('%Y%m') + "/" + yesterday.strftime('%Y%m%d')

    # アクセスログをS3にエクスポート
    export_access_log_task_id = export_log(
        logGroupName="/test/logs/access-log",
        destinationPrefix="access-log/" + s3_prefix,
        fromTime=start_time,
        toTime=end_time
    )

    # アプリケーションログをS3にエクスポート
    export_application_log_task_id = export_log(
        logGroupName="/test/logs/application-log",
        destinationPrefix="application-log/" + s3_prefix,
        fromTime=start_time,
        toTime=end_time
    )

    return {
        'statusCode': 200
    }

def export_log(logGroupName, destinationPrefix, fromTime, toTime):
    # S3にエクスポート
    export_log_task = client.create_export_task(
        logGroupName=logGroupName,
        fromTime=fromTime,
        to=toTime,
        destination="test-logs",
        destinationPrefix=destinationPrefix
    )
    # エクスポートタスクIDを取得
    export_log_task_id = export_log_task['taskId']
    # エクスポートタスクIDからエクスポートタスクの詳細を取得
    export_log_task_desc = client.describe_export_tasks(taskId=export_log_task_id)
    # エクスポートタスクの状態を取得
    status = export_log_task_desc['exportTasks'][0]['status']['code']
    # S3へのエクスポートが完了するまで待つ
    while status != 'COMPLETED':
        export_log_task_desc = client.describe_export_tasks(
            taskId=export_log_task_id
        )
        status = export_log_task_desc['exportTasks'][0]['status']['code']

    return export_log_task_id

CreateExportTaskの説明に.
Each account can only have one active (RUNNING or PENDING) export task at a time.
と記載されている通り、 1回目の処理が完了してから2回目を実行しないとResource limit exceededとエラーが発生してしまいます。
そのため、DescribeExportTasksを使ってエスクポートが完了するまで待つようにしています。
また、CreateExportTaskは数分かかる場合もあるので、Lambdaのタイムアウト設定は長め(今回は、15分としています)にしておくことをおすすめします。
定期的に実行する場合は、EventBridgeを使うことで簡単に実現できます。

まとめ

Lambdaを使ってCloudWatchLogsをS3にエクスポートする方法を紹介しました。
CreateExportTaskで期間を指定できるので、ニーズに合わせた柔軟な対応が可能で便利に使えます。
ただし、ロググループの保持期間を1日としている場合、CreateExportTaskで期間を1週間と指定したとしても1日分しか取得できないのでご注意ください。