Sansan Tech Blog

Sansanのものづくりを支えるメンバーの技術やデザイン、プロダクトマネジメントの情報を発信

AWS Glueを使ってバッチ処理を60倍高速化した話

初めまして、技術本部Digitization部データ化グループ所属の高田です。 今回はAWS GlueのJobを使ってバッチ処理を60倍高速化した話をします。

この記事は以下の内容を共有しています。

  • AWS Glueの概要とメリット
  • Apache Sparkの概要とメリット
  • Pythonを使ったAWS Glue ETL Jobの書き方(一例)

背景

Digitization部は、日々お客さまからお預かりしたアナログ画像をデータ化している部署です。 そのデータを、修正などのために一定期間保持しているのですが、保持しているデータは個人情報保護法に従い削除処理をしています。加えて、削除処理がなんらかの原因で動かなくなった時に備え、存在しているデータが全て保管対象であるかをチェックする処理(削除漏れチェック)もあります。

削除漏れチェックの処理の中には、実行に60日間かかっているものがありました。 このチェックに時間がかかる理由は、開発当初から所持するデータ量が増え続け、膨大なデータの付き合わせが考慮されていなかったためです。具体的にはデータの付き合わせがO(n2)の処理になっていて、ハッシュなどを利用した効率的な検索ができていないためでした。 この処理を、AWS Glueを使うと1日で完了するようになりました。

AWS Glue とは

AWS Glue とは、Amazon Web Services(AWS)が提供する完全マネージド型の抽出、変換、ロード(ETL)サービスです。 さまざまなデータソースから抽出・格納ができ、変換にはApache SparkというOSSの分散処理システムが利用できます。 今回Glueを使った意図は以下です。

  • S3のデータでも、Apache Sparkを使うとRDBMSのように検索ができる(さらに効率的なアルゴリズムで検索されるため早い)
  • Apache Sparkで分散処理ができるので、割り当てるリソースを増やせば短時間で処理が完了する

具体的な処理

今回は「保持データリストが、全て保持対象リストに含まれるか」をチェックするスクリプトを、ECS上のスケジュールタスク(Ruby)からGlueのジョブ(Python)に書き換えました。

前提として、保管データリストと保持対象リストは別プログラムによってS3上に配置しています。 これらのデータはcsv形式でおかれていて、IDを含めたCSV形式で出力されています。 データ群Aとデータ群Bはidのカラムが一致するかを元に判断します。

以下は、今回書いたコードのダミーコードです。

import sys
from datetime import datetime

from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from inventory import Inventory


# Glueのジョブ実行時に引数を指定できる。
args = getResolvedOptions(sys.argv, ["JOB_NAME", "BUCKET_A", "BUCKET_B", "OUTPUT_BUCKET"])

# Sparkの初期設定
sc = SparkContext()
glueContext = GlueContext(sc)
glueContext.setConf("spark.sql.broadcastTimeout", "36000")
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# データ群Aの最新のパスを取得
inventory_a = Inventory(args["BUCKET_A"]).most_recent()

# データ群Bの最新のパスを取得
inventory_b = Inventory(args["BUCKET_B"]).most_recent()


# データ群A, Bのテーブルを作成 (Schemaはid以外省略)
schema = "id long"
data_a = spark.read.csv(f"{inventory_a.s3_directory_path}/*", schema=schema)
data_b = spark.read.csv(f"{inventory_a.s3_directory_path}/*", schema=schema)

data_a.createOrReplaceTempView("table_a")
data_b.createOrReplaceTempView("table_b")

# Queryを実行し、対象のデータを取得する。
query = f"""
SELECT id
FROM data_a
WHERE EXISTS (
    SELECT id
    FROM data_b
    WHERE data_b.id = data_a.id
)
"""

undeleted_ids = spark.sql(query)
undeleted_size = undeleted_ids.count()

# 結果を出力
output_path = f"s3://{args['OUTPUT_BUCKET']}/{datetime.today().strftime('%Y-%m-%d-%H-%M-%S')}"
undeleted_ids.write.mode("overwrite").csv(output_path, compression="gzip", header=True)

上記のように、非常に簡単なコードでS3のデータをテーブルのように扱え、SQLで検索できます。 また設定も柔軟で、CPU数やメモリを選べ、従量課金なので今までと違って短時間に多くのリソースを使って処理が終わるようになりました。

この結果、60日かかっていた処理が1日以内で終わるようになりました。

最後に

今回はAWS Glueを利用してバッチ処理を高速化する話を書きました。 Glueを使えば、S3だけでなくRedshiftなど別のデータソースもSQLで簡単に比較できるので、 今回のようなビッグデータの検索処理にはうってつけです。 みなさんも是非使ってみてください。

© Sansan, Inc.