Node.js でオブジェクトの配列からストリームを作成する
多分知っておくべきこととして、ストリームはバージョン 12.3.0 でかなり変化があった。この記事は 11.15.0 で書いている。
Node でオブジェクトの配列からストリームを作る方法。
const { Readable } = require("stream"); const readable = Readable.from({ objectMode: true, read() {} }) オブジェクトの配列.map((o) => readable.push(o)) readable.push(null)
12.3.0 以降 は以下で良い。
const { Readable } = require("stream"); const readable = Readable.from(オブジェクトの配列)
ストリーム自体の使い方があまりわかっていない。
とりあえず動かす
エラー時、ストリームを頑張って閉じなければいけないと思う、 destroy()
とか使う。
const { Readable, Transform } = require("stream"); // テストデータ作成 const testData = new Array(1000) .fill("") .map((_, i) => ({ id: i, val: Math.random() })); testData.push(null) // Readable ストリーム作成 const readable = new Readable({ objectMode: true, read() {}, }); // Transform ストリーム作成 const stringifyTransform = new Transform({ transform(data, _encoding, callback) { this.push(JSON.stringify(data)); callback(); }, objectMode: true, }); // この辺微妙 stringifyTransform.on("error", () => readable.destroy()); process.stdout.on("error", () => { readable.destroy(); stringifyTransform.destroy(); }); // 入力 -> 変換 -> 出力 readable.pipe(stringifyTransform).pipe(process.stdout); // 入力へデータを入れる testData.forEach((o) => readable.push(o));
標準出力にオブジェクトの文字列が表示される。
ストリームの閉じる閉じないの話
ストリームが閉じているかどうか調べる方法がわからない。イベントを監視していればできるがもっと良い方法はないだろうか? (12.3.0 以降とかだとプロパティがある)
以下はストリームの挙動を確認するための、ストリームを標準出力へパイプするコード。失敗させたりしてイベント拾う。
const { Readable } = require("stream"); // Readable ストリーム作成 const readable = new Readable({ objectMode: true, read() {}, }); console.log(0, readable.readableFlowing); // => null const pipe = readable.pipe(process.stdout); console.log(1, readable.readableFlowing); // => true console.log(2, readable.push("ok\n")); // => true // 出力あり console.log(3, readable.push({})); // => true // pipe 先の process.stdout がエラー起こして失敗 pipe.on("error", (e) => { console.log(4, readable.readableFlowing); // => false console.log(5, readable.push("ok\n")); // => true // ただ pipe が切れているので出力はない readable.destroy(); // readable を切る }); readable.on("close", (e) => { console.log(6, "close"); // destroy() で close が発生する。pipe 先のエラーでは切れたりしないようだ });
出力
0 null 1 true 2 true 3 true ok 4 false 5 true 6 'close'
readableFlowing
: パイプされているかどうか。最初 (0) では繋がっていないのでnull
。繋がると (1)true
、エラー発生後 (4) は切れててfalse
。pipe
のerror
を拾うようにして、その中で各ストリームを切っていけば良いっぽい。
感想
API ドキュメントのストリーム部分 ちゃんと読んだほうがいいのだろうな、と思いました。英語読むのちょっと気合がいる。
疑問たくさんある
- エラー時に
transform
も切る必要があるだろうか? - 各種イベントの正しい使い方。
error
起きた時に閉じるとかちゃんとしなきゃダメだがあまり整理できてない。finish
?end
? - ストリーム作成時の
read() {}
。pipe
だと不要?実装してみる? Transform
はReadable
とWritable
を継承しているらしいがWritable
っぽさが見えてない。gzip
のTransform
の実装どんなだ
fetch
の body
でストリーム取得できるから画像を canvas に書き込むとか楽しそう。何に使えるかはわからない。加工とかしてみる?RGB から一色なくすとかはできそう。ただそれは何のために?
ストリーム Java でしか使ったことがなかった