1. ホーム
  2. java

[解決済み] Javaエグゼキュータ:タスクの完了をブロックせずに通知する方法とは?

2022-04-21 21:35:59

質問

私はエグゼキュータサービスに提出する必要があるタスクでいっぱいのキューを持っているとします。 私はそれらを一度に一つずつ処理したい。 私が考える最も簡単な方法は、次のとおりです。

  1. キューからタスクを取り出す
  2. エグゼキュータに提出する
  3. 返されたFutureに対して.getを呼び、結果が得られるまでブロックする。
  4. キューから別のタスクを取る...

しかし、私はブロッキングを完全に避けようとしています。 このようなキューが1万個もあって、それらが一度に1つずつタスクを処理する必要がある場合、そのほとんどがブロックされたスレッドを保持することになるので、スタックスペースが足りなくなるのです。

私が望むのは、タスクを送信して、タスクが完了したときに呼び出されるコールバックを提供することです。 そのコールバックの通知を、次のタスクを送るためのフラグとして使うのです。(functionaljavaとjetlangはそのようなノンブロッキングアルゴリズムを使用しているようですが、私は彼らのコードを理解することができません)

JDKのjava.util.concurrentを使ってそれを実現するには、独自のエグゼキューターサービスを書く以外に方法はないのでしょうか?

(これらのタスクを送り出すキュー自体がブロックされる可能性がありますが、それは後で取り組むべき問題です)

解決方法は?

完了通知で渡したいパラメータを受け取るコールバックインターフェースを定義する。そして、タスクの終了時にそれを呼び出します。

Runnableタスクの一般的なラッパーを書き、それらを ExecutorService . または、Java 8に組み込まれたメカニズムについては、以下を参照してください。

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}


CompletableFuture Java 8 では、非同期かつ条件付きで処理を完了させることができるパイプラインを構成するための、より精巧な手段が提供されています。以下は、通知のための巧妙な、しかし完全な例です。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}