初めまして、技術本部Digitization部データ化グループ所属の高田です。 今回はAWS GlueのJobを使ってバッチ処理を60倍高速化した話をします。
この記事は以下の内容を共有しています。
- AWS Glueの概要とメリット
- Apache Sparkの概要とメリット
- Pythonを使ったAWS Glue ETL Jobの書き方(一例)
背景
Digitization部は、日々お客さまからお預かりしたアナログ画像をデータ化している部署です。 そのデータを、修正などのために一定期間保持しているのですが、保持しているデータは個人情報保護法に従い削除処理をしています。加えて、削除処理がなんらかの原因で動かなくなった時に備え、存在しているデータが全て保管対象であるかをチェックする処理(削除漏れチェック)もあります。
削除漏れチェックの処理の中には、実行に60日間かかっているものがありました。 このチェックに時間がかかる理由は、開発当初から所持するデータ量が増え続け、膨大なデータの付き合わせが考慮されていなかったためです。具体的にはデータの付き合わせがO(n2)の処理になっていて、ハッシュなどを利用した効率的な検索ができていないためでした。 この処理を、AWS Glueを使うと1日で完了するようになりました。
AWS Glue とは
AWS Glue とは、Amazon Web Services(AWS)が提供する完全マネージド型の抽出、変換、ロード(ETL)サービスです。 さまざまなデータソースから抽出・格納ができ、変換にはApache SparkというOSSの分散処理システムが利用できます。 今回Glueを使った意図は以下です。
- S3のデータでも、Apache Sparkを使うとRDBMSのように検索ができる(さらに効率的なアルゴリズムで検索されるため早い)
- Apache Sparkで分散処理ができるので、割り当てるリソースを増やせば短時間で処理が完了する
具体的な処理
今回は「保持データリストが、全て保持対象リストに含まれるか」をチェックするスクリプトを、ECS上のスケジュールタスク(Ruby)からGlueのジョブ(Python)に書き換えました。
前提として、保管データリストと保持対象リストは別プログラムによってS3上に配置しています。 これらのデータはcsv形式でおかれていて、IDを含めたCSV形式で出力されています。 データ群Aとデータ群Bはidのカラムが一致するかを元に判断します。
以下は、今回書いたコードのダミーコードです。
import sys from datetime import datetime from awsglue.utils import getResolvedOptions from awsglue.context import GlueContext from awsglue.job import Job from pyspark.context import SparkContext from inventory import Inventory # Glueのジョブ実行時に引数を指定できる。 args = getResolvedOptions(sys.argv, ["JOB_NAME", "BUCKET_A", "BUCKET_B", "OUTPUT_BUCKET"]) # Sparkの初期設定 sc = SparkContext() glueContext = GlueContext(sc) glueContext.setConf("spark.sql.broadcastTimeout", "36000") spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # データ群Aの最新のパスを取得 inventory_a = Inventory(args["BUCKET_A"]).most_recent() # データ群Bの最新のパスを取得 inventory_b = Inventory(args["BUCKET_B"]).most_recent() # データ群A, Bのテーブルを作成 (Schemaはid以外省略) schema = "id long" data_a = spark.read.csv(f"{inventory_a.s3_directory_path}/*", schema=schema) data_b = spark.read.csv(f"{inventory_a.s3_directory_path}/*", schema=schema) data_a.createOrReplaceTempView("table_a") data_b.createOrReplaceTempView("table_b") # Queryを実行し、対象のデータを取得する。 query = f""" SELECT id FROM data_a WHERE EXISTS ( SELECT id FROM data_b WHERE data_b.id = data_a.id ) """ undeleted_ids = spark.sql(query) undeleted_size = undeleted_ids.count() # 結果を出力 output_path = f"s3://{args['OUTPUT_BUCKET']}/{datetime.today().strftime('%Y-%m-%d-%H-%M-%S')}" undeleted_ids.write.mode("overwrite").csv(output_path, compression="gzip", header=True)
上記のように、非常に簡単なコードでS3のデータをテーブルのように扱え、SQLで検索できます。 また設定も柔軟で、CPU数やメモリを選べ、従量課金なので今までと違って短時間に多くのリソースを使って処理が終わるようになりました。
この結果、60日かかっていた処理が1日以内で終わるようになりました。
最後に
今回はAWS Glueを利用してバッチ処理を高速化する話を書きました。 Glueを使えば、S3だけでなくRedshiftなど別のデータソースもSQLで簡単に比較できるので、 今回のようなビッグデータの検索処理にはうってつけです。 みなさんも是非使ってみてください。