Sansan Tech Blog

Sansanのものづくりを支えるメンバーの技術やデザイン、プロダクトマネジメントの情報を発信

Vol. 21 Google Cloud Pub/Sub利用時における分散トレーシングの断絶を防ぐコンテキスト伝搬手法

この記事は、Bill One開発Unit ブログリレー2025の第21弾になります!

こんにちは。技術本部 Bill One Engineering Unitの前田です。

今回は、前回の記事で宣言したとおり、非同期処理の計装について記事を書きます。今回はコンテキスト伝搬に着目しています。

はじめに

この記事では、OpenTelemetryで非同期処理(Messaging System)計装を行う際、Ktor + Google Cloud Pub/Sub(以降、Pub/Subと記載)環境で実施した知見を記載します。

これまで何度かOpenTelemetry関連で登壇の機会があり、非同期処理の計装について話をした際、Pub/Sub周りで質問いただくことが多くありました。加えて、登壇では詳細を話しきれないという状況でした。そこで、そういった内容を補完しつつ、語りきれなかったことを語ろう、という意図でこの記事を書きました。今回はOpenTelemetryのContext(以降、「コンテキスト」と記載する)を、非同期処理でどのように伝搬させるかについて話します。

全体の流れとして、非同期処理でコンテキストが伝搬しない理由を解説した後に、Pub/Subの話をします。

過去の登壇内容は次のとおりです。

想定読者・前提知識

本記事では、Ktor + Pub/Sub環境において、OpenTelemetryのゼロコード計装だけではトレースが分断される問題と、その解決策を解説します。記事を読むに当たり、次の前提知識が必要です。

  • Webフレームワークを用いたサーバーサイドアプリケーションへの理解
  • OpenTelemetryの基本的な概念(Trace, Span, Contextなど)への理解
  • Google Cloud Pub/Subの基本的な仕組み(Topic, Subscription)への理解

なお、OpenTelemetryは言語別に実装があり、そのすべてを把握できていません。その点はご了承ください。OpenTelemetry Javaを前提として説明しますが、基本的な仕様は似通っていると思います。できる限り、他の言語や非同期処理のシステムでも転用できる内容を目指しています。

環境の前提

今回の記事に記載した内容は、次の環境で検証したものになっています。

  • アプリケーションはCloud Runで稼働するKtorのアプリケーションで、HTTPリクエストを受けるものとする。ただし、実行はローカル環境で実施する
  • Messaging SystemとしてPub/Subを使用する
  • Pub/Subはエミュレータを使用する
  • APMツールは、ローカルで起動しているAspire Dashboardを使用する
  • 基本的に本番環境で動作しているコードを例にするが、一部私が試しただけのコードが存在する

docs.cloud.google.com

aspire.dev

同期処理と非同期処理のコンテキスト伝搬

最初にトレースが分断される問題について解説します。

ここでいう「同期処理」とは、他のマイクロサービスなどに対して直接HTTPやgRPCのリクエストを実行し、その結果を待つ処理を指します。async/awaitなどを使っていたとしても、処理全体としてクライアントがレスポンスを待っている場合は、同期処理と扱います。一方、「非同期処理」とは、自身もしくは他のマイクロサービスにメッセージを送信する際、Pub/Subなどの外部にあるメッセージングシステムを挟み、クライアントがレスポンスを待たない処理を指します。

同期処理の場合

ゼロコード計装の場合、同期処理のコンテキストは設定に基づいて自動的に伝搬されます。HTTPで同期リクエストを送信した場合の例を示します。

同期処理の流れ

非同期処理の場合

非同期処理の場合、間に外部システムを挟みます。そのため、コンテキスト伝搬の仕様は外部システム次第になります。Pub/Subの場合、全く別のコンテキストになります。図にするとこうなります。Service Bには、Service Aが扱っていたTraceIDとは別のTraceIDが伝搬されます。

非同期処理の流れ

トレースが分断される場合の対処法

コンテキストが伝播されず繋がらないのであれば、外部システムの影響を受けない形式でコンテキストを送信し、受信する際にコンテキストを解釈する必要があります。

こういったケースのために、Pub/Subではカスタム属性という仕組みが用意されています。

属性は、優先度、送信元、宛先など、メッセージに関する追加情報を提供するために使用されます。

このように書いてある通り、コンテキストを伝搬させる用途にはちょうど良さそうです。

docs.cloud.google.com

docs.cloud.google.com

Pub/Sub以外のシステムであっても、コンテキスト伝搬されない場合はこのような仕組みを探して利用する必要があります。

Pub/Subでコンテキストを伝搬させる

Pub/Subの場合のコンテキスト伝搬方法について記載します。

カスタム属性を送信する

Pub/Subのパブリッシャーとなるサービスは、メッセージの送信時にデータとカスタム属性を送信できます。カスタム属性を設定するには、各言語のSDKを使用します。Kotlin(OTel Java)の例は次の通りです。コンテキストをカスタム属性に対して注入(inject)するコードです。Pub/Subのカスタム属性は単純なKeyValue構造となっています。

import com.google.pubsub.v1.PubsubMessage
import io.opentelemetry.context.Context

fun PubsubMessage.Builder.injectTraceContext(): PubsubMessage.Builder {
    val propagator = openTelemetry.propagators.textMapPropagator
    val context = Context.current()
    propagator.inject(
        context,
        this,
    ) { carrier, key, value ->
        // key: traceparent や b3 といった、コンテキスト伝搬用のヘッダーの名前
        // value: 00-6442ca0b5032d17b1001685547128879-11da70cd0b456486-01 といった値
        carrier?.putAttributes(key, value)
    }
    return this
}

突然propagatorというものが出てきました。これはコンテキスト伝搬に使われる仕組みで、ここでは現在のコンテキスト(Context.current())をPubsubMessage.Builderに対して注入しています。このページにある「手動でコンテキストを伝搬する必要がある場合」に該当します。propagatorで伝搬される値は、ゼロコード計装の場合はOTEL_PROPAGATORS環境変数によって自動的に決まります。

この関数を利用する場合はこうなります。

import com.google.protobuf.ByteString
import com.google.pubsub.v1.PubsubMessage

val message = PubsubMessage
    .newBuilder()
    .setData(ByteString.copyFromUtf8(data))
    .injectTraceContext() //これを呼ぶだけで良い
    .putAllAttributes(...) // 別途追加したい属性がある場合
    .build()

カスタム属性を送信する場合はこれだけで良いです。後述するサブスクライバーと異なり、考慮事項は少ないです。

カスタム属性からコンテキストを設定する

サブスクライバーとなるサービスは、カスタム属性を解釈して伝搬されたコンテキストを取得し、必要な設定を行います。ここで問題になるのは、サブスクリプションの方法がいくつかあることです。

まず、Pub/Subからメッセージを受信する「サブスクリプション」には、3つの種類*1があります。そのうち、アプリケーションでよく使われるのは「pullサブスクリプション*2」と「pushサブスクリプション」です。さらにpushサブスクリプションには、「ペイロードのラップ解除」という仕組みがあります。これは、通常であればデータがBase64エンコーディングされてくるものを、デコードした上でHTTPリクエストとして送信してくれる機能です。

docs.cloud.google.com

これらの設定により、次のようにカスタム属性の受信方法が変わります。

サブスクリプションの種類 ペイロードのラップ解除 メッセージ本体 カスタム属性
push なし Base64エンコードされている リクエストボディ
push あり デコードされている ヘッダー
pull なし(できない) Base64エンコードされている リクエストボディ

カスタム属性を扱う場合は、これらの違いを考慮する必要があります。それぞれの違いを見ていきます。

pushサブスクリプション

pushサブスクリプションとは、Pub/Subからサービスに対して、メッセージを含むHTTPリクエストを送信する方式です。サービスはリクエストを待つだけでよいため、通常のHTTPリクエストなどと同様に扱えます。

docs.cloud.google.com

ペイロードのラップ解除を行わない(ラップ済み)場合

この場合、カスタム属性はリクエストボディに格納されています。公式ドキュメントの例ではこういった内容になります。

  {
      "deliveryAttempt": 5,
      "message": {
          "attributes": {
              "key": "value"
          },
          "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
          "messageId": "2070443601311540",
          "message_id": "2070443601311540",
          "orderingKey": "key",
          "publishTime": "2021-02-26T19:13:55.749Z",
          "publish_time": "2021-02-26T19:13:55.749Z"
      },
    "subscription": "projects/myproject/subscriptions/mysubscription"
}

このJSONからattributesを取り出して、そこからコンテキストを抜き出せば良いです。コードのサンプルは次の通りです。injectはKotlinの仕組みを活用することでコードを短くできますが、extractは少しコード量が増えます。

import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.TextMapGetter

// attributesを単純な Map<String, String> で受けたものとする

// 伝搬されたカスタム属性からコンテキストを取得する
val mapGetter = object : TextMapGetter<Map<String, String>> {
    override fun keys(carrier: Map<String, String>): Iterable<String?>? {
        return carrier.keys
    }

    override fun get(
        carrier: Map<String, String>?,
        key: String,
    ): String? {
        return carrier?.get(key)
    }
}

val propagator = openTelemetry.propagators.textMapPropagator
val extractedContext = propagator.extract(
    Context.current(),
    attributes,
    mapGetter,
)

この後、このコンテキストをどのように設定するかは用途によります。現在のスパンにリンクを追加するなら、このように書けます。

import io.opentelemetry.api.trace.Span

val spanContext = Span.fromContext(extractedContext).spanContext
Span.current().addLink(spanContext)

すると、このように異なるTraceID同士のリンクが作られます。画像上部にそのスパンのSpanIDが書いてあり、「詳細」にはリンク先のSpanIDがあります。

パブリッシャーのスパン

サブスクライバーのスパン

理論上はextractedContextを親としてスパンを開始すれば親子関係になります。しかし、ゼロコード計装では手動スパンと自動生成スパンのスコープ管理が競合しやすく、実装難易度が高いです。

また、リクエストボディからカスタム属性を抽出する場合、リクエストボディは何もしなければ1度しか読み込むことはできないことが多い点に注意が必要です。カスタム属性の抽出処理を共通化する際に問題になると考えています。回避策として、Webフレームワークには「リクエストボディを2回読み込める」仕組みも用意されています。Ktorの場合はDoubleReceiveというプラグインがそれに該当します。ただし、これはこれでメモリ使用量などが増えます。共通化を考える際はこれらの点にもご注意ください。

ペイロードのラップ解除を有効にした場合

ペイロードのラップ解除を有効にした場合、パブリッシャーが付与したカスタム属性はリクエストヘッダーに付与されます。具体例は次のページにあります。

docs.cloud.google.com

ペイロードのラップ解除を使うと「dataをデコードする」という手間から解放されるため、プログラムがシンプルになります。さらに、HTTPヘッダーにはパブリッシャーで設定したb3などのコンテキストが付与される*3ため、OpenTelemetryのコンテキスト伝搬の仕組みがそのまま利用できます。ペイロードのラップ解除を利用する場合は、コンテキスト伝搬のためにカスタム属性を取得する必要はありません。Bill Oneでは、Pub/Subを利用する場合この方法でコンテキスト伝搬しています。デメリットとしては、バッチ処理などトレース全体が長い場合やサブスクリプションが多すぎる場合などに、トレース全体が長くなることです。トレースが長い場合など、自動的にコンテキスト伝搬されてしまう挙動が問題になるケースはありそうです。

自動的にコンテキスト伝搬される挙動を回避するには「標準のコンテキストヘッダーに何らかの接頭辞をつける」といった方法が考えられます。また、TextMapPropagatorを拡張してライブラリに定義し、双方で使うということもできそうです。前者の方法で行うとしたら次のようなイメージです。受信側は動作検証ができていないため省いています。ご了承ください。

送信側

const val PROPAGATOR_PREFIX = "x_app_"
fun PubsubMessage.Builder.injectPropagationTraceHeaders(): PubsubMessage.Builder {
    val propagator = openTelemetry.propagators.textMapPropagator
    val context = Context.current()
    propagator.inject(
        context,
        this
    ) { carrier, key, value ->
        carrier?.putAttributes("$PROPAGATOR_PREFIX$key", value)
    }
    return this
}

pullサブスクリプション

pullサブスクリプションとは、アプリケーションがPub/Subに対しリクエストし、メッセージを取得する方式です。

docs.cloud.google.com

細かい説明は省きますが、pullサブスクリプションでメッセージを処理する場合、Pub/Subクライアントライブラリを利用した方が良いと考えています。Pub/SubのライブラリにはOpenTelemetryのサポートが入っています。pullサブスクリプションのメッセージ取得などだけでなく、コンテキスト伝搬もライブラリでやれます。

docs.cloud.google.com

Cloud Traceが例として上がっていますが、OpenTelemetryのインスタンスが取得できればこの仕組みは動作します。ゼロコード計装の場合でもGlobalOpenTelemetry.get()で取得したインスタンスを利用できます。

Pub/Subクライアントライブラリについて

これまで何度か出てきていますが、Pub/Subにはクライアントライブラリが用意されています。

docs.cloud.google.com

今回は手動でコンテキストを伝搬させる方法を紹介しましたが、ライブラリにも近しい機能が提供されています。Publisherの構築時に、次のように書くことでgoogclient_traceparentという名前のカスタム属性が追加されます。googclient_traceparentをサブスクライバで取得*4すればコンテキスト伝搬に必要な情報が揃います。

Publisher
    .newBuilder(topicName)
    .setRetrySettings(retrySetting)
    .setOpenTelemetry(openTelemetry) // GlobalOpenTelemetry.get() などから取得する
    .setEnableOpenTelemetryTracing(true)
    .build()

今回の記事では「コンテキスト伝搬のやり方を知っておくと他の非同期処理の機構を使う場合に応用が利く」と考えたため、あえてこの紹介を最後に持ってきました。

Pub/SubでOpenTelemetryトレースを使用する方法として、次のページで紹介されています。Bill Oneではpullサブスクリプションを使用しておらず、コード上で試せなかったため、コード例は割愛します。

docs.cloud.google.com

考察: スパンは親子にするかリンクさせるか

非同期処理は、何もしないとトレースが途切れるとお話ししました。じゃあ、それを繋げる際に「スパンを親子関係にする」のか「スパンリンクにする」のかという疑問が生まれます。

この問いに関して、唯一解は無いと思っています。次のような内容を考慮し、適切な方法を選択して実現することになると考えています。

  • APMツールの仕様
  • トレースの長さ
  • 実現手段のシンプルさ(プログラムを書いた場合の保守性など)

とはいえ、どのような方法であっても非同期処理同士を繋げること自体に価値があります。まずは関連を辿れるようにしてからどうするか考える、という流れでも良いと私は思っています。

終わりに

感想

今回は、非同期処理の場合にどうやってコンテキストを伝搬させるかについて、Pub/Subを例に紹介しました。今回の記事で言いたかったことは2つです。

  • 非同期処理は何もしないとトレースが途切れてしまう
  • 非同期処理でもコンテキスト伝搬できる手段を探して、トレースを繋げよう

なお、当初この記事はもっと長く、コンテキスト伝搬などの要素について細かく説明するつもりで書いていました。しかし、記事が長くなりすぎてしまったため大きく削りました。Bill Oneでは採用していない方式の検証についても、うまくいかなかった検証事項は省いています。

何かの参考になれば幸いです。

展望

コンテキスト伝搬でこれだけの量になったので、こういう話は省きました。記事として学びがある内容になりそうなら、いずれ書くかもしれません。

また、過去の登壇内容から「この辺りの話が聞きたい!」といった要望があれば、書くかもしれません。

Sansan技術本部ではカジュアル面談を実施しています

Sansan技術本部では中途の方向けにカジュアル面談を実施しています。Sansan技術本部での働き方、仕事の魅力について、現役エンジニアの視点からお話しします。「実際に働く人の話を直接聞きたい」「どんな人が働いているのかを事前に知っておきたい」とお考えの方は、ぜひエントリーをご検討ください。

参考資料

speakerdeck.com

speakerdeck.com

buildersbox.corp-sansan.com

*1:https://docs.cloud.google.com/pubsub/docs/subscription-overview?hl=ja

*2:pullサブスクリプションには、pullとStreamingPullの2種類がある。この記事での解説は省略する。

*3:traceparentは標準ヘッダーでPub/Subが書き換える可能性があるため、b3などのヘッダーを利用することを推奨する。

*4:属性名がライブラリ依存になるため、プロダクトで利用することのリスクは存在する。

© Sansan, Inc.