ushumpei’s blog

生活で気になったことを随時調べて書いていきます。

Node.js の stream.pipe のエラーハンドリング

stream.pipe の代わりにバージョン 10 で追加された stream.pipeline を使うと良さそうです。

stream.pipe では ここ に書かれているように、エラーハンドリングとしてストリームの後始末を書かなければいけないためです。

One important caveat is that if the Readable stream emits an error during processing, the Writable destination is not closed automatically. If an error occurs, it will be necessary to manually close each stream in order to prevent memory leaks.

あと pipe(...).on('error', ...).pipe(...).on('error', ...)... みたいに書かなきゃいけない気がするし。

確認コード

stream.pipeline でどのようなエラーハンドリングが行われるかをコードで確認しました。ファイルを読んで、変換をかまして、ファイルへ書き込むサンプルです。変換部分でエラーを任意に発生させられるようにし、

  1. エラーが発生しない時の挙動
  2. エラーが発生した時の挙動

の二つを確認します。

(動かすには入力用のファイル from.txt が必要になります)

const { Transform, pipeline } = require("stream");
const { createReadStream, createWriteStream } = require("fs");

// Readable ストリーム作成
const readable = createReadStream("from.txt", "utf8");

// Writable ストリーム作成
const writable = createWriteStream("to.txt", "utf8");

// Transform ストリーム作成 (何もしないけどエラー投げられるようにしたやつ)
const getTransform = (fail) =>
  new Transform({
    transform(data, _encoding, callback) {
      if (fail) {
        callback(new Error("fail"));
      }
      this.push(data);
      callback();
    },
  });

// [NOTE] ここでエラーの発生を任意に切り替える
// const transform = getTransform(true);
const transform = getTransform();

// イベントの発生をロギング。`i-th` は各ストリームに対応 (0-th: readable, 1-th: transform, 2-th: writable)
[readable, transform, writable].forEach((s, i) => {
  s.on("end", () => {
    console.log(`${i}-th stream:`, "end");
  });
  s.on("finish", () => {
    console.log(`${i}-th stream:`, "finish");
  });
  s.on("close", () => {
    console.log(`${i}-th stream:`, "close");
  });
});

// 入力 -> 変換 -> 出力
pipeline(readable, transform, writable, (e) => {
  if (e) {
    console.log(e);
    console.log(
      "エラーが起きた時は全てのストリームに close イベントが発行される"
    );
  } else {
    console.log(
      "エラーなく終了した時は全てのストリームにそれぞれの終了イベント、加えて入出力ストリームに close イベントが発行される"
    );
  }
});

1. エラーが発生しない時の挙動

0-th stream: end
1-th stream: finish
1-th stream: end
2-th stream: finish
エラーなく終了した時は全てのストリームにそれぞれの終了イベント、加えて入出力ストリームに close イベントが発行される
0-th stream: close
2-th stream: close

2. エラーが発生した時の挙動 ([NOTE] と書いてあるコメント行近くの transform 変数定義をコメントアウト/インして実行)

1-th stream: close
2-th stream: close
Error: fail
    at Transform.transform [as _transform] (/Users/ushumpei/node-stream/index.js:15:18)
    at Transform._read (_stream_transform.js:189:10)
    at Transform._write (_stream_transform.js:177:12)
    at doWrite (_stream_writable.js:428:12)
    at writeOrBuffer (_stream_writable.js:412:5)
    at Transform.Writable.write (_stream_writable.js:302:11)
    at ReadStream.ondata (_stream_readable.js:722:22)
    at ReadStream.emit (events.js:209:13)
    at addChunk (_stream_readable.js:305:12)
    at readableAddChunk (_stream_readable.js:282:13)
エラーが起きた時は全てのストリームに close イベントが発行される
0-th stream: close

各イベントは、

  • end: Readable ストリームが読まれ終わった時に発火
  • finish: Writable ストリームが書かれ終わった時に発火
  • close: ストリームとその使用しているリソースが閉じた時に発火 (でも発火しないストリームもあるらしい)

と言う感じか?

まとめ

確認できたこととしては以下

  • どちらのケースでも close が呼ばれて使用しているリソースを閉じている。
  • transform は DuplexReadable かつ Writable なので endfinish が呼ばれている。

感想

Transformclose されるのはどう言うことだ?と悩む。

あと、根本的な問題として、ストリーム閉じなくて良いケースを理解できていない。文字列から単なるメモリに乗っている Readable なストリーム作った場合は、閉じなくて良いのか?GC されるか?

公式ドキュメントに promisifypipeline を Promise 化した時の例も書いてあったので、async/await で書きたい時はそれを使う。