1. ホーム
  2. java

[解決済み] List<CompletableFuture>からCompletableFuture<List>に変換する。

2023-07-12 06:13:43

質問

私は List<CompletableFuture<X>>CompletableFuture<List<T>> . これは、多くの非同期タスクがあり、すべてのタスクの結果を取得する必要がある場合に非常に便利です。

もし、どれかが失敗したら、最終的な未来は失敗します。このように実装しています。

public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
    if(com.isEmpty()){
        throw new IllegalArgumentException();
    }
    Stream<? extends CompletableFuture<T>> stream = com.stream();
    CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
    return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
        x.add(y);
        return x;
    },exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
        ls1.addAll(ls2);
        return ls1;
    },exec));
}

実行するには

ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep((long) (Math.random() * 10));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);

どれか一つでも失敗したら、失敗です。100万個の先物があっても、期待通りの出力が得られます。 問題は 例えば、5000枚以上の先物があり、そのうちの1枚が失敗した場合、私は StackOverflowError :

スレッドで例外発生 "pool-1-thread-2611" java.lang.StackOverflowError において java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193) at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487)

何が間違っているのでしょうか?

注:上記の返された未来は、未来のどれかが失敗したときに正しく失敗します。受理された回答はこの点も考慮する必要があります。

どのように解決するのですか?

使用方法 CompletableFuture.allOf(...) :

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
    return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
            .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
}

あなたの実装について少しコメントします。

あなたの使用する .thenComposeAsync , .thenApplyAsync.thenCombineAsync は期待通りの働きをしていない可能性が高いです。 これらは ...Async メソッドは提供された関数を別のスレッドで実行します。 つまり、あなたの場合、リストへの新しいアイテムの追加は、提供された実行ファイルで実行されるようになります。 軽量な操作をキャッシュされたスレッドエクゼキュータに詰め込む必要はないのです。 また thenXXXXAsync メソッドを使わないでください。

さらに reduce は、ミュータブルコンテナに蓄積するために使ってはいけません。 ストリームがシーケンシャルな場合は正しく動作しても、ストリームがパラレルになった場合は失敗します。 ミュータブルリダクションを行うには .collect を使ってください。

最初の失敗の直後に例外的に全計算を完了させたい場合は、次のように sequence メソッドで次のようにします。

CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
        .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
        );

com.forEach(f -> f.whenComplete((t, ex) -> {
    if (ex != null) {
        result.completeExceptionally(ex);
    }
}));

return result;

さらに、最初の失敗時に残りの処理をキャンセルしたい場合は、次のように exec.shutdownNow(); のすぐ後に result.completeExceptionally(ex); . もちろん、これは exec がこの1つの計算のためだけに存在することを前提としています。 そうでない場合は、ループオーバーして残りの各 Future を個別にキャンセルする必要があります。