[解決済み] Java 8 の並列ストリームにおけるカスタムスレッドプール
質問内容
Java 8 のカスタムスレッドプールを指定することは可能ですか? 並列ストリーム ? どこにも見当たらないのですが。
サーバー・アプリケーションがあり、並列ストリームを使いたいとします。しかし、アプリケーションは大規模でマルチスレッドであるため、それを区分けしたいと思います。アプリケーションのあるモジュールで実行速度の遅いタスクが、他のモジュールのタスクをブロックするようなことはしたくありません。
モジュールごとに異なるスレッドプールを使用できないのであれば、現実世界のほとんどの状況で並列ストリームを安全に使用できないことを意味します。
次のような例で試してみてください。CPUに負荷のかかるタスクがいくつかあり、別々のスレッドで実行されています。 このタスクは並列ストリームを利用しています。最初のタスクは壊れているので、各ステップに 1 秒かかります (スレッドスリープでシミュレート)。問題は、他のスレッドが、壊れたタスクの終了を待って、立ち往生してしまうことです。これは意図的な例ですが、サーブレットアプリで、誰かが長時間実行するタスクを共有フォークジョインプールにサブミットした場合を想像してみてください。
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
解決方法は?
実は、並列処理を特定のフォークジョインプールで実行する方法があるんです。フォークジョインプール内のタスクとして実行すると、そこに留まって共通のものを使わないのです。
final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = forkJoinPool.submit(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList())
).get();
System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
この仕掛けは
ForkJoinTask.fork
は次のように指定します: "現在のタスクが実行されているプールでこのタスクを非同期に実行するように手配します (該当する場合)。
ForkJoinPool.commonPool()
そうでない場合は
inForkJoinPool()
"。
関連
-
Eclipseでプロジェクトエクスプローラービューとパッケージエクスプローラービューを使う
-
Spring BootのテストメソッドFailed to load ApplicationContextの問題を解決する
-
[解決済み] JavaでInputStreamを読み込んでStringに変換するにはどうすればよいですか?
-
[解決済み] JavaでNullPointerExceptionを回避する方法
-
[解決済み] JavaにおけるHashMapとHashtableの違いは何ですか?
-
[解決済み] Javaでメモリーリークを発生させるにはどうしたらいいですか?
-
[解決済み] JavaでArrayListではなくLinkedListを使用するのはいつですか?
-
[解決済み] Javaにおける "implements Runnable "と "extends Thread "の違いについて
-
[解決済み] Java 8 StreamをArrayに変換する方法は?
-
[解決済み] 可能な限り常にパラレルストリームを使用した方がいいのでしょうか?
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
Java のエラーです。未解決のコンパイル問題 解決方法
-
Enumとの組み合わせでswitchの使い方を一度覚えるために必要な定数式
-
プロローグでのコンテンツは禁止されています
-
JDKの設定時にjava.dllが見つからない、java SE Runtime Environmentが見つからない問題が発生しました。
-
node js npm gruntインストール、elasticsearch-head 5.Xインストール
-
マスキング このリソースにアクセスするには、完全な認証が必要です。
-
自動配線された依存性のインジェクションに失敗しました。
-
[オリジナル】java学習ノート【II】よくあるエラー クラスパス上のクラスファイルが見つからない、またはアクセスできない場合
-
あるコードに出会いましたが、何に使うのか理解できません。 List<String> list = new ArrayList<String>() { { a
-
[解決済み] Java 8 のストリームと RxJava の observable の違い