今年も残すところあとわずか、急いでふるさと納税の返礼品を選んでいる方も多い今日この頃ですが、皆さんはいかがお過ごしでしょうか。
どうも、技術本部 Contract One Devグループの原です。
今年のContract One Devグループは、大きめのプロジェクトとして「契約書の検索をPostgreSQLからElasticsearchを使ったものに刷新する」というものがありました。 これに関する詳しい話は弊社の川崎が別の記事を書いていますので、ぜひ読んでみてください。
検索のパフォーマンスは上がったものの、Elasticsearchを導入する際には、運用のための仕組みをいろいろと作り込まなければいけません。 今回はその中から、インデックスの再構築についてContract One Devグループが行なっている方法をご紹介します。
インデックスの再構築とは
Elasticsearchはインデックスのマッピング定義を後から変更することが難しく、フィールドの追加や一部のパラメータ類を修正できますが、既存のフィールド定義を変更したり削除したりはできません。*1
また、全文検索に使用するアナライザーなども後から変更できません。パフォーマンスに関係してくるシャード数も後から変更できません。
PostgreSQLなどのRDBMSと比べると不便ですが、実際にサービスを運用していると「新しい機能のためにマッピングの定義を変更したい」「検索結果の質を改善するためにアナライザーを変更したい」などの要求はたびたび発生します。
そのため、Elasticsearchを運用している現場では新しいマッピング定義でインデックスを作成してそこにデータをすべて移行し、利用するインデックスを新しいほうに切り替えるという方法をとります。
これを「インデックスの再構築」または「再インデックス」などと呼んだりもします。
ただ、実際にこれを実現しようとすると、「データの移行はどうやるのか」「作業中はサービスを停止させるのか」「サービス無停止で行うならデータの整合性はどうやって担保し続けるのか」「新しいインデックスへの切り替えはどうやるのか」「作業中のクラスタのパフォーマンスは大丈夫か」などなど、いろいろと考えることは多いです。
また、どのような仕組みを作るべきかは、自分たちが運営しているサービスの要件や開発チームのスキル・文化なども考慮する必要があるため、運用しているインデックス再構築の仕組みは各社でさまざまです。
Contract Oneにおけるインデックス再構築の要件
ユーザーがあまりアクセスしない時間帯はあれど、基本的にContract Oneは24時間365日利用できるサービスです。サービスをメンテナンスモードにするにも計画と実施にコストがかかります。
また、契約書の検索はContract Oneにおける重要な機能であり、今後もさまざまな改修が入る見込みです。そのためマッピング定義もさまざまな変更が入ると思いますが、マッピング定義の変更内容によってインデックス再構築の方法を変えるのは手間ですし、再構築の仕組みを複数持つとメンテナンスコストや認知コストもかかります。よって次の要件を満たせるよう検討しました。
- サービス無停止でインデックスの再構築ができること
- 1つの仕組みだけで様々なマッピング定義変更に対応できること
検討したインデックス再構築の方法
設定した要件を満たせるよう、どのような方法があるかを考えました。
3つほど検討した案が出ましたが、それについて説明します。
1. Reindex APIを使う
Reindex APIを利用すると、とあるインデックスのデータを別のインデックスにコピーできます。その名の通りインデックスの再構築に特化したAPIで、負荷軽減のためのスロットリングや並列処理のためのスライスなど多機能です。また、スクリプト機能を備えており、フィールドの軽微な変更であればデータの前処理をしつつデータ移行することが可能です。
ただし、次のような懸念もあります。
- 無停止で切り替えるには、コピーの開始から終了の間に更新があったデータを移行先インデックスに反映する必要がある
- フィールドの追加・変更などでスクリプト機能を使う場合、変更の規模によってはスクリプト作成が面倒でテストもしにくい
- インデックスの_sourceに無いデータを追加する場合などは、マイグレーション用バッチを別途作成して移行先インデックスに追加データを流し込む必要がある
スクリプト作成やバッチ作成に関してはその都度作成する必要があり、その分コストがかかります。また、フィールド定義に大幅な変更がある場合は結局マイグレーション用バッチを別途作成する必要があり、そのぶんシンプルさは欠けます。
2. アプリケーションにて、新旧2つのリポジトリを使いつつ再構築する
Contract Oneではドメイン駆動設計(DDD)をベースとしたアプリアーキテクチャを採用しており、Elasticsearchのデータ管理においても専用のリポジトリを作ります。そこで、移行元インデックスを取り扱う旧リポジトリと移行先インデックスを取り扱う新リポジトリの2つを実装し、これらを用いて段階的に移行するというものです。
ざっくりと処理手順を説明すると次のような流れになります。
- 移行先インデックスの作成と新リポジトリをリリースし、書き込み処理だけを新旧両方で実施する(読み込み処理は旧リポジトリから行う)
- アクセスされていない非アクティブなものも含む全データを、バッチ処理などで移行先インデックスにすべて同期する
- 読み込み処理も新リポジトリから行うように切り替える
- 旧リポジトリの書き込みを停止させた後、移行元インデックスと旧リポジトリを撤去する
といった具合で、徐々に移行先インデックスと新リポジトリを使うようにマイグレーションするといった感じです。
これであれば無停止で切り替えられますし、Reindex APIでは難しかったフィールド定義の大幅な変更にも対応できます。
ただ、この方法ですと最低でも2回リリースが必要になります(1と3のとき)。また、新旧両方のリポジトリをアプリケーション内で取り扱えるように実装する必要があり、複数のユースケースでリポジトリを利用するため新旧切り替え処理の実装も複数箇所に施す必要があります。毎回のインデックス再構築でこのような実装と複数回リリースを行うのはちょっと手間が多く、もう少しシンプルにできる方法はないかと考えました。
3. 移行先のインデックスのデータを常に最新状態に保つジョブを動かしながら、新しいアプリケーションをリリースする
これが今回採用した方式になります。
契約書の検索に使うインデックスはPostgreSQL上の契約書データを元にして構築されています。
そのため、移行先のインデックスへPostgreSQL上の契約書データをすべて同期した後、契約書データの更新があり次第移行先のインデックスにそれを反映し続けることができれば、移行先のインデックスのデータは常に最新状態を保つことができます。
その状態を保ったまま、移行先のインデックスを利用する新しいアプリケーションをリリースすればインデックスの再構築ができるはずです。
また、これが実現できると次のようなメリットが得られるはずです。
- サービス無停止で作業ができる
- フィールド定義の大幅な変更などにも対応できる
- 移行先のインデックスのデータを常に最新状態に保つジョブを一度作成してしまえば、インデックス再構築に必要な実装を毎回揃える必要がないので作業コストが下げられる
これなら今回定めた要件を満たせそうなため、この方式を実現させることにしました。
採用したインデックス再構築の詳細
今回採用した方法における、移行先のインデックスのデータを常に最新状態に保つジョブ(以下、「インデックス再構築ジョブ」と呼ぶ)を使った大まかな流れは次のようになります。
- 移行先のインデックスを作成
- PostgreSQL上のすべての契約書データを移行先のインデックスに移行する
- すべての契約書データを移行する最中に更新された契約書データを移行先のインデックスに再度同期させる
- 契約書データの更新をリアルタイムで監視しつつ移行先のインデックスに同期させ続ける
- 移行先のインデックスを利用する新しいアプリケーションをリリースする(作業者が行う)
- ジョブを終了する(作業者が終了させる)
各工程についての説明をしていきます。
移行先のインデックスを作成
まずは移行先のインデックスを作成します。この時点での移行先のインデックスではレプリカ数を0にしています。理由としては、インデックスの再構築中にレプリカが存在すると、レプリカへのデータ書き込みも行う必要がある分クラスタに対して余計に負荷がかかるためです。
インデックスの再構築中はアプリケーションが移行先のインデックスを利用することはありません。そのため、再構築中に障害が発生して移行先のインデックスが壊れても再構築作業が失敗するだけで、サービス稼働にはほとんど問題がないためレプリカ数を0にしています。
PostgreSQL上のすべての契約書データを移行先のインデックスに移行する
この時に注意しなければならないのは、契約書データの数は大量にあるため、それらを一気に移行先インデックスに入れようとするとインデキシング処理が大量に走りクラスタにかなりの負荷がかかってしまう点です。同じクラスタ内にある移行元のインデックスは契約書の検索に利用されているため、クラスタが高負荷状態だとサービス稼働に影響が出てしまいます。
そのため、今回の実装ではデータ移行処理の並列数とチャンク数を自由に変更できるようにしています。そしてパフォーマンステストを行った後、サービスの性質的に許容できる負荷量としてCPU使用率50%程度を一旦設定し、その範囲に収まるように調整しています。
これで、現在ですとだいたい2時間くらいで処理が完了しています。今後のサービス成長を加味してもしばらくの間は許容範囲に収まると判断しました。
また、サービス無停止でやっているためこの工程を行っている最中でも契約書データの更新は行われており、それらは移行元インデックスにも同期されていきます。つまり、この工程が完了したとしても移行先のインデックスにある契約書データは一部が古い状態となってしまいます。
ですので、これ以降の工程でその差分を埋めて、移行先のインデックスのデータを常に最新状態に保っていきます。
すべての契約書データを移行する最中に更新された契約書データを移行先のインデックスに再度同期させる
インデックス内にある契約書データのドキュメントにはtimestamp
というフィールドがあり、ドキュメントが更新されるとこのフィールドも現在日時で更新されます。
これを使って、インデックス再構築ジョブが開始された時間以降に更新された契約書データを移行元インデックスから検索します。
そして、それら契約書データの最新状態をPostgreSQLから再度取得して移行先インデックスに同期します。
もう少し具体的に処理内容を説明すると次のようになります。
- 最新のデータを検索できるように移行元インデックスをrefreshする
- 移行元インデックスにてPITを開始する
- search_afterを使いつつ、
timestamp
がインデックス再構築ジョブの開始時間以降になっている契約書データを移行元インデックスからすべて検索して取得する - PITを終了する
- 検索して取得した契約書データの最新状態をPostgreSQLから取得して移行先インデックスに同期する。
また、検索結果のうちtimestamp
が一番新しいものをピックアップし、そのtimestamp
を "最後の更新日時" として保存して後続の工程で利用します。
契約書データの更新をリアルタイムで監視しつつ移行先のインデックスに同期させ続ける
基本的には、前の工程である「すべての契約書データを移行する最中に更新された契約書データを移行先のインデックスに再度同期させる」とやることはほぼ一緒ですが、違いとしてはインデックス再構築ジョブが開始された時間以降で検索するのではなく、前工程で保存した "最後の更新日時" 以降で検索することです。
また、検索結果のうちtimestamp
が一番新しいものをピックアップし、そのtimestamp
を "最後の更新日時" として再度保存して次の検索に利用します。
この処理を適当な間隔を空けながらずっとループさせることにより、更新があった契約書データを逐一監視してピックアップすることができます。
結果的に、移行元インデックスで更新が発生すると移行先インデックスも更新されるため、移行先インデックスのデータを常に最新状態に保つことができます。
移行先のインデックスを利用する新しいアプリケーションをリリースする
これにより移行元のインデックスが完全に使われなくなり、移行先のインデックスだけが使われるようになります。このとき少しの間だけですが、インデックス再構築ジョブと新しいアプリケーションとで同じ契約書を更新しようとして競合する可能性があります。Contract Oneではインデックス内の契約書データの更新時に_primary_termと_seq_noを使った楽観ロックを行ってデータの整合性を担保しています。今回においても、もし競合が発生した場合はインデックス再構築ジョブ側の更新をキャンセルします。
ジョブを終了する
ただ単にインデックス再構築ジョブのプロセスを終了するのではなく、次のような後処理をしてから終了します。
- 作業中に解約した企業がないかチェック
- 作業中に解約した企業の契約書データを移行先インデックスから削除する
- 移行先インデックスのレプリカ数を上げておく
Contract Oneではサービス契約中は契約書データを物理削除することはありませんが、サービス解約時には解約企業の契約書データはすべて物理削除されます。インデックス再構築中に解約処理が発生して移行先インデックスに解約企業の契約書データが残留してしまわないようにここでケアしておきます。
それが終わったら、クラスタの障害に耐えられるよう移行先インデックスのレプリカ数を上げておきます。
そして作業者は、動作確認が終わった後に移行元インデックスを削除します。
実際に利用してみて
ここまでインデックス再構築方法の詳細をいろいろ説明してきましたが、利用する側からしてみると手順はシンプルです。
- 新しいマッピング定義のインデックスを利用するようアプリケーションを改修する
- インデックス再構築ジョブだけを先にリリースして実行する
- 移行先インデックスのデータが最新状態になったら新しいアプリケーションをリリースする
- インデックス再構築ジョブを終了させて移行元インデックスを削除する
Contract OneではPostgreSQL上の契約書データをElasticsearchのインデックスに同期させる実装が共通化されており、インデックス再構築ジョブでもそれを流用しています。
結局、新しいマッピング定義を使うにはその実装も改修する必要があるため、結果的にインデックス再構築ジョブは改修せずそのまま使い回し続けることができます。これで、フィールド定義に大幅な変更がある場合でも作業コストを抑えることができています。
実際の作業時間も2時間ほどで終わり、クラスタへの負荷もCPU使用率が50%以下くらい、メモリもそこまで消費はせず、まだ使い始めたばかりですが今のところ問題は起きていません。
現状のデメリット
インデックスの再構築が終わって新しいアプリケーションもリリースした後、もしアプリケーションに不具合があってロールバックする場合は移行元インデックスを再度利用することになります。ただ、リリースからロールバックするまでの間は移行元インデックスが使われていないため、その間発生したデータ更新は移行元インデックスに反映されていません。
これに関してはReindex APIを使った場合でも同じかとは思いますが、ロールバックした後は移行元インデックスのデータを最新のもので更新し直す必要があります。
Contract Oneでは、もしロールバックが発生した場合には移行元インデックスと同じマッピング定義で移行先インデックスを作成し、そこに対してインデックスを再構築することで対処できます。しかしこれだとちょっと面倒です。
今のところロールバックが発生したことはないですが、もし需要が高くなったらインデックス再構築ジョブにロールバック時のデータ修復機能を追加しようと思います。
まとめ
Contract Oneで行っているインデックス再構築の方法を紹介しました。
今回のインデックス再構築のほかに、Elasticsearchのインデックスに契約書データを同期する実装などにもいろいろ工夫をしており、データストアを複数持つとやはりそれなりに複雑さが増してきますね。
今回の記事が皆さまの参考になれば幸いです。
それと、Contract Oneでは一緒に働く仲間を募集しています!
詳しくはこちらのリンクから採用情報をご確認ください。