こんにちは、研究開発部 Architect グループで DevOps エンジニアをやっている八藤丸です。 今回は【R&D DevOps通信】連載の4回目、データ同期システムを構築した際に利用した GCP の Workflows*1 について紹介します。同じグループの張が最近 GitHub Actions についての取り組みを過去の連載で紹介しているので、ぜひご覧ください。
buildersbox.corp-sansan.com
背景
他グループ管轄の BigQuery*2 のデータを新たに作成した研究開発部用の Cloud SQL に定期的に同期するシステムを Workflows で実装しました。
ワークフローの処理の流れは以下のようになっています。
- 現在時刻のフォルダを GCS のデータ同期用のバケットに作成
- BigQuery から GCS に csv をエクスポート
- GCS の csv をBigQuery にインポート
Workflows 自体が比較的まだ新しいサービスだったため、あまり情報が見つからず、実装する際にいくつかハマったところがあったため、今回はその対応策を Tips として紹介したいと思います。
Tips
現在時刻の作成
GCS のバケットに現在時刻を名前としたフォルダを作成するために、現在時刻を Cloud Functions の関数を呼び出して受け取る方法なども考えたのですが、調査したところワークフローの組み込み関数で生成可能でした。
実際のコードは以下のようになりました。
assign: # JST にするために+9時間している - target_date: ${text.split(time.format(sys.now() + 32400), "T")[0]} - target_date_yyyymmdd: ${text.replace_all(target_date, "-", "")} # JST にするために+9時間している - target_time: ${text.split(time.format(sys.now() + 32400), "T")[1]} - target_hour: ${text.split(target_time, ":")[0]} - target_date_time: ${target_date_yyyymmdd + target_hour}
環境毎に差分のあるクエリをワークフロー内で管理する
BigQuery から GCS に csv をエクスポートする際に、エクスポートを行うクエリ*3 を BigQuery に対して実行するのですが、テーブル名が環境毎に違っていたため、クエリに Expression*4 を利用して Cloud Scheduler*5 から渡された引数をテーブルに指定していたのですが、以下のエラーに遭遇しました。
ワークフローをデプロイできませんでした: main.yaml:21:17: parse error: in workflow 'main', step 'assign_variables': maximum length for an expression is 400 characters, the provided expression has a length of 1445
Expression は400文字以内に収めなければならない制限があるようで、一つのクエリの中で環境毎にテーブルを分岐させることができませんでした。
以下のようなコードです。
Expression を使った書き方
assign: # 400字を超えるクエリ - query: - '${" SELECT ~ FROM hoge-" + arg.environment + ".fuga.piyo "}'
そこで、行った対応としては、環境毎にクエリを代入するサブワークフローを作成して、環境毎に対象のサブワークフローを呼び出すようにして対応しました。
呼び出し側
- switch_query: switch: - condition: ${args.environment == "stg"} steps: - assign_query_stg: call: assign_query_variable_stg result: query - condition: ${args.environment == "prod"} steps: - assign_query_prod: call: assign_query_variable_prod result: query
サブワークフロー (stg)
assign_query_variable_stg: steps: - assign_query_stg: assign: - query: - " SELECT id, name FROM hoge-stg.fuga.piyo " - return_assigned_query_stg: return: ${query}
(ワークフローは YAML か JSON の形式で書くことができますが、今回は YAML パターンで紹介しています。)*6
BigQuery のエクスポート
BigQuery からエクスポートした csv をインポートする動作確認をしていた際に、いくつかの csv がインポートされていないことに気が付きました。
調査したところ、全ての csv のエクスポートが完了するのを待つ必要があったため、エクスポートのステータスを1秒おきに確認するステップを実装することで同期に成功しました。
以下のようなワークフローになりました。
steps: - call_export: call: googleapis.bigquery.v2.jobs.query args: projectId: ${test_project_id} body: query: ${"EXPORT DATA OPTIONS( uri='gs://" + bucket + "/" + folder + "/" + "*.csv', format='CSV', overwrite=true ,header=false) AS " + query} useLegacySql: false result: response # BigQuery からのエクスポートが全て完了するのを待っている - check_response: switch: - condition: ${response.jobComplete == true} return: "done" next: wait - wait: call: sys.sleep args: seconds: 1 next: get_response - get_response: call: googleapis.bigquery.v2.jobs.getQueryResults args: jobId: ${response.jobReference.jobId} projectId: ${response.jobReference.projectId} location: ${response.jobReference.location} result: response next: check_response
異常終了時の Slack でのアラート通知
ワークフローが異常終了した時に Slack のアラート用のチャンネルに通知を送るように対応しました。
(アラートだけでなく、単純に Slack に何かメッセージを投稿する際にも使えます)
ここはハマりどころはなかったのですが、同じような実装がやりたい場合もあるかと思ったので、併せてご紹介しておきます。
手順としては、以下のようになりました。
- Slack API 用のトークンを SecretManager から取得
- アラート通知用のサブワークフローを作成
- (上記のサブワークフローを呼び出す)
Slack API 用のトークンを SecretManager から取得
- get_slack_token_secret: try: call: googleapis.secretmanager.v1.projects.secrets.versions.access args: name: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/secrets/test-alert-slack-token/versions/latest"} result: secret_result except: as: e steps: - handle_secret_manager_error: switch: - condition: ${e.code == 404} raise: "Secret not found" - condition: ${e.code == 403} raise: "Error authenticating to Secret Manager" - unhandled_exception: raise: ${e}
アラート通知用のサブワークフローを作成
send_alert: params: - token - error_message steps: - post_message: call: http.post args: url: https://slack.com/api/chat.postMessage headers: Authorization: ${"Bearer " + token} body: channel: "#alerts-test" text: ${error_message}
おわりに
想定していたよりも複雑なワークフローになってしまいましたが、何とかプロダクション利用することができました。
こちらの情報が、これから Workflows を利用しようとされている方の参考になれば幸いです。
また、公式の Workflows 用のチートシートもあるので、併せてこちらも参考にしてみてはいかがでしょうか?
cloud.google.com
最後まで読んでいただき、ありがとうございました!
おまけ
研究開発部アーキテクトグループでは一緒に働く仲間を募集しています。
R&D MLOps/DevOpsエンジニア / Sansan株式会社
*1:API 呼び出しや Cloud Functions, Cloud Run などのサービスを組み合わせてワークフローを構築することができるサービスです
*2:本来の用途である分析用に使っていたわけではなく、欲しいデータが揃っていたため、一時的に他グループのものをエンジンから参照している状態でした(分析の仕組みは研究開発部門で別にあります)
*3:Other statements in GoogleSQL | BigQuery | Google Cloud
*4:Expressions | Workflows | Google Cloud
*5:定期実行用の cron ジョブ スケジューラです
*6:JSON はコメントアウトが書けないということもあり、YAML を採用しました