1. ホーム
  2. javascript

[解決済み] Node.js ストリームとオブザーバブルの比較

2023-05-20 13:01:41

質問

について学んだ後 観測可能な に非常に似ていると思います。 Node.jsのストリーム . どちらも新しいデータが到着した時、エラーが発生した時、データがなくなった時(EOF)にコンシューマに通知するメカニズムを持っています。

この2つの概念的/機能的な違いについて教えていただきたいです。ありがとうございます!

どのように解決するのですか?

どちらも 観測可能 とnode.jsの ストリーム は、一連の値を非同期で処理するという同じ根本的な問題を解決することができます。この2つの主な違いは、その出現の動機となったコンテキストに関連していると思います。そのコンテキストは、用語と API に反映されています。

において オブザーバブル 側には、リアクティブプログラミングモデルを導入した EcmaScript の拡張機能があります。これは値の生成と非同期性の間のギャップを埋めるために、最小限の概念で構成された ObserverObservable .

node.jsと ストリーム 側では、ネットワークストリームとローカルファイルの非同期で高性能な処理のためのインターフェイスを作りたかったのでしょう。用語はその最初のコンテキストから派生して、次のようになります。 pipe , chunk , encoding , flush , Duplex , Buffer などです。特定のユースケースを明示的にサポートする実用的なアプローチをとることで、統一性がなくなるため、物事を構成する能力が失われます。例えば push の上に Readable ストリームと write の上に Writable の上に置くことができますが、概念的には同じこと、つまり値を公開していることになります。

ですから、実際には、もしあなたが概念に注目し、そしてオプションの { objectMode: true } を使えば ObservableReadable のストリームと Observer を持つ Writable のストリームを使うことができます。2つのモデルの間にいくつかの簡単なアダプタを作成することもできます。

var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');

var Observable = function(subscriber) {
    this.subscribe = subscriber;
}

var Subscription = function(unsubscribe) {
    this.unsubscribe = unsubscribe;
}

Observable.fromReadable = function(readable) {
    return new Observable(function(observer) {
        function nop() {};

        var nextFn = observer.next ? observer.next.bind(observer) : nop;
        var returnFn = observer.return ? observer.return.bind(observer) : nop;
        var throwFn = observer.throw ? observer.throw.bind(observer) : nop;

        readable.on('data', nextFn);
        readable.on('end', returnFn);
        readable.on('error', throwFn);

        return new Subscription(function() {
            readable.removeListener('data', nextFn);
            readable.removeListener('end', returnFn);
            readable.removeListener('error', throwFn);
        });
    });
}

var Observer = function(handlers) {
    function nop() {};

    this.next = handlers.next || nop;
    this.return = handlers.return || nop;
    this.throw = handlers.throw || nop;
}

Observer.fromWritable = function(writable, shouldEnd, throwFn) {
    return new Observer({
        next: writable.write.bind(writable), 
        return: shouldEnd ? writable.end.bind(writable) : function() {}, 
        throw: throwFn
    });
}

いくつかの名前を変えて、よりシンプルなコンセプトの ObserverSubscription によって行われる責任の過負荷を避けるために、ここに導入されました。 オブザーバブル Generator . 基本的には Subscription から退会することができます。 Observable . ともかく、上記のコードで pipe .

Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));

と比較すると process.stdin.pipe(process.stdout) と比較すると、ストリームを組み合わせたり、フィルタリングしたり、変換したりする方法であり、他のどのようなデータのシーケンスに対しても機能します。これを実現するには Readable , Transform そして Writable のストリームがありますが、APIでは、チェーンではなくサブクラス化された Readable を連結し、関数を適用する代わりにサブクラス化することを推奨しています。そのため Observable モデルでは,例えば,値を変換することは,ストリームに変換関数を適用することに相当する。これは、新しいサブタイプの Transform .

Observable.just = function(/*... arguments*/) {
    var values = arguments;
    return new Observable(function(observer) {
        [].forEach.call(values, function(value) {
            observer.next(value);
        });
        observer.return();
        return new Subscription(function() {});
    });
};

Observable.prototype.transform = function(transformer) {
    var source = this;
    return new Observable(function(observer) {
        return source.subscribe({
            next: function(v) {
                observer.next(transformer(v));
            },
            return: observer.return.bind(observer),
            throw: observer.throw.bind(observer)
        });
    });
};

Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)
  .subscribe(Observer.fromWritable(process.stdout))

結論は?リアクティブモデルを導入するのは簡単で Observable の概念をどこにでも導入することは簡単です。しかし、その概念を中心にライブラリ全体を実装するのは難しい。すべての小さな関数が一貫して動作する必要があります。結局のところ ReactiveX プロジェクトはまだ進行中です。しかし、本当にファイルのコンテンツをクライアントに送信し、エンコードを処理し、zip 圧縮する必要がある場合は、NodeJS でサポートされており、かなりうまく機能します。