Concurrency Utilities for Java EE(JSR-236)のシンプルなサンプル試してみる
Java EEで非同期処理を行う方法を調べていました。 やりたいことは、iOSやAndroidのPushメッセージのリクエストを受け付けて、リクエストの内容はデータベースに保存。非同期で、データベースの情報を元に、APNsやGCMにメッセージを送るという処理です。
選択肢としてはこんな感じかなと思います。
方式 | 検討したこと |
---|---|
Concurrency Utilities for Java EE | Concurrencyという名前が付いているだけあって、単なる非同期処理というよりは、並列処理に関するコントロールがいろいろできそうです。 |
Message Driven Bean | キューにメッセージがエンキューされたタイミングで、イベント・ドリブンで処理が動きます。JMSにメッセージがキューイングされるので、メッセージの永続化が可能。メッセージの永続化が可能なら、この方式かなと思います。今回は、Pushメッセージの内容はDBに永続化されていて、処理依頼のメッセージ自体のの永続性は不要だったので、JMSの環境構築の手間も考えて選択肢から外しました。 |
EJB Timer Service | 単純な非同期処理以外にも、cron的なスケジュールベースの処理起動ができるみたいです。今回の要件では、多重度なくて、並行処理はどっちでもいいので、こちらの方式でもいいかも。 |
このエントリではConcurrency Utilities for Java EEを使ってみたいと思います。 Executors.newSingleThreadExecutorを使ってみます。これは、アプリケーション・サーバーなどのコンテナからスレッドを借りてこれますが、スレッドの多重度は1つになります。 生成されたスレッドは、コンテナのコンテキストを持っています。
Executors.newSingleThreadExecutorを使ったサンプルコード
Webサービスでリクエストを受け付ける部分
GETメソッドで処理を受け付けて、後述する非同期処理を起動します。 リクエストごとにRequestIdを採番しています。
package sample; import javax.ws.rs.GET; import javax.ws.rs.Path; @Path("/task") public class TaskService { static int index = 0; @GET @Path("/") public void start() { System.out.println("TaskService.start is called. RequestId is " + index); TaskManager timer = TaskManager.getInstance(); timer.exec(index++); } }
非同期処理を行う部分
Executors.newSingleThreadExecutorを使って、処理を非同期で行います。 シングルトンで実装しています。
非同期処理部分で2秒間待合せ処理をしています。 パラメタで受け取ったリクエストIDと、スレッドを識別するためのスレッドIDをログに出力しています。
package sample; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TaskManager { static private TaskManager instace = null; private ExecutorService executor = null; private TaskManager() { executor = Executors.newSingleThreadExecutor(); } synchronized static public TaskManager getInstance() { if (instace == null) { instace = new TaskManager(); } return instace; } public void exec(int requestId) { Runnable runnable = new Runnable() { public void run() { System.out.println("RequestId " + requestId + ":ThreadId " + Thread.currentThread().getId() + " is running."); try { Thread.sleep(2000); } catch (InterruptedException e) {} System.out.println("RequestId " + requestId + ":ThreadId " + Thread.currentThread().getId() + " is finished."); } }; executor.execute(runnable); } }
実行時ログ
作ったWebサービスを呼び出して、ログを確認します。 Executors.newSingleThreadExecutorを使っていることで、リクエストが重なった時も1つのスレッドで順次処理されていることがわかります。
# 最初のリクエスト 01:00:46,566 INFO [stdout] (default task-2) TaskService.start is called. RequestId is 0 01:00:46,568 INFO [stdout] (pool-5-thread-1) RequestId 0:ThreadId 138 is running. 01:00:48,570 INFO [stdout] (pool-5-thread-1) RequestId 0:ThreadId 138 is finished. # リクエストを受け付けて、処理が不通に動きました。スレッドIDは138。 01:00:50,164 INFO [stdout] (default task-3) TaskService.start is called. RequestId is 1 01:00:50,164 INFO [stdout] (pool-5-thread-1) RequestId 1:ThreadId 138 is running. 01:00:52,169 INFO [stdout] (pool-5-thread-1) RequestId 1:ThreadId 138 is finished. # ここからリクエストを連続で投げました 01:00:53,444 INFO [stdout] (default task-4) TaskService.start is called. RequestId is 2 01:00:53,444 INFO [stdout] (pool-5-thread-1) RequestId 2:ThreadId 138 is running. 01:00:53,756 INFO [stdout] (default task-5) TaskService.start is called. RequestId is 3 01:00:54,028 INFO [stdout] (default task-6) TaskService.start is called. RequestId is 4 01:00:54,317 INFO [stdout] (default task-7) TaskService.start is called. RequestId is 5 # リクエストは受付られましたが、処理はキューイングされています 01:00:55,447 INFO [stdout] (pool-5-thread-1) RequestId 2:ThreadId 138 is finished. 01:00:55,447 INFO [stdout] (pool-5-thread-1) RequestId 3:ThreadId 138 is running. 01:00:57,448 INFO [stdout] (pool-5-thread-1) RequestId 3:ThreadId 138 is finished. 01:00:57,449 INFO [stdout] (pool-5-thread-1) RequestId 4:ThreadId 138 is running. 01:00:59,451 INFO [stdout] (pool-5-thread-1) RequestId 4:ThreadId 138 is finished. 01:00:59,451 INFO [stdout] (pool-5-thread-1) RequestId 5:ThreadId 138 is running. 01:01:01,452 INFO [stdout] (pool-5-thread-1) RequestId 5:ThreadId 138 is finished. # 同じスレッドIDで順次、処理が実行されていきました
サンプルコード
サンプルコードの全部はGithubで公開しています。
参考ドキュメント
こちらを参考にさせてもらいました。