Node.js の stream.pipe のエラーハンドリング
stream.pipe
の代わりにバージョン 10 で追加された stream.pipeline
を使うと良さそうです。
stream.pipe
では ここ に書かれているように、エラーハンドリングとしてストリームの後始末を書かなければいけないためです。
One important caveat is that if the Readable stream emits an error during processing, the Writable destination is not closed automatically. If an error occurs, it will be necessary to manually close each stream in order to prevent memory leaks.
あと pipe(...).on('error', ...).pipe(...).on('error', ...)...
みたいに書かなきゃいけない気がするし。
確認コード
stream.pipeline
でどのようなエラーハンドリングが行われるかをコードで確認しました。ファイルを読んで、変換をかまして、ファイルへ書き込むサンプルです。変換部分でエラーを任意に発生させられるようにし、
- エラーが発生しない時の挙動
- エラーが発生した時の挙動
の二つを確認します。
(動かすには入力用のファイル from.txt
が必要になります)
const { Transform, pipeline } = require("stream"); const { createReadStream, createWriteStream } = require("fs"); // Readable ストリーム作成 const readable = createReadStream("from.txt", "utf8"); // Writable ストリーム作成 const writable = createWriteStream("to.txt", "utf8"); // Transform ストリーム作成 (何もしないけどエラー投げられるようにしたやつ) const getTransform = (fail) => new Transform({ transform(data, _encoding, callback) { if (fail) { callback(new Error("fail")); } this.push(data); callback(); }, }); // [NOTE] ここでエラーの発生を任意に切り替える // const transform = getTransform(true); const transform = getTransform(); // イベントの発生をロギング。`i-th` は各ストリームに対応 (0-th: readable, 1-th: transform, 2-th: writable) [readable, transform, writable].forEach((s, i) => { s.on("end", () => { console.log(`${i}-th stream:`, "end"); }); s.on("finish", () => { console.log(`${i}-th stream:`, "finish"); }); s.on("close", () => { console.log(`${i}-th stream:`, "close"); }); }); // 入力 -> 変換 -> 出力 pipeline(readable, transform, writable, (e) => { if (e) { console.log(e); console.log( "エラーが起きた時は全てのストリームに close イベントが発行される" ); } else { console.log( "エラーなく終了した時は全てのストリームにそれぞれの終了イベント、加えて入出力ストリームに close イベントが発行される" ); } });
1. エラーが発生しない時の挙動
0-th stream: end 1-th stream: finish 1-th stream: end 2-th stream: finish エラーなく終了した時は全てのストリームにそれぞれの終了イベント、加えて入出力ストリームに close イベントが発行される 0-th stream: close 2-th stream: close
2. エラーが発生した時の挙動 ([NOTE]
と書いてあるコメント行近くの transform
変数定義をコメントアウト/インして実行)
1-th stream: close 2-th stream: close Error: fail at Transform.transform [as _transform] (/Users/ushumpei/node-stream/index.js:15:18) at Transform._read (_stream_transform.js:189:10) at Transform._write (_stream_transform.js:177:12) at doWrite (_stream_writable.js:428:12) at writeOrBuffer (_stream_writable.js:412:5) at Transform.Writable.write (_stream_writable.js:302:11) at ReadStream.ondata (_stream_readable.js:722:22) at ReadStream.emit (events.js:209:13) at addChunk (_stream_readable.js:305:12) at readableAddChunk (_stream_readable.js:282:13) エラーが起きた時は全てのストリームに close イベントが発行される 0-th stream: close
各イベントは、
end
: Readable ストリームが読まれ終わった時に発火finish
: Writable ストリームが書かれ終わった時に発火close
: ストリームとその使用しているリソースが閉じた時に発火 (でも発火しないストリームもあるらしい)
と言う感じか?
まとめ
確認できたこととしては以下
- どちらのケースでも
close
が呼ばれて使用しているリソースを閉じている。 - transform は
Duplex
でReadable
かつWritable
なのでend
とfinish
が呼ばれている。
感想
Transform
が close
されるのはどう言うことだ?と悩む。
あと、根本的な問題として、ストリーム閉じなくて良いケースを理解できていない。文字列から単なるメモリに乗っている Readable なストリーム作った場合は、閉じなくて良いのか?GC されるか?
公式ドキュメントに promisify
で pipeline
を Promise 化した時の例も書いてあったので、async/await
で書きたい時はそれを使う。