ExecutorService

JDK1.5 の java.util.concurrent のスレッド管理機能が強力なので、なんか便利なものを作ってみたいのだが、なかなか難しい。上手に抽象化されすぎてて、非同期管理機能を提供するのはあんまり意味がないのだ。


「別スレッドで実行して、結果を受け取る。タイムアウト機能付き。」


なんてのは 1.4 以前だったらネタになった気がするが。


とはいえ、業務の中で使うのであれば concurrent パッケージに含まれるものを util として一ヶ所にまとめて軽くラップするのはいいアイデアかもしれない。業務処理の中でやるようなことで無いのは確かなので。

public class ExecutorUtils {

  /**
   * 別スレッドで実行して、結果を受け取る。タイムアウト機能付き。<br>
   * 
   * @param <T> 返却されるオブジェクトの型 T
   * @param callable Callable<T> の実装クラスを渡してください。
   * @return T
   * @throws TimeoutException 指定した timeout 秒(TimeUnit.SECOND の場合)
   *     経過しても処理が終了しない場合 
   * @throws ExecutionException callable 実行中に例外が発生した場合。(getCause() で原因取得)
   * @throws InterruptedException 別のスレッドから割り込みがかかった。(catch ブロックに
   *     割り込みがかかった場合の処理を記述してください。通常は単に処理が中断されたので、
   *     失敗を返すだけでOKです。Callable の実装クラスで sleep でもしてない限り発生しないと思います。
   */
  public static <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit unit) 
  throws InterruptedException, ExecutionException, TimeoutException {
    ExecutorService ex = Executors.newSingleThreadExecutor();
    Future<T> future = ex.submit(callable);
    T ret = future.get(timeout, unit);
    ex.shutdown();
    return ret;
  }
}

上記のような感じ。使い勝手は直接 ExecutorService を呼ぶのとあんまり変わらないが。呼ぶ側は以下のような感じで Callable を実装することになる。

    try {
      Boolean ret = ExecutorUtils.runWithTimeout(new Callable<Boolean>() {
        @Override
        public Boolean call() throws Exception {
          TimeUnit.MILLISECONDS.sleep(3001);
          return true;
        }
        
      }, 3, TimeUnit.SECONDS);
      System.out.println("結果:" + ret);
    } catch (InterruptedException e) {
      // 処理が中断されたので失敗。起きないはずなんで、
      // 逆に対処は・・・運用通知メールでも投げるんだろうか。
    } catch (ExecutionException e) {
      // これは e.getCause() で中身を確認して、原因に応じた処理。
    } catch (TimeoutException e) {
      // タイムアウトになっちゃいました例外とか、
      // リトライキューへの挿入とかを行う。
    }


ちなみにタイムアウトはビジーループに対しても有効。上記は sleep しているが、そこを、

for(;;) {if(1==2) break;}  // ビジーループ

と置き換えても同じように機能する。


ま、やっぱりあんまりラップする意味は無い。が、まあ、高負荷になるサービスの中でよく意味を理解しないで各自が勝手に CachedThreadPool を作ったりするのは問題を起こす。簡単にできちゃうだけに、ラップして一元管理すること自体に意味はあるんじゃねーのって気がする。

セマフォ付きにしてみる。

何らかの外部APIの呼び出し時なんかに、呼び出し側に問題ないからといって、数千スレッド同時リクエスト投げちゃっていいかというと相手側のサービスに迷惑がかかる場合がある。もちろん、そういう状態になってしまう呼び出し側の方にも問題ありそうだが、まあそれはおいといて。*1。そういう場合はセマフォとか、Mutex を使いたくなる。セマフォも上手く抽象化されていると思う。runWithTimeout を内部で使って同時実行数制限をかけるメソッドを追加すると、こういう感じか。

/**
 * 別スレッドで実行して、結果を受け取る。タイムアウトと、セマフォ付き。<br>
 * 
 * @param <T> 返却されるオブジェクトの型 T
 * @param callable Callable<T> の実装クラスを渡してください。
 * @param sem 同時実行可能数を定義。シングルトンな Semaphore オブジェクトを渡してください。
 * @param timeout タイムアウト時間
 * @param tryAcquire リソースのブロック待ち時間
 * @param unit タイムアウトとブロック待ち時間の単位
 * @return T
 * @throws TimeoutException 指定した timeout 秒(TimeUnit.SECOND の場合)
 *     経過しても処理が終了しない場合 
 * @throws ExecutionException callable 実行中に例外が発生した場合。(getCause() で原因取得)
 * @throws InterruptedException 別のスレッドから割り込みがかかった。(catch ブロックに
 *     割り込みがかかった場合の処理を記述してください。通常は単に処理が中断されたので、
 *     失敗を返すだけでOKです。Callable の実装クラスで sleep でもしてない限り発生しないと思います。
 */
public static <T> T runWithTimeout(Callable<T> callable, Semaphore sem, long timeout, long tryAcquire, TimeUnit unit) 
throws InterruptedException, ExecutionException, TimeoutException {
  T ret;
  try {
    if (sem.tryAcquire(tryAcquire, unit)) {
      ret = runWithTimeout(callable, timeout, unit);
    } else {
      throw new TimeoutException(
          String.format("The resource was not able to be acquired for %d %s.", tryAcquire, unit.toString()));
    }
  } finally {
    sem.release();
  }
  return ret;
}


これを使う場合、何らかのクライアントクラスを作ってやって、そこに、シングルトンのセマフォオブジェクトを一個用意してやる。凝ったことしなくても、static オブジェクトとして作ってやればいいと思う。

  /**
   * 同時リクエスト数
   * クライアント毎に一個セマフォを用意しておく。同時実行可能数をここで定義する
   */
  private static Semaphore sem = new Semaphore(10);
  
  /**
   * 処理本体(戻り値voidとかであんまり現実的ではないが)
   * 処理終了を 180 秒まで待ち、リソース開放を 3 秒待つ例。
   */
  public static void hogehoge(String[] args) {
    try {
      Boolean ret = ExecutorUtils.runWithTimeout(new Callable<Boolean>() {
        public Boolean call() throws Exception {
          // 外部APIを呼び出しする処理
          return true;
        }
      }, sem, 180, 3, TimeUnit.SECONDS);
      System.out.println("結果:" + ret);
    } catch (InterruptedException e) {
      // 処理が中断されたので失敗。起きないはずなんで、
      // 逆に対処は・・・運用通知メールでも投げるんだろうか。
    } catch (ExecutionException e) {
      // これは e.getCause() で中身を確認して、原因に応じた処理。
    } catch (TimeoutException e) {
      // タイムアウトになっちゃいました例外とか、
      // リトライキューへの挿入とかを行う。
    }
  }

クライアントクラスにセマフォオブジェクトを生で持たせるんじゃなくて、ラップしたほうがいいのか?とか若干考えたがあんまりそういうことを考えるのは好きじゃない(面倒くさい)ので、生のまま持たせる形でとりあえず書いてみた。static な new Semaphore(10) が同時実行数10なんだな、っていうのが直感的に分かりにくいとも思えないので、これでいいんじゃないか。


フレームワークとしてこの辺をラップする場合、アノテーションでやることも出来る気はする。

public class HogeServiceClientImpl {
  @concurrency(10)
  public boolean call(int serviceId, String value) {
    // ...
  }
}

みたいな。
でもまーどうだろう。せっかく concurrent、上手に抽象化されてるし、そのまま使ったほうがわかりやすい気がする。アノテーションで、とか、確かにすっきりはするけど、頑張りすぎな気がする。上の例だと単純だけど、実際には例外処理とか呼び出し側には必要になってくるわけだし。この辺はバランス感覚。

*1:そういう状態になっちゃうと呼び出し側の効率もかなり下がってると考えるのが自然