ushumpei’s blog

生活で気になったことを随時調べて書いていきます。

Node.js の stream.pipe のエラーハンドリング

stream.pipe の代わりにバージョン 10 で追加された stream.pipeline を使うと良さそうです。

stream.pipe では ここ に書かれているように、エラーハンドリングとしてストリームの後始末を書かなければいけないためです。

One important caveat is that if the Readable stream emits an error during processing, the Writable destination is not closed automatically. If an error occurs, it will be necessary to manually close each stream in order to prevent memory leaks.

あと pipe(...).on('error', ...).pipe(...).on('error', ...)... みたいに書かなきゃいけない気がするし。

確認コード

stream.pipeline でどのようなエラーハンドリングが行われるかをコードで確認しました。ファイルを読んで、変換をかまして、ファイルへ書き込むサンプルです。変換部分でエラーを任意に発生させられるようにし、

  1. エラーが発生しない時の挙動
  2. エラーが発生した時の挙動

の二つを確認します。

(動かすには入力用のファイル from.txt が必要になります)

const { Transform, pipeline } = require("stream");
const { createReadStream, createWriteStream } = require("fs");

// Readable ストリーム作成
const readable = createReadStream("from.txt", "utf8");

// Writable ストリーム作成
const writable = createWriteStream("to.txt", "utf8");

// Transform ストリーム作成 (何もしないけどエラー投げられるようにしたやつ)
const getTransform = (fail) =>
  new Transform({
    transform(data, _encoding, callback) {
      if (fail) {
        callback(new Error("fail"));
      }
      this.push(data);
      callback();
    },
  });

// [NOTE] ここでエラーの発生を任意に切り替える
// const transform = getTransform(true);
const transform = getTransform();

// イベントの発生をロギング。`i-th` は各ストリームに対応 (0-th: readable, 1-th: transform, 2-th: writable)
[readable, transform, writable].forEach((s, i) => {
  s.on("end", () => {
    console.log(`${i}-th stream:`, "end");
  });
  s.on("finish", () => {
    console.log(`${i}-th stream:`, "finish");
  });
  s.on("close", () => {
    console.log(`${i}-th stream:`, "close");
  });
});

// 入力 -> 変換 -> 出力
pipeline(readable, transform, writable, (e) => {
  if (e) {
    console.log(e);
    console.log(
      "エラーが起きた時は全てのストリームに close イベントが発行される"
    );
  } else {
    console.log(
      "エラーなく終了した時は全てのストリームにそれぞれの終了イベント、加えて入出力ストリームに close イベントが発行される"
    );
  }
});

1. エラーが発生しない時の挙動

0-th stream: end
1-th stream: finish
1-th stream: end
2-th stream: finish
エラーなく終了した時は全てのストリームにそれぞれの終了イベント、加えて入出力ストリームに close イベントが発行される
0-th stream: close
2-th stream: close

2. エラーが発生した時の挙動 ([NOTE] と書いてあるコメント行近くの transform 変数定義をコメントアウト/インして実行)

1-th stream: close
2-th stream: close
Error: fail
    at Transform.transform [as _transform] (/Users/ushumpei/node-stream/index.js:15:18)
    at Transform._read (_stream_transform.js:189:10)
    at Transform._write (_stream_transform.js:177:12)
    at doWrite (_stream_writable.js:428:12)
    at writeOrBuffer (_stream_writable.js:412:5)
    at Transform.Writable.write (_stream_writable.js:302:11)
    at ReadStream.ondata (_stream_readable.js:722:22)
    at ReadStream.emit (events.js:209:13)
    at addChunk (_stream_readable.js:305:12)
    at readableAddChunk (_stream_readable.js:282:13)
エラーが起きた時は全てのストリームに close イベントが発行される
0-th stream: close

各イベントは、

  • end: Readable ストリームが読まれ終わった時に発火
  • finish: Writable ストリームが書かれ終わった時に発火
  • close: ストリームとその使用しているリソースが閉じた時に発火 (でも発火しないストリームもあるらしい)

と言う感じか?

まとめ

確認できたこととしては以下

  • どちらのケースでも close が呼ばれて使用しているリソースを閉じている。
  • transform は DuplexReadable かつ Writable なので endfinish が呼ばれている。

感想

Transformclose されるのはどう言うことだ?と悩む。

あと、根本的な問題として、ストリーム閉じなくて良いケースを理解できていない。文字列から単なるメモリに乗っている Readable なストリーム作った場合は、閉じなくて良いのか?GC されるか?

公式ドキュメントに promisifypipeline を Promise 化した時の例も書いてあったので、async/await で書きたい時はそれを使う。

Node.js でオブジェクトの配列からストリームを作成する

多分知っておくべきこととして、ストリームはバージョン 12.3.0 でかなり変化があった。この記事は 11.15.0 で書いている。

Node でオブジェクトの配列からストリームを作る方法。

const { Readable } = require("stream");

const readable = Readable.from({ objectMode: true, read() {} })
オブジェクトの配列.map((o) => readable.push(o))
readable.push(null)

12.3.0 以降 は以下で良い。

const { Readable } = require("stream");

const readable = Readable.from(オブジェクトの配列)

ストリーム自体の使い方があまりわかっていない。

とりあえず動かす

エラー時、ストリームを頑張って閉じなければいけないと思う、 destroy() とか使う。

const { Readable, Transform } = require("stream");

// テストデータ作成
const testData = new Array(1000)
  .fill("")
  .map((_, i) => ({ id: i, val: Math.random() }));
testData.push(null)

// Readable ストリーム作成
const readable = new Readable({
  objectMode: true,
  read() {},
});

// Transform ストリーム作成
const stringifyTransform = new Transform({
  transform(data, _encoding, callback) {
    this.push(JSON.stringify(data));
    callback();
  },
  objectMode: true,
});

// この辺微妙
stringifyTransform.on("error", () => readable.destroy());
process.stdout.on("error", () => {
  readable.destroy();
  stringifyTransform.destroy();
});

// 入力 -> 変換 -> 出力
readable.pipe(stringifyTransform).pipe(process.stdout);

// 入力へデータを入れる
testData.forEach((o) => readable.push(o));

標準出力にオブジェクトの文字列が表示される。

ストリームの閉じる閉じないの話

ストリームが閉じているかどうか調べる方法がわからない。イベントを監視していればできるがもっと良い方法はないだろうか? (12.3.0 以降とかだとプロパティがある)

以下はストリームの挙動を確認するための、ストリームを標準出力へパイプするコード。失敗させたりしてイベント拾う。

const { Readable } = require("stream");

// Readable ストリーム作成
const readable = new Readable({
  objectMode: true,
  read() {},
});

console.log(0, readable.readableFlowing);
// => null

const pipe = readable.pipe(process.stdout);

console.log(1, readable.readableFlowing);
// => true

console.log(2, readable.push("ok\n"));
// => true
// 出力あり

console.log(3, readable.push({}));
// => true
// pipe 先の process.stdout がエラー起こして失敗

pipe.on("error", (e) => {
  console.log(4, readable.readableFlowing);
  // => false

  console.log(5, readable.push("ok\n"));
  // => true
  // ただ pipe が切れているので出力はない

  readable.destroy();
  // readable を切る
});

readable.on("close", (e) => {
  console.log(6, "close"); // destroy() で close が発生する。pipe 先のエラーでは切れたりしないようだ
});

出力

0 null
1 true
2 true
3 true
ok
4 false
5 true
6 'close'
  • readableFlowing: パイプされているかどうか。最初 (0) では繋がっていないので null。繋がると (1) true、エラー発生後 (4) は切れてて false
  • pipeerror を拾うようにして、その中で各ストリームを切っていけば良いっぽい。

感想

API ドキュメントのストリーム部分 ちゃんと読んだほうがいいのだろうな、と思いました。英語読むのちょっと気合がいる。

疑問たくさんある

  • エラー時に transform も切る必要があるだろうか?
  • 各種イベントの正しい使い方。error 起きた時に閉じるとかちゃんとしなきゃダメだがあまり整理できてない。finish? end?
  • ストリーム作成時の read() {}pipe だと不要?実装してみる?
  • TransformReadableWritable を継承しているらしいが Writable っぽさが見えてない。
  • gzipTransform の実装どんなだ

fetchbody でストリーム取得できるから画像を canvas に書き込むとか楽しそう。何に使えるかはわからない。加工とかしてみる?RGB から一色なくすとかはできそう。ただそれは何のために?

ストリーム Java でしか使ったことがなかった

GZIPOutputStream と ByteArrayOutputStream と try with resources

Javatry with resources は途中で return してもリソースの close をしてくれるけど、なんか間違えた。

GZIPOutputStreamByteArrayOutputStream を使って以下のような、データを圧縮してバイト列にして返す処理を書いていました。( InputStream の部分は実際は外部のファイルのストリームとかになると思います)

String input = "hogehogehoge";
InputStream in = new ByteArrayInputStream(input.getBytes());

try (
  BufferedReader reader = new BufferedReader(new InputStreamReader(in));
  ByteArrayOutputStream out = new ByteArrayOutputStream();
  BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(out)))
) {

  String line;
  while ((line = reader.readLine()) != null) {
    writer.write(line);
    writer.newLine();
  }

  return out.toByteArray();

} catch (IOException e) {
  e.printStackTrace();
  throw new RuntimeException(e);
}

すると次の例外が発生。

java.io.EOFException: Unexpected end of ZLIB input stream

これは GZIPOutputStream を閉じていないときに出る例外で、「 try with resources だから閉じられるのでは?」と思いましたが、 toByteArray しているタイミングはまだ処理が try を抜けていないので閉じられていなかったようです。

以下のように変更して解決しました。バイト列を取得するのは try を抜けてから行うようにしました。

String input = "hogehogehoge";
InputStream in = new ByteArrayInputStream(input.getBytes());

ByteArrayOutputStream out = new ByteArrayOutputStream();

try (
  BufferedReader reader = new BufferedReader(new InputStreamReader(in));
  BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(out)))
) {

  String line;
  while ((line = reader.readLine()) != null) {
    writer.write(line);
    writer.newLine();
  }

} catch (IOException e) {
  e.printStackTrace();
  throw new RuntimeException(e);
}
return out.toByteArray();

感想

すぐ終わると思って書いたコードが動かなくて夜が明けてしまった。

最近景気が悪いので、仕事がなくなったら各地のお祭りを巡る旅とかしたい。

修了証明書

Certificate of Completion

スライドとかでよかった気がするけど、なんとなく html で修了証明書を書いてみました。css は無限に時間を溶かすと思います。

See the Pen Certificate of Completion by ushumpei (@ushumpei) on CodePen.

MacChrome でしか確認していない。もしかしたら他の環境でフォントとかダメかもしれないです。

COURSE DETAILS (リボンの下の細かい文字) が短すぎると良い感じの形にならないです。三文くらいあると印刷した時に A4 サイズにうまくおさまる。

参考

ありがとうございました。

リボン

saruwakakun.com

リボンのコード、border で三角形を作るやつの応用で白い三角形を使って切り口を表現する。

mrtc.jp

画像使わないと縁取りかっこよくならないかと思ったけど、radial-gradientlinear-gradient を使って丸みを出している。ただコードちゃんと追えていない。

感想

効力のない、お気持ちのやつ。というか正式なフォーマットがどこかにあるんだろうな。人に教えるほど何かを極めているわけでもないのにどこへ向かっているんだろうか私は。

スマホで見るとビールのラベルっぽい。

VSCode で Ruby 書くための設定

PC 変えた時に vim & ruby のいい感じの設定が失われてしまいしばらく書かないから放置していたらまた書くことになり VSCode でなんとなく書いてたらいい加減不便さを感じてきたので本当にとりあえずの設定を調べました。(早口)

この辺ができるように設定。

  • 構文チェック
  • 補完と定義ジャンプ

あとこれは

でやってます。

やること

VSCodeRuby の拡張があるので入れる (名前そのまんま Ruby のやつ)

GitHub - rubyide/vscode-ruby: Provides Ruby language and debugging support for Visual Studio Code

拡張を動かすために rubocop があればいいそうなので入れる

gem install rubocop

VSCode の設定ファイルに以下を追加

    "ruby.lint": { "rubocop": true },
    "ruby.useLanguageServer": true,
    "ruby.intellisense": "rubyLocate",

説明

  1. "ruby.lint": { "rubocop": true }: Linter は rubocop
  2. "ruby.useLanguageServer": true: Language Server 使う (構文チェックしてくれるようになる)
  3. "ruby.intellisense": "rubyLocate": インテリセンスの設定 (ある程度、補完、定義ジャンプができるようになる)

終わり

rubyLocate のオプションには定義検索するパスが設定できるようで、うまくやれば結構便利になるかもです?脳死で初期設定できないと割と放置しがちな癖があり、よく無いですね。