[解決済み] List<CompletableFuture>からCompletableFuture<List>に変換する。
質問
私は
List<CompletableFuture<X>>
を
CompletableFuture<List<T>>
. これは、多くの非同期タスクがあり、すべてのタスクの結果を取得する必要がある場合に非常に便利です。
もし、どれかが失敗したら、最終的な未来は失敗します。このように実装しています。
public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
if(com.isEmpty()){
throw new IllegalArgumentException();
}
Stream<? extends CompletableFuture<T>> stream = com.stream();
CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
x.add(y);
return x;
},exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
ls1.addAll(ls2);
return ls1;
},exec));
}
実行するには
ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);
どれか一つでも失敗したら、失敗です。100万個の先物があっても、期待通りの出力が得られます。 問題は 例えば、5000枚以上の先物があり、そのうちの1枚が失敗した場合、私は
StackOverflowError
:
スレッドで例外発生 "pool-1-thread-2611" java.lang.StackOverflowError において java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193) at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487)
何が間違っているのでしょうか?
注:上記の返された未来は、未来のどれかが失敗したときに正しく失敗します。受理された回答はこの点も考慮する必要があります。
どのように解決するのですか?
使用方法
CompletableFuture.allOf(...)
:
static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
あなたの実装について少しコメントします。
あなたの使用する
.thenComposeAsync
,
.thenApplyAsync
と
.thenCombineAsync
は期待通りの働きをしていない可能性が高いです。 これらは
...Async
メソッドは提供された関数を別のスレッドで実行します。 つまり、あなたの場合、リストへの新しいアイテムの追加は、提供された実行ファイルで実行されるようになります。 軽量な操作をキャッシュされたスレッドエクゼキュータに詰め込む必要はないのです。 また
thenXXXXAsync
メソッドを使わないでください。
さらに
reduce
は、ミュータブルコンテナに蓄積するために使ってはいけません。 ストリームがシーケンシャルな場合は正しく動作しても、ストリームがパラレルになった場合は失敗します。 ミュータブルリダクションを行うには
.collect
を使ってください。
最初の失敗の直後に例外的に全計算を完了させたい場合は、次のように
sequence
メソッドで次のようにします。
CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
com.forEach(f -> f.whenComplete((t, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
}
}));
return result;
さらに、最初の失敗時に残りの処理をキャンセルしたい場合は、次のように
exec.shutdownNow();
のすぐ後に
result.completeExceptionally(ex);
. もちろん、これは
exec
がこの1つの計算のためだけに存在することを前提としています。 そうでない場合は、ループオーバーして残りの各
Future
を個別にキャンセルする必要があります。
関連
-
Web Project JavaでPropertiesファイルを読み込むと、「指定されたファイルがシステムで見つかりません」というソリューションが表示されます。
-
[解決済み] JavaでInputStreamを読み込んでStringに変換するにはどうすればよいですか?
-
[解決済み] 配列からArrayListを作成する
-
[解決済み] JavaでStringをintに変換するにはどうしたらいいですか?
-
[解決済み] Java で、あるコンストラクタを別のコンストラクタから呼び出すにはどうすればよいですか?
-
[解決済み] Javaで文字列値からenum値を取得する方法
-
[解決済み] スタックトレースを文字列に変換するにはどうすればよいですか?
-
[解決済み] Java 8 List<V> を Map<K, V> に変換する。
-
[解決済み] Java 8 StreamをArrayに変換する方法は?
-
[解決済み] Java 8でリストのリストをリストにするにはどうしたらいいですか?
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
Eclipse問題 アクセス制限。タイプ 'SunJCE' が API でないことを解決し、/jdk ディレクトリにある /jre と jre の違いについて理解を深める。
-
アクセス制限です。タイプ 'Application' は API ではない(必要なライブラリに制限がある)。
-
java マイクロソフト払い戻し予期せぬサーバーからのファイルの終了
-
CertificateException: XXXに一致するサブジェクトの代替DNS名が見つかりません 解決策
-
JDK8 の Optional.of と Optional.ofNullable メソッドの違いと使い方を説明する。
-
Java基礎編 - オブジェクト指向
-
コンストラクタDate()が未定義である問題
-
ecplise プロンプトが表示されます。"選択したものは起動できません。" "最近の起動はありません。"
-
Java Runtime Environmentを継続するためのメモリが不足しています。
-
java 365*1000*60*60*24 計算問題