こんにちは。研究開発部 Architectグループの中村です。
本記事は、Sansan Advent Calendar 2024の2日目の記事です。
今回は、Cloud ComposerとCloud Run Jobsを組み合わせたデータパイプラインの実装について紹介します。
背景
弊チームでは、Sansanにおける全社横断データ基盤の構築とデータ利活用の推進をしています。
全体のアーキテクチャは以下のようになっています。
さて、とあるデータパイプラインでは、API経由でデータを取得し、返却されたJSONをCSV形式に変換→GCSにアップロード(これを全件取得するまで1ページずつ繰り返す)というDAGをCloud Composer上で実施していました。
また、このデータソースから取得する対象のデータも複数種あり、異なるエンドポイントからそれぞれデータを取得→アップロードするという処理を行うDAGを動的に生成していました。
課題
この設計では、
①それぞれのDAGを同時に実行しようとした場合、リソース不足になってDAGが失敗する。(そのため、それぞれのDAGの処理時間をもとに、起動時刻を分散させる必要がある)
②Composer上では他にも実行するDAGがあるが、実行時刻が被った場合、このDAGがリソースを占有することにより、失敗することがある。(そのため、再実行するときにも、同時に実行するDAGがないことを確認する必要がある)
という課題がありました。
暫定処置としてワーカーリソースの増強で対応は可能なのですが、根本的な解決策ではないため、この処理自体をCloud Run Jobs上で実行するようにし、それらをCloud Composerがトリガーするというように設計を変更しました。
Cloud Run Jobsの実装(インフラ)
Cloud Run Jobs内のコードはPythonで実装しました。ざっくりとしたイメージとしては↓のような感じです。
import click from task import Task @click.command() @click.option("--object_name", required=True, type=str, help="target object name") def main( object_name: str, ) -> None: task_instance = Task(object_name) task_instance.run() # データを取得して、GCSにアップロードする if __name__ == "__main__": main()
Cloud Run Jobsのterraformの実装部分としては以下のような感じです。
各設定内容についてはterraformのドキュメントを参照してください。
resource "google_cloud_run_v2_job" "sample" { name = "sample" project = var.project_id location = var.location deletion_protection = false template { task_count = 1 parallelism = 0 template { service_account = var.service_account containers { image = var.image args = [ "--object", "sample_object", ] env { name = "SAMPLE_SECRET_VAR" value_source { secret_key_ref { secret = var.secret_name version = "latest" } } } } } } }
ポイントとしては、secret_key_refブロックで環境変数として、secret managerの値を参照するようにしています。
これにより、API_KEY=os.env("SAMPLE_SECRET_VAR")
のように記載するだけで、secret情報を変数にいれられます。詳細は公式ドキュメント
Cloud Composer(Airflow)側の実装
Cloud Composer側からの呼び出し部分の実装は以下のような形です。
from airflow.decorators import dag from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator target_objects = [ {"name": "sample1", "task_count": 1}, {"name": "sample2", "task_count": 2}, {"name": "sample3", "task_count": 3}, ] def create_dag(object_name: str, task_count: int) -> DAG: @dag( dag_id=f"sample__{object_name}", ) def sample_dag(object_name: str, task_count: int): op1 = EmptyOperator(task_id="start") op2 = CloudRunExecuteJobOperator( task_id="el_task", project_id="sample", region="asia-northeast1", job_name="sample", deferrable=True, overrides={ "container_overrides": [ { "args": ["--object_name", object_name], }, ], "task_count": task_count, }, ) op3 = EmptyOperator(task_id="end") return op1 >> op2 >> op3 return sample_dag(object_name=object_name, task_count=task_count) for target_object in target_objects: dag_name = f"sample__{target_object['name']}" globals()[dag_name] = create_dag(object_name=target_object["name"], task_count=target_object["task_count"])
- CloudRunExecuteJobOperatorを用いることで、Cloud Run Jobsをトリガーしています。
- またCloud Run Jobsで処理を時間している最中にCloud Composerのワーカースロットを占有しないためdefferableモードで実行するようにしています。
- ovverridesブロックでは、ジョブ実行の設定をオーバーライドしています。データ量の多いジョブはタスクの並列数を増やして処理することで、実行時間の短縮しています。
- Cloud Run Jobsでの環境変数、CLOUD_RUN_TASK_COUNT(タスク数)とCLOUD_RUN_TASK_INDEX(タスクのインデックス)をベースに各タスクで取得対象のデータ量が均等になるように調整しています。
- terraform側の設定で
parallelism=0
としているため、全タスクはジョブ実行時に一斉に起動します。- 本当はAPIのRate Limitなどを考慮し、1ジョブ内で起動するタスクの上限(parallelism)もオーバーライドしたかったのですが、記事執筆時点ではCloud Run Jobsでは未対応でした。
まとめ
Cloud Run Jobsに切り出すことで、当該ジョブがCloud Composerのワーカーリソースを占有することがなくなったため、よりパイプラインがスケールできるようになりました。 Cloud ComposerやAirflowをGoogle Cloudで利用している方の参考となれば幸いです。