1. ホーム

[解決済み】CompletableFutureとFutureとRxJavaのObservableの違いについて

2022-04-20 18:09:32

質問

との違いを教えてください。 CompletableFuture , FutureObservable RxJava .

私が知っているのは、すべてが非同期であるということです。

Future.get() スレッドをブロックする

CompletableFuture コールバックメソッドを与える

RxJava Observable --- と同様です。 CompletableFuture その他の特典あり(不明)

例えば、クライアントが複数のサービスを呼び出す必要があり、その際に Futures (Java) Future.get() が順次実行されるのですが、RxJavaではどうなのか知りたいのですが。

そして、ドキュメント http://reactivex.io/intro.html は言う。

Futuresを使って条件付き非同期実行フローを最適に構成するのは難しい(というか不可能)。もちろんできるのですが、すぐに複雑になり(つまりエラーが起こりやすくなり)、Future.get()で早々にブロックしてしまい、非同期実行のメリットがなくなってしまいます。

本当に知りたいのは、どのように RxJava はこの問題を解決してくれます。ドキュメントを読んでも理解しにくいと思いました。

どのように解決するのですか?

先物

先物 は、Java 5 (2004)で導入された。基本的には、まだ終了していない操作の結果のプレースホルダーである。操作が終了すると Future にはその結果が含まれます。例えば、ある操作は 実行可能 または 呼び出し可能 インスタンスに送信されます。 ExecutorService . 操作のサブミッターが Future オブジェクトを使用して、その操作が isDone() を使うか、ブロッキングを使用して終了を待ちます。 ゲット() メソッドを使用します。

/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 1;
    }

}

public static void main(String[] args) throws Exception{
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<Integer> f = exec.submit(new MyCallable());

    System.out.println(f.isDone()); //False

    System.out.println(f.get()); //Waits until the task is done, then prints 1
}

完成予想図

完成予想図 は、Java 8(2014年)で導入されました。実は、通常のFuturesを進化させたもので、Googleの リスナブルフューチャー の一部である グアバ ライブラリです。これらはFutureで、タスクを連鎖的に繋げることもできます。ワーカースレッドに "タスク X をやって、終わったら X の結果を使って別のことをするように指示するのに使えます。CompletableFuturesを使えば、実際に結果を待つためにスレッドをブロックすることなく、操作の結果を使って何かをすることができます。以下は簡単な例です。

/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {

    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //Do nothing
        }
        return 1;
    }
}

/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {

    @Override
    public Integer apply(Integer x) {
        return x + 1;
    }
}

public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
    System.out.println(f.isDone()); // False
    CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
    System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}

RxJava

RxJava のための全体ライブラリです。 リアクティブ・プログラミング Netflixで作成されました。一見すると、次のようなものに似ています。 Java 8のストリーム . しかし、もっと強力です。

RxJavaはFuturesと同様に、同期または非同期のアクションを束ねて処理パイプラインを作成するために使用できます。Futuresが一回しか使えないのに対して、RxJavaは ストリーム 0個以上のアイテムで構成される。無限のアイテムを持つ終わりのないストリームも含まれます。また、信じられないほど豊富な 演算子セット .

Java 8のストリームとは異なり、RxJavaには 背圧 このメカニズムにより、処理パイプラインの異なる部分が異なるスレッドで動作するケースを処理することができます。 異なる速度で .

RxJavaの欠点は、ドキュメントがしっかりしているにもかかわらず、パラダイムの転換が必要なため、習得が難しいライブラリであることです。Rxのコードは、特に複数のスレッドが関与している場合、またさらに悪いことに、背圧が必要な場合、デバッグするのが悪夢になる可能性もあります。

もし、あなたがそのことを知りたいのであれば、このページで紹介されているように ページ 公式サイトの様々なチュートリアルや、公式の ドキュメント ジャバドック . また、以下のようなビデオもご覧いただけます。 これ Rxの簡単な紹介と、RxとFuturesの違いについて話しています。

ボーナス: Java 9 Reactive Streams

Java 9のリアクティブストリーム 別名 フローAPI は、様々な リアクティブストリーム などのライブラリは RxJava 2 , Akka Streams そして Vertx . これらのリアクティブライブラリは、重要な背圧を維持したまま、相互接続することができます。