Sansan Tech Blog

Sansanのものづくりを支えるメンバーの技術やデザイン、プロダクトマネジメントの情報を発信

GCP のログを Amazon S3 へ集約する仕組みを作った話

こんにちは!技術本部情報セキュリティ部 CSIRT グループ所属の吉山です。 普段は社員の方々からのセキュリティに関する報告・相談の対応や、ログの収集・検知を行う SIEM 基盤の開発・運用などに向き合っています。

今回は、SIEM 基盤へ GCP の各サービスが出すログを集めるために作った、「GCP のログを Amazon S3 へ集約する仕組み」についてまとめたいと思います。

本記事は Sansan Advent Calendar 2023 - Adventar 17日目の記事です。

概要や目的

Sansan CSIRT では、プロダクトのインフラ系ログやクラウドサービスが吐き出すログ、SaaS の監査ログなどを AWS 上で構築している SIEM 基盤へ集約することで、不審事象の検知やログの分析を行える環境を整えています。

この SIEM 基盤のコンセプトの1つとして「集めたログを全て1度 S3 へ保存する」ことがあげられます。 これにより、ログを長期間安価に保存するとともに、分析を行う基盤のシステムへログを連携しやすい状態を作っています。

AWS のサービスは元から S3 へログを保存する機能を備えているものが多いですが、GCP については自分でこの仕組みを構築する必要があります。 また、「S3 から GCP へログなどを集める方法」はいくつも情報がありますが、「GCP から S3 へログを集める方法」はあまり情報が存在していません。

そこで、今回 GCP から S3 へログを集める仕組みを構築するとともに記事をまとめてみました。

過去に SIEM 自体について取り上げた記事もあるので、こちらも併せてご一読頂ければと思います!

buildersbox.corp-sansan.com

構築した仕組み

GCP の各種サービスが出力するログは基本的に Cloud Logging へ JSON 形式で出力されます。 Cloud Logging にはログルーター という機能があり、これを使い GCS や BigQuery、Pub/Sub など様々なサービスへログを送ることができます。

今回は、このログルーターを使い Pub/Sub へログを出力、Pub/Sub にきたログを Dataflow を使い処理し S3 へ保存する、という仕組みを構築しました。

構成

以下のような順でログを転送し S3 へ集約します。

  1. ログ送信元の各プロジェクトで、収集対象サービスのログを Cloud Logging へ出力する
  2. Cloud Logging のログルーター機能を使い、CSIRT 側プロジェクト上の Pub/Sub へログを送る
  3. Pub/Sub 上のメッセージを Dataflow を使い取得、前処理を行う
  4. 前処理後にログを S3 へ保存する

構成図

Pub/Sub

Pub/Sub は AWS で言うところの SQS のようなメッセージング系サービスです。 Cloud Logging のログルーターというログ転送機能で宛先として Pub/Sub を選択することができます。

加えて、最大7日まで受信したメッセージを Pub/Sub 上で保持しておくことができます。 これにより後続の Dataflow でメッセージが取得できないといった問題が生じても、ログが欠損しづらくなります。

10 時ごろから Dataflow 側エラーで処理が止まっている。Pub/Sub 上でメッセージを保持してくれているため欠損が生じていない。

今回の構成では、各プロダクト側でログルーターの設定を実施、宛先として CSIRT 側プロジェクトの Pub/Sub を選択することで、一旦 CSIRT の GCP プロジェクトへログを集約しています。

宛先として別プロジェクトの Pub/Sub Topic を選択

Dataflow

Dataflow は大規模なデータを並列処理することが得意なマネージドサービスです。 Pub/Sub 上のメッセージをストリーミング処理することができ、フィールドの前処理を行った上で他サービスへデータを送ったりすることができます。

クイックスタート: Dataflow を使用して Pub/Sub からメッセージをストリーミングする  |  Cloud Pub/Sub ドキュメント  |  Google Cloud

Dataflow のパイプラインは Apache Beam という SDK で実装します。 対応している言語は Java, Python, Go です。

beam.apache.org

今回は、Python でパイプラインを作成しました。 作成したパイプラインでは以下のような流れで処理を行います。

  1. Pub/Sub からメッセージを取得
  2. ログを Python で処理しやすい形に成形
  3. 各ログレコード毎に特定フィールドの値を元に Key を決定
  4. Key を元にログをグルーピング
  5. ウィンドウタイム毎にグルーピングしたログを S3 へ保存

上記のパイプラインをコードへ落とし込んだものがこちらです。 全体の処理の流れをそのままパイプラインとして定義しつつ、各フェーズ毎の細かな処理は適宜クラスなどを作り込むことで、全体の処理を実装しています。

# 実際に稼働しているパイプラインのコード(一部抜粋)
def run(secret_id, input_subscription, s3_bucket, window_size=1.0, pipeline_args=None):
    pipeline_options = ...
    ~~~~~
    ~~~~~
    with beam.Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            | 'Read PubSub Messages' >> beam.io.ReadFromPubSub(
                subscription=input_subscription,
                with_attributes=False
            )
            | 'Window into Fixed Intervals' >> beam.WindowInto(
                window.FixedWindows(int(window_size * 60))
            )
            | 'Format Message' >> beam.ParDo(FormatMessage())
            | "Add logName as a Key" >> beam.Map(lambda elem: (elem['logName'], elem))
            | "Groupby" >> beam.GroupByKey()
            | 'Write to S3' >> beam.ParDo(WriteToS3(aws_credentials, s3_bucket, temp_location))
        )

実装したコードを動かすことで Dataflow へパイプラインをデプロイすることができます。 デプロイ後は画像のように Dataflow のコンソール上でも稼働中の処理の流れを確認できます。

実際に Dataflow 上で稼働しているパイプライン

運用してみた

良かった点

Pub/Sub や Dataflow を活用し、シンプルな構成でログの集約を行うシステムを構築することができました。 運用においても、ログの流量に応じて Dataflow をオートスケールする、コードのメンテのためパイプラインを生やし直す、といったことも簡単に行えるため、楽に運用できています。

ログ量が多くなる日中帯に入るとオートスケールしワーカー数が増えている

改善したい点

今回はとりあえず構築し運用し始めることを優先し、Classic Template と呼ばれるテンプレートでパイプラインの構築を行いました。 しかし、本来は Flex Template と呼ばれるテンプレートでパイプラインの構築を行う方が推奨なようです。 Flex Template は柔軟性に優れており Classic Template では備えていない機能もあるため、今後はこちらのテンプレートでの実装に切り替えたいと思っています。

cloud.google.com

まとめ

今回は GCP のログを Amazon S3 へ集約するシステムについてまとめました。 Pub/Sub や Dataflow を活用し、シンプルな構成で S3 までログを集めるシステムを構築・運用できています。

GCP のログを AWS へ集めるという話題はあまりネット上でも情報が無く、今回の記事が何かの参考になれば良いなと思います。

© Sansan, Inc.