ushumpei’s blog

生活で気になったことを随時調べて書いていきます。

Node.js でオブジェクトの配列からストリームを作成する

多分知っておくべきこととして、ストリームはバージョン 12.3.0 でかなり変化があった。この記事は 11.15.0 で書いている。

Node でオブジェクトの配列からストリームを作る方法。

const { Readable } = require("stream");

const readable = Readable.from({ objectMode: true, read() {} })
オブジェクトの配列.map((o) => readable.push(o))
readable.push(null)

12.3.0 以降 は以下で良い。

const { Readable } = require("stream");

const readable = Readable.from(オブジェクトの配列)

ストリーム自体の使い方があまりわかっていない。

とりあえず動かす

エラー時、ストリームを頑張って閉じなければいけないと思う、 destroy() とか使う。

const { Readable, Transform } = require("stream");

// テストデータ作成
const testData = new Array(1000)
  .fill("")
  .map((_, i) => ({ id: i, val: Math.random() }));
testData.push(null)

// Readable ストリーム作成
const readable = new Readable({
  objectMode: true,
  read() {},
});

// Transform ストリーム作成
const stringifyTransform = new Transform({
  transform(data, _encoding, callback) {
    this.push(JSON.stringify(data));
    callback();
  },
  objectMode: true,
});

// この辺微妙
stringifyTransform.on("error", () => readable.destroy());
process.stdout.on("error", () => {
  readable.destroy();
  stringifyTransform.destroy();
});

// 入力 -> 変換 -> 出力
readable.pipe(stringifyTransform).pipe(process.stdout);

// 入力へデータを入れる
testData.forEach((o) => readable.push(o));

標準出力にオブジェクトの文字列が表示される。

ストリームの閉じる閉じないの話

ストリームが閉じているかどうか調べる方法がわからない。イベントを監視していればできるがもっと良い方法はないだろうか? (12.3.0 以降とかだとプロパティがある)

以下はストリームの挙動を確認するための、ストリームを標準出力へパイプするコード。失敗させたりしてイベント拾う。

const { Readable } = require("stream");

// Readable ストリーム作成
const readable = new Readable({
  objectMode: true,
  read() {},
});

console.log(0, readable.readableFlowing);
// => null

const pipe = readable.pipe(process.stdout);

console.log(1, readable.readableFlowing);
// => true

console.log(2, readable.push("ok\n"));
// => true
// 出力あり

console.log(3, readable.push({}));
// => true
// pipe 先の process.stdout がエラー起こして失敗

pipe.on("error", (e) => {
  console.log(4, readable.readableFlowing);
  // => false

  console.log(5, readable.push("ok\n"));
  // => true
  // ただ pipe が切れているので出力はない

  readable.destroy();
  // readable を切る
});

readable.on("close", (e) => {
  console.log(6, "close"); // destroy() で close が発生する。pipe 先のエラーでは切れたりしないようだ
});

出力

0 null
1 true
2 true
3 true
ok
4 false
5 true
6 'close'
  • readableFlowing: パイプされているかどうか。最初 (0) では繋がっていないので null。繋がると (1) true、エラー発生後 (4) は切れてて false
  • pipeerror を拾うようにして、その中で各ストリームを切っていけば良いっぽい。

感想

API ドキュメントのストリーム部分 ちゃんと読んだほうがいいのだろうな、と思いました。英語読むのちょっと気合がいる。

疑問たくさんある

  • エラー時に transform も切る必要があるだろうか?
  • 各種イベントの正しい使い方。error 起きた時に閉じるとかちゃんとしなきゃダメだがあまり整理できてない。finish? end?
  • ストリーム作成時の read() {}pipe だと不要?実装してみる?
  • TransformReadableWritable を継承しているらしいが Writable っぽさが見えてない。
  • gzipTransform の実装どんなだ

fetchbody でストリーム取得できるから画像を canvas に書き込むとか楽しそう。何に使えるかはわからない。加工とかしてみる?RGB から一色なくすとかはできそう。ただそれは何のために?

ストリーム Java でしか使ったことがなかった