Sansan Tech Blog

Sansanのものづくりを支えるメンバーの技術やデザイン、プロダクトマネジメントの情報を発信

Kotlin × OpenTelemetry で実現する LLM observability - Contract One での実装と運用

本記事はSansan Advent Calendar 2025、18日目の記事です。

こんにちは。技術本部Contract One Engineering Unitの伊藤です。取引管理サービス「Contract One」の開発を担当しています。

約1年前、前回の記事でLLM評価基盤のLangfuseについて紹介しました。当時は検証フェーズでしたが、現在は本番環境で運用しています。本記事では、Contract OneにおけるLLM observabilityの全体像と、Kotlin × OpenTelemetryでの実装について紹介します。

背景

Contract Oneは最近「AI契約データベース」から「取引管理サービス」へプロダクト定義を変更し、契約書に限らずより広い意味でのデータを整理・活用できるプロダクトを目指す方向へ進化しています。この変化に伴い、LLMを使った機能も拡大・強化しています。

機能の精度を向上させるためのロジックが複雑化していく中で、LLM基盤の長期的なメンテナンス性や、本番データの監視・管理に関する課題が顕在化してきました。この課題に対応するため、Langfuseを本番環境に導入しました。

チーム体制

Contract OneのLLM observabilityは、Contract One Engineering Unitと研究開発部(R&D)の協業によって実現しています。

  • Contract One Engineering Unit: 2名 - アプリケーションへの計装実装
  • R&D(基盤管理): 2名 - Langfuseのホスティング・運用
  • R&D(研究員): 3名 - LLMを使った機能のアルゴリズム設計・検証

アーキテクチャ全体像

全体の構成は以下のようになっています。

  • Contract One(アプリケーション側): Cloud Run上で稼働し、OpenTelemetryでTraceを送信
  • Langfuse基盤(R&D管理): AWS EKS上でセルフホスティング

Kotlinでの実装詳細

OpenTelemetryで計装しLangfuseに送信する際の注意点

Langfuseのデータモデル

LangfuseはOpenTelemetry(OTLP)でトレースを受信できるバックエンドですが、LLM observabilityのために独自のデータモデルを持っています。

Observations Data Model(出典: Langfuse Documentation / MIT License)

LangfuseではTraceが明示的なエンティティとして存在し、以下のような独自の属性を持ちます。

  • input / output: Trace全体の入出力
  • userId / sessionId: ユーザー・セッション識別子
  • tags / metadata: 分類・付加情報

これらはLangfuseのUI上でTrace一覧として表示され、フィルタリングや分析の対象になります。OpenTelemetryのSpanをそのまま送信しても、これらのTraceレベル属性は自動では設定されないため、Langfuse固有の属性を使って明示的に設定する必要があります。

Trace属性の設定

Langfuseはlangfuse.trace.*というプレフィックスを持つ属性を特別に扱います。任意のSpanでこれらの属性を設定すると、Langfuse側でTraceレベルの属性として反映されます。

// どのSpanからでもTraceレベルの属性を設定可能
span.setAttribute("langfuse.trace.input", userQuery)
span.setAttribute("langfuse.trace.tags", listOf("production", "gpt-4"))

OpenTelemetry自体にはLangfuseのTraceに相当する概念がありませんが、この属性マッピングにより、OpenTelemetryのSpanを通じてLangfuse固有のTraceメタデータを操作できます。

Kotlin/JVMでの実装設計

JVM系のSDKとしてlangfuse-javaがありますが、tracingの計装はOpenTelemetryで行うことが推奨されています。そのため今回はlangfuse-java SDKは利用せず、OpenTelemetry SDKで直接計装を行いました。上記のLangfuse固有の仕様を踏まえた設計が必要でした。

TraceレベルをどのSpanに付与するか

Traceレベル属性の付与方法として、Traceの最初と最後にTrace属性設定用の短いSpan(request / response)を明示的に作成する方法を採用しました。

他に2案を検討しましたが、それぞれ以下の理由で採用しませんでした。

  • 既存のSpanに暗黙的に付与する方法: Trace内の処理が複雑な場合に複数の子Spanに属性が付与されてしまい、意図しない動作が起きる可能性がある
  • Root SpanをTrace全体で開いておく方法: Root SpanのdurationがTrace全体の処理時間と一致してしまい、LangfuseのUI上で常に「時間がかかっているSpan」として表示されるため、実際の処理時間を正確に把握しづらい

採用したRequest / Response Spanパターンは以下のようなコードになります。

// Trace開始時: Request Spanでinputやmetadataを設定
tracer.spanBuilder("request")
    .startSpan()
    .use { span ->
        span.setAttribute("langfuse.trace.input", requestBody.toJson())
        span.setAttribute("langfuse.trace.user.id", userId)
        span.setAttribute("langfuse.trace.session.id", sessionId)
        span.setAttribute("langfuse.trace.tags", listOf("contract-extraction"))
    }

// ... 実際の処理(LLM呼び出しなど)...

// Trace終了時: Response Spanでoutputを設定
tracer.spanBuilder("response")
    .startSpan()
    .use { span ->
        span.setAttribute("langfuse.trace.output", response.toJson())
    }

この方法のメリットは以下の通りです。

  • 明示性: どこでTraceレベル属性が設定されるかがコード上で明確
  • UIへの影響が少ない: 専用Spanは一瞬で終わるため、UI上でノイズにならない
  • 責務の分離: 実際の処理を行うSpanと、Traceメタデータを設定するSpanが分離される
Semantic Conventionsの併用

Langfuse固有の属性(langfuse.*)を送信しつつ、加えてOpenTelemetryのSemantic Conventions for Generative AI Systems(gen_ai.*)も送信する構成にしています。

// Langfuse固有属性
span.setAttribute("langfuse.trace.session.id", sessionId)
span.setAttribute("langfuse.trace.user.id", userId)
span.setAttribute("langfuse.trace.tags", listOf("contract-extraction"))
span.setAttribute("langfuse.usage.prompt_tokens", promptTokens)
span.setAttribute("langfuse.usage.completion_tokens", completionTokens)

// GenAI Semantic Conventions
span.setAttribute("gen_ai.system", "openai")
span.setAttribute("gen_ai.request.model", "gpt-4")
span.setAttribute("gen_ai.usage.input_tokens", inputTokens)
span.setAttribute("gen_ai.usage.output_tokens", outputTokens)

Langfuseはトークン使用量の属性としてlangfuse.usage.*とgen_ai.usage.*の両方に対応しており、langfuse.usage.*が優先されます。また、LangfuseにはOpenAIやAnthropicなどの主要プロバイダのモデル単価があらかじめ設定されているため、モデル名を記録するだけで自動的にコストが計算され、機能別・期間別のコストが可視化されます。

Contract OneではAzure OpenAI Serviceを利用しており、同サービスでもコストの確認は可能ですが、deploymentごとの集計になるため、同じdeploymentを複数の機能で共有している場合に機能別のコストを把握できません。また、機能で使うモデルを変更した場合にコストの推移を追うのが難しくなります。LangfuseではTraceにタグを付与することで機能別のコストを集計でき、モデル名はgen_ai.response.modelで記録されるためdeploymentの変更に関係なくモデル単位での分析が可能です。

基盤クラスの実装

LLM呼び出しをOpenTelemetryで計装するために、LangfuseTrace、LangfuseGeneration、LangfuseSpanという3つのクラスを実装しました。

LangfuseTrace

LangfuseTraceはTrace全体を管理するクラスです。前述のRequest / Response Spanパターンを実装しています。

class LangfuseTrace private constructor(
    private val traceContext: TraceContext
) {
    private var outputString: String? = null

    data class TraceContext(
        val span: Span,
        val tracer: Tracer
    )

    companion object {
        fun new(
            operationName: LangfuseOperationName,
            tenantId: String,
            userId: String,
            tracer: Tracer,
            input: String?,
            metadata: Map<String, String> = emptyMap()
        ): LangfuseTrace {
            val current = Span.current()

            // request spanを作成してTrace属性を設定
            tracer.spanBuilder("request")
                .setParent(Context.current().with(current))
                .apply {
                    setAttribute("langfuse.trace.name", operationName.featureName)
                    setAttribute("langfuse.observation.type", LangfuseObservationType.EVENT.lowercase())
                    setAttribute("langfuse.session.id", current.spanContext.traceId)
                    setAttribute("langfuse.user.id", tenantId)
                    setAttribute("langfuse.trace.tags", operationName.featureName)
                    setAttribute("langfuse.trace.metadata.tenant_id", tenantId)
                    setAttribute("langfuse.trace.metadata.user_id", userId)
                    metadata.forEach { (key, value) ->
                        setAttribute("langfuse.trace.metadata.$key", value)
                    }
                    input?.let { setAttribute("langfuse.trace.input", it) }
                }
                .startSpan().end()

            return LangfuseTrace(TraceContext(span = current, tracer = tracer))
        }
    }

    // 子Spanを作成するメソッド
    fun <T> span(name: String, type: LangfuseObservationType, input: Any? = null, block: (LangfuseSpan) -> T): T
    fun <T> generation(name: String, input: Any, modelParameters: Map<String, Any> = emptyMap(), block: (LangfuseGeneration) -> T): T

    fun end() {
        // response spanを作成してTrace outputを記録
        traceContext.tracer.spanBuilder("response")
            .setParent(Context.current().with(traceContext.span))
            .apply {
                setAttribute("langfuse.observation.type", LangfuseObservationType.EVENT.lowercase())
                outputString?.let { setAttribute("langfuse.trace.output", it) }
            }
            .startSpan().end()
    }
}

Traceの開始時にrequest spanを作成してinputやmetadataを設定し、終了時にresponse spanを作成してoutputを記録します。span()とgeneration()メソッドで子Spanを作成できます。

LangfuseGeneration

LangfuseGenerationはLLM呼び出しを記録するクラスです。GenAI Semantic Conventionsに対応した属性を設定します。

class LangfuseGeneration private constructor(
    internal val span: Span
) {
    companion object {
        fun new(
            parentSpan: Span,
            tracer: Tracer,
            name: String,
            input: Any,
            modelParameters: Map<String, Any> = emptyMap(),
            metadata: Map<String, Any> = emptyMap()
        ): LangfuseGeneration {
            val childSpan = tracer.spanBuilder(name)
                .setParent(Context.current().with(parentSpan))
                .setSpanKind(SpanKind.CLIENT)
                .apply {
                    // GenAI Semantic Conventions
                    setAttribute("gen_ai.operation.name", "chat")
                    setAttribute("gen_ai.provider.name", "azure.ai.openai")

                    // Model parametersをGenAI Semantic Conventionsにマッピング
                    modelParameters.forEach { (key, value) ->
                        when (key) {
                            "temperature" -> setAttribute("gen_ai.request.temperature", value.toString().toDoubleOrNull() ?: 0.0)
                            "max_tokens" -> setAttribute("gen_ai.request.max_tokens", value.toString().toLongOrNull() ?: 0L)
                            // ... その他のパラメータ
                        }
                    }

                    setAttribute("langfuse.observation.type", LangfuseObservationType.GENERATION.lowercase())
                    setAttribute("langfuse.observation.input", objectMapper.writeValueAsString(input))
                }
                .startSpan()

            return LangfuseGeneration(span = childSpan)
        }
    }

    fun update(
        output: Any,
        usage: TokenUsage,
        responseId: String,
        requestModel: String,
        responseModel: String,
        finishReasons: List<String>,
        outputType: OutputType
    ) {
        span.setAttribute("langfuse.observation.output", objectMapper.writeValueAsString(output))

        // Token usage(GenAI Semantic Conventions + Langfuse attributes)
        span.setAttribute("gen_ai.usage.input_tokens", usage.promptTokens.toLong())
        span.setAttribute("gen_ai.usage.output_tokens", usage.completionTokens.toLong())

        // Request and Response attributes
        span.setAttribute("gen_ai.request.model", requestModel)
        span.setAttribute("gen_ai.response.id", responseId)
        span.setAttribute("gen_ai.response.model", responseModel)
    }

    data class TokenUsage(val promptTokens: Int, val completionTokens: Int, val totalTokens: Int)
}

update()メソッドでLLMのレスポンス情報を記録します。gen_ai.usage.*属性により、Langfuseがトークン数からコストを自動計算してくれます。

LangfuseSpan

LangfuseSpanはLLM以外の処理(データ取得、前処理など)を記録するクラスです。

class LangfuseSpan private constructor(
    internal val span: Span
) {
    companion object {
        fun new(
            parentSpan: Span,
            tracer: Tracer,
            name: String,
            observationType: LangfuseObservationType = LangfuseObservationType.SPAN,
            input: Any? = null,
            metadata: Map<String, Any> = emptyMap()
        ): LangfuseSpan {
            val childSpan = tracer.spanBuilder(name)
                .setParent(Context.current().with(parentSpan))
                .setSpanKind(SpanKind.INTERNAL)
                .apply {
                    setAttribute("langfuse.observation.type", observationType.lowercase())
                    metadata.forEach { (key, value) ->
                        setAttribute("langfuse.observation.metadata.$key", value.toString())
                    }
                    input?.let {
                        setAttribute("langfuse.observation.input", objectMapper.writeValueAsString(it))
                    }
                }
                .startSpan()

            return LangfuseSpan(span = childSpan)
        }
    }

    fun update(output: Any? = null, statusMessage: String? = null) {
        output?.let {
            span.setAttribute("langfuse.observation.output", objectMapper.writeValueAsString(it))
        }
    }
}

LangfuseGenerationと同様の構造ですが、GenAI Semantic Conventionsは使用せず、Langfuse固有の属性のみを設定します。

利用側のコード

これらの基盤クラスを使った実際のUseCaseでの使い方を紹介します。

UseCase層でのTraceの開始

UseCaseではLangfuseTracingService.trace()で処理全体をラップします。

class CreateAiChatMessageUseCase {
    fun execute(input: Input, userContext: UserContext): Output {
        // LLMを使う処理をTraceでラップ
        val result = LangfuseTracingService.trace(
            operationName = LangfuseOperationName.CHAT_COMPLETION,
            userContext = userContext,
            input = input.userContent,
            metadata = mapOf("feature" to "chat"),
            outputMapper = { result ->
                result.fold(success = { it.content }, failure = { null })
            }
        ) { trace ->
            createAiMessage(trace, input, userContext)
        }
        // ...
    }
}
非LLM処理のspan記録

データ取得などの非LLM処理はtrace.span()で記録します。

val documents = trace.span(
    name = "retrieve-documents",
    type = LangfuseObservationType.RETRIEVER,
    input = mapOf("query" to query)
) { span ->
    val result = documentRepository.findByQuery(query)

    span.update(
        output = mapOf(
            "count" to result.size,
            "ids" to result.map { it.id }
        )
    )
    result
}
LLM呼び出しの計装

Contract Oneでは、LLMクライアント層にtrace.generation()を組み込んでいます。そのため、UseCase層ではtraceを渡すだけでLLM呼び出しの計装が完了します。

// UseCase層: traceを渡すだけで計装が完了
val response = openAIClient.createCompletion(
    request = completionRequest,
    trace = trace
)

LLMクライアント層では、受け取ったtraceを使ってgenerationを記録します。

// LLMクライアント層
fun createCompletion(request: Request, trace: LangfuseTrace): Response {
    return trace.generation(
        name = "openai-completion",
        input = request.messages.map { LangfuseGeneration.LangfuseMessage(it.role, it.content) },
        modelParameters = mapOf("temperature" to request.temperature)
    ) { generation ->
        val response = callOpenAIAPI(request)

        generation.update(
            output = response.choices.map { LangfuseGeneration.LangfuseChoice(it.role, it.content) },
            usage = LangfuseGeneration.TokenUsage(
                promptTokens = response.usage.promptTokens,
                completionTokens = response.usage.completionTokens,
                totalTokens = response.usage.totalTokens
            ),
            responseId = response.id,
            requestModel = response.model,
            responseModel = response.model,
            finishReasons = response.choices.map { it.finishReason },
            outputType = LangfuseGeneration.OutputType.TEXT
        )
        response
    }
}

この設計により、新しいAI機能を追加する際もUseCase層でtraceを渡すだけでobservabilityが担保されます。

評価体制の構築

前回の記事で紹介した内容に加えて、LLM as a Judgeによる自動評価機能やプロンプト管理機能も利用しています。

2025年6月にLangfuseは多くの機能をMITライセンスで公開しました。LLM as a Judge、Annotation Queues、Prompt Experiments、Playgroundなどが対象で、以前は商用ライセンスが必要だった機能もセルフホスト環境で自由に利用できるようになっています。

導入による成果

Langfuse導入によって得られた成果を紹介します。

コスト可視化

機能別にコストが見えるようになり、運用コストの最適化が可能になりました。Contract Oneでは現在6個ほどのAI機能を提供していますが、それぞれにタグを付与することで機能カットでの利用状況やコストを確認できます。どの機能にどれだけのコストがかかっているかが明確になり、開発計画の優先度判断にも活用できています。

アップデート時の評価

同じ機能のモデルを変更した際に、レイテンシやコスト、精度などを横並びで比較できるようになりました。これにより、アップデートの評価やパフォーマンス劣化の検知にかかる運用コストを大幅に削減できています。

検証フローの効率化

これまでは生成がうまくいかないケースがあった場合、エンジニアがログからデータや状況を取得し、それをR&Dに連携し、R&D環境で再現してから検証をスタートするというフローでした。Langfuse導入後は、Langfuse上で状況を把握し、出力されている完全なpromptを使ってすぐに検証を開始できるようになりました。処理が複雑になるほど従来のフローでは運用コストがかさむため、ツールによる効率化の効果は大きいです。

まとめ

本記事では、Contract OneにおけるLLM observabilityの実装と運用について紹介しました。LangfuseとOpenTelemetryを組み合わせることで、Kotlin環境でも効果的なobservabilityを実現できています。OpenTelemetryを使ってLangfuseに計装を行おうとしている方の参考になれば幸いです。

EngineeringとR&Dの協業により、基盤の安定運用と機能開発を両立させています。今後も評価体制の強化を進め、より高品質なAI機能を提供していきます。

Contract Oneでは現在、絶賛採用強化中です!少しでも興味がある方はぜひご連絡ください。

media.sansan-engineering.com

© Sansan, Inc.