Sansan Tech Blog

Sansanのものづくりを支えるメンバーの技術やデザイン、プロダクトマネジメントの情報を発信

Cloud ComposerとCloud Run Jobsを組み合わせたデータパイプラインの構築

こんにちは。研究開発部 Architectグループの中村です。
本記事は、Sansan Advent Calendar 2024の2日目の記事です。
今回は、Cloud ComposerとCloud Run Jobsを組み合わせたデータパイプラインの実装について紹介します。

背景

弊チームでは、Sansanにおける全社横断データ基盤の構築とデータ利活用の推進をしています。
全体のアーキテクチャは以下のようになっています。

全社横断データ基盤アーキテクチャ概要図

さて、とあるデータパイプラインでは、API経由でデータを取得し、返却されたJSONをCSV形式に変換→GCSにアップロード(これを全件取得するまで1ページずつ繰り返す)というDAGをCloud Composer上で実施していました。
また、このデータソースから取得する対象のデータも複数種あり、異なるエンドポイントからそれぞれデータを取得→アップロードするという処理を行うDAGを動的に生成していました。

課題

この設計では、
①それぞれのDAGを同時に実行しようとした場合、リソース不足になってDAGが失敗する。(そのため、それぞれのDAGの処理時間をもとに、起動時刻を分散させる必要がある)
②Composer上では他にも実行するDAGがあるが、実行時刻が被った場合、このDAGがリソースを占有することにより、失敗することがある。(そのため、再実行するときにも、同時に実行するDAGがないことを確認する必要がある)
という課題がありました。
暫定処置としてワーカーリソースの増強で対応は可能なのですが、根本的な解決策ではないため、この処理自体をCloud Run Jobs上で実行するようにし、それらをCloud Composerがトリガーするというように設計を変更しました。

Cloud Run Jobsの実装(インフラ)

Cloud Run Jobs内のコードはPythonで実装しました。ざっくりとしたイメージとしては↓のような感じです。

import click
from task import Task


@click.command()
@click.option("--object_name", required=True, type=str, help="target object name")
def main(
    object_name: str,
) -> None:
    task_instance = Task(object_name)
    task_instance.run() # データを取得して、GCSにアップロードする


if __name__ == "__main__":
    main()

Cloud Run Jobsのterraformの実装部分としては以下のような感じです。
各設定内容についてはterraformのドキュメントを参照してください。

resource "google_cloud_run_v2_job" "sample" {
  name                = "sample"
  project             = var.project_id
  location            = var.location
  deletion_protection = false
  template {
    task_count  = 1
    parallelism = 0
    template {
      service_account = var.service_account
      containers {
        image = var.image
        args = [
          "--object", "sample_object",
        ]
        env {
          name = "SAMPLE_SECRET_VAR"
          value_source {
            secret_key_ref {
              secret  = var.secret_name
              version = "latest"
            }
          }
        }
      }
    }
  }
}

ポイントとしては、secret_key_refブロックで環境変数として、secret managerの値を参照するようにしています。
これにより、API_KEY=os.env("SAMPLE_SECRET_VAR")のように記載するだけで、secret情報を変数にいれられます。詳細は公式ドキュメント

Cloud Composer(Airflow)側の実装

Cloud Composer側からの呼び出し部分の実装は以下のような形です。

from airflow.decorators import dag
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator

target_objects = [
    {"name": "sample1", "task_count": 1},
    {"name": "sample2", "task_count": 2},
    {"name": "sample3", "task_count": 3},
]


def create_dag(object_name: str, task_count: int) -> DAG:
    @dag(
        dag_id=f"sample__{object_name}",
    )
    def sample_dag(object_name: str, task_count: int):
        op1 = EmptyOperator(task_id="start")
        op2 = CloudRunExecuteJobOperator(
            task_id="el_task",
            project_id="sample",
            region="asia-northeast1",
            job_name="sample",
            deferrable=True,
            overrides={
                "container_overrides": [
                    {
                        "args": ["--object_name", object_name],
                    },
                ],
                "task_count": task_count,
            },
        )
        op3 = EmptyOperator(task_id="end")
        return op1 >> op2 >> op3

    return sample_dag(object_name=object_name, task_count=task_count)


for target_object in target_objects:
    dag_name = f"sample__{target_object['name']}"
    globals()[dag_name] = create_dag(object_name=target_object["name"], task_count=target_object["task_count"])
  • CloudRunExecuteJobOperatorを用いることで、Cloud Run Jobsをトリガーしています。
  • またCloud Run Jobsで処理を時間している最中にCloud Composerのワーカースロットを占有しないためdefferableモードで実行するようにしています。
  • ovverridesブロックでは、ジョブ実行の設定をオーバーライドしています。データ量の多いジョブはタスクの並列数を増やして処理することで、実行時間の短縮しています。
    • Cloud Run Jobsでの環境変数、CLOUD_RUN_TASK_COUNT(タスク数)とCLOUD_RUN_TASK_INDEX(タスクのインデックス)をベースに各タスクで取得対象のデータ量が均等になるように調整しています。
  • terraform側の設定でparallelism=0としているため、全タスクはジョブ実行時に一斉に起動します。
    • 本当はAPIのRate Limitなどを考慮し、1ジョブ内で起動するタスクの上限(parallelism)もオーバーライドしたかったのですが、記事執筆時点ではCloud Run Jobsでは未対応でした。

まとめ

Cloud Run Jobsに切り出すことで、当該ジョブがCloud Composerのワーカーリソースを占有することがなくなったため、よりパイプラインがスケールできるようになりました。 Cloud ComposerやAirflowをGoogle Cloudで利用している方の参考となれば幸いです。

© Sansan, Inc.