Sansan Tech Blog

Sansanのものづくりを支えるメンバーの技術やデザイン、プロダクトマネジメントの情報を発信

Azure Functionsでの大量データ処理とグレースフルシャットダウン(前編)

技術本部Sansan Engieering Unit Data Hubグループの藤原です。普段はプロダクトのアーキテクチャを改善したり、技術的な課題を解決したり、たまにOSSを書いたりコントリビュートしたりしています。

今年はSansan Data Hubの日々の開発や運用で突き当たっている課題をベースに、現在取り組んでいることや、これから取り組みたいことについて紹介していきたいと思います。今回は、Azure Functionsでの大量データ処理をするとき、グレースフルシャットダウン関連で遭遇した問題について、Azure Functionsの内部構造に触れつつ紹介します。一言でいうと、Event Hubトリガーを使っている場合、SDKのバージョンによってはメッセージが処理されないことがあります(後編で説明しますが、Microsoft.Azure.WebJobs.Extensions.EventHubs の v6.1.0 以上を使用する必要があります)。

目次

Azure Functionsとその大まかな構造について

まず、前提として、Azure Functionsとは何かと、その大まかな内部構造について簡単に触れておきます。 Azure FunctionsはいわゆるFaaS(Function as a Service)と呼ばれるサービスです。AWS LambdaやGoogle Cloud Functionsと同様に、何かしらの起動条件(トリガー)に応じて、処理を実行します。その名の通り、それらの処理は「関数(Function)」と呼べる程度の短い処理にすることが多いですが、基本的にはどんな処理でも書けます。その気になればWebサーバーも記述できます。

Azure Functionsの内部構造

さて、その構造ですが、構造を理解するには少し歴史に触れた方がわかりやすいので、歴史を交えて説明します。 Azure Functionsが出た当時、Azureには既にAzure App Serviceというマネージドのアプリケーション実行環境がありました。さらに、Azure Functionsよりも機能が制限されていますが、Azure Web JobsというApp Serviceでジョブ(Webアプリケーションでない処理)を実行するための機能もありました。そして、Azure FunctionsはApp Service上で、Azure Web Jobsの拡張のような形で実装されました。 現在、Azure Functionsは以下のような構造になっています。

ランタイム、トリガー、リスナー

私たちアプリケーション開発者がAzure Functionsの処理を実装するときには、Azure FunctionsのSDKを使用して処理を実装します。具体的には、処理の起動条件であるトリガーとして何を使うのか(キューにメッセージがあるとき、特定の時刻になったとき、等)を関数のパラメーターおよびそのカスタム属性として宣言します。なお、Azure FunctionsランタイムはWeb Jobs SDKのコントラクトのみを参照し、その実装はアプリケーションがDI(Dependency Injection)で設定するようになっており、Azure Functionsの拡張性を担保しています。このあたりの関係性を図示すると以下のようになります。

Azure Functionsの構造

Azure Functionsランタイムは、アプリケーション側で宣言したトリガーに応じたリスナーを作成します。リスナーは、起動条件を満たしているかを監視して、条件を満たしていればアプリケーションの関数を呼び出す役割を持ちます。関数に特定の型のパラメーターを宣言しておくと、リスナーが関数を呼び出すときに、リスナー自身やAzure Functionsランタイムによってパラメーターが提供されます。たとえば、キューのメッセージ、Azure Functionsランタイム経由でログを出力するためのロガー、この後説明するグレースフルシャットダウン通知を受け取るための CancellationToken などです。図にすると以下のような流れです。

Azure Functionsのトリガー実行の流れ

Azure Functionで大量のメッセージを処理するときの選択肢とEventHubについて

非同期メッセージングアーキテクチャ

Data Hubでは、ドメインごとに分割した複数の処理をAzure Functionsで実装し、それらキューで接続してメッセージを渡すことで処理を実行する非同期メッセージングのアーキテクチャを採用しています。これにより、きめ細かなアップデートやスケーリング、万が一の障害時の部分的な再実行によるリカバリを実行しています(流行りのマイクロサービスアーキテクチャ、に見えるかもしれませんが、キューベースの非同期処理による耐障害性やスケーラビリティの確保はWindowsやUnix以前から採用されている伝統的なアーキテクチャだったりします)。

非同期メッセージングを行うためのキューにはさまざまな選択肢がありますが、Data Hubでは主にフルマネージドのメッセージングサービスであるAzure Service Busを使用しています。組み込みのリトライ機能やPub Subのサポートがあることや、Azure PortalをはじめとしたツールセットやManaged Identityサポートなど、Azure組み込みであるメリットも多いことが理由です。また、Azure FunctionsでもService Bus Triggerをはじめとしたサポートがあります。

Azure Event Hubs

しかし、Azure Service Busは基本的にメッセージを1件ずつ処理するためのサービスです。そのため、大量(たとえば月間100億)のメッセージを処理する場合、メッセージ単位での処理を行うことによるオーバーヘッドが避けられません。このような大量メッセージを一気にさばくことに特化したメッセージ処理サービスとして、AzureにはAzure Event Hubs(以下Event Hubs)というサービスがあります。実際に、Azure内でのログの転送や、IoTでのテレメトリデータの処理などではEvent Hubsが使われています。Event Hubsは、以下に示すアーキテクチャにすることで、大量のデータを高速に処理できるようにしています(詳細は公式ドキュメントを参照)。

  • 入出力をパーティション化する。これにより、内部処理の並列性を高めています。
  • メッセージ(正確にはイベントと言いますが、ここではメッセージで統一します)の処理状態をサーバーではなくクライアント側に持たせ、サーバー側は単なる一連のイベントストリームとして管理しています。これによって、状態管理をクライアント側にオフロードできるので、複数のサブスクライバーがいる場合のスケーラビリティが高まります。

このように、Event Hubs自身は個々のメッセージを管理しないので、Event Hubsから(バルクで)取得したメッセージをどこまで処理したのかを記憶し、次の取得時にその位置を指定するのはアプリケーション側の役割になります。このどこまで処理したのか(どこから取得するのか)を表す位置情報を「チェックポイント」といいます。チェックポイントの管理はアプリケーション側の責任なので、チェックポイントの永続化もアプリケーションが行わなければなりません。とはいえ、チェックポイントを使用した取得やチェックポイントの永続化はEvent HubsのSDKとして実装が提供されています。そのため、実際のところ、実装の負荷はそれほど高くありません(動作を理解していないと障害時の対応が難しくなるため、理解しておく必要はあります)。また、Azure FunctionsのEvent Hubsトリガーにはチェックポイント処理を行う機構も用意されており、Azure FunctionsでEvent Hubsを使う場合に明示的にチェックポイント処理を実装する必要はありません。

グレースフルシャットダウンについて

今回話題となるグレースフルシャットダウンについても触れておきます。

グレースフルシャットダウンとは

Azure Functionsの実行基盤となっているApp ServiceはマネージドPaaSであるため、VM(利用者からは一部のデバッグ用の機能を除き隠蔽されています)のメンテナンスはAzure運営側によって実施されます。そのため、Windows Updateやランタイムのアップデートなどの理由により、プロセスがシャットダウンされることがあります。また、専用のApp Service Planで動かしている場合やFlexible Planと言った従量課金制の場合でも、負荷が減ったときにApp Service Planをスケールインすると、減らされるApp Service Planインスタンス上のプロセスはシャットダウンされます。アプリケーションコードの更新でデプロイを行う、環境変数(アプリ設定)を変更するなどの行為によっても、プロセスが再起動するため、シャットダウンが発生します。

このような場合に、プロセスが終了されようとしていることをアプリケーションから検知できると、適切なクリーンアップ処理を実行できて便利ですよね……というより、ないと安定した運用が非常に困難になります。グレースフルシャットダウンとは、このような場合にアプリケーション側でシャットダウンされようとしていることを検知し、クリーンアップを行うことや、そのための仕組みのことです。

Azure Functionsにおけるグレースフルシャットダウン

Azure Functionsランタイム(正確にはそれが利用している ASP.NET Core)では、プロセスの終了通知(Windows版のApp ServiceであればIISからのワーカープロセスシャットダウン通知や構成変更通知)をフックし、関数のパラメーターとしてアプリケーション側が受け取り可能な CancellationToken の状態をキャンセル済にすることでグレースフルシャットダウンを実現可能にしています。

CancellationToken の状態が「キャンセル済」になった場合、アプリケーションや依存先のクラスライブラリのコードでは、ループ処理を完了(CancellationToken.IsCancellationRequested をループの終了条件に組み込んでおく)したり、TaskCancelledException をスローしたりして処理を中断することで、(通常 finally 句に指定してある)クリーンアップコードが実行されるようにします。アプリケーションが例外時のクリーンアップと処理の冪等性を正しく実装しているならば、プロセスが再び起動して処理が再実行されることで、問題なく処理が再開されるはずです。

先ほど説明したように、Azure Functionsでは関数のパラメーターとして CancellationToken を受け取れます。この CancellationToken こそが、今説明したグレースフルシャットダウンを実現するためのものです。つまり、関数で CancellationToken を受け取るようにしておけば、それを使用してグレースフルシャットダウンを実装できるということです。

Azure Functionsでのグレースフルシャットダウンの流れ

EventHubTriggerとリトライ

さて、システムの可用性は構成要素の数が多くなるほど落ちていきます。これはクラウドであろうがオンプレミスであろうが変わらず、確率的に故障します。さらに、前述のようにAzure App Serviceではプラットフォーム側のメンテナンスやデプロイなどによりプロセスがシャットダウンされることもあります。このような一時的な障害がすべてシステムの故障につながっていては可用性を確保できません。そのため、システム、特に非同期メッセージングアーキテクチャを始めとした分散システムでは、処理の冪等性の確保と、リトライの実装が必要になります。そのため、Azure Functionsにもリトライの仕組みが実装されています。

Azure Functionsにおけるリトライ

この後の話題にも関わるので、Azure Functionsのリトライについては少し深く触れておきます(Azure Functionsランタイムv4の2024/1初頭の実装を前提とします)。

先ほど述べたAzure Functionsのリスナーは、基本的に各Azureサービス向けのSDKを活用し、Azure Functionsのリスナーというコントラクトに合わせて調整するような形で実装されています。Event Hubs向けのリスナー(EventHubListener)はEvent HubsのSDKを利用しています。

Event HubsのSDKは、Event Hubsとの通信のリトライ機能を実装しています。ところが、受信した後、アプリケーションの実装がリトライを行うかどうかは関与しません。Event Hubs SDKはEvent Hubsとの通信や、先ほど述べたチェックポイント処理を行うためのSDKであり、アプリケーションフレームワークではないためです。

とはいえ、アプリケーションの実装としては、出力先のデータベースが一時的に故障している場合など、リトライを行うことが多くあります。このようなケースをフォローするために、Azure Functionsランタイムには組み込みのリトライ機能があります。Azure Functionsランタイムは、呼び出し先の関数のメタデータを(皆さんの予想通りリフレクションを使用して)調べ、リトライ属性(たとえば [ExponentialBackoffRetry] カスタム属性)が付与されている場合にリトライを行います。具体的には、FunctionExecutorExtensions.TryExecute() メソッド において、リトライ属性の内容に応じたリトライを行います。ここでは、関数が例外をスローしたならば、次のリトライまでの間隔として指定された時間で待機した後に処理をやり直すようなリトライループが実装されています。

Azure FunctionsのリトライループとEvent Hubリスナーのチェックポイント処理

Azure Functionsのリトライループが終了する条件は2つあります。まず、当然ながら処理が正常終了した場合です。処理が正常終了したならば、Event Hubsリスナーはチェックポイント処理を行い、次のメッセージバッチを処理します。そして、グレースフルシャットダウンが行われている場合です。この場合、リトライループを終了し、Azure Functionsランタイム自身やSDKのクリーンアップを行わなければなりません。

グレースフルシャットダウンが行われているケースでは、チェックポイント処理を行うべきではありません。グレースフルシャットダウンが発生すると、アプリケーションは処理の途中でキャンセルされます。ACIDトランザクションを実行していたならばロールバックされるでしょう。そのため、チェックポイント処理を行ってしまうと、アプリケーションが実際には処理を実行していないのにメッセージが処理済みとされてしまうと、システム全体としてはメッセージがなくなったかのような結果になってしまいます。

そして、チェックポイント処理を行うかどうかの判定はEvent Hubリスナーの役割です。Event Hubリスナーは、Azure Functionsランタイムによる関数実行の結果に応じて、チェックポイントを更新します。具体的には、関数実行が完了した後、自身が渡した CancellationToken の状態を調べ、キャンセル状態でないときにのみチェックポイント処理を行います。

まとめると、Event Hubsトリガーを使う場合、関数にリトライ属性を付けておくことで、関数の実行が失敗した時にAzure Functionsが関数実行をリトライするようにできます。ただし、グレースフルシャットダウンによる失敗のときにはリトライせずに終了します。

グレースフルシャットダウン発生時のチェックポイント処理のバグ

実はこのリトライとチェックポイント処理の組み合わせについて、以前、具体的には2023年の夏ごろまではバグがありました。それは、Event Hubsリスナーの実装が、グレースフルシャットダウンによるリトライループ中断の可能性を考慮していなかったというものです(このPRで修正されています)。つまり、Event Hubsリスナーは、「呼び出し先の FunctionExecutorExtensions.TryExecute はリトライが終わってからアプリケーションが処理を正常終了されたときのみ制御を返すはず」という前提を置き、制御が返ってきたらチェックポイント処理を実行していました。先ほど述べた「自身が渡した CancellationToken の状態を調べ」ることをしていなかったのです。しかし、実際のところ、前述のようにリトライ処理はグレースフルシャットダウンが発生した際には中断されます。そのため、グレースフルシャットダウンのためにアプリケーションによるメッセージの処理が中断されていた場合、その未処理のメッセージに対するチェックポイント処理が走る可能性があったのです。

Event Hubsトリガーと問題

アプリケーション実行がキャンセルされたにもかかわらずチェックポイントが進むと、キャンセルされたときに処理されていたメッセージが実際には処理されずスキップされた状態になります。アプリケーションから見ると、Event Hubsからメッセージを受信できなかった、いわゆるメッセージロスト状態になります。

Data Hubは頻繁にスケーリングやデプロイを行うことから、この問題に遭遇していました。幸いにも、前述のようにリトライ可能な構造にしていたため大きな問題とはなりませんでしたが、リカバリによりデータの反映が遅延していました。そのため、グレースフルシャットダウンを考慮したEvent Hubリスナーのパッチを適用することにしました。

さて、ライブラリやSDKは可能であれば最新版を使用するのが定石です。まだ遭遇していない未知の不具合が解消されているかもしれませんし、セキュリティ上の脆弱性の修正は最新版にしか提供されない可能性があるためです。最新版を適用し、リグレッションテストを行い、問題がないことを確認しました。さらに、当該SDKのパッチをアーキテクトがレビューし、修正前にはグレースフルシャットダウン時のメッセージロストの可能性があること、パッチによって解決することを確認しました。これで、処理が安定して実行できるようになった……はずでした(後編に続く)。 buildersbox.corp-sansan.com

仲間を募集中です!

このBlogを書いていたら、Copilotが有名なAzure Function大好きな方の名前をサジェストしてきました。

同じようにAzure Functionsをどっぷりやりたい方、LambdaやCloud Functionsを使っているけどAzure Functionsもやってみたい方、Azure FunctionsランタイムやSDKや.NETランタイムを深く理解し何ならプルリクエストを送りたい/送ったことのある方、ぜひご連絡ください。一緒に大量データ処理システムを支えていきましょう!



20240312182329
20240315190344

© Sansan, Inc.