こんにちは。研究開発部でデータエンジニアをしている鈴木翔大です。
今回は【R&D DevOps通信】連載 9回目として、以前構築したデータ基盤の転送パイプラインについて書こうと思います。
AWSのS3上に存在するデータをGCPのGCSに転送して、データのETL処理をしながらBigQueryのネイティブテーブルに同期するような仕組みです。この一連の処理の流れ(パイプライン)をCloud Composer上で管理・運用する方法について紹介します。
背景
Sansanが持っているデータは非常に大規模で、かつ機密性が高いデータも含まれています。そのため、基本的にデータを全社員が見れないように、必要なチームのみにアクセス権限を付与しています。現在構築を進めている全社横断データ基盤でも、列レベル・行レベルできめ細やかなアクセス制御を行う必要があり、これを実現できるデータウェアハウスとしてBigQueryを採用しました。
しかし、AWS上で構築されているサービスもあるため、これらのサービスのログをAWSからGCPに転送する必要があります。今回はこちらのAWSからGCPにデータを同期するための転送パイプラインをCloud Composerで構築しました。
全社横断データ基盤のインフラ構築・CI/CDパイプラインについては張さんが以前紹介してくれているので、こちらもぜひご覧ください。
buildersbox.corp-sansan.com
Cloud Composer(Airflow)の良いところ
Cloud Composerの利点は、GUI上でパイプラインの実行状況を確認して、再実行などの運用ができる点です。データの転送やETLは複雑なロジックや処理が行われることが多く、管理や運用が煩雑になりがちです。Cloud Composerを使えばタスク間の依存関係を可視化することができ、エラーの発生原因を特定したり、失敗したタスクから再実行したりといった運用を画面上から簡単に行うことができます。
他にも、パイプライン処理をコード管理できたり、Googleのマネージドサービスと相性の良いOperatorが用意されていたりと、さまざまなメリットがあります。
Cloud Composerの構成
今回実装したCloud Composerのディレクトリ構成は以下のようになっています。
. ├── Dockerfile ├── Makefile ├── README.md ├── docker-compose.yml ├── config │ ├── airflow.cfg │ ├── connections.json │ ├── requirements.txt │ ├── variables.json ├── dags ├── s32bq ├── dag.py ├── models │ └── settings.py ├── settings │ ├── dataset_hoge │ │ ├── table_hoge.yml │ │ ├── table_fuga.yml └── utils.py
新規データ転送DAGを開発する際はAirflowで実行テストを行うため、Docker環境を用意しています。config以下にairflowの設定や接続情報などの構成ファイルを配置しています。AWSからGCPにデータを転送するためのDAGファイルはdags/s32bq以下に配置していて、dag.pyではsettings以下にある各テーブル毎の転送設定ファイルを読み込むことでymlファイルの数だけ転送DAGを作成しています。
また、各テーブル毎の転送設定のymlファイルの中身は以下のようになっています。
job: dag_name: dag_hoge data_source: fuga description: hogeテーブルの週次転送タスク schedule: 5 5 * * 1 tags: - product_1 s3: aws_role_arn: arn:aws:iam::{aws_account_id}:role/**** source_bucket: ****** source_path: ****** is_partitioned: true date_format: '%Y-%m-%d' gcs: dest_bucket: ****** dest_path: ****** bq: dataset: fuga table_name: hoge etl_query: | DELETE FROM {dataset}.{table} WHERE true; INSERT INTO {dataset}.{table} ( column1, column2, ... ) SELECT column1, column2, ... FROM piyo.{table} extable_config: source_format: PARQUET compression: GZIP skip_leading_rows: null field_delimiter: null quote_character: null
転送時間や転送元のバケット情報など、データ転送に必要な設定情報をこの設定ファイルに記載しています。このような設計にすることで、新しいデータソースをBigQueryに転送するとなった場合でも新しい処理を追加する必要はなく、ymlファイルを追加するだけでデータ転送が実行できるため、開発やテストの工数を大幅に削減することができます。
パイプラインの構成
Cloud Composer(Airflow)では、パイプラインを構成するタスクを定義した後、それらを>>という記号で紐づけることでタスク間の依存関係を定義することができます。
今回作成した例だと、以下のように定義した2つのタスクを>>で紐づけることで、push_xcomの後にcreate_transfer_jobを実行するというパイプラインを作成しています。
# xcomに設定情報を追加するタスク push_xcom = PythonOperator( task_id="push_xcom", python_callable=push_setting_to_xcom, op_kwargs={"setting": setting}, provide_context=True, ) # Data Transfer Jobを作成するタスク create_transfer_job = PythonOperator( task_id="create_transfer_job", python_callable=create_transfer_job_operator, provide_context=True, ) # push_xcomの後にcreate_transfer_jobを実行する push_xcom >> create_transfer_job
並列して実行するタスクの場合は、タスク順の記述を配列で表現するだけです。
# Data Transfer Serviceの完了を待機するタスク wait_for_transfer_to_complete = CloudDataTransferServiceJobStatusSensor( task_id="wait_for_transfer_to_complete", job_name="{{ ti.xcom_pull(key='transfer_job_name') }}", project_id=project_id, expected_statuses={GcpTransferOperationStatus.SUCCESS}, poke_interval=10, ) # 以前作成した外部テーブルを削除するタスク delete_external_table = BigQueryDeleteTableOperator( task_id="delete_external_table", deletion_dataset_table=f"{project_id}.{tmp_dataset_id}.{setting.bq.table_name}", ignore_if_missing=True, ) # create_transfer_jobの後にwait_for_transfer_to_completeとdelete_external_tableを同時に実行する create_transfer_job >> [wait_for_transfer_to_complete, delete_external_table]
途中のタスク定義は省きますが、最終的には以下のようなタスクの依存関係を定義しました。
push_xcom >> create_transfer_job create_transfer_job >> [wait_for_transfer_to_complete, delete_external_table] [wait_for_transfer_to_complete, delete_external_table] >> create_external_table create_external_table >> execute_etl_query execute_etl_query >> delete_transfer_job delete_transfer_job >> [notify_error_to_slack, success]
上記のタスク定義と設定情報を記載したymlファイルを元に、以下のようなデータ転送パイプラインを作成しています。
処理の流れとしては、以下のようになっています。
- push_xcomで設定情報をxcomに格納
- create_transfer_jobでS3からGCSにデータを転送するData Transfer Serviceを作成・実行
- wait_for_transfer_to_completeでデータ転送が完了するまで待機
- create_external_tableでGCSのデータをBigQueryの外部テーブルとして読み込み
- execute_etl_queryで外部テーブルからデータの前処理をしつつネイティブテーブルに書き込み
- エラーが発生した場合はnotify_error_to_slackでSlackにエラー通知
S3からBigQueryに直接データを転送するBigQuery Data Transfer Serviceを利用する方法もありますが、今回構築したパイプラインではS3からGCSにデータを転送するData Transfer Serviceを利用しています。BigQuery Data Transfer Serviceはデータ転送の設定をするためにAWSのアクセスキーが必須パラメータになっていますが、アクセスキーが流出した場合のセキュリティのリスクが大きいというデメリットがあります。
Data Transfer Serviceの方では転送用のIAMロールを指定するだけでデータ転送をすることができるので、アクセスキーを払い出す必要がなく、セキュリティ上のメリットが大きいためこちらを採用しています。
全てのタスクが正常に完了すると、画面上のタスクの色が緑色に変化するので、正常に完了しているかどうかが分かりやすいです。途中のタスクでエラーが発生した場合は、以下のようにタスクの色が赤色に変化するので、ログを参照してエラーの原因を特定した後に失敗したタスクから実行することができるので、運用作業も非常に行いやすくなっています。
最後に
Cloud Composerを使って、AWSからGCPにデータを転送するパイプラインを構築し、管理・運用することができました。
今後データの追加が発生する場合は、転送設定を記載したymlファイル一つを追加するだけで良いので、開発・レビューの工数を大きく削減することができています。
Cloud ComposerはAirflowのマネージドサービスですが、AirflowとDAGの配置場所が違ったりと、少しハマるポイントがあるので注意が必要です。
しかし、一度処理の書き方を覚えてしまえば他のパイプラインを作る際も楽になるので、今後もこちらを活用していきたいと考えています。
以上、最後まで読んでいただきありがとうございました。
Architectグループでは一緒に働く仲間を募集しています。
データエンジニア(CTO直轄「全社横断データ分析基盤構築プロジェクト」リーダー候補) | Sansan株式会社
R&D データエンジニア(データ分析基盤開発) | Sansan株式会社