Sansan Tech Blog

Sansanのものづくりを支えるメンバーの技術やデザイン、プロダクトマネジメントの情報を発信

失敗からの学び:Node.JSのStreamにおけるアウトオブメモリとRxJS, IxJS

はじめに

データ戦略部門の松本です。 1年の各月の季節を漢字で表すと「冬冬春春夏夏夏夏夏夏秋冬」と感じるくらい最近暑いですね。5月なのに真夏日も出ており、秋が好きな私としてはとても残念な気持ちを持っています。今年も暑くなりそうなので、体調に気をつけて過ごしていきたいです。

今回はRxJSやstream処理について失敗から学びを得ましたので、その知見を共有します。

サマリ

  • 読み込みよりも書き込みが遅い、バックプレッシャー が必要な領域ではRxJSを使うべきではない
  • ReactiveXがオーナーのライブラリであるIxJSを使うことで、I/O周りの処理においてRxJSと同等の記載内容でバックプレッシャー可能
  • streamのpipeline APIは非同期ジェネレータをサポートしているので、シンプルな用途であれば必要十分

失敗

私たちのプロダクトではデータを多く扱うため、大量のデータに対してバッチ処理を行うプログラムを書くことがあります。 複雑なデータフローを効率よく実行する目的で、ファイルに対してのバッチ処理を実行するためにRxJSを利用していました。*1

私たちのユースケースの多くは単純に示すと、次のように読み込んだファイルに対して何かしら外部APIを叩いて結果を取得して、その後データベースに行を挿入するというものです。

import { from, concatMap } from "rxjs";
import fs from "fs";

const fileReadStream = fs.createReadStream("large_file.csv");

const observable = from(fileReadStream).pipe(
  // APIの呼び出しに時間がかかる
  concatMap((x) => someApiClient.get(x)),
  concatMap((x) => db.streamingInsert(x))
);

ファイルをチャンクに分割して処理を書くためにNode.jsのstreamの機能を利用していました。それにもかかわらず、プログラムはアウトオブメモリエラーにより異常終了してしまいました。その原因を調査した結果と内容・対策をご紹介します。

Node.jsのstream

Node.jsのstreamにはバックプレッシャーという概念があります。 リアクティブ宣言 によるとバックプレッシャーとは次のような意味を持つ単語です。

処理が追いつかなくなっていて、かつクラッシュすることも許されないならば、コンポーネントは上流のコンポーネント群に自身が過負荷状態であることを伝えて負荷を減らしてもらうべきだ。

今回の事象を例にとると、APIの呼び出しはファイルからの読み出しよりも時間がかかる処理です。バックプレシャーの仕組みが存在しない場合、ファイルの読み込みが先行して行われ、読み込まれたデータはAPI呼び出しを待つ間「メモリ上」に展開されてしまいます。これによりアウトオブメモリエラーが発生してしまいます。

一方バックプレッシャーの仕組みが存在する場合、API呼び出し用に確保されたメモリ(バッファ)の上限に達した際に、バッファがいっぱいである旨をファイルの読み込み処理に対して通知します。ファイルの読み込み処理は、API呼び出しのバッファーに余裕ができるまでファイルの読み込みを中断することでメモリの使用量を削減します。

streamにはバックプレッシャーの仕組みが存在しているにもかかわらずなぜアウトオブメモリになったのでしょうか。もう少し見ていきます。

nodesource.com のこの記事から、次のことがわかります。

  • streamを利用することで、ファイルをチャンク(chunk)に分割して読み込むため、メモリ上にすべての内容を保持する必要がない

これは、私たちがstreamを選定した理由に合致しています。 ただし、同記事をさらに読み進めていくと次のような記述があります。

  • Readable streamには2つのモードがある
    • flowing モード : データを自動的に読み取って、EventEmitterのインターフェイスでイベントを素早く送信する
    • paused モード : streamからデータを読み出すために stream.read() メソッドを明示的に読み出す必要がある

この辺りが怪しそうです。深掘りしていきましょう。

streamの2種類のモード

Node.jsのドキュメント によると、 stream#readableFlowing を確認することで現在streamがどのモードで起動しているか確認できます。各種操作を行った状態でのstreamのモードを次のプログラムで検証しました。(Node.js 20.11.0で実行しています)

import fs from "fs";
import { pipeline } from "stream/promises";

const nothing = () => {};
const rs = () => fs.createReadStream("test.ts");
const ws = () => fs.createWriteStream("/dev/null");

// 0. default
const s0 = rs();
console.log("s0: ", s0.readableFlowing);

// 1. use data event handler via on method
const s1 = rs();
s1.on("data", nothing);

console.log("s1: ", s1.readableFlowing);

// 2. use data event handler via addListener method
const s2 = rs();
s2.addListener("data", nothing);

console.log("s2: ", s2.readableFlowing);

// 2a. attatch other than data event handler
const s2a = rs();
s2a.addListener("readable", nothing);

console.log("s2a: ", s2a.readableFlowing);

// 3. resume stream
const s3 = rs();
s3.resume();

console.log("s3: ", s3.readableFlowing);

// 4. pipe stream
const s4 = rs();
s4.pipe(ws());

console.log("s4: ", s4.readableFlowing);

// 5. pipeline stream
const s5 = rs();
pipeline(s5, ws());

console.log("s5: ", s5.readableFlowing);

// 6. pause stream
const s6 = rs();
s6.pause();

console.log("s6: ", s6.readableFlowing);

// 7. async iterator
const s7 = rs();
const it = s7[Symbol.asyncIterator]();
it.next();

console.log("s7: ", s7.readableFlowing);

これらの結果をまとめたものが次の表です。

番号 処理 結果
s0 stream生成直後 null
s1 onメソッドでdataイベントのイベントハンドラを登録 flowing
s2 addEventListenerでdataイベントのイベントハンドラを登録 flowing
s2a dataイベント以外のイベントのイベントハンドラを登録 paused
s3 resumeを呼び出し flowing
s4 pipeでstreamを結合 flowing ⭐️
s5 pipelineでstreamを結合 flowing ⭐️
s6 pauseを呼び出し paused
s7 非同期イテレータでデータを読み出す paused

⭐️の部分に関しては、起動時はflowingモードだが内部実装上メモリの使用量を監視してpausedモードとflowingモードとを切り替えるようになっています。詳細はヤフー様のテックブログに詳しく解説されておりますのでご参照ください。 techblog.yahoo.co.jp

これらの調査から次のことがわかりました。

  • data イベントのイベントハンドラを登録すると flowing モードになること
  • 非同期イテレータでデータを読み出した場合は paused モードになること
  • pipeメソッドや pipeline APIでは起動時は flowing モードであるが、メモリの使用量を監視してflowingとpausedモードを切り替えバックプレッシャーを実現していること

RxJSでの実装

では、改めてRxJSにおけるReadable streamからObservableを作成する部分の実装を追ってみます。

Readable streamはEventEmitterでもあるため、次の2通りの実装が考えられます。

// EventEmitterとして扱う
const observable = fromEvent(readableStream, "data");

// readable streamとして扱う
const observable = from(readableStream);

fromEvent

実装は次のリンク先に示す箇所です。 https://github.com/ReactiveX/rxjs/blob/9c8fa65ea161f97cc90ea9b4679b0ae104a29ffb/packages/rxjs/src/internal/observable/fromEvent.ts#L262-L292

プログラムの要点を抜き出すと次のようになります。

export function fromEvent(target, eventName) {
  return new Observable((subscriber) => {
    // 単純にsubscriber.nextを呼ぶのみ
    const handler = (...args: any[]) => subscriber.next(args);

    target.addEventListener(eventName, handler);
  })
}

これは上で見た通り、イベントハンドラをそのままつなぐ実装であり、flowingモードで起動されるためアウトオブメモリを発生させそうな実装であることが分かります。

from

こちらの実装は次の部分です。 https://github.com/ReactiveX/rxjs/blob/9c8fa65ea161f97cc90ea9b4679b0ae104a29ffb/packages/observable/src/observable.ts#L1183-L1203

こちらもプログラムの要点を抜き出すと次のようになります。

export function from(readableStream) {
  return new Observable((subscriber) => {
    for await(const value of readableStream) {
      subscriber.next(value);
    }
  })
  subscriber.complete();
}

上で見た内容に準ずると、非同期イテレータを利用して内容を読み出しているため、streamはpausedモードで起動していそうです。ただし、subscriber.next(value) を同期的に呼び出しているため、せっかくのpausedモードですが値を無限にpushする構成になっています。

パフォーマンス検証

実際に検証用のプログラムを用意し、パフォーマンスを測定しました。

▼ fromEventのパフォーマンス測定用のプログラム:

import { fromEvent, concatMap, takeUntil } from "rxjs";
import fs from "fs";

const readableStream = fs.createReadStream("large_file.csv");

const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

const obs = fromEvent(readableStream, "data").pipe(
  concatMap(async (data) => {
    await sleep(10);
    console.log(JSON.stringify(process.memoryUsage()));
    return data.toString();
  }),
  takeUntil(fromEvent(readableStream, "end"))
);

await new Promise((resolve) => {
  obs.subscribe({
    complete: resolve,
  });
});

fromEventのメモリ使用量の測定結果

RxJS fromEventのメモリ使用量

▼ fromのパフォーマンス測定用のプログラム:

import { from, concatMap } from "rxjs";
import fs from "fs";

const readableStream = fs.createReadStream("large_file.csv");

const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

const obs = from(readableStream).pipe(
  concatMap(async (data) => {
    await sleep(10);
    console.log(JSON.stringify(process.memoryUsage()));
    return data.toString();
  })
);

await new Promise((resolve) => {
  obs.subscribe({
    complete: resolve,
  });
});

fromのメモリ使用量の測定結果

RxJS fromのメモリ使用量

検証に利用したファイルは約150MBのcsvでしたが、arrayBuffersに約150MB割り当てられており、ファイルの中身がすべてメモリ上に読み込まれていることが分かります。

IxJS

今まで見てきた通り、RxJSではファイルをstreamで取得したとしても内部の実装ですべてメモリにため込んでしまうため、アウトオブメモリが発生する状態です。 そこで代替のライブラリを調査したところ、同じくReactiveXから IxJSというライブラリが公開されていました。

GitHub - ReactiveX/IxJS: The Interactive Extensions for JavaScript

READMEに記載されているIxJSについての内容を引用します。

The Interactive Extensions for JavaScript (IxJS) brings the Array#extras combinators to iterables, generators, async iterables and async generators. With the introduction of the Symbol.iterator and generators in ES2015, and subsequent introduction of Symbol.asyncIterator and async generators, it became obvious we need an abstraction over these data structures for composition, querying and more.

IxJS unifies both synchronous and asynchronous pull-based collections, just as RxJS unified the world of push-based collections. RxJS is great for event-based workflows where the data can be pushed at the rate of the producer, however, IxJS is great at I/O operations where you as the consumer can pull the data when you are ready.

まとめると次のような説明です。

  • IxJSはpull basedのコレクションである(一方RxJSはpush basedのコレクション)
  • IxJSは I/O操作に適しており、コンシューマが準備できた時にデータをプルして操作する

以下、公式のサンプルコードからの引用ですが、用意されているAPIはRxJSとほとんど同じであるため、RxJSに慣れた方であればIxJSの切り替えもほとんどコストがかからないと思われます。

// ES
import { from } from 'ix/asynciterable';
import { filter, map } from 'ix/asynciterable/operators';

// CommonJS
const from = require('ix/asynciterable').from;
const { filter, map } = require('ix/asynciterable/operators');

const source = async function* () {
  yield 1;
  yield 2;
  yield 3;
  yield 4;
};

const results = from(source()).pipe(
  filter(async x => x % 2 === 0),
  map(async x => x * x)
);

for await (let item of results) {
  console.log(`Next: ${item}`);
}

// Next 4
// Next 16

パフォーマンス

IxJSにもRxJS同様、 from というAPIがあります。このAPIもRxJSと同様、引数に非同期イテレータを指定でき、その結果からIxJSのイテレータを作成します。

実装の要点は次に示す通りです。

github.com

export class FromAsyncIterable {
  constructor(source: AsyncIterable) {
    this._source = source
  }

  async *[Symbol.asyncIterator]() {
    let i = 0;
    for await (const value of this._source) {
      yield value
    }
  }
}

ということでそのままですね。 RxJSバージョンと異なり、このクラス自体が非同期イテラブルであり、イテレータ関数で値をyieldする点が大きく異なっています。

このAPIを利用したパフォーマンスを見ていきます。

▼ パフォーマンス測定用のプログラム:

import { from } from "ix/asynciterable/index.js";
import { concatMap } from "ix/asynciterable/operators/index.js";

import fs from "fs";

const readableStream = fs.createReadStream("large_file.csv");

const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

const obs = from(readableStream).pipe(
  concatMap(async (data) => {
    await sleep(10);
    console.log(JSON.stringify(process.memoryUsage()));
    return data.toString();
  })
);

for await (const _ of obs) {
  // noop
}

プログラムを動作させて取得したメモリ使用量をプロットしたものが次のグラフです。

IxJS fromのメモリ使用量

このグラフを見て分かる通り、最大メモリ使用量が約62MBと、RxJSのメモリ使用量と比較しても圧倒的に少ないメモリ使用量です。 arrayBuffersの使用量が特に顕著であり、RxJS版と比較するとIxJS版は最大2.5MBと適切にコントロールされています。

まとめると次のようになります。

種類 最大メモリ使用量
(150MBのファイルを読み込んだとき)
RxJS fromEvent 202MB
RxJS from 202MB
IxJS from 63MB

改めて、IxJSのpullベースのアーキテクチャの効果によりメモリの使用量が削減されること、が実験からも示されました。 *2

外部ライブラリに依存しない方法

ここまでIxJSというライブラリを紹介しました。複雑に絡み合ったストリーミング処理を記載する場合はこのライブラリが有用です。しかし、より簡単にstreamをバックプレッシャー付きで扱う方法として次の2種類が見つかったため、合わせて紹介させてください。

pipeline関数に非同期ジェネレータ関数を適用する

Node.jsのドキュメント によると、pipeline関数には非同期ジェネレータ関数をpipelineの構成要素として適用できます。 forkJoinのような複雑な処理はこの構文だけで書くのは難しく、さらなる実装が必要でメリットは少ないと感じました。一方APIを叩いて結果を保存するだけ、のようなシンプルな処理であれば、ライブラリも不要で扱いやすと思われます。 さらには、pipeline関数を用いたstreamの処理方法であるため、適宜バックプレッシャーを管理可能です。

パフォーマンス測定用にサンプルコードを作成しました。以下に示す通り、適切にメモリ使用量がハンドリングされていることが分かります。

import fs from "fs";
import { pipeline } from "stream/promises";

const readableStream = fs.createReadStream("large_file.csv");

const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

const obs = pipeline(
  readableStream,
  async function* (source) {
    for await (const chunk of source) {
      await sleep(10);
      console.log(JSON.stringify(process.memoryUsage()));
      yield chunk.toString();
    }
  },
  async function* (source) {
    for await (const chunk of source) {
      // noop
    }
  }
);

await obs;

pipelineに非同期ジェネレータ関数を適用した際のメモリ使用量

Async Iterator helpers

また、現在TC39として Async Iterator Helperが提案されています。

github.com

Node.jsにおけるArrayのように、メソッドチェーンの形で非同期イテレータを扱う構文でありとても分かりやすいと感じます。

現在提案中であるためNode.jsにはまだ実装されておらず、polyfillを用いて同じようにメモリの使用量を測定しました。結果、適切にチャンクに分割されて処理がされていました。

import fs from "fs";
import { aiter } from "iterator-helper";

const readableStream = fs.createReadStream("large_file.csv");

const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

const fileIterator = aiter(readableStream);

const obs = fileIterator.map(async (data) => {
  await sleep(10);
  console.log(JSON.stringify(process.memoryUsage()));
  return data.toString();
});

for await (const _ of obs) {
  // noop
}

Async Iterator Helper使用時のメモリ使用量

まとめ

  • stream APIには2つのモードがある
  • RxJSは push型の思想であり、streamから作成したObservableでもバックプレッシャーを無視する
  • IxJSは pull型であり、RxJSと共通の文法に加えてバックプレッシャーをケアした記載が可能
  • 簡単な処理であれば pipelineに非同期ジェネレータ関数を適用するのが一番お手軽
種類 最大メモリ使用量
(150MBのファイルを読み込んだとき)
RxJS fromEvent 202MB
RxJS from 202MB
IxJS from 62MB
pipeline + 非同期ジェネレータ 56MB
TC39 Async iterator helper 58MB

私たちのチームでは Web アプリ開発エンジニアを募集しています。Web アプリ開発だけではなくデータエンジニアリングなど、幅広い業務を経験できます!

open.talentio.com

*1:昔のRxJSはバックプレッシャーをサポートしていた、という記載 があり、それを信じていた点は反省です。

*2:IxJSにも fromEvent APIがあり、そちらを利用すると(ご想像の通り) 213MBのメモリ使用量になりました。

© Sansan, Inc.