Sansan Tech Blog

Sansanのものづくりを支えるメンバーの技術やデザイン、プロダクトマネジメントの情報を発信

3つのシンプルな改善策で実現!elasticsearch-modelを使ったインポート処理を2倍高速化した話

こんにちは、技術本部 Sansan Engineering Unit Nayoseグループでエンジニアをしている冨田です。

私たちのグループでは、当社のほぼすべてのプロダクトが利用する共通基盤である「組織データの名寄せサービス」の開発に取り組んでいます。名寄せサービスにより、組織名の表記揺れなどで別々に存在するデータが名寄せ(グルーピング)され、組織データの活用価値が向上します。

名寄せサービスのバックエンドでは、WebフレームワークにRuby on Rails(以下「Rails」とする)、DBにAurora、検索基盤の1つにElasticsearchを利用しています。

本記事では、elasticsearch-modelで実現されている AuroraからElasticsearchへのインポート処理特別な技術を使わない3つのシンプルな改善策2倍に高速化しユーザー体験を向上させた実践手法 をご紹介します。

前提:バージョン

  • Rails(7.0.8.7)
  • elasticsearch-model(7.2.1)
  • Aurora(8.0.mysql_aurora.3.04.3)
  • Elasticsearch(7.10)※Amazon OpenSearch Service

課題:インポート処理の遅延

私たちのチームでは、社内向け管理画面に「組織データの曖昧検索機能」を提供しています。この機能は、「組織名の一部分」や「表記揺れがある組織名」を検索して、関連する組織情報を特定するために使用されています。

オリジナルの組織データはAuroraに存在し、曖昧検索はElasticsearchで実現するため、毎月Auroraの該当テーブルの全データ(数億件)をElasticsearchにインポートしています。

しかし、このインポート処理に時間がかかっており、完了までに1カ月ほどかかっていました。社内のユーザーからは「組織データが古い」という問い合わせが寄せられていました。

原因分析:なぜ遅いのか?

原因を探るために、インポート処理の実装を確認してみます。

上述した通り、AuroraからElasticsearchへのインポート処理は、elasticsearch-modelにより実現されています。ActiveRecordモデルにElasticsearch::Modelをincludeすることで、Elasticsearchの操作機能(インデックス作成や検索のメソッドなど)を追加できます。今回利用されていた機能は、Elasticsearchへのインポート機能です。

以下に実装イメージを載せます。インポート対象はorganizationsテーブルになります。

class Organization < ApplicationRecord
  include Elasticsearch::Model
    
    class << self
      def import_into_elasticsearch_index
        __elasticsearch__.import
      end
  end
end

# Elasticsearchの該当インデックス(事前に作成しておく)に、対応するAuroraのテーブルデータがインポートされる
Organization.import_into_elasticsearch_index

※ 実際には、Elasticsearchへの接続設定インデックス設定データ変換(シリアライズ)処理の定義などが必要になりますが、見やすさのために省略しています。

次に、 __elasticsearch__.import の実装を確認してみます。

gem内の該当メソッド を見るとわかる通り、find_in_batchesでバッチ(デフォルト1,000件)を取得し、ElasticsearchのBulk APIを呼び出す直列処理になっていました。

以下に該当箇所のコードを抜粋します。

def import(options={}, &block)
  ... # 省略

  __find_in_batches(options) do |batch| # <== [find_in_batches](https://api.rubyonrails.org/classes/ActiveRecord/Batches.html#method-i-find_in_batches)を使っている
    params = {
      index: target_index,
      type:  target_type,
      body:  __batch_to_bulk(batch, transform)
    }

    params[:pipeline] = pipeline if pipeline

    response = client.bulk params # <== [ElasticsearchのBulk API](https://www.elastic.co/guide/en/elasticsearch/reference/7.13/docs-bulk.html)を呼び出している

    yield response if block_given?

    errors +=  response['items'].select { |k, v| k.values.first['error'] }
  end

  ... # 省略
end

ここまでの情報を踏まえてわかるように、数億件のデータを単純に直列処理していたため、パフォーマンスが悪くなっていたのです。当初はデータ量が少なく問題なかった実装も、データ量の増加に伴ってパフォーマンス問題が顕在化している状況でした。

他にも、「Elasticsearchのインデックス設計」や「データ変換(シリアライズ)処理の最適化」も検討しました。ただ、比較的少ない工数で上記問題を改善でき、大きな効果が見込めると判断したため、まずはこの改善に取り組むことにしました。

改善策:シンプルで効果的な3つのアプローチ

パフォーマンス改善のために、以下3つの施策を実施しました。

  • インポート不要なデータの除外
  • インポート処理の優先順位付け
  • インポート処理の並列化

1. インポート不要なデータの除外

インポート対象を再精査したところ、すでに論理削除されているデータが全体の約10%含まれていることがわかりました。これらは検索対象として不要なので、インポート対象から除外することにしました。organizationsテーブルはstatusカラムを持ち、このカラム値によってインポートの必要有無を判定できます。

当初は __elasticsearch__.importqueryオプションを使おうとしましたが、この方法では実現できませんでした。

以下に実装イメージを載せます。

# import_target_statusは「statusカラム値を限定するscope」を指す
Organization.import_target_status.find_in_batches do |records|
  __elasticsearch__.import(query: -> { where(id: records.pluck(:id)) })
end

# 上記を実行すると、以下クエリ実行が繰り返される
SELECT organizations . * FROM organizations WHERE organizations . status = ? AND organizations . id > ? ORDER BY organizations . id ASC LIMIT ?

organizationsテーブルには「statusカラムとidカラムの複合インデックス」が設定されていませんでした。そのため、パフォーマンスの悪いクエリが発行され、DBに負荷をかけていました。数億件のデータを持つorganizationsテーブルにインデックス追加するマイグレーションは、長時間のテーブルロックが発生するリスクがあり、サービス運用への影響を考慮すると簡単に実行できませんでした。なので、別のアプローチを選択することにしました。

具体的には、Athenaを使ってインポート対象のidをCSV出力し、このCSVを使ってインポートする方法を採用しました。このCSVにはインポート不要なデータは含まれません。今回Athenaを利用したのは、既存の仕組みでAuroraの該当テーブルがAthena検索できるようになっており、追加工数が少なく済んだためです。

以下に実装イメージを載せます。

# Athenaを使ってインポート対象のidをCSV出力する(出力先はfile_path)

# IMPORT_CSV_READING_BATCH_COUNTは「メモリ使用して問題ない値」を設定する
CSV.foreach(file_path, headers: true).each_slice(IMPORT_CSV_READING_BATCH_COUNT) do |batch|
  ids = batch.map {|row| row['id'].to_i }

  __elasticsearch__.import(query: -> { where(id: ids) })
end

2. インポート処理の優先順位付け

曖昧検索機能のユースケースを再確認した結果、検索利用パターンに重要な発見がありました。検索頻度の高いデータは、「特定のstatus値を持つデータ(全体の10%未満)」であることが判明しました。

しかし、 __elasticsearch__.import をそのまま利用すると、インポート順は Gem 依存となってしまいます(find_in_batchesのorder順になります)。つまり、全体のインポート処理が完了しないと、これら「特定のstatus値を持つデータ」のインポートも完了したかわかりません。

この洞察を活かし、「すべてを平等にインポートする方針」から、「ユーザーが本当に必要とするデータを優先してインポートする方針」へと転換しました。具体的には、インポート対象をstatus値ごとに分類し、検索頻度の高いデータから順番にインポートします。

以下に実装イメージを載せます。

# 上述したインポート処理をメソッド化
def import(file_path)
    CSV.foreach(file_path, headers: true).each_slice(IMPORT_CSV_READING_BATCH_COUNT) do |batch|
      ids = batch.map {|row| row['id'].to_i }
    
      __elasticsearch__.import(query: -> { where(id: ids) })
    end
end

# Athenaを使ってインポート対象(優先度:高)のidをCSV出力する(出力先はfirst_import_file_path)
import(first_import_file_path)

# Athenaを使ってインポート対象(優先度:低)のidをCSV出力する(出力先はsecond_import_file_path)
import(second_import_file_path)

インポート全体のスループットは変わりませんが、曖昧検索機能のユーザーにより早く価値提供できます。

3. インポート処理の並列化

最後に、直列で実行していた処理を並列化しました。具体的には、Rubyのparallelライブラリを利用し、複数スレッドで同時にインポート処理を実行します。このライブラリを選んだ理由は、対応工数が小さく、かつ利用実績があったためです。

以下に実装イメージを載せます。

def import(file_path)
  CSV.foreach(file_path, headers: true).each_slice(IMPORT_CSV_READING_BATCH_COUNT) do |batch|
    ids = batch.map {|row| row['id'].to_i }

    # PARALLEL_IMPORT_THREADSは「並列数で割り切れる値(同バッチサイズでインポートできる)」を設定する
    Parallel.each(ids.in_groups(PARALLEL_IMPORT_THREADS, false), in_threads: PARALLEL_IMPORT_THREADS) do |grouped_ids|
        ActiveRecord::Base.connection_pool.with_connection do
        __elasticsearch__.import(query: -> { where(id: ids) })
      end
    end
  end
end

並列数(PARALLEL_IMPORT_THREADS)は、以下のステップで決定しました。

3-1. Elasticsearchのスレッドプールサイズを確認する

Elasticsearchは内部的にスレッドプールを使って並列処理しています。Bulk APIリクエストを処理するwriteスレッドプールサイズは「ノードに割り当てられたCPUコア数」を指します。クラスター全体の総CPUコア数は、 ノードに割り当てられたCPUコア数 * ノード数 で計算できます。例えば、OpenSearch Serviceのm6g.xlargeインスタンス(vCPU x4)3台構成では、12スレッドがプールされています。

3-2. Elasticsearchのリソース状況を確認しながら、並列数を調整する

今回対象となるElasticsearchドメインはインポート処理以外でwriteスレッドプールを使っていませんでした。そのため、「1で定めた並列数」から始めて、Elasticsearchのリソース状況を監視しながら調整しました。具体的には、実際にインポート処理を実行して、Elasticsearch側のCPUリソース上昇が許容範囲内か、429エラーが起きていないか※を確認しました。

※ すべてのスレッドプールが使用中の場合、新規リクエストはキューに保持されます(キューサイズは10,000件)。このキューサイズを超えるリクエストを送信した場合、429エラー(Too Many Requests)が発生します。

補足:スループットの計測

上記施策の改善効果を確認するため、インポート処理のスループットを把握しながら取り組みました。

__elasticsearch__.import にブロックを渡すとbatch_size毎のElasticsearchレスポンスを処理できます。

以下に実装イメージを載せます。

# importメソッド部分のみ抜粋
__elasticsearch__.import(query: -> { where(id: ids) }) {|response|
  error_size = response['items'].select {|i| i['index']['error'] }.size
  success_size = response['items'].size - error_size
  
  puts "success_size: #{success_size}, error_size: #{error_size}"
}

改善結果:劇的な高速化を実現

これらの施策により、インポート処理全体が2倍ほど速くなりました。

インポート処理全体:約30日 → 約15日(50%削減

さらに重要なのは、ユーザーが最も頻繁に検索するデータのインポートが劇的に高速化されたことです。

優先データ処理:約30日 → わずか2日(93%削減

今後の展望:Amazon OpenSearch Ingestionによる高速データ同期

今回は「既存のelasticsearch-modelを使った処理」を最適化しましたが、今後はAWSが提供する新しいサービス「Amazon OpenSearch Ingestion」の活用を検討しています。

このサービスは、Aurora MySQL/PostgreSQLからOpenSearch Serviceへシームレスなデータインポートを実現します。また、データベースで更新されたデータは数秒以内にOpenSearch Serviceへ同期されます。この機能を利用すれば、インポート実装を自前で用意しなくても、ニアリアルタイムでのデータ同期が可能になります。

今後はこの新しい技術を検証し、より効率的なデータ同期の実現を目指していきます。

まとめ:シンプルな施策で大きな改善

今回のパフォーマンス改善では、Elasticsearch特有のチューニングや大規模な改修ではなく、基本的なアプローチ(処理対象の精査・優先順位付け・並列化)を採用しました。少ない工数で大きな改善を実現できたと振り返っています。

この事例から学んだ教訓は、当初問題なく動作していた実装も、データ量の増加に伴いパフォーマンス問題が顕在化するという点です。データの成長を見据えた設計の重要性を改めて実感しました。

本記事が__elasticsearch__.importを利用している方のパフォーマンス改善の参考になれば幸いです。

採用情報

Nayoseグループが所属するデータ戦略部門では、パフォーマンス改善を含めサービス改善が好きなエンジニアを募集しています。少しでもご興味を持っていただけましたら、以下も併せてご確認ください!

最後まで読んでいただきありがとうございました。

© Sansan, Inc.