読者です 読者をやめる 読者になる 読者になる

そごうソフトウェア研究所

SOA、開発プロセス、ITアーキテクチャなどについて書いています。Twitterやってます@rsogo

Concurrency Utilities for Java EE(JSR-236)のシンプルなサンプル試してみる

Java

Java EEで非同期処理を行う方法を調べていました。 やりたいことは、iOSAndroidのPushメッセージのリクエストを受け付けて、リクエストの内容はデータベースに保存。非同期で、データベースの情報を元に、APNsやGCMにメッセージを送るという処理です。

選択肢としてはこんな感じかなと思います。

方式 検討したこと
Concurrency Utilities for Java EE Concurrencyという名前が付いているだけあって、単なる非同期処理というよりは、並列処理に関するコントロールがいろいろできそうです。
Message Driven Bean キューにメッセージがエンキューされたタイミングで、イベント・ドリブンで処理が動きます。JMSにメッセージがキューイングされるので、メッセージの永続化が可能。メッセージの永続化が可能なら、この方式かなと思います。今回は、Pushメッセージの内容はDBに永続化されていて、処理依頼のメッセージ自体のの永続性は不要だったので、JMSの環境構築の手間も考えて選択肢から外しました。
EJB Timer Service 単純な非同期処理以外にも、cron的なスケジュールベースの処理起動ができるみたいです。今回の要件では、多重度なくて、並行処理はどっちでもいいので、こちらの方式でもいいかも。

このエントリではConcurrency Utilities for Java EEを使ってみたいと思います。 Executors.newSingleThreadExecutorを使ってみます。これは、アプリケーション・サーバーなどのコンテナからスレッドを借りてこれますが、スレッドの多重度は1つになります。 生成されたスレッドは、コンテナのコンテキストを持っています。

Executors.newSingleThreadExecutorを使ったサンプルコード

Webサービスでリクエストを受け付ける部分

GETメソッドで処理を受け付けて、後述する非同期処理を起動します。 リクエストごとにRequestIdを採番しています。

package sample;

import javax.ws.rs.GET;
import javax.ws.rs.Path;

@Path("/task")
public class TaskService {

    static int index = 0;
    
    @GET
    @Path("/")
    public void start() {
        
        System.out.println("TaskService.start is called. RequestId is " + index);
        
        TaskManager timer = TaskManager.getInstance();
        timer.exec(index++);
    }
}

非同期処理を行う部分

Executors.newSingleThreadExecutorを使って、処理を非同期で行います。 シングルトンで実装しています。

非同期処理部分で2秒間待合せ処理をしています。 パラメタで受け取ったリクエストIDと、スレッドを識別するためのスレッドIDをログに出力しています。

package sample;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TaskManager {
    
    static private TaskManager instace = null;
    private ExecutorService executor = null;
    
    private TaskManager() {
        executor =  Executors.newSingleThreadExecutor();
    }
    
    synchronized static public TaskManager getInstance() {
        if (instace == null) {
            instace = new TaskManager();
        }
        return instace;
    }
    
    public void exec(int requestId) {
        Runnable runnable = new Runnable() {

            public void run() {
                System.out.println("RequestId " + requestId + ":ThreadId " + Thread.currentThread().getId() + " is running.");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {}
                System.out.println("RequestId " + requestId + ":ThreadId " + Thread.currentThread().getId() + " is finished.");
            }
        };
        executor.execute(runnable);
    }
}

実行時ログ

作ったWebサービスを呼び出して、ログを確認します。 Executors.newSingleThreadExecutorを使っていることで、リクエストが重なった時も1つのスレッドで順次処理されていることがわかります。

# 最初のリクエスト
01:00:46,566 INFO  [stdout] (default task-2) TaskService.start is called. RequestId is 0
01:00:46,568 INFO  [stdout] (pool-5-thread-1) RequestId 0:ThreadId 138 is running.
01:00:48,570 INFO  [stdout] (pool-5-thread-1) RequestId 0:ThreadId 138 is finished.
# リクエストを受け付けて、処理が不通に動きました。スレッドIDは138。

01:00:50,164 INFO  [stdout] (default task-3) TaskService.start is called. RequestId is 1
01:00:50,164 INFO  [stdout] (pool-5-thread-1) RequestId 1:ThreadId 138 is running.
01:00:52,169 INFO  [stdout] (pool-5-thread-1) RequestId 1:ThreadId 138 is finished.

# ここからリクエストを連続で投げました
01:00:53,444 INFO  [stdout] (default task-4) TaskService.start is called. RequestId is 2
01:00:53,444 INFO  [stdout] (pool-5-thread-1) RequestId 2:ThreadId 138 is running.
01:00:53,756 INFO  [stdout] (default task-5) TaskService.start is called. RequestId is 3
01:00:54,028 INFO  [stdout] (default task-6) TaskService.start is called. RequestId is 4
01:00:54,317 INFO  [stdout] (default task-7) TaskService.start is called. RequestId is 5
# リクエストは受付られましたが、処理はキューイングされています

01:00:55,447 INFO  [stdout] (pool-5-thread-1) RequestId 2:ThreadId 138 is finished.
01:00:55,447 INFO  [stdout] (pool-5-thread-1) RequestId 3:ThreadId 138 is running.
01:00:57,448 INFO  [stdout] (pool-5-thread-1) RequestId 3:ThreadId 138 is finished.
01:00:57,449 INFO  [stdout] (pool-5-thread-1) RequestId 4:ThreadId 138 is running.
01:00:59,451 INFO  [stdout] (pool-5-thread-1) RequestId 4:ThreadId 138 is finished.
01:00:59,451 INFO  [stdout] (pool-5-thread-1) RequestId 5:ThreadId 138 is running.
01:01:01,452 INFO  [stdout] (pool-5-thread-1) RequestId 5:ThreadId 138 is finished.
# 同じスレッドIDで順次、処理が実行されていきました

サンプルコード

サンプルコードの全部はGithubで公開しています。

github.com

参考ドキュメント

こちらを参考にさせてもらいました。

Concurrency Utilities for EE 7 | 寺田 佳央 - Yoshio Terada

Concurrency Utilities for Java EEをつかってみる - Qiita