技術本部 Mobile Applicationグループに所属する北村です。SansanとEightの両プロダクトをまたぐプロダクト横断チームの一員として、モバイル領域の中長期的な技術課題の解決や、PoCの開発を担当しています。
今回は昨年9月にリリースした、Eightのタッチ名刺交換機能の開発時に遭遇したマルチスレッドで発生する問題に関連して、Kotlin Coroutinesを使った際にその問題を回避する方法を紹介します。
タッチ名刺交換機能はBluetoothの機能を使って実現していますが、複数端末と同時にタッチが成立しないよう、マルチスレッド環境で発生する問題を解決することが重要でした。
マルチスレッド環境で発生する問題とは、スレッドセーフでないクラスを複数のスレッドから使用することで発生するさまざまなバグを指します。
スレッドセーフであるとは、オブジェクトがランタイムのスレッドスケジューリングや実行タイミングに関係なく、呼び出し元で同期を行わなくてもマルチスレッド環境で意図したとおりに動作することを意味します。
この記事では、並行処理プログラミングの本質と、Kotlin Coroutinesを用いた際に並行処理プログラミングの落とし穴を回避する方法を紹介します。
並行処理プログラミングの本質は「データを守る」
スレッドやロックなどのコードは並行処理プログラミングの本質ではありません。並行処理プログラミングの本質はデータをいかに守るかという事です。データを無秩序なアクセスから守るためにロックなどのコードが必要となります。
まず、守るべきデータがどのようなデータなのかを理解し、スレッドセーフではない事によって、そのデータにどのようなリスクがあるのかを把握することが重要です。そして、そのリスクからどのように守るかを理解する必要があります。
1つのプログラムにはさまざまなデータが存在しますが、すべてのデータをスレッドセーフにする必要はありません。実際に、保護すべきデータはそれほど多くありません。
保護すべきデータは、次の2つの条件を満たすデータです。
保護すべきデータ
条件1. 複数のスレッドから参照される
条件2. そのうちのいずれかのスレッドから更新される
この2つの条件を満たすデータは、ロックなどの手段で保護しないとデータの破壊や記述したコード通りに動作しないバグが発生します。逆に言えば、この2つの条件を満たさないデータは、保護する必要がありません。
スレッドセーフではない事が原因でトラブルが発生するケース
アトミック性の喪失
ここで言うアトミック性とは、データを読み込み、何らかの処理を行い、データを更新するという一連の処理を単一不可分(アトミック)に行う性質のことを指します。
アトミック性がない状態で2つのスレッドから同時に処理を行うと、処理の競合状態(レースコンディション)が発生し、値の更新が喪失してしまうことがあります。その結果、データの整合性が保たれない状態になることがあります。
具体的な例を見ていきましょう。
class Counter { var count = 0 private set // countUp()はスレッドセーフではない suspend fun countUp() { count++ // このコードがアトミックではない } } // Counterを実行するコード fun main() = runBlocking { val n = 10000 val counter = Counter() val jobs = List(2) { GlobalScope.launch { repeat(n) { counter.countUp() } } } jobs.forEach { it.join() } println("Counter = ${counter.count}") // 20000を期待するが15513が出力される }
このコードのcount++は1つの処理のように見えますが、実際には次の3つのステップで構成されています。
- countの現在の値を取得する
- その値に1を足す
- 足した値でcountを更新する
このように、データを読み込み、処理を行い、データを更新する一連の処理はアトミックではないため、2つのスレッドから同時に実行されると、次の図のように更新の喪失が発生することがあります。
このようなレースコンディションは、後述するロックやAtomicIntegerなどのアトミックな更新が可能なクラスを用いて解決します。
これはスレッドセーフでないオブジェクトをマルチスレッドから扱う際によく発生する問題なので、注意が必要です。
メモリの可視性の問題
Java仮想マシンを含む一般的なメモリモデルでは、CPUのキャッシュやレジスタなどスレッドの作業メモリとメインメモリを別々に持つ都合上、あるスレッドで行った変数への書き込みが、別スレッドから見えることは保証されません。
すると、明らかに片方のスレッドでデータを更新したにもかかわらず、別のスレッドからその更新が見えないという直感と反した奇妙なバグに遭遇することがあります。
具体的には、次のコードは意図した動作をしません。フラグがtrueになるのをスレッドAが無限ループで待ち、別のスレッド(スレッドB)からフラグをtrueにしますが、その変更がスレッドAに見えることは保証されていません。つまり、無限ループから抜ける保証がなく、別スレッドからsetReadyを実行しても、実際には無限ループが継続します。
class WaitTest { var count: Int = 0 // 可視性が保証されない var ready: Boolean = false // スレッドAから実行 fun waitForReady(): Int { // 他のスレッドから更新した値が見えない場合に無限ループする while (!ready) { // NOP } return count } // スレッドBから実行するが、readyをtrueにした変更がスレッドAから見えず無限ループが継続する fun setReady() { count = 100 ready = true } }
また、恐ろしいことに、ready()関数内で実行するcountとreadyの実行順も最適化により順番が保証されないため、readyがtrueになっても、スレッドAから実行したwaitForReady()の戻り値が0のままとなるケースもあり得ます。
このような問題は、volatile変数を用いてメモリの同期を強制するか、ロックを用いることで解決します。ロックの取得や解放時にはvolatileと同じくメモリバリアが挿入され、メモリの可視性による問題は解決します。メモリバリアとは、メモリ操作の順序を保証し、プログラムが意図した通りに動作するようにするための仕組みです。
なお、volatile変数はメモリの可視性のみを解決するので、レースコンディションは解決しません。メモリの可視性のみが問題となるケースは少ないため、volatile変数を使いたくなったときは、本当にメモリの可視性だけの問題か慎重に確認しましょう。Kotlinでvolatile変数にするには、@Volatileアノテーションをプロパティに設定します。
class WaitTest { // 可視性が保証される @Volatile var ready: Boolean = false // 略
アトミック性を確保しスレッドセーフにする方法
スレッドセーフではないことで発生する主な問題を確認したところで、それらをどのように解決するかを見ていきましょう。それぞれの解決方法には適用可能なケースやメリット・デメリットがありますので、どの解決方法が最適かを検討して適用しましょう。
不変
不変にすることで、データを更新させないことを強制できるため、そのオブジェクトをスレッドセーフにできます。不変なクラスを利用するコードを増やすことで、結果として可変データを扱うコードの割合が減り、可変データの保持責務が凝集され、管理がしやすくなります。クラスにvarで可変変数を追加する際は、それが本当に必要か、可変データを別の場所で持ち、その保持責務を凝集できないかを検討しましょう。
同期化
データへのアクセスを同期化し、実行を直列化するにはロックを用います。Kotlin CoroutinesではMutexを使用します。Mutexのロック取得は中断可能であるため、ロック取得待ちでもスレッドをブロックしません。
class Counter { // Kotlin CoroutinesのロックはMutexを使う private val mutex = Mutex() var count = 0 private set // countUp()はスレッドセーフ suspend fun countUp() { mutex.withLock { count++ } } } // 1万回を2スレッドから実行すると期待通り20000となる
シンプルなコードでは非常に簡単ですが、データの取得や更新が複雑になると、「取りあえずMutexでロックしたから安全」とはならないケースがあるので注意が必要です。
どのデータを無秩序なアクセスから保護する必要があるのか、どのデータがどこで取得されてどこで更新されているのかを丁寧に精査する必要があります。また、ロックの取得には一定のコストがかかり、同時に実行できるスレッドを1つに限定します。その結果、パフォーマンスの低下が予想されるため、パフォーマンス要件が重要な場合はパフォーマンスの低下が最低限となるような設計とパフォーマンスの確認が必要です。
※ 同期化の注意点:複数のロックの依存関係によるデッドロックに注意が必要
ロックによる同期化は非常に便利ですが、複数のロックを扱うときにはデッドロックの回避が重要です。たとえば、ロックAとロックBの2つのロックがある場合、ロックAを取得した後にロックBを取得してデータを更新する処理と、ロックBを取得した後にロックAを取得してデータを更新する処理を同時に実行すると、互いに2つ目のロックを取得しようとして待ち続け、デッドロックが発生します。
複数のロックを取得して実行する処理がある場合は、ロックを取得する順番を統一するなど、デッドロックを回避する工夫が必要です。
ノンブロッキングなアルゴリズムを用いたライブラリの利用
Javaには従来のブロッキング同期化コレクションクラスと、CPUのCAS(Compare-and-Swap)命令で実現するLock-freeとWait-freeアルゴリズムを用いた並行コレクションクラスがあります。Lock-freeとWait-freeアルゴリズムを用いた並行コレクションクラスは内部でロックを使用しておらず、パフォーマンスの低下を最小限に抑えながらスレッドセーフを実現できます。これらのクラスはKotlinからも使用でき、Kotlin Coroutinesにも有効ですので、場合によっては選択肢に入れましょう。
1つのスレッドからアクセスを強制
データへのアクセスを複数のスレッドから無秩序に行うことで、さまざまな問題が発生します。そこで、データへのアクセスを1つのスレッドから行うようにすることで、スレッドセーフではない事による問題を回避できます。
Coroutineでは、withContext(Dispatchers.Main)を使用して必ずメインスレッドで実行するようにできます。また、シングルスレッドで動作するCoroutineDispatcherを作り利用するのも良いアイデアです。ただし、Coroutineは中断可能なので、中断する可能性があるsuspend関数を実行すると、中断によってアトミック性が崩れる可能性があります。具体的には、次のコードはレースコンディションが発生する可能性があります。
interface Api { suspend fun getAddValue(): Int } suspend fun unsafeAddValue() { // ① 1つのスレッドからアクセスを強制しているつもり withContext(Dispatchers.Main) { val nowValue = state.value // ② suspend関数を実行 val addValue = api.getAddValue() // ③ 状態を更新 val nextValue = nowValue + addValue state.value = nextValue } }
①でメインスレッドを指定しているため、withContextの中は1つのスレッドで実行されますが、②でsuspend関数を実行しています。suspend関数は中断可能であるため、中断中に①が別スレッドから実行される可能性があります。すると、複数の処理が②に同時に到達し、アトミック性が失われます。
まとめ
スレッドセーフではない事により発生するさまざまな問題はデバッグが困難な事が多いです。そのため、通常はそれらの問題に遭遇しないアーキテクチャを設計します。このようなアーキテクチャを利用する場合、個々の画面の実装などではマルチスレッドで発生する問題を意識する必要はあまりありませんが、自分でアーキテクチャを設計する場合は、マルチスレッドで発生する問題がないように注意して設計する必要があります。
今回は、その際にどのような部分に気をつける必要があるかを解説しました。おさらいですが、気をつけるべきポイントを一言で言うと、「複数スレッドで共有される可変なデータはロックする必要がある」ということです。Kotlin Coroutinesを扱う際も、どのデータが複数スレッドで共有される可変なデータかを注意深く設計・実装しましょう。
そして、複数スレッドで共有される可変なデータを見つけたら、データの読み込み、そのデータを使った処理、データの更新がアトミックに行われるようにロックをかけるようにしましょう。ロックをかける際、Kotlin Coroutinesの場合はMutexを使います。
最後に
Sansanの技術本部では、一緒にSansan / Eightのモバイルアプリを開発する仲間を募集中です。
選考評価無しで現場のエンジニアのリアルな声が聞けるカジュアル面談もあるので、ご興味ありましたらぜひ面談だけでもお越しいただけたら幸いです。