Sansan Tech Blog

Sansanのものづくりを支えるメンバーの技術やデザイン、プロダクトマネジメントの情報を発信

【R&D DevOps通信】GCP Workflows を業務で使ってみた

こんにちは、研究開発部 Architect グループで DevOps エンジニアをやっている八藤丸です。 今回は【R&D DevOps通信】連載の4回目、データ同期システムを構築した際に利用した GCP の Workflows*1 について紹介します。同じグループの張が最近 GitHub Actions についての取り組みを過去の連載で紹介しているので、ぜひご覧ください。
buildersbox.corp-sansan.com

背景

他グループ管轄の BigQuery*2 のデータを新たに作成した研究開発部用の Cloud SQL に定期的に同期するシステムを Workflows で実装しました。
ワークフローの処理の流れは以下のようになっています。

  1. 現在時刻のフォルダを GCS のデータ同期用のバケットに作成
  2. BigQuery から GCS に csv をエクスポート
  3. GCS の csv をBigQuery にインポート

Workflows 自体が比較的まだ新しいサービスだったため、あまり情報が見つからず、実装する際にいくつかハマったところがあったため、今回はその対応策を Tips として紹介したいと思います。

Tips

現在時刻の作成

GCS のバケットに現在時刻を名前としたフォルダを作成するために、現在時刻を Cloud Functions の関数を呼び出して受け取る方法なども考えたのですが、調査したところワークフローの組み込み関数で生成可能でした。
実際のコードは以下のようになりました。

assign:
  # JST にするために+9時間している
  - target_date: ${text.split(time.format(sys.now() + 32400), "T")[0]}
  - target_date_yyyymmdd: ${text.replace_all(target_date, "-", "")}
  # JST にするために+9時間している
  - target_time: ${text.split(time.format(sys.now() + 32400), "T")[1]}
  - target_hour: ${text.split(target_time, ":")[0]}
  - target_date_time: ${target_date_yyyymmdd + target_hour}

環境毎に差分のあるクエリをワークフロー内で管理する

BigQuery から GCS に csv をエクスポートする際に、エクスポートを行うクエリ*3 を BigQuery に対して実行するのですが、テーブル名が環境毎に違っていたため、クエリに Expression*4 を利用して Cloud Scheduler*5 から渡された引数をテーブルに指定していたのですが、以下のエラーに遭遇しました。

ワークフローをデプロイできませんでした: main.yaml:21:17: parse error: in workflow 'main', step 'assign_variables': maximum length for an expression is 400 characters, the provided expression has a length of 1445

Expression は400文字以内に収めなければならない制限があるようで、一つのクエリの中で環境毎にテーブルを分岐させることができませんでした。
以下のようなコードです。

Expression を使った書き方
assign:
  # 400字を超えるクエリ
  - query:
      - '${"
            SELECT
            ~
            FROM hoge-" + arg.environment + ".fuga.piyo
         "}'

そこで、行った対応としては、環境毎にクエリを代入するサブワークフローを作成して、環境毎に対象のサブワークフローを呼び出すようにして対応しました。

呼び出し側
- switch_query:
    switch:
      - condition: ${args.environment == "stg"}
        steps:
          - assign_query_stg:
              call: assign_query_variable_stg
              result: query
      - condition: ${args.environment == "prod"}
        steps:
          - assign_query_prod:
              call: assign_query_variable_prod
              result: query
サブワークフロー (stg)
assign_query_variable_stg:
  steps:
    - assign_query_stg:
        assign:
          - query:
              - "
                SELECT
                id,
                name
                FROM hoge-stg.fuga.piyo
                "
    - return_assigned_query_stg:
        return: ${query}

(ワークフローは YAML か JSON の形式で書くことができますが、今回は YAML パターンで紹介しています。)*6

BigQuery のエクスポート

BigQuery からエクスポートした csv をインポートする動作確認をしていた際に、いくつかの csv がインポートされていないことに気が付きました。
調査したところ、全ての csv のエクスポートが完了するのを待つ必要があったため、エクスポートのステータスを1秒おきに確認するステップを実装することで同期に成功しました。
以下のようなワークフローになりました。

steps:
  - call_export:
      call: googleapis.bigquery.v2.jobs.query
      args:
        projectId: ${test_project_id}
        body:
          query: ${"EXPORT DATA OPTIONS( uri='gs://" + bucket + "/" + folder + "/" + "*.csv', format='CSV', overwrite=true ,header=false) AS " + query}
          useLegacySql: false
      result: response
  # BigQuery からのエクスポートが全て完了するのを待っている
  - check_response:
      switch:
        - condition: ${response.jobComplete == true}
          return: "done"
      next: wait
  - wait:
      call: sys.sleep
      args:
        seconds: 1
      next: get_response
  - get_response:
      call: googleapis.bigquery.v2.jobs.getQueryResults
      args:
        jobId: ${response.jobReference.jobId}
        projectId: ${response.jobReference.projectId}
        location: ${response.jobReference.location}
      result: response
      next: check_response

異常終了時の Slack でのアラート通知

ワークフローが異常終了した時に Slack のアラート用のチャンネルに通知を送るように対応しました。
(アラートだけでなく、単純に Slack に何かメッセージを投稿する際にも使えます)
ここはハマりどころはなかったのですが、同じような実装がやりたい場合もあるかと思ったので、併せてご紹介しておきます。
手順としては、以下のようになりました。

  1. Slack API 用のトークンを SecretManager から取得
  2. アラート通知用のサブワークフローを作成
  3. (上記のサブワークフローを呼び出す)
Slack API 用のトークンを SecretManager から取得
- get_slack_token_secret:
   try:
      call: googleapis.secretmanager.v1.projects.secrets.versions.access
      args:
        name: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/secrets/test-alert-slack-token/versions/latest"}
      result: secret_result
    except:
      as: e
      steps:
        - handle_secret_manager_error:
            switch:
              - condition: ${e.code == 404}
                raise: "Secret not found"
              - condition: ${e.code == 403}
                raise: "Error authenticating to Secret Manager"
        - unhandled_exception:
            raise: ${e}
アラート通知用のサブワークフローを作成
send_alert:
  params:
    - token
    - error_message
  steps:
    - post_message:
        call: http.post
        args:
          url: https://slack.com/api/chat.postMessage
          headers:
            Authorization: ${"Bearer " + token}
          body:
            channel: "#alerts-test"
            text: ${error_message}

おわりに

想定していたよりも複雑なワークフローになってしまいましたが、何とかプロダクション利用することができました。
こちらの情報が、これから Workflows を利用しようとされている方の参考になれば幸いです。
また、公式の Workflows 用のチートシートもあるので、併せてこちらも参考にしてみてはいかがでしょうか?
cloud.google.com


最後まで読んでいただき、ありがとうございました!

おまけ

研究開発部アーキテクトグループでは一緒に働く仲間を募集しています。
R&D MLOps/DevOpsエンジニア / Sansan株式会社

*1:API 呼び出しや Cloud Functions, Cloud Run などのサービスを組み合わせてワークフローを構築することができるサービスです

*2:本来の用途である分析用に使っていたわけではなく、欲しいデータが揃っていたため、一時的に他グループのものをエンジンから参照している状態でした(分析の仕組みは研究開発部門で別にあります)

*3:Other statements in GoogleSQL  |  BigQuery  |  Google Cloud

*4:Expressions  |  Workflows  |  Google Cloud

*5:定期実行用の cron ジョブ スケジューラです

*6:JSON はコメントアウトが書けないということもあり、YAML を採用しました

© Sansan, Inc.