Sansan Tech Blog

Sansanのものづくりを支えるメンバーの技術やデザイン、プロダクトマネジメントの情報を発信

Firehoseを介したS3への中間データ保管と検索性強化

はじめに

こんにちは、研究開発部 Architectグループの辻田です。

SansanではユーザーフィードバックがSlackのフィードバックチャンネルに集まります。研究開発部ではこれらからモデルやアルゴリズムなどの改善をしています。

そのため、より多くのフィードバックに早く対応することが改善サイクルを回していく上で重要な活動となります。

本記事では、改善対象のデータ抽出を効率化するために、Amazon Data Firehoseを使用してS3に中間データをストリーミングし、Athenaを使ってデータ検索を実現する方法を紹介します。さらに、パーティショニングの適用によって検索性を向上させる手法も紹介します。

この記事は、【R&D DevOps通信】の連載記事のひとつです。

背景

フィードバック対応の手順は前任者がメモ程度に残したもので整備されておらず、手順自体も煩雑で属人性の高い作業になっていました。

具体的には、

  • フィードバック内容にある文字列からAthenaで検索
  • Athenaで特定したユニークIDを元にS3上の該当データを探す
  • S3上のデータをダウンロードしてからローカルでデバッグする

といった内容でした。また、S3のファイルは別チームが管理しており、データ連携が自動化されていない状況でした。そのためデータが欲しいときは別チームに連絡を取り、手動でdumpしてもらわないとAthenaで検索ができず、コミュニケーションコストとリードタイムがかさばっていました。

目的

フィードバックから必要なデータの収集〜デバッグを容易にできる仕組みを作り、1日以内には原因の特定、修正が完了できる状態にすることを目指しました。

アーキテクチャ

アーキテクチャは以下です。抽出APIの中間データをFirehose経由で出力し、Athenaから検索できるようにしました。クエリを実行してデバッグに必要なデータを取得するスクリプトはコンテナ化して、誰の環境でもすぐ調査ができるようにしました。

※ 今回のサービスで扱うデータは一部ログに出力できない事情があったため、Firehoseに直接PUTしています。特別な事情がない場合はログに出力し、CloudWatch Logs -> Firehoseに連携するほうが、アプリケーションがFirehoseに依存せず綺麗だと思います。

Amazon Data FirehoseとS3の統合

Amazon Data Firehoseは、リアルタイムでのデータストリーミングを可能にし、さまざまなデータストアにデータを配信します。2024/02に、サービス名がAmazon Kinesis Data Firehose から Amazon Data Firehose に変更されました。

詳しくはドキュメントを参照ください。 https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/what-is-this-service.html

本プロジェクトでは、Firehoseを使用して抽出APIの中間データをS3にストリーミングします。これにより、データのリアルタイム処理と効果的な保存が可能となります。

バッファサイズとバッファ間隔

S3へのデータ配信頻度は、バッファサイズ (1 MB~128 MB) とバッファ間隔 (60~900 秒) の値を指定できます。最初に満たされた条件によってS3へのデータ配信がトリガーされます。

S3に配信したファイルはAthenaから高速にクエリする必要があります。ファイルが細切れだとクエリのパフォーマンスが下がってしまうため、ある程度のサイズごとにまとめることが重要です。

ファイルサイズが非常に小さい場合、特に 128MB 未満の場合には、実行エンジンは S3ファイルのオープン、ディレクトリのリスト表示、オブジェクトメタデータの取得、データ転送のセットアップ、ファイルヘッダーの読み込み、圧縮ディレクトリの読み込み、といった処理に余分な時間がかかります。
引用:Amazon Athena のパフォーマンスチューニング Tips トップ 10 | Amazon Web Services ブログ

JSON Lines形式

JSON Linesは、複数のJSONオブジェクトを1行ずつ記述した形式であり、データの取り扱いが容易で効率的です。JSONオブジェクト間に改行を挿入してバッファごとにまとまったファイルを出力します。

Terraformを使用したFirehoseの構築方法

以下は、TerraformでS3バケットを作成し、Firehoseを設定して中間データをS3にストリーミングするコードの例です。

まずはS3と、FirehoseがS3にファイルを配信するためのIAMロールを作成します。

# S3バケット
resource "aws_s3_bucket" "example_bucket" {
  bucket = "example-bucket"
  acl    = "private"
}

# IAMロール
resource "aws_iam_role" "example_role" {
  name               = "firehose-role"
  assume_role_policy = data.aws_iam_policy_document.example_assume_role_policy_document.json
}

# ポリシーのアタッチ
resource "aws_iam_role_policy" "example_policy" {
  name   = "${aws_iam_role.example_role.name}-policy"
  role   = aws_iam_role.example_role.id
  policy = data.aws_iam_policy_document.example_policy_document.json
}

# IAMポリシー
data "aws_iam_policy_document" "example_assume_role_policy_document" {
  statement {
    effect  = "Allow"
    actions = ["sts:AssumeRole"]
    principals {
      type        = "Service"
      identifiers = ["firehose.amazonaws.com"]
    }
  }
}

# IAMポリシー
data "aws_iam_policy_document" "example_policy_document" {
  statement {
    actions = [
      "s3:AbortMultipartUpload",
      "s3:GetBucketLocation",
      "s3:GetObject",
      "s3:ListBucket",
      "s3:ListBucketMultipartUploads",
      "s3:PutObject",
    ]
    resources = [
      aws_s3_bucket.example_bucket.arn,
      "${aws_s3_bucket.example_bucket.arn}/*"
    ]
  }
}

次にFirehoseを作成します。

# Firehose
resource "aws_kinesis_firehose_delivery_stream" "example_stream" {
  name        = "example-stream""
  destination = "extended_s3"

  extended_s3_configuration {
    role_arn            = aws_iam_role.example_role.arn
    bucket_arn          = aws_s3_bucket.example_bucket.arn
    prefix              = "!{partitionKeyFromQuery:category}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/"
    error_output_prefix = "errors/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/!{firehose:error-output-type}/"
    buffer_size         = 128 # MB
    buffer_interval     = 900 # 15分

    processing_configuration {
      enabled = "true"
      processors {
        type = "RecordDeAggregation"
        parameters {
          parameter_name  = "SubRecordType"
          parameter_value = "JSON"
        }
      }
      processors {
        type = "MetadataExtraction"
        parameters {
          parameter_name  = "MetadataExtractionQuery"
          parameter_value = "{category:.category}"
        }
        parameters {
          parameter_name  = "JsonParsingEngine"
          parameter_value = "JQ-1.6"
        }
      }
      processors {
        type = "AppendDelimiterToRecord"
      }
    }

    dynamic_partitioning_configuration {
      enabled = true
    }
  }
}

prefixでS3のパスの指定、バッファサイズ、バッファ間隔の指定や、オブジェクト間の区切りなどを指定して作成します。

各設定値の詳細はTerraformのドキュメントを参照ください。 https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/kinesis_firehose_delivery_stream.html

Athenaによる検索の実現

Athenaは、S3に保存されたデータに対してSQLクエリを使って分析を行うことができるサービスです。FirehoseがS3にストリーミングした中間データに対し、Athenaを介して効果的に検索できます。これにより、データの柔軟な分析と検索が可能となります。

パーティショニングによる検索性の強化

パーティショニングは、データを論理的なセクションに分割し、検索性を向上させる手法です。データを年月日でパーティション分割しました。これによりクエリの効率が向上し、検索速度が向上します。

Athenaでは Apache Hiveスタイルのパーティションを使用できます。このスタイルのパスには、等号で連結されたキーと値のペア (year=2024/month=03/day=01/...) が含まれています。Hiveスタイルを利用したほうがCREATE TABLRE->ロード(MSCK REPAIR TABLEコマンド)で簡単にパーティションを設定できるようです。

今回はFirehose側の設定をHiveではないスタイル(2024/03/01/...)で出力するように作成していたので、非Hiveでの設定方法を紹介します。

DDLは以下です。

CREATE EXTERNAL TABLE `sample`(
  `id` string, 
  `name` string)
PARTITIONED BY ( 
  `year` int, 
  `month` int, 
  `day` int)
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
WITH SERDEPROPERTIES ( 
  'case.insensitive'='TRUE', 
  'dots.in.keys'='FALSE', 
  'ignore.malformed.json'='FALSE', 
  'mapping'='TRUE') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://sample'
TBLPROPERTIES (
  'classification'='json', 
  'projection.enabled'='true', 
  'projection.day.digits'='2', 
  'projection.day.range'='1,31', 
  'projection.day.type'='integer', 
  'projection.month.digits'='2', 
  'projection.month.range'='1,12', 
  'projection.month.type'='integer', 
  'projection.year.digits'='4', 
  'projection.year.range'='2023,2100', 
  'projection.year.type'='integer', 
  'storage.location.template'='s3://sample/${year}/${month}/${day}')

年月日でパーティショニングしたいため、year, month, dayをPARTITIONED BYで宣言します。

PARTITIONED BY ( 
  `year` int, 
  `month` int, 
  `day` int)

TBLPROPERTIESでそれぞれのパーティションキーについての詳細を設定します。

projection.*がパーティショニングで使用されるカスタムプロパティです。Athenaがテーブルでクエリを実行するときに、どのパーティションパターンを期待すべきかを把握できるようにします。

  'classification'='json', 
  'projection.enabled'='true', 
  'projection.day.type'='integer',
  'projection.day.range'='1,31',
  'projection.day.digits'='2', 
  'projection.month.type'='integer', 
  'projection.month.range'='1,12', 
  'projection.month.digits'='2', 
  'projection.year.type'='integer', 
  'projection.year.range'='2023,2100',
  'projection.year.digits'='4', 
  'storage.location.template'='s3://sample/${year}/${month}/${day}'
  • projection.columnName.type: キーの型
  • projection.columnName.range: 最小値と最大値の範囲。2要素のカンマ区切りリストで表す。
  • projection.columnName.digits: 値の桁数

詳しくは以下を参照ください。 https://docs.aws.amazon.com/ja_jp/athena/latest/ug/partition-projection-supported-types.html

まとめ

クエリを実行してデバッグに必要なデータを取得するスクリプトは、簡単なものなので紹介を省きます。以上でフィードバックから必要なデータの収集〜デバッグを容易にできる仕組みが完成しました。

実際にフィードバック対応をやってみた結果、クエリの実行時間は3秒ほどで、すぐに必要なデータを取得できました。(実はFirehoseを使う前はリクエストごとの細切れのファイルでした。クエリに10分超かかっていたので大幅な改善です)

これで目的通り、原因の特定、修正をして1日以内にプルリクエストが出せるようになりました。

中間データを保存しておき後から検索したい場面があれば、参考にしてみてください。 以上、最後まで読んでいただきありがとうございました。

Architectグループでは一緒に働く仲間を募集しています。

R&D MLOps/DevOpsエンジニア



20240312182329

© Sansan, Inc.