やり直しJava

マルチスレッド

Jump to Section

Javaはスレッドを言語仕様として規定していて 登場当時は従来の言語に比べて並列プログラミングが簡単に実装できることを一つの売りとしていました。スレッドの枠組みは Javaの進化と共に強化されてきて 次のような枠組みが登場してきました。

  • プレーンなThreadクラス
  • Executorフレームワーク・Futureクラス(J2SE 5.0~)
  • Fork/Joinフレームワーク(Java SE 7~)
  • CompletableFuture (Java SE 8~)
  • 並列ストリーム(Java SE 8~)

Executorフレームワークの登場以降、プレーンなThreadクラスを直接扱う必要は無くなりました。また Java SE 8でCompletableFutureやParallelStreamが導入されたことにより、他の関数型言語と同じように非同期処理をスマートに実装できるようになりました。

この章ではフレームワークに依らない スレッドに関する共通的な事項を見てから、各種フレームワークについて見ていきます。

スレッドに関する基本事項

スレッドのフレームワークに依らない基本的な事項をまとめます。

スレッドの種類(デーモンスレッドとユーザスレッド)

Javaのスレッドにはデーモンスレッドユーザスレッドがあります。2 つの違いは次の通りです。

  • ユーザスレッドが実行している間は Java仮想マシンは終了しません。
  • 実行中のスレッドがデーモンスレッドだけになるとJava仮想マシンは終了します。

Java仮想マシンが起動すると、指定されたクラスのmain() クラスメソッドをユーザスレッドで実行します。main()メソッドを抜けると 実行中のユーザスレッドが無くなるので Java仮想マシンは終了します。

main()メソッドの中で 別のユーザスレッドを起動した場合、main()メソッドを抜けても 起動した別スレッドの処理が終了するまで Java仮想マシンは終了しません。一方でmain()メソッドの中で別のデーモンスレッドを起動した場合、main()メソッドを抜けると 起動した別スレッドが処理を実行中でも Java仮想マシンは終了します。

ユーザスレッドから別スレッドを作成するとユーザスレッドになり、デーモンスレッドから別スレッドを作成するとデーモンスレッドとなります。isDaemon()メソッドでデーモンスレッドかどうか確認でき、setDaemon()メソッドで切替えることもできます。

スレッドプールの種類によっては ワーカスレッドがデーモンスレッドであるため、ワーカスレッドの終了を待ち合わせないと ワーカスレッド実行中に Java仮想マシンが終了してしまうケースが出てきますので注意が必要です。

スレッドのライフサイクル

スレッドの状態と状態遷移を次にまとめます。

スレッドの状態と状態遷移
状態概要遷移アクション遷移先
NEWスレッドのインスタンスが生成された状態。まだスレッドは開始されていない。start()RUNNING
RUNNINGスレッド実行中の状態。run()メソッドの処理が終了すればTERMINATEDに遷移。wait()でWAITINGに遷移したり、synchronizedで モニタのロック解放待ちでBLOCKEDに遷移したりする。run()終了TERMIATED
wait()WAITING
wait(timeout)/sleep()TIMED_WAITING
モニタのロック獲得待ちBLOCKED
TERMINATEDスレッドの処理終了。
WAITING実行条件が整って他のスレッドから再開されるのを待っている状態。notify()/notifyAll()RUNNING
TIMED_WAITING期限付きのWAITINGの状態。notify()/notifyAll()/
タイムアウト
RUNNING
BLOCKEDモニタのロック獲得待ち、またはI/O処理待ちの状態。モニタロック獲得/
I/O処理終了
RUNNING

スレッドの現在の状態は ThreadクラスのgetState()メソッドで確認することができます。

スレッド間通信と同期

スレッド間通信

スレッド間通信と言うと 大袈裟な感じもしてしまいますが、スレッド間でデータをやり取りしたり 待ち合わせを行うことを指します。スレッド間通信は共有メモリを介して行います。共有メモリとして利用できるのは ヒープ領域に置かれたインスタンスやクラスフィールド等です。具体的には次のような方法でスレッド間通信を行うことができます。

  • クラスフィールドを介してスレッド間通信を行う。
  • クラスのシングルトンオブジェクトを介してスレッド間通信を行う。
  • 共有するオブジェクトの参照を それぞれのスレッドで持ち合い、共有オブジェクトを通してスレッド間通信を行う。

並列処理を行う場合、スレッド間通信が必要なければ シングルスレッドの場合と変わらないので 難しいことは何もありません。しかし 大抵の場合はスレッド間通信が必要なため 排他制御同期が必要になり、並列処理の複雑性を上げる要因となります。

並列処理における要件

マルチスレッドによる並列処理を行う場合は シングルスレッドの場合と比べて求められる要件が異なってきます。並列処理を行う場合 次のような要件が求められます。

  • 安全性:複数のスレッドが同じ共有メモリにアクセスする場合、適切に同期を取らないと実行のたびに異なる結果になってしまいます(このような動作を非決定的な動作と言います)。非決定的な動作にならないようにする必要があります。
  • 活性(liveness):必要な処理はいつか必ず行われなくてはいけません。具体的にはデッドロック無限ループに陥いらないようにする必要があります。
  • 処理性能:同じ処理を高速に、または同じ時間で大容量の処理を行えることが望ましいです。

安全性と活性を守ることは必須条件で、その上で処理性能を上げることが肝要です。

安全性

並列処理で安全性の要件を満たすためには いくつかの方法が挙げられます。方法によって安全性のレベル及び難易度が異なり、またシステムによって適合できない物もあります。システムの要件に応じて適切な方法を選択することが肝要です。

並列処理での安全性要件を満たすには 次のような方法が挙げられます。

  • 複数のスレッド間でデータを共有しない。また、複数のスレッド間でタイミングの同期を取らない。
  • 複数のスレッド間で共有するデータを不変オブジェクトにする。
  • 複数のスレッド間で共有するデータにアクセスする際にロックを掛けて排他制御を行う。
  • 複数のスレッド間で共有するデータにアクセスする際にロックフリーな仕組みで安全性を確保する。

それぞれの方法について説明します。

複数のスレッド間でデータを共有しない

言うまでもなく 複数スレッド間でデータを共有しないようにできれば それが一番安全です。しかし、システムの要件が適合しなければこの方法は選択できず、大抵はスレッド間通信を必要とします。

他のスレッドからアクセスできないようなデータには 次のようなものがあります。

  • ローカル変数
  • privateなインスタンスフィールドで、インスタンスフィールドにアクセスできるメソッドを提供しない
  • ThreadLocal

ThreadLocalについては、この章の「ThreadLocalクラス」で説明します。

複数のスレッド間で共有するデータを不変オブジェクトにする

定数や不変(immutableな)オブジェクトであれば いつどのスレッドからでも同じデータを見ることができるので 複数スレッド間で 同期することなく安全に共有することができます。全ての共有データを不変オブジェクトにできない場合でも 可能な部分を不変オブジェクトとすることによって 部分的にロックフリーな安全性を確保することができます。

複数のスレッド間で共有するデータにアクセスする際に ロックを掛けて排他制御を行う

複数のスレッド間で共有するデータにアクセスする際に、synchronizedやjava.util.concurrent.locks.Lockインタフェースの実装クラスなどを使って ロックを掛けて排他制御を行います。ロックを掛ける排他制御では 後述するアトミック性可視性の両方を確保することができます。特にsynchronizedはネストが可能で ロックの解放も自動で行ってくれるため便利で安全です。適切に使えば synchronizedだけで並列処理の安全性を確保することができます

ただし、ロックを使用する場合は 少なからずもロック処理に要するコストやロック獲得の待ち時間が必要になるため、性能要件が求められるような場合は 次に挙げるロックフリーな(またはロックの使用を最小限に抑えた)仕組みで安全性を確保する方法を検討することになります。

複数のスレッド間で共有するデータにアクセスする際に ロックフリーな仕組みで安全性を確保する

ロックフリーな(またはロックの使用を最小限に抑えた)仕組みで安全性を確保するには 更にいくつかの方法が挙げられます。

  • 元々アトミックな操作に対して 可視性を確保する。
  • 変更操作についてはロックを掛けて排他制御を行い、読み取り操作については可視性だけを確保する。(ロックを減らす)
  • アトミックな操作を提供するクラス(AtomicInteger等)を利用する。
  • 共有データを定数や不変オブジェクトにする。
  • ロックフリーな操作を提供するクラス(ConcurrentHashMap等)を利用する。

いずれの方法も可視性を確保する必要があるため、可視性の説明をしてから 再度この話題に触れます。

アトミック性と可視性が適切に確保されていないと、性能向上を図るどころか安全性を損ねてシステムとして破綻しかねないため、ロックを使った排他制御に比べて 高度な知識と十分な検証が必要になります

活性(liveness)

並列処理で活性を損なう典型例は デッドロックです。デッドロックは複数のスレッドが複数のロックを必要とする場合に お互いが別のロックが解放されるのを待つような場合に発生します。例えばロックA・ロックBがあり、スレッド1がロックAを獲得を スレッド2がロックBを獲得している状態で、スレッド1がロックBを獲得しようとして解放を待ち スレッド2がロックAを獲得しようとして解放を待つとデッドロックに陥ります。

複数のスレッドが複数のロックを取り合うと 簡単にデッドロックに陥ってしまうと思うかも知れませんが、デッドロックに陥るには次の4つの条件が揃っている必要があり、逆に言うといずれかの条件を1つでも崩すことができればデッドロックの発生を防ぐことができます。デッドロックが発生するための4条件は次の通りです。

  1. ロックが複数ある。
  2. スレッドが あるロックを獲得したまま 別のロックを獲得しようとする。
  3. ロックを獲得しようとした際に 他のスレッドがロックを占有している場合は ロックが解放されるのを永遠に待つ。
  4. 複数のロックを獲得する際に ロックを獲得する順番が決まっていない。

synchronizedを使う場合は 3番目の条件を崩すことはできません。java.util.concurrent.locks.Lockインタフェースの実装クラスを使うと 3番目の条件を崩すことができます。すなわち ロックが獲得できるか試して、獲得できない場合は即時に復帰して 占有済みのロックを解放することによって デッドロックを防ぐことができます。

同期プリミティブ

マルチスレッドによる並列処理を行う際に スレッド間で安全にデータを共有したりタイミングを同期させるのに必要なのが同期プリミティブです。同期プリミティブには次の2つの役割があります。

  1. 複数のスレッドから共有メモリにアクセスする際に 排他制御や順序管理を行う。(排他制御)
  2. あるスレッドが 実行条件が整わない場合に一時的に休眠し、実行条件が整ったら別のスレッドから起こしてもらい処理を再開する。(待ち合わせ)

同期プリミティブとしては セマフォモニタなどが使われますが、Javaでは言語レベルの同期プリミティブとして モニタ(synchronizedがモニタの実装になります)が採用されています。モニタによって保護されたブロックは1度に1つのスレッドだけが実行できます(排他制御の実現)。モニタによって保護されたブロックを実行するには ロックを獲得する必要があり、JavaのObject 1つにつき 1つのロックが存在します。どのObjectもロックとして利用することができ、言わばObjectをロックその物として扱うことができます。

また、モニタによって保護されたブロックに入ったけれども 実行条件が整っていない場合は、休眠状態に入り 実行条件が整った時に 別のスレッドから起こしてもらうことができます(待ち合わせの実現)。Objectは1つのロックの他に ウェイトセットという待機スレッドの集合を持っています。あるスレッドがあるObjectのwait()メソッドを呼び出すと、そのObjectのウェイトセットに自身を格納して休眠状態に入ります。別のスレッドが同じObjectのnotify()メソッドを呼び出すと ウェイトセットのうちの1つのスレッドが再開されます。(ウェイトセットのうちどのスレッドが再開されるかは順序に依らず不定です。)notify()メソッドの代わりにnotifyAll()メソッドを呼び出すと ウェイトセットの全てのスレッドが再開されます。

Javaでは 言語仕様としてモニタを採用している他に、java.util.concurrentパッケージに セマフォや待ち合わせなどの同期化支援のためのクラス群が用意されています。

同期プリミティブ」(自作コンパイラの部屋)

アトミック性・可視性

Javaにおいて 複数のスレッドから共有メモリにアクセスする際には アトミック性可視性という2つの側面を考慮する必要があります。アトミック性は Javaに限らず一般的に並列処理を行う際に必要になります。可視性はJavaのメモリモデルに起因するものになります。

Javaの言語仕様では 同期プリミティブとしてモニタを提供していて synchronizedでスレッド間の同期を取ることができます。synchronizedによる同期は アトミック性と可視性の両方を確保するため、synchronizedだけを使っている場合は可視性を意識することはないかも知れません。しかし、synchronizedによる同期は ロックを使用するため、少なからずもロック処理に要するコストやロック獲得の待ち時間が必要になります。そのため、性能要件が求められるような場合は ロックフリーな(またはロックの使用を最小限に抑えた)スレッドセーフの仕組みを検討することになります。ロックフリーでスレッドセーフな仕組みを実現しようとする場合には 可視性の問題と向き合う必要があります。

アトミック性

明示的にスレッド間の同期を取らない場合は 各スレッドは各々独立して処理を行うことになり スレッド間の実行順番は不定となります。その状態で複数のスレッドが ほぼ同時に共有メモリにアクセスすると競合状態(race condition)が発生する可能性があります。競合状態になると システム全体の実行結果が 各スレッドの処理の実行順序に依存する形になり、同じ入力を与えても プログラム実行のたびに結果が変わってしまう非決定的な動作になってしまいます。非決定的な動作では カウンタが正しくカウントされなかったり、銀行の預金システムで残高がないにも関わらず送金ができてしまったりという事象が発生してしまい、システムとして破綻してしまいます。

競合状態が発生する例を次に挙げます。10個のスレッドからそれぞれ1万回 共通のカウンタをカウントアップします。最終的なカウンタは10万回であることが期待されます。この例では可視性を確保するために共有変数counterにはvolatile修飾子を指定しています。(可視性とvolatileについては後述します。)

private static volatile int counter = 0;

public static void main(String[] args) {
    ExecutorService es = Executors.newCachedThreadPool();

    for(int i = 0; i < 10; i ++) {
        es.execute(() -> {
            for(int j = 0; j < 100000; j ++) {
                counter ++;
            }
        });
    }

    es.shutdown();
    try {
        es.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(counter);
}

結果は環境により異なり実行するたびにも変わりますが、ある時の実行結果は58637となり期待値の10万回には遠く届きませんでした。細かく見てみると次のようになります。

counterのインクリメント「counter++;」の行はJavaのソースコードでは1行ですが、実際には 読み出し・変更・書き込み(RMW:Read-Modify-Write操作)という複数の処理で構成されていて 複数スレッドの実行が入り乱れる可能性があります。複数スレッドが入り乱れてカウンタをインクリメントする部分を詳細に見てみます。

まず、正しくカウントアップされる例は次のようになります。

正しくカウントアップされる例
実行順序スレッド1counterスレッド2
1counterの値を読み取る:0
2読み取った値に1を加える:1
3counterに値を設定:11
41counterの値を読み取る:1
52読み取った値に1を加える:2
62counterに値を設定:2

あるスレッドが「count++;」の処理を実施する間、別のスレッドの実行が割り込まなければ正しくカウントアップされます。

続いて、正しくカウントアップされない例を次に挙げます。

正しくカウントアップされない例
実行順序スレッド1counterスレッド2
1counterの値を読み取る:0
2counterの値を読み取る:0
3読み取った値に1を加える:1
4counterに値を設定:11
51読み取った値に1を加える:1
61counterに値を設定:1

あるスレッドが「counter++;」の処理を実施している間に 別のスレッドが割り込むと正しくカウントアップされない場合が出てきます。counterの値はスレッドの実行順序に依存してしまい、非決定的な動作となってしまいます。これを修正するには「counter++;」の箇所を1度に1つのスレッドだけが実行できるように排他制御する必要があります。1度に1つのスレッドだけしか実行できないようにする範囲をクリティカルセクションと呼びます。

排他制御をしてクリティカルセクションを保護することによって 非決定的な動作を防げるようになっていることを アトミックであるとかアトミック性が確保されているなどと言われます。アトミック性はJavaに限らず一般的に並列処理を行う際に考慮すべき事項です。アトミック性を確保するにはsynchronized、java.util.concurrent.atomic.AtomicXXXクラス、java.util.concurrent.lock.Lockの実装クラスなどを用います。各同期機能におけるアトミック性の有無については 可視性の説明の後にまとめます。

可視性

Javaで並列処理を行う場合は 可視性を考慮する必要があります。可視性は Javaのメモリモデルが関係しています

メモリモデルと言った場合 Java VMのメモリ構造を表す場合もありますが ここでのメモリモデルはJSR-133のメモリ一貫性モデルのことを指します。メモリモデルはプロセッサのアーキテクチャごとに異なりますが、Javaでは その違いを吸収するために独自のメモリモデルを提供しています。C++もこの流れに追随して C++11からメモリモデルが導入されました。メモリモデルでは 複数スレッドから共有メモリにアクセスする際の順序等のルールを規定しています。

Javaのソースコードは Javaバイトコードにコンパイルされ、更に実行時にJITコンパイラ等により機械語に変換されて各プロセッサで実行されます。この際に JITコンパイラ等による最適化や各プロセッサのアウトオブオーダー実行等により、Javaのソースコードとは異なる順番で命令実行やメモリアクセスが行われることが多々あります(リオーダー。また、マルチプロセッサでは大抵 各プロセッサがメインメモリに直結するのではなく キャッシュメモリを経由することになります。マルチスレッド環境においては 変数に対する読み書きがメインメモリに対してではなく プロセッサごとのキャッシュメモリやレジスタに対して行われることも多々あります(キャッシュ

このリオーダーとキャッシュによって マルチスレッド環境では適切な同期を行わないと あるスレッドが変更した内容を別のスレッドから見ることができない状況が発生します。これが可視性の欠如です。リオーダーは特にプログラマの直感からずれてきますので少し詳しく見て行きます。

リオーダー

リオーダーでは 何でもかんでも順序を変更できるわけではなく Java言語仕様では「Javaのプログラムコードの順番にシングルスレッドで実行した場合と結果が変わらないこと」(イントラスレッド・セマンティクスが変わらないこと)を要求しています。逆に言うとイントラスレッド・セマンティクスが変わらない限りは どのようなリオーダーをしても良いことになります。マルチスレッド環境でも 明示的に同期を行わない限りはこのルールは変わりません。そのため、共有変数にアクセスを行う あるスレッドの実行がリオーダーされていると、別のスレッドからその共有変数を見るとプログラムの実行順からは想定できないような値が見えてしまう場合が出てきます。このようなリオーダーの例を次に挙げます。

public class ReorderExample1 {
    static int a1 = 0;
    static int a2 = 0;
    static int b1 = 0;
    static int b2 = 0;

    public static void main(String[] args) {
        ExecutorService es = Executors.newCachedThreadPool();
        int resultList[] = new int[4];
        
        for(int i = 0; i < 100000; i ++) {  // 10万回試行
            a1 = 0;
            a2 = 0;
            b1 = 0;
            b2 = 0;

            Future<?> f1 = es.submit(() -> {  // スレッド1
                a1 = 1;
                a2 = 1;
            });
            
            Future<?> f2 = es.submit(() -> {  // スレッド2
                b1 = a2;
                b2 = a1;
            });
            
            try {
                f1.get();    // 待ち合わせ
                f2.get();    // 待ち合わせ
            } catch (ExecutionException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            if(b1 == 0 && b2 == 0) resultList[0]++;
            if(b1 == 0 && b2 == 1) resultList[1]++;
            if(b1 == 1 && b2 == 0) resultList[2]++;
            if(b1 == 1 && b2 == 1) resultList[3]++;
        }
        System.out.println("b1 == 0, b2 == 0: " + resultList[0]);
        System.out.println("b1 == 0, b2 == 1: " + resultList[1]);
        System.out.println("b1 == 1, b2 == 0: " + resultList[2]);
        System.out.println("b1 == 1, b2 == 1: " + resultList[3]);
        
        es.shutdown();
        try {
            es.awaitTermination(3, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

a1、a2、b1、b2が共有変数で、始めに全て0で初期化します。スレッド1でa1、a2に1を代入し、スレッド2でb1にa2を、b2にa1をそれぞれ代入します(交差しているのがポイントです)。スレッドの実行タイミングと結果は次のようになります。

  • スレッド1の実行が完了した後にスレッド2が実行されれば b1=b2=1となりresultList[3]となります。
  • スレッド2の実行が完了した後にスレッド1が実行されれば b1=b2=0となりresultList[0]となります。
  • スレッド1のa1=1の次にスレッド2のb1=a2、b2=a1が実行され、その後にスレッド1のa2=1が実行されれば b1=0、b2=1となりresultList[1]となります。
  • スレッド2のb1=a2の次にスレッド1のa1=1、a2=1が実行され、その後にスレッド2のb2=a1が実行されれば b1=0、b2=1となり、これもresultList[1]となります。
  • プログラムコード通りの順番で実行されれば b1=1、b2=0となるresultList[2]は発生しない筈です。

結果は環境により異なり実行するたびにも変わりますが、ある時の実行結果は次のようになりました。

b1 == 0, b2 == 0: 4048
b1 == 0, b2 == 1: 5
b1 == 1, b2 == 0: 7
b1 == 1, b2 == 1: 95940

プログラムがコード通りの順番であれば発生しない筈の b1=1、b2=0となるケースが発生しています。これは(後述のキャッシュの可能性も否定できませんが、おそらく)リオーダーが発生しているためと考えられます。コンパイラによるリオーダーは通常 意味のある最適化に伴うもので、スレッド1やスレッド2の処理の順番を変更することは最適化の意味はないと考えられます。そのため このリオーダーは おそらくプロセッサによるアウトオブオーダー実行によるものと考えられます。

ちなみに a2にvolatile修飾子を指定すると このリオーダーを禁止することができ、resultList[2]は0になります。a1とa2の両方にvolatile修飾子を指定しても 同様にリオーダーを禁止することはできますが、a1だけにvolatile修飾子を指定した場合はリオーダーを禁止することはできません。JSR-133の中でどのような場合にリオーダーを禁止するのかが決められています。

リオーダーの別の例として、プログラマは他のスレッドから共有変数を変更してもらうことを期待しているけれども、イントラスレッド・セマンティクス(シングルスレッドでその処理を実行することを仮定する)では、その共有変数は不変のため コンパイラが最適化してしまうような場合が考えられます。このような最適化の例を次に挙げます。

public class ReorderExample2 {

    private static boolean isRunning = false;
    private static int count = 0;

    public static void main(String[] args) {

        ExecutorService es = Executors.newCachedThreadPool();

        es.submit(() -> {  // スレッド1
            isRunning = true;
            while(isRunning) {    // isRunning が false に変更されれば終了。
                count ++;
            }                

            System.out.println("thread done:" + count);
        });

        // 1秒後に isRunning を false に設定する。
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        isRunning = false;

        System.out.println("main: isRunning = false");
        es.shutdown();
        try {
            es.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("main done.");
    }
}

isRunningがループを抜けるためのフラグで mainスレッドが1秒後にフラグをfalseに設定することで スレッド1のループを終了させようとしますが、上のプログラムは無限ループになり終了しません。プログラムを見るとスレッド1のrun()メソッドの中でisRunningフラグを参照していますが、スレッド1はrun()メソッドの中でisRunningの値を変更しません。そのためコンパイラは次のように最適化してしまう可能性があります。

if(isRunning) {
    while(true) {
        count ++;
    }
}

上のように変更しても シングルスレッドで実行した場合の実行結果(イントラスレッド・セマンティクス)が変わらないので、このような最適化はなんら問題ありません。しかし、別のスレッドから変更されることを期待するのであれば、プログラマはコンパイラにその旨を伝えるために適切な同期を行う必要があります。この例ではisRunningにvolatileを指定すると コンパイラの最適化に伴うリオーダーを禁止して無限ループに陥いるのを防ぐことができます。

尚、この例で無限ループが発生する原因は 上のようにリオーダーされた可能性と 次に挙げるキャッシュの可能性の両方が考えられます。

キャッシュ

マルチプロセッサでは大抵 各プロセッサがメインメモリに直結するのではなく プロセッサのキャッシュメモリを経由することになります。そのため あるスレッドが共有変数を操作しても 操作結果がプロセッサのキャッシュやレジスタだけに反映されてメインメモリには反映されず他のスレッドから操作結果をすぐに見られないような場合が発生します。同じ共有変数にも関わらずプロセッサごとに異なる値に見えてしまい、スレッド間でデータの不整合が発生してしまいます。

前述のリオーダーの2番目の例では mainスレッドがisRunningの値をfalseに変更してもプロセッサのキャッシュなどにしか反映されずメインメモリに反映されていないことや、メインメモリに反映されたとしても スレッド1がキャッシュの値を読み続けるため変更を検出できていないことにより無限ループに陥ってしまっている可能性も考えられます。

この例では isRunningにvolatile修飾子を指定すると キャッシュではなくメインメモリの値を読み書きさせることによって 無限ループに陥らず意図した動作にすることができます。

可視性のみの確保

これまでの説明でもたびたび登場しましたが、複数スレッドで共有するフィールドにvolatile修飾子を付けることにより可視性を確保することができます。

システムの要件によっては、アトミック性を確保しなくても volatileを使って可視性のみを適切に確保することによって ロックフリーな(または最小限のロック使用に絞った)仕組みで 並列処理の安全性を確保することもできますロックフリーな仕組みが実現できると ロック操作のコストが不要で スレッドのロック獲得待ち時間も不要なため、一般的には全体の性能向上が見込めます

しかし、リオーダーの1番目の例で示したように 最小限のvolatile指定で可視性のみを確保しようとすると JSR-133のメモリモデルを詳細に把握している必要があり 高度な知識と十分な検証が求められます。volatileを使って可視性のみを確保して並列処理の安全性を確保するのは かなり高度なテクニックと言えます。ただ、可視性のみを確保すれば良い明確なパターンがいくつか存在するので、そのパターンに適合する場合は volatileを有効に利用することができます。該当パターンについては この章の「volatileの使いどころ」で説明します。

各同期機能におけるアトミック性と可視性の有無

synchronizedやLockの実装クラス等を使わないと 全くアトミック性が確保できないかというと、そういうわけでもありません。

32bit以下のプリミティブ型やオブジェクト参照のフィールドに対する単独の代入や読み出しは アトミックに行われます。ただし可視性がないので スレッド間で共有するにはフィールドにvolatile修飾子を付ける必要があります。

64bitのプリミティブ型のlongとdoubleのフィールドに対する単独の代入や読み出しは アトミックに行われず可視性もありません。しかし longやdoubleのフィールドにvolatile修飾子を付けると 単独の代入や読み出しはアトミックに行われるようになり、可視性も確保されます

プリミティブ型のフィールドに対するインクリメント(読み出し・変更・書き込み(RMW:Read-Modify-Write操作))のような複合操作は アトミックに行われません。複合操作をアトミックに行うには synchronizedやLockの実装クラスを使って 一連の操作に対して排他制御を行う必要があります

AtomicXXXクラスが提供する incrementAndGet()、compareAndSet()などのメソッドは アトミックに行われ 可視性も確保されます。ただし、AtomicXXXクラスのメソッドを複数呼び出す場合は 一連の呼び出しはアトミックに行われません。一連の呼び出しをアトミックに行うにはsynchronizedやLockの実装クラスを使って 一連の呼び出しに対して排他制御を行う必要があります。

synchronizedメソッドやsynchronizedブロック中の操作は アトミックに行われ可視性も確保されます。

Lockの実装クラスのlock()~unlock()の間の操作も アトミックに行われ可視性も確保されます。

volatileは可視性を提供し、プリミティブ型やオブジェクト参照のフィールドに対する単独の代入や読み出しをアトミックに行います(前述の通り、longとdouble以外のフィールドに対する単独の代入や読み出しは volatileを付けなくても元々アトミックに行われます)。しかし 複合操作はアトミックに行われないため、複合操作をアトミックに行うにはsynchronizedやLockの実装クラスを使って 一連の操作に対してロックを掛ける必要があります。

ダブルチェックロッキング(double-checked-locking)で起こりうる驚くべき事象

並列処理では アトミック性や可視性を適切に確保して 複数スレッド間の同期を取らないと プログラマの直感からずれた驚くべき事象が発生する可能性があります。前述のリオーダーの例がその1つですが、もう1つ有名な例があります。シングルトンパターンを実現する際に利用される ダブルチェックロッキングdouble-checked-locking)です。マルチスレッド環境でシングルトンパターンを実現するには次のような実装になります。

public class Singleton {
    
    // インスタンスフィールド
    private int number;

    // このクラスの唯一のインスタンス(Singleton)
    private static Singleton instance;

    // コンストラクタをprivateにして他のクラスからのインスタンス生成を防ぐ
    private Singleton(int number) {
        this.number = number;
    }

    // Singletonインスタンスを返す。
    // まだ生成されていない場合はインスタンスを生成して返す。
    public static synchronized Singleton getInstance() {
        if(instance == null) {
            instance = new Singleton(123);
        }
        return instance;
    }
}

これは問題なく機能します。しかし、getInstance()メソッドを呼び出すたびに ロックを掛けていますが、本来ロックが必要なのは 初回のインスタンス生成の時だけです。そこで 性能改善の目的で 次のようなダブルチェックロッキングが考案されました。

public class DCLSingleton {
    
    // インスタンスフィールド
    private int number;

    // このクラスの唯一のインスタンス(Singleton)
    private static DCLSingleton instance;

    // コンストラクタをprivateにして他のクラスからのインスタンス生成を防ぐ
    private DCLSingleton(int number) {
        this.number = number;  // ①
    }

    // Singletonインスタンスを返す。
    // まだ生成されていない場合はインスタンスを生成して返す。
    public static DCLSingleton getInstance() {
        if(instance == null) {  // 1回目のチェック ②
            synchronized (DCLSingleton.class) {
                if(instance == null) {  // クリティカルセクションで再度チェック
                    instance = new DCLSingleton(123);  // ③
                }
            }
        }
        return instance;  // ④
    }
}

2つのスレッドの実行タイミングが重なり、1回目のチェックで両方が instance==null を検出した場合の動作をステップ・バイ・ステップで見てみます。

2つのスレッドが ほぼ同時にgetInstance()を呼び出した場合
実行順序スレッド1instanceスレッド2
1instanceを読み取る:nullnullinstanceを読み取る:null
2synchronizedブロックに入るnullロック獲得待ち
3instanceを読み取る:nullnull
4instanceを生成するDCLSingleton
5synchronizedブロックを出るDCLSingleton
6instanceを返す:DCLSingletonDCLSingletonsynchronizedブロックに入る
7instanceを読み取る:DCLSingleton
8synchronizedブロックを出る
9instanceを返す:DCLSingleton

ロジック的には正しく動作するように思えますが、JVMの実装やプロセッサによっては 次の2つの事象が発生する可能性があります。

  1. getInstance()の結果、numberフィールドの値が0(デフォルト値)であるDCLSingletonインスタンスが見えてしまう可能性があります。
    これは、①のnumberフィールドへの値の設定と ③のinstanceクラスフィールドへのオブジェクト参照の設定がリオーダーされた場合に発生する可能性があります。
  2. getInstance()の結果、nullが返る可能性があります。
    これは、②のinstanceフィールドの読み出しと ④のinstanceフィールドの読み出しがリオーダーされた場合に発生する可能性があります。

いずれも かなり絶妙なタイミングでないと発生しない事象ではあり JVMの実装やプロセッサによっては発生しない事象ではありますが、仕様上 発生する可能性がある以上は対策を取らなければいけません。また、再現性が低いが故に 発生してしまったときに原因の究明が困難になりがちです

instanceフィールドをvolatileにすることで この問題に対処することができます。volatileによって①と③のリオーダーが禁止されます。これはすなわちオブジェクト参照フィールドをvolatileにすると コンストラクタが完了しているインスタンスが見えることが保証されることになります。逆に言うと オブジェクト参照フィールドが非volatileでsynchronized等で同期も取らない場合は コンストラクタが完了していない状態のインスタンスが見えてしまう可能性があると言うことです(不変オブジェクトは除きます)。また、volatileによって ②と④のリオーダーが禁止されますが、より良い解法は instanceを一旦ローカル変数にコピーして ②と④をローカル変数の評価に置き換えることです。ローカル変数はスレッド間で共有しないため 常に最新の値を見ることができ リオーダーもされず、volatileフィールドの読み出しに比べてもコストが低いです。

instanceフィールドをvolatileにするよりも良い解法として initialize-on-demand holder手法が提唱されています。ただ、initialize-on-demand holderはクラスフィールドにしか適用できないため インスタンスフィールドに適用するためにFinalWrapperを使う方法が提唱されています。これらの手法はWikipedia(※1)で詳しく解説されています。JPCERTのサイト(※2)でも解説されていますが、JPCERTのページは翻訳したもので、翻訳後に原文のサンプルコードが修正されています。それに伴い それまで適合とされていたコードが違反コードになっているため、必ず原文を確認してください

ThreadLocalクラス

他のスレッドからアクセスできないようなデータが必要な場合には、ThreadLocalクラスを使うことができます。他のスレッドからアクセスできないようにするには 次のようなものを利用することができます。

  • ローカル変数
  • privateなインスタンスフィールドで、インスタンスフィールドにアクセスできるメソッドを提供しない
  • ThreadLocalクラス

ローカル変数はスコープが限られるため、メソッドを跨ぐような場合には使えません。メソッドを跨ぐような場合には privateなインスタンスフィールドを使うことができます。ThreadLocalも インスタンスフィールドと同じような使い方をすることもできますが、インスタンスフィールドで間に合うのであれば わざわざThreadLocalを使う必要性はありません。ThreadLocalは 次のような場合に活用することが出来ます

  • クラスメソッドにおいて スレッド毎に異なる値や状態を管理したい場合。
  • シングルトンオブジェクトのインスタンスメソッドにおいて スレッド毎に異なる値や状態を管理したい場合

つまり、複数のスレッドから呼び出されるメソッドにおいて、スレッド毎の値や状態を管理したい場合に向いているということになります。

ThreadLocalの使用例は ThreadLocalのJavaDocが参考になります。

public class ThreadId {
    // 現在のスレッドID(連番)
    private static final AtomicInteger nextId = new AtomicInteger(0);

    // スレッドIDを管理する ThreadLocal
    private static final ThreadLocal<Integer> threadId =
        new ThreadLocal<Integer>() {
            // 現在のスレッドIDを割り当てたらインクリメントして次に備える
            @Override protected Integer initialValue() {
                return nextId.getAndIncrement();
        }
    };

    // 呼び出し元スレッドのスレッドIDを返す。未割当であれば割り当てる。
    // 未割当の場合に ThreadLocal.initialValue()が呼び出される。
    public static int get() {
        return threadId.get();
    }
}

get()はクラスメソッドであり、複数のスレッドから呼び出されます。get()を呼び出すと、呼び出し元のスレッドに応じたスレッドIDの値が返ります。ThreadLocalは スレッドの識別子がキーでスレッド毎の値が値であるMapのような物と見なすと 動作がイメージし易いと思います。(ただし、実際の実装は全く違います。後述します。)

ThreadLocalのオブジェクトは インスタンスごとに持つ必要がないため、クラスフィールドまたは シングルトンオブジェクトのインスタンスフィールドとするのが通例です

ThreadLocalの有効な活用方法(初回かどうかの判定)

ThreadLocalを使うと、前述のダブルチェックロッキングを上手く実装することができます。この章の「ダブルチェックロッキング(double-checked-locking)で起こりうる驚くべき事象」の始めの方で説明しましたが、getInstance()メソッド自体にロックを掛ければ動作的に問題ないのですが、本来ロックを掛けたいのはシングルトンオブジェクトのインスタンス生成時だけであるため、性能改善のためにダブルチェックロッキング手法が考案されました。この「シングルトンオブジェクトのインスタンス生成時だけロックを掛けたい」という要件を 少し変えて、「各スレッドが 初回getInstance()を呼び出したときだけロックを掛けて、それ以降はロック不要にする 」ことによって、ロックに掛かるコストをダブルチェックロッキングと同程度に減らすことができます。そして、ThreadLocalを利用すると それを実現することができます。ThreadLocalを使うと ダブルチェックロッキングは次のように実装することができます。

public class DCLSingletonTL {
    
    // インスタンスフィールド
    private int number;
    
    private static ThreadLocal<Object> visited = new ThreadLocal<>();

    // このクラスの唯一のインスタンス(Singleton)
    private static DCLSingletonTL instance = null;

    // コンストラクタをprivateにして他のクラスからのインスタンス生成を防ぐ
    private DCLSingletonTL(int number) {
        this.number = number;
    }

    public static DCLSingletonTL getInstance() {
        // visited がnullということは、
        // このスレッドが初めて getInstance()を呼び出したということ。
        if(visited.get() == null) {
            createInstance();
        }
        return instance;
    }
    
    // Singletonインスタンスを生成する。
    // スレッドごとに、初回 getInstance()呼び出し時に このメソッドが呼ばれる。
    // synchronizedであるため、可視性とアトミック性を保証している。
    public static synchronized void createInstance() {
        if(instance == null) { 
            instance = new DCLSingletonTL(123);
        }
        // 次回 createInstance()が呼ばれないように、 visited に何らかの値をセット。
        // 値は nullでなければ 何でも良い。
        visited.set(new Object());
    }
}

ここでのポイントは、クラスメソッドやシングルトンオブジェクトのインスタンスメソッドを呼び出す際に、そのスレッドからの初回呼び出しかどうかを判定するために ThreadLocalを活用することができるということです。

スレッドプールで使用する場合の注意点

スレッドプールでは スレッドを再利用することによって スレッド生成のオーバーヘッドを軽減したり、スレッドの生成数に制限をかけることによって 信頼性の低下を防ぐことができます。Executorフレームワークの導入により、スレッドプールを手軽に利用できるようになりました。

スレッドプールを使用する場合は スレッドを再利用することになるため、スレッドの処理が終了する際に ThreadLocalのスレッド毎の値を削除する必要があります。そうしないと、次回スレッドを再利用する際に 前の値がそのまま残ってしまい、想定外の動作を引き起こす可能性があるためです。また、前の値が残っていると ThreadLocalのinitialValue()は呼び出されません

そのため、スレッドプールで ThreadLocalを使用する場合には、スレッドの処理が終了する際に ThreadLocalのremove()メソッドを呼び出して スレッド毎の値を削除するようにします。

ThreadLocalの実装

ThreadLocalは スレッドの識別子をキーとしたMapであるかのような動作になりますが、実際の実装は全く異なります。ThreadとThreadLocalの関係は次のようになります。

Threadクラスは ThreadLocalMapというクラスのインスタンスフィールド(パッケージprivate)を持っています。ThreadLocalMap はMapという名前の通り java.util.Mapのような キー・値の組のコンテナです(ただし java.util.Mapは実装していません)。ThreadLocalMapのキーは ThreadLocalで、値はスレッド毎の値です。つまり、ThreadLocalは Mapのような物ではなく、実体はThreadLocalMapのキーなのです。そして ThreadLocalMapのキー・値の組(Entry)は WeakReferenceのサブクラスであり、ThreadLocalがどこからも参照されなくなると、該当ThreadLocalをキーとするEntryが削除されます。これは、例えば ThreadLocalが finalでないクラスフィールドで、別のThreadLocalに参照先を変えた場合に、古いThreadLocalをキーとするEntryが自動で削除されることを意味します。(ただし、即座に削除されるわけではなく、ThreadLocalのget()やset()の呼び出しとガベージコレクタの動作タイミングに依存します。即座に削除するには、明示的にThreadLocalのremove()メソッドを呼び出します。)

通常 ThreadLocalを利用する場合は 実装を意識する必要はありませんが、ThreadLocalを使った場合の スレッド毎の値のライフサイクル等を確認する際には 実装を見てみるのも役に立ちます。

ロックを使った同期(排他制御)

Javaではロックを使った同期(排他制御)の方法として下記のような手段を用意しています。

  • synchronizedによる排他制御
  • ReentrantLockなどの java.util.concurrent.locks.Lockインタフェースの実装クラスによる排他制御

synchronizedとLockの実装クラスでは 同じようなことを実現できますが、Lockの実装クラスの方が一般的に激しい競合下での性能が優れているようです)。また Lockの実装クラスでは、ロック取得時にタイムアウトを設定したり 割り込みをできるようにしたりと細かい制御を行うことができます

一方で Lockの実装クラスでは ロックの解放を明示的に行う必要があるため 解放漏れの心配が付きまといます。また、Lockの実装クラスの場合は スレッドダンプにロック情報が含まれないため、synchronizedに比べるとデバグがやや難しくなる場合があるというデメリットがあります。

synchronizedによる排他制御

synchronizedによって アトミック性と可視性の両方を保証することができるようになります。

synchronizedの使い方として、メソッド定義にsynchronizedを付けてメソッド全体をアトミックにする方法と、クリティカルセクションをsynchronizedブロックで囲んで 局所的にアトミックにする方法の二通りがあります。

synchronizedブロック(メソッド含む)開始時にロックを取得し、ブロック終了時にロックを解放します。このロックにより排他制御を実現していて、ロックのことをロックオブジェクトと呼びます。Objectクラスのインスタンスがロックオブジェクトとなります。(全てのクラスはObjectクラスのサブクラスなので どのクラスインスタンスでもロックオブジェクトにすることができます。)

synchronizedブロックでは 任意のロックオブジェクトを指定することができます。synchronizedメソッドでは メソッド呼び出しの対象となるインスタンス(レシーバ)そのものがロックオブジェクトになります。

あるスレッドT1が あるロックオブジェクトL1にロックを掛けて synchronizedブロック内を実行している間は、別のスレッドT2は 同じロックオブジェクトL1のロックを取得することができず、T1がL1を解放するまで待たされます(アトミック性を保証)。また スレッドT1が synchronizedブロックの中で 共有データの変更を行うと、その後にスレッドT2が そのsynchronizedブロックに入ったときに、T1が行った変更結果が確実に見えることが保証されます(可視性を保証)。

同じロックオブジェクトの獲得に対して排他制御が行われるため、異なるsynchronizedブロックでも同じロックオブジェクトを獲得する場合は 排他制御されます。逆に ソースコード上では同じsynchronizedブロックでも、ロックオブジェクトが変数になっていて 異なるロックオブジェクトを獲得するような場合は 排他制御されないことに注意が必要です。

メソッド全体をsynchronized

public class MethodSynchronizedExample {
    public synchronized void doSomething() {  // インスタンスメソッド
        処理;
    }

    public synchronized void doOtherThing() {  // 別のインスタンスメソッド
        処理;
    }

    public static synchronized void doStaticSomething() {  // クラスメソッド
        処理;
    }

    public static synchronized void doStaticOtherThing() {  // 別のクラスメソッド
        処理;
    }
}

インスタンスメソッドの場合、メソッドを呼び出されるオブジェクト(レシーバ)がロックオブジェクトになります。そのため、同じインスタンスのdoSomething()メソッドを複数のスレッドから呼び出した場合、後に呼び出した方は先に呼び出した方の処理終了を待つことになります(後に呼び出した方はブロックされます)。オブジェクトがロックオブジェクトとなりますので、異なるオブジェクトのdoSomething()メソッドを呼び出しても当然ですが排他制御は行われません。また、複数のスレッドから 同じオブジェクトのdoSomething()メソッドとdoOtherThing()メソッドを同時に呼び出した場合は排他制御が行われます。

クラスメソッドの場合、Classクラスのオブジェクト(MethodSynchronizedExample.class)がロックオブジェクトになります。複数のスレッドから doStaticSomething()クラスメソッドを呼び出すと 排他制御が行われます。また、複数のスレッドから doStaticSomething()クラスメソッドとdoStaticOtherThing()クラスメソッドを同時に呼び出した場合も排他制御が行われます。

synchronizedブロック

private final Object lock = new Object();
public void someMethod() {
    排他制御が不要な処理;
    …
    synchronized(lock) {
        排他制御が必要な処理;
    }
    排他制御が不要な処理;
}

インスタンスフィールドやクラスフィールドにロックオブジェクトを用意して ロックオブジェクトをsynchronizedブロックに渡すことにより、ブロック内の処理に排他制御を掛けることができます。メソッド全体をアトミックにする場合と比べて クリティカルセクションの局所化を図ることができます。但し、あまりクリティカルセクションを細かく分け過ぎると ロックを掛ける回数が増えて 逆に性能劣化につながる可能性があったり、デッドロックを引き起こす原因にもなりかねないため 注意が必要です。

時間のかかる処理やファイル・DB・ネットワークI/Oのような ブロックされる可能性のある処理は、synchronizedブロックの外に出すことができれば 大きな性能改善を見込めるため 一考の価値があります。

synchronizedメソッド  vs  synchronizedブロック

synchronizedメソッドとsynchronizedブロックでは機能的には同じことを実現できますが、synchronizedブロックでは 任意のロックオブジェクトを指定することができる点が大きく違います。そのため、synchronizedブロックの方が柔軟性が高いです。

また、通常のシステム開発では関係ないかも知れませんが、オブジェクトを不特定多数に公開するような場合は、synchronizedメソッドの場合は脆弱性を伴います。詳しい内容とコードのサンプルおよび対策はJPCERTのサイトで紹介されています()。参照先の内容を要約すると、信頼できないコードにオブジェクトを公開する場合は、オブジェクトのロックを無期限に獲得されてしまうおそれがあるため、private finalなロックオブジェクトを使うようにしてロックを露出しないようにするべきということです。また、クラスフィールドやクラスメソッドを保護する場合は private static finalなロックオブジェクトを使用するべきということです。

継承時の注意点

synchronized修飾子がついているメソッドをサブクラスでオーバーライドした場合、synchronizedは継承されません。そのため、サブクラスでも同様に排他制御を行いたい場合は、サブクラスで明示的にsynchronized修飾子を指定する必要があります

ロックの獲得待ちと獲得の順番

複数のスレッドがロックを獲得できずに獲得待ちをしている場合、ロックの獲得待ちに並んだ順番にロックを獲得できるわけではなく、次にどのスレッドがロックを獲得できるかは不定になります。つまり、キューのような順序管理の機能を期待することはできないため、順序管理を行いたい場合はキューを利用します

次のコードを実行すると、ロックオブジェクトの獲得待ちに並んだ順番では ロックが獲得できないことを確認することができます。

ExecutorService es = Executors.newCachedThreadPool();
Object lock = new Object();	// ロックオブジェクト

// 10個のタスクを登録
for (int i = 0; i < 10; i++) {
    final int index = i;
    es.submit(() -> {
        __logger.debug("ID=" + index + ", try aquire lock.");
        synchronized (lock) {
            __logger.debug("ID=" + index + ", aquire lock.");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

    // 10個のタスクが順番通りExecutorServiceに登録されるように
    // 0.1秒間隔を空ける。
    try {
        Thread.sleep(100);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

es.shutdown();

再入可能(リエントラント)

synchronizedによるロックは再入可能で デッドロックは発生しません。あるスレッドがあるロックオブジェクトのロックを獲得している状態で、再度同じロックオブジェクトのロックを獲得しようとすると デッドロックせずに成功します。これを再入可能と言います。

これは synchronizedのネストの形などで表れます。あるロックオブジェクトに対するsynchronizedブロックの中で、同じロックオブジェクトのsynchronizedブロックを記述することができます。

Object lock = new Object();
synchronized(lock) {
    // 何らかの処理
    synchronized(lock) {  // 再入可能
        // 何らかの処理
    }
}

また、あるオブジェクトのsynchronizedメソッドの中から 同じオブジェクトの別のsynchronizedメソッドを呼び出しても、デッドロックすることなく呼び出すことができます。これも再入可能のお陰です。

再入可能なのは、同じスレッドが同じロックオブジェクトのロックを獲得する場合の話になります。別々のスレッドから同じロックオブジェクトのロックを獲得しようとすると 当然競合が発生しますし、同じスレッドが複数の異なるロックオブジェクトのロックを獲得しようとすると それぞれのロックの状況により獲得できたり待たされたりすることになります。

synchronizedブロックは極力短く

synchronizedのブロックが長いと、他のスレッドを待たせる時間が長くなります。そのため、synchronizedブロックの処理は 必要最低限のものとし、極力短くした方が良いです。特にサブクラスでオーバーライド可能なメソッドや 関数オブジェクトのメソッドを呼び出したいような場合は注意が必要です。それらのメソッドでは時間の掛かる処理を行う可能性があったり、思い思いに別スレッドを起動したり ロックを獲得しようとして安全性や活性を損なうおそれがあり、それを防ぐことができません。このような 制御が及ばず どんな処理が行われるかわからないようなメソッドは エイリアンメソッドと呼ばれます。ここでは synchronizedブロックでエイリアンメソッドを呼び出した場合に起こりうる安全性や活性を損なう具体例を見ていき、対策について説明します。

まずはエイリアンメソッドによるデッドロックの例を見てみます。登録されているイベントリスナにイベント発生を知らせるような枠組みを例に挙げます。ListenerRegistryのイベント通知メソッドfireEvent()では、通知中にlistフィールドの構造が変更されないようにlistフィールドにロックを掛けるようにしています。

// イベントリスナ
interface SomeListener {
    void handleData(int data);
}

// イベント通知元
class ListenerRegistry {
    static ListenerRegistry instance = new ListenerRegistry();
    public static ListenerRegistry getInstance() {
        return instance;
    }
    // イベントリスナリスト
    List<SomeListener> list = new ArrayList<>();
    // イベントリスナ登録
    public void addListener(SomeListener l) {
        synchronized (list) {
            list.add(l);
        }
    }
    // イベントリスナ削除
    public void removeListener(SomeListener l) {
        synchronized (list) {
            list.remove(l);
        }
    }
    // イベント通知
    public void fireEvent(int data) {
        synchronized (list) { // 通知中にlistフィールドの構造が変更されないようにロック
            for(Iterator<SomeListener> it = list.iterator(); it.hasNext(); ) {
                it.next().handleData(data);
            }
        }
    }
}

イベントリスナのhandleDate()メソッド内で、別スレッドを起動してListenerRegistryのフィールドlistのロックを獲得するようなメソッドを呼び出すとデッドロックに陥ります。

ListenerRegistry registry = ListenerRegistry.getInstance();
registry.addListener(new SomeListener() {
    @Override
    public void handleData(int data) {
        Thread th = new Thread(() -> {
            // ラムダ内でのthisは匿名クラスのインスタンス
            ListenerRegistry.getInstance().removeListener(this);
        });
        th.start();
        try {
            th.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});

registry.fireEvent(123);    // デッドロック

ここでは 別スレッドを起動する必要がないため わざとらしい例になってしまいましたが、GUIのフレームワークでは コールバックを登録してバックグラウンドスレッドによる描画更新を多用するため、設計を誤ると このようなデッドロックを引き起こす可能性があります。

続いて、安全性エラーの例を見てみます。今度はイベントリスナのhandleDate()メソッドで 別スレッドを起動せずにremoveListener()を呼び出してみます。ListenerRegistryのfireEvent()メソッドでは、listフィールドにロックを掛けていますが、イベントリスナのhandleDate()メソッドは 同じスレッドで実行されるため、listフィールドに対するロックを獲得できてしまいます(再入可能なため)。その結果、この例ではConcurrentModificationExceptionが発生します。つまり、fireEvent()では イベント通知中はlistフィールドの構造が変更されないようにロックを掛けたつもりが、エイリアンメソッドからはアクセスできてしまい 想定外の変更をされてしまう可能性があるということです。

ListenerRegistry registry = ListenerRegistry.getInstance();
registry.addListener(new SomeListener() {
    
    @Override
    public void handleData(int data) {
        // データ処理
        System.out.println("handleDate:" + data);
        
        // 最後に自身を削除
        ListenerRegistry.getInstance().removeListener(this);
    }
});

registry.fireEvent(456);    // ConcurrentModificationException

このような問題を防ぐために、synchronizedブロックからはエイリアンメソッドを呼び出さないようにするべきです。それでは、fireEvent()内でイベント通知中にlistフィールドの構造が変更されないようにするにはどうしたら良いかと言うと、listフィールドの構造が変更されないようにするのではなく、listフィールドの構造を変更されても問題ないようにした方がうまく行きます。

具体的には「制御構文」の章の「CopyOnWriteArrayList」で説明した通り、listの要素を走査する前に複製を作成して 複製に対して走査を行います。これであれば、走査中にlistの構造を変更されても問題はありません。

class ListenerRegistry {
 
    public void fireEvent(int data) {
        // イベントリスナリストの複製を作成。
        List<SomeListener> tmp = new ArrayList<>(list)
        // 複製したリストを走査する。
        for(Iterator<SomeListener> it = tmp.iterator(); it.hasNext(); ) {
            it.next().handleData(data)
        }
    }
 
    // ・・・ 他は同じ。
}

または、ListenerRegistryのlistフィールドの実装クラスを ArrayListではなく、java.util.concurrent.CopyOnWriteArrayListに置き換えることによっても同じ効果を得ることができます。

ロックオブジェクトを誤り易い場合

synchronizedによる排他制御では、どのオブジェクトがロックオブジェクトかを意識することが重要です。異なるロックオブジェクトに対してロックを掛けても排他制御は行われません。逆に 意図せず同じロックオブジェクトに対してロックを掛けてしまうと、無駄な待ちを発生させてしまうことになります。

ロックオブジェクトを誤り易いのは 次のような場合です。

  • プリミティブ型のラッパークラスオブジェクトをロックオブジェクトにする場合
  • クラスオブジェクトをロックオブジェクトにして サブクラスを作成する場合
  • コレクションビューの基になるコレクションにアクセス可能な場合
  • クラスフィールドを保護する場合

それぞれのケースについて見ていきます。

プリミティブ型のラッパークラスオブジェクトをロックオブジェクトにする場合

プリミティブ型のラッパークラスオブジェクトをロックオブジェクトとする場合には 少し注意が必要です。「変数とデータ型」の章の「特定の型の特定の値範囲についてはキャッシュ機能が働く」で説明した通り、特定の型の特定の値範囲をボクシングした場合、同一のオブジェクトを共有することになります。共有されるラッパーオブジェクトをロックオブジェクトにしてしまうと、想定外の排他制御が行われる可能性があります。

Integer lock1 = 100;
Integer lock2 = 100;

lock1とlock2は同じオブジェクトになるため、lock1に対するロックの獲得と lock2に対するロックの獲得は競合します。

クラスオブジェクトをロックオブジェクトにして サブクラスを作成する場合

クラスオブジェクトをロックオブジェクトにする際には 注意が必要です。Object.getClass()でクラスオブジェクトを取得することができますが、実行時のクラスが返されます。従って スーパークラスのインスタンスメソッドの中で getClass()を呼び出せば スーパークラスのクラスオブジェクトが返りますし、サブクラスのインスタンスメソッドの中で getClass()を呼び出せば サブクラスのクラスオブジェクトが返ります。

これは 誤解を招く可能性があり、例えば サブクラスの実装者が スーパークラスのメソッドのコードを読んで、getClass()で取得したロックオブジェクトを スーパークラスのクラスオブジェクトと勘違いしてしまう可能性があります。その結果、本来 サブクラスのクラスオブジェクトをロックオブジェクトにしないといけない場面で、スーパークラスのクラスオブジェクトをロックオブジェクトにしてしまい、想定通り 排他制御が掛からない不具合を引き起こしかねません。具体的な例は 以下のJPCERTのサイト()を参照してください。

これに対する対策は、クラスオブジェクトをロックオブジェクトにする場合は、クラスオブジェクトを getClass()で取得するのではなく、SuperClass.class のようなクラスリテラルを明記して 誤解を招かないようにすることです。

コレクションビューの基になるコレクションにアクセス可能な場合

コレクションフレームワークには、Mapの keySet()、values()、entrySet()や java.util.Collections の synchronizedList()、checkedList()のように、様々なコレクションビューを返す機能が提供されています。コレクションビューには 基になるコレクション本体があります。コレクションビューは、本体とは違った見方を提供したり、本体に同期や実行時型チェックなどの追加の機能を提供したりします。ビューを変更すると 本体の方にも反映されます(中には変更不可なビューもあります)。

コレクションビューの基になるコレクション本体が 複数スレッドからアクセス可能な場合は注意が必要です。コレクションビューへのアクセスに対して排他制御を行っても、本体の変更を防げません。基になるコレクション本体が 複数スレッドからアクセス可能な場合は、基になるコレクション本体へのアクセスに対して排他制御を行う必要があります

また、コレクションビューと 基になるコレクション本体の両方が 複数スレッドからアクセス可能な場合は、両方へのアクセスに対して排他制御を行わないと 他のスレッドからの変更を防ぐことができません。

クラスフィールドを保護する場合

クラスフィールドは そのクラスのオブジェクトが共通してアクセス可能なフィールドです。そのため、クラスフィールドへのアクセスの排他制御を行う場合には、ロックオブジェクトが staticでないと 排他制御が行えないことに注意が必要です。これは 初心者の人がやってしまいがちな初歩的なミスです。

public class SomeClass {
    private static volatile int counter;

    private final Object lock = new Object();
    public SomeClass() { }

    public void increment() {
        // SomeClassのインスタンスごとにlockフィールドを持っているため、
        // インスタンスごとにロックオブジェクトが異なる。
        // counter のようなクラスフィールドへの排他制御を行うには不適切。
        synchronized(lock) {
            counter ++;
        }
    }
}

staticなロックオブジェクトは クラスオブジェクト(例:SomeClass.class)とクラスフィールドになります。ただし、この章の「synchronizedメソッド vs synchronizedブロック」で説明した通り、悪意のある第三者がクラスオブジェクトにアクセスできる場合は、ロックを無期限に獲得されるおそれがあるため、クラスオブジェクトではなく、private finalなクラスフィールドをロックオブジェクトにするのが望ましいです。

管理外のクラスのオブジェクトに対する排他制御

これまでは クラスを設計・実装する観点での排他制御について見てきました。ここでは 自分が設計・実装していない 管理外のクラスのオブジェクトに対する排他制御について見ていきます。管理外のクラスは コードの変更ができない場合が多く、ソースコードを見ることができない場合もあります。複数スレッドから そのようなクラスのオブジェクトにアクセスされる場合に 排他制御するにはどのようにしたら良いでしょうか?

管理外クラスのオブジェクトをロックオブジェクトにする案を思いつくかも知れませんが、それはうまく行く場合もあれば うまく行かない場合もあります。例えば 次のようなインタフェースを実装した管理外のクラスがあるとします。

interface Foreign {
    public int get();
    public void set(int val);
}

このクラスのオブジェクトに対して get()した値をインクリメントした値をset()する increment()メソッドを実装することを考えます。get()とset()の間には他のスレッドが割り込めないようにしないといけません。increment()メソッドを次のように実装したとします。

class SomeClass {
    private Foreign foreignObj;
    public SomeClass(Foreign foreignObj) {
        this.foreignObj = foreignObj;
    }
    public void increment() {
        synchronized(foreignObj) {
            int tmp = foreignObj.get();
            foreignObj.set(tmp + 1);
        }
    }
}

Foreignクラスのオブジェクトをロックオブジェクトにした場合、うまく行く場合もあれば うまく行かない場合もあり、Foreignの実装クラス次第になります。Foreginの実装クラスがどのような場合にうまく行き、どのような場合にうまく行かないかは次のようになります。

  1. get()もset()も何も同期をしていない。
    → うまく行かない。increment()を呼び出している間にも 任意のタイミングでget()・set()が呼び出されてしまうため
  2. get()とset()の両方が synchronizedで同期を取っている(または、thisをロックオブジェクトとして synchronizedブロックで内部フィールドへのアクセスの排他制御を行っている)。
    → うまく行く。increment()とget()・set()で ロックオブジェクトが同じため
  3. private finalなロックオブジェクトを用意して 内部フィールドへのアクセスの排他制御を行っている。
    → うまく行かない。increment()とget()・set()で ロックオブジェクトが異なるため

1~3の具体的なコードは次のようになります。

// 1. get()もset()も何も同期をとっていない
class NoSync implements Foreign {
    private int val = 0;
    
    public int get() {
        return val;
    }

    public void set(int val) {
        this.val = val;
    }
}

// 2. get()とset()の両方がsynchronized
class IntrinsicLock implements Foreign {
    private int val = 0;
    
    public synchronized int get() {
        return val;
    }
    
    public synchronized void set(int val) {
        this.val = val;
    }
}

// 3. private final ロックオブジェクト
class PrivateFinalLock implements Foreign {
    private int val = 0;
    private final Object lock = new Object();
    
    public int get() {
        synchronized (lock) {
            return val;
        }
    }
    public void set(int val) {
        synchronized (lock) {
            this.val = val;
        }
    }
}

このように、管理外のクラスのオブジェクトへのアクセスに排他制御を行おうとする場合そのクラスがどのようなロックポリシーに従って排他制御を行っているのかを把握していないと、適切に保護することができません。2のように 管理外のクラスのオブジェクトを保護する場合に、そのクラスが使用しているロックを使ってガードを行うことを クライアントサイドロックと呼びます。1の場合は ロックを掛けていないため そもそもクライアントサイドロックを行うことはできません。また、3のように 外部からロックにアクセスできない場合は、クライアントサイドロックを行うことができません。クライアントサイドロックは 適用シーンが限られます

また、管理外のクラスであるが故に ロックポリシーを変更されてしまう可能性もあります。ロックポリシーが変更されてしまうと(例えば2から3に変更)、それまで適切に保護できていたものが 保護できなくなってしまいます。

このように、クライアントサイドロックは 適用シーンが限られ、ロックポリシーの変更に対して脆弱なため基本的には利用しない方が良いです。例外的に、ドキュメントでクライアントサイドロックをすべきと明記されている場合は この限りではありません。例えば java.util.Collectionsの synchronizedList()のJavaDocには、返されたリストを走査する場合はクライアントサイドロックをすべき旨が明記されているため、それに従います。

話の最初に戻って、クライアントサイドロックが適切でないとすると、複数スレッドから管理外のクラスのオブジェクトにアクセスされる場合に 排他制御するにはどのようにしたら良いでしょうか?そのような場合は そのクラスのオブジェクトをラッピングしてクライアントサイドのロックポリシーに依存しない形を取ります。具体的には 次のようなラッパークラスを用意します。

class ForeignWrapper implements Foreign {
    
    private Foreign foreignObj;
    private final Object lock = new Object();
    
    public ForeignWrapper(Foreign foreignObj) {
        this.foreignObj = foreignObj;
    }

    @Override
    public int get() {
        synchronized (lock) {
            return foreignObj.get();
        }
    }

    @Override
    public void set(int val) {
        synchronized (lock) {
            foreignObj.set(val);
        }
    }
    
    public void increment() {
        synchronized (lock) {
            int tmp = foreignObj.get();
            foreignObj.set(tmp + 1);
        }
    }
}

このような形であれば、管理外のクラスのロックポリシーがどのようであれ、また 今後どのように変更されても 影響を受けることはありません。

Lockインタフェースの実装クラスによる排他制御

java.util.concurrent.locks.Lockインタフェースの実装クラスでは synchronizedと同じような排他制御を実現でき synchronizedより細かい制御を行うことができます。また、激しい競合下の性能も上回るようです()。一方で ロックの解除は自動では行われず ユーザが明示的に行う必要があります。また、synchronizedの場合はスタックトレースにロック情報が含まれますが、Lockの実装クラスの場合はその情報が含まれず、デッドロックなどの原因究明が困難になることがあります。

通常はsynchronizedでの排他制御を検討し、激しい競合下での性能が問題になる場合や synchronizedでは実現できない機能を利用したい場合に Lockの実装クラスを使うのが良いと思います。synchronizedでは実現できない機能には 次のような物があります。

  • ロックの取得待ちを割り込み可能にする。
  • ロックの取得待ちにタイムアウトを設定する。
  • ロックの取得待ちに入る前に ロックの状態を確認する(失敗すれば即時に復帰)。

ReentrantReadWriteLock

ReentrantReadWriteLockクラス自体が Lockインタフェースを実装しているわけではなく、staticなメンバクラスのReentrantReadWriteLock.ReadLock(読み出し用ロック)とReentrantReadWriteLock.WriteLock(書き込み用ロック)が それぞれLockインタフェースを実装しています。読み出し時には読み出し用ロックを取得し、書き込み時には書き込み用ロックを取得します。それぞれのロック取得のルールは次のようになっています。

  • 読み出し用ロックは複数のスレッドが取得できる。
  • 書き込み用ロックは1つのスレッドのみが取得できる。
  • 読み出し用ロックが取得されている間は、他のスレッドは書き込み用ロックを取得できず待たされる。
  • 書き込み用ロックが取得されている間は、他のスレッドは読み出し用ロックを取得できず待たされる。

ReentrantReadWriteLockのコンストラクタでは ロック取得順序に関わる 公平不公平のポリシーを選択することができます。公平な場合はlock()メソッドを実行した順番でロックを取得することが保証されます。一方で 不公平な場合は ロックを取得できる順番は 必ずしもlock()メソッドを呼び出した順番になるとは限りません。一般的には 順序保証をするためにはパフォーマンスの犠牲が必要なため、不公平にした場合に比べスループットが劣るとされています。デフォルトは不公平なポリシーで 引数なしのコンストラクタを呼び出すと不公平なポリシーに設定されます。

ロックの解放漏れを防ぐ(Execute-Around)

Lockの実装クラスを利用する場合、try-with-resource文によるロックの自動解放を行うことができないため、忘れずに finallyブロックでロックの解放を行わなければなりません。ロックの解放が漏れてしまうと、別のスレッドがロックを獲得できず 永遠に処理が進まなくなってしまいます。

ロックの解放のような 必ず実施しなければならない後処理があるような場合は、Execute-Aroundイディオムを使うと 解放漏れの心配をしなくて済みます。Execute-Aroundイディオムでは、ロックの獲得と解放を行うための一般的な枠組みを提供し、利用者は本質的な機能の実装に集中することができます。Exeture-Aroundイディオムの例を次に示します。

// LockActionクラスでは本質的な機能を実装する。
interface LockAction {
    void doAction();
}

// 一般的なロックの獲得、解放の仕組みを提供する。
public class ReentrantLockAction {
    
    private static final Lock lock = new ReentrantLock();
    
    public static void doActionWithLock(LockAction action) {
        lock.lock();
        try {
            action.doAction();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {

        ReentrantLockAction.doActionWithLock(() -> {
            // 本質的な機能の実装
            System.out.println("do something.");
        });
    }
}

LockActionクラスのdoAction()メソッドでは、ロックの獲得や解放を気にせずに 本質的な機能の実装に集中することができます。

ロックフリーな(または最小限のロック使用に絞った)同期

ロックフリーな(またはロックの使用を最小限に抑えた)仕組みで 並列処理の安全性を確保するにはいくつかの方法が挙げられます。

  • 元々アトミックな操作に対して 可視性を確保する。
  • アトミックな操作を提供するクラス(AtomicInteger等)を利用する。
  • 共有データを定数や不変オブジェクトにする。
  • ロックフリーな操作を提供するクラス(ConcurrentHashMap等)を利用する。
  • 変更操作をロックを掛けて排他制御を行い、読み取り操作を可視性だけを確保する(ロックを減らす)。

いずれの方法も 可視性を確保する必要があります

アトミック性と可視性が適切に確保されていないと、性能向上を図るどころか安全性を損ねてシステムとして破綻しかねたいため、ロックを使った排他制御に比べて高度な知識と十分な検証が必要になります

元々アトミックな操作に対して 可視性を確保する

この章の「各同期機能におけるアトミック性と可視性の有無」で触れた通り、元々アトミックな操作については 追加で可視性を確保することによって 複数スレッド間で安全に共有することができます。

32bit以下のプリミティブ型やオブジェクト参照のフィールドに対する 単独の代入や読み出しはアトミックに行われます。そのため フィールドにvolatile修飾子を付けて可視性を確保することによって 複数のスレッドから実行しても安全になります。

64bitのプリミティブ型のlongとdoubleのフィールドに対する 単独の代入や読み出しはアトミックに行われず可視性もありません。しかしフィールドにvolatile修飾子を付けると アトミック性と可視性が確保され 複数のスレッドから実行しても安全になります。

複数スレッド間で単純にプリミティブ型のフィールドを共有する場合や、処理を持たない構造体のようなインスタンスを共有する場合に この方法を採用することができます。

大事なことなので繰り返しますが、あくまでもフィールドの単独の代入や読み出しだけを行う場合です。複合操作を行う場合は volatileではアトミックに行えないことに注意が必要です。

変更操作をロックを掛けて排他制御を行い、読み取り操作を可視性だけを確保する。(ロックを減らす)

上の「元々アトミックな操作に対して 可視性を確保する」では 単純な読み書きを行う場合にしか適用できないため、適用できるケースがかなり限られてしまいます。それに対して もう少し適用範囲を広げたのがこの方法になります。

データを保持するだけの単純なクラスでも、往々にして 値の取得はそのまま返すだけで良くても 変更操作を行う場合は 引数チェックなど他の処理を必要とするケースが多いものです。この方法は そのようなケースに適用することができます。タイトルの通り 複合操作を伴う変更操作はsynchronized等を使ってロックを掛けて排他制御を行い単独のフィールドの読み取りだけの取得操作はフィールドにvolatileを付与して ロックを掛けずに可視性だけを確保することで ロックの利用を減らしつつスレッドセーフを実現します

特に 変更よりも読み取りが多い場合は ロック取得待ちがなくなるため 読み取りにロックを掛ける場合と比べると全体の性能向上が見込めます

こちらも大事なことなので強調しますが、このケースが適用できるのは 取得操作ではフィールドの単独の読み出しだけを行う場合です。取得操作で複合操作を行う場合には volatileではアトミックに行えないことに注意が必要です。

アトミックな複合操作を提供するクラスを利用する

最近の多くのプロセッサは、他のプロセッサからの並行アクセスを検出または防止するような方法で、共有メモリを更新する命令を持っています。その代表的なものがCompare and swapCAS命令です。このような命令を利用すると インクリメントのような定型の複合操作を ロックフリーでアトミックに行うことができます。J2SE 5.0で java.util.concurrent.atomicパッケージとして これを実現するクラス群が追加されました。

CAS命令

CAS命令では 操作対象のメモリ位置(V)、期待される古い値(A)、新しい値(B)の3つパラメータを取り、「Vの現在の値CがAと一致すれば Bに更新する。一致しなければ何もしない。いずれの場合も Cの値を返す。」という処理をアトミックに行います。これだけ読むと 通常の変数に値を代入する場合と何ら変わらないように感じてしまいますが、CAS命令は 決まった使い方をすることによって インクリメントのような定型の複合操作を ロックフリーでアトミックに実行することができます。決まった使い方というのは次のようになります。

private SimulatedCAS<Integer> value;  // int値を保持するCAS命令を実装したクラス
                                      // compareAndSwap()メソッドでCAS命令を実行。
                                      // getValue()が返す値は可視性が確保されている。

/** valueの現在値をインクリメントしてインクリメントした値を返す。 */
public int incrementAndGet() {
    int oldValue, newValue;
    do {
        oldValue = value.getValue();  // ①現在値を取得
        newValue = oldValue + 1;      // ②現在値をインクリメント値を導出
    } while(value.compareAndSwap(oldValue, newValue) != oldValue);  // ③CAS命令
    return newValue;
}

仮想的にCAS命令を実装するクラスをSimulatedCASとし、CAS命令を実行する次のメソッドを持っているとします。戻り値は現在値です。

int  compareAndSwap(oldValue, newValue);

また、現在値を返す次のメソッドを持っているとします。このメソッドが返す値は可視性が確保されています。

int  getValue();

①から③の間に他のスレッドの割り込みがなければ、①と③のoldValueの値が一致するためcompareAndSwap()で valueの内部のint値をnewValueに更新します。また、oldValueの値が一致するのでループを終了してnewValueを返します。

一方で ①から③の間に他のスレッドに割り込まれた場合は、①と③のoldValueの値が一致しないためcompareAndSwap()では valueの内部のint値を更新しません。また、oldValueの値が一致しないので ループの先頭に戻って①からやり直します。2回目のループで 他のスレッドの割り込みがなければ、更新が成功してループを終了しますが、また他のスレッドから割り込まれた場合は ループの先頭に戻って①からやり直します。

①から③の間に他のスレッドから割り込まれ続けると 永遠にループを抜けることができなくなってしまいますが、①~③の時間が十分に短く そのような状況に陥ることはまずありえないという前提の元に成り立っています

ここで特筆すべきは、CAS命令を利用することにより、ロックを使わずに アトミックなインクリメント操作を実現しているということです。

アトミックで行く」(IBM developerWorks)
コンペア・アンド・スワップ」「Load-Link/Store-Conditional」(Wiki)

AtomicXXXクラス

前述のCAS命令などを利用して インクリメントのような定型な複合操作をアトミックに行うことができるクラスが java.util.concurrent.atomicパッケージのAtomicXXXクラスになります。データ型ごとにクラスが用意されていて XXXにはInteger、Long、Boolean、IntegerArray、Referenceなどが入ります。

ここでは 一例としてAtomicIntegerクラスを見てみます。AtomicIntegerクラスでは incrementAndGet()のような定型の複合操作(主に加減算)をアトミックに行うためのメソッドが用意されています。加減算以外の演算を行いたい場合は accumulateAndGet()メソッドで演算内容をIntBinaryOperatorで指定することができます。また、もっと汎用的な処理を行いたい場合は compareAndSet()メソッドでCAS命令そのものを利用することもできます。compareAndSet()を利用する場合の注意点として、現在値の取得からcompareAndSet()の間に 時間の掛かる処理を行わないことです。時間の掛かる処理を行うと 他のスレッドに割り込まれる確率が高くなり ループ回数が増えて性能劣化につながりかねません。

定型な複合操作を行うメソッドとしては 次のような物が用意されています。尚、一連のメソッドはアトミック性に加えて可視性も保証するため、別途volatile修飾子を指定する必要はありません。

AtomicIntegerクラスの主な定型操作メソッド
メソッド引数戻り値概要
addAndGetint deltaint引数のdeltaを現在の値に加えて、加算した後の値を返します。一連の処理はアトミックです。
getAndAddint deltaint引数のdeltaを現在の値に加え、加算する前の値を返します。一連の処理はアトミックです。
incrementAndGetvoidint現在の値をインクリメントして、インクリメントした値を返します。一連の処理はアトミックです。
getAndIncrementvoidint現在の値をインクリメントして、インクリメントする前の値を返します。一連の処理はアトミックです。
decrementAndGetvoidint現在の値をデクリメントして、デクリメントした値を返します。一連の処理はアトミックです。
getAndDecrementvoidint現在の値をデクリメントして、デクリメントする前の値を返します。一連の処理はアトミックです。
setint newValuevoid値をnewValueに設定します。一連の処理はアトミックです。
getAndSetint newValuevoid値をnewValueに設定し、変更前の値を返します。一連の処理はアトミックです。

共有データを定数や不変オブジェクトにする

不変オブジェクトとメリット

不変オブジェクトとは、インスタンス化された後 外部から見た状態が変わらないオブジェクトのことを指します。また、インスタンス化すると不変オブジェクトになるクラスを 不変クラスと呼びます。Javaの標準クラスの中でもString、Integer等のラッパークラス、BigDecimal、OffsetDateTimeなどは不変オブジェクトになります。(ただし、BigDecimalクラスはfinalでなく 不変性を崩すようなサブクラスを作成することができてしまうため、不変クラスとしては不完全です。)

不変オブジェクトには次のようなメリットがあります。

  • 不変オブジェクトはシンプルです。生成されたときから状態が変わらないため、不変オブジェクトの利用者は状態変化に気を配る必要がありません。知らない間に状態が変わってしまったことに起因するバグの混入を防ぐことができます。
  • メソッドの引数として不変オブジェクトを渡す場合、呼び出したメソッドの中でオブジェクトの状態が変更されてしまうことを危惧する必要はありません
  • 不変オブジェクトは 状態が変化することがないため、自由にキャッシュすることができます
  • 不変オブジェクトは インスタンス生成後から状態が変わらないため、複数スレッドの間で同期を取ることなく安全に共有することができます
  • 自由に共有できることから、ディフェンシブコピーを行う必要がありません。clone()等の複製手段やコピーコンストラクタを提供する必要がありません。
  • HashSetやHashMapのような ハッシュバケットに基づいたコレクションのキーに適しています。可変オブジェクトは 状態変更に伴ってhashCode()の戻り値が変わってしまうため、ハッシュバケットに基づいたコレクションのキーとして利用することができません。

また、遅延評価をするような場合に 可変オブジェクトを使うと思わぬ結果を招くことがあります。例えばDateを引数に取り 指定した日付に起動するタスクを登録するような場合を考えます。

public class Scheduler {

    public void schedule(Date date, String message) {
        // 指定時刻に起動するタイマスレッドを起動してすぐに復帰する。
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 100ミリ秒ごとに監視して、起動時刻を過ぎたらループを終了してタスクを実行。
                while(true) {
                    if(System.currentTimeMillis() > date.getTime()) {
                        break;
                    }
                    try {
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(date + ":" + message);
            }
        }).start();
    }
    
    public static void main(String[] args) {

        Date date = new Date();
        long now = date.getTime();  // 現在時刻
        System.out.println(date + ":now");
        
        date.setTime(now + 1 * 1000);             // 今から1秒後に設定
        new Scheduler().schedule(date, "task1");  // 1秒後に実行したい -> 実際は2秒後に実行される。
        
        date.setTime(now + 2 * 1000);             // 今から2秒後に設定
        new Scheduler().schedule(date, "task2");  // 2秒後に実行したい -> 2秒後に実行される。
        System.out.println("timer set.");
    }
}

上のコードでは2つのSchedulerが 同じDateインスタンスを参照してしまっているため、両方のタスクが2秒後に起動されてしまいます。schedule()メソッドの呼び出しに 同じDateインスタンスを渡していることも問題ですが、そのような誤用を防ぐことはできません。もしDateクラスが不変クラスであれば、setTime()メソッドは指定した時刻をセットした結果を返すことになり、その結果をSchedulerのschedule()メソッドに渡すことによって、このような誤用を防ぐことができます。

Dateのような可変クラスでこのような問題に対応するには、Scheduler.schedule()で 受け取ったDateインスタンスの複製を作成する(ディフェンシブコピー)か、date.getTime()でlong値を取得して それを元にスケジューリングする必要があります。

手続き的ではなく関数的なメソッド

不変オブジェクトに対して操作や演算を行いたい場合があります。例えばStringで部分置換をしたり、BigDecimalで加算を行うような場合です。そのような場合は 不変オブジェクトの状態を変更するのではなく操作や演算を行った結果を表す別の不変オブジェクトを返すことになります。

メソッド呼び出しによってオブジェクトの状態を変更するようなメソッドは 手続き的(procedural)または命令的(imperative)方法と呼ばれ、操作や演算を行った結果を表す別のオブジェクトを返すメソッドは 関数的(functional)方法と呼ばれます。

不変クラスの条件

不変クラスの条件は次の通りです。若干緩められる部分もあり、それについては後ほど補足します。

  1. 全てのフィールドがfinalである。(緩和可能)
  2. クラスがfinalとして宣言されている。または サブクラス化が不可能
  3. フィールドが配列や可変オブジェクトへの参照の場合は finalを付けてもオブジェクトや配列の中身の変更を防ぐことはできないため、次のような対策を施す。
    1. クラス外から配列や可変オブジェクトを変更できないようにする
      • フィールドをprivateにしてクラス外からのアクセスを防ぐ。
      • 配列や可変オブジェクトを変更するようなメソッドを提供しない。
    2. 自クラス内で インスタンス化後に配列や可変オブジェクトの内容を変更しないようにする
      • ドキュメントを残すなどの対策を行う。
    3. クラス外と配列や可変オブジェクトへの参照を共有しないようにする
      • privateでないgetXXXメソッドを通して 配列や可変オブジェクトへの参照をクラス外に返さない。
      • コンストラクタで配列や可変オブジェクトを受け取る場合は、ディフェンシブコピーを作成する。

3のcは見落としてしまいがちな点です。コード例を交えて説明します。

privateでないgetXXXメソッドを通して 配列や可変オブジェクトへの参照をクラス外に返さない

フィールドが配列や可変オブジェクトへの参照の場合、参照をそのまま返してしまうと 呼び出し側で参照先のオブジェクトの状態を変更することができてしまいます。そうならないようにするためには 次の2つの方法が挙げられます。

  1. 参照その物を返すメソッドを提供するのではなく、必要なだけプリミティブ型のプロパティを返すメソッドを提供する
  2. 参照先の配列や可変オブジェクトのディープコピーを作成して返す
class ImmutableArrayHolder {
  private final int[] theArray;
  // ×:参照を返す。呼び出し側で参照を通して配列の内容を変更できてしまう。
  public int[] getArray()       { return theArray }

  // ○:1. 参照そのものではなく、必要なだけプリミティブ型のプロパティを返すメソッドを提供する。
  public int getArrayLength() { return theArray.length }
  public int getArray(int n)  { return theArray[n]; }

  // ○:2. ディープコピーを作成して返す。
  public int[] getArray()       { return (int[]) theArray.clone(); }
}
コンストラクタで配列や可変オブジェクトを受け取る場合は、ディフェンシブコピーを作成する

コンストラクタで渡された 配列や可変オブジェクトへの参照をそのままコピーしてしまうと、後々呼び出し側で参照先の内容を変更することができてしまいます。そうならないようにするには コンストラクタで配列や可変オブジェクトへの参照を受け取ったらディープコピーを作成して保持するようにします。

class ImmutableArrayHolder {
    private final int[] theArray;
    // ×:参照をコピーしてしまう。
    // 呼び出し側がコンストラクタの後で配列の内容を変更することが可能。
    public ImmutableArrayHolder(int[] anArray) {
        this.theArray = anArray;
    }
    // ○:ディープコピーを作成する。
    public ImmutableArrayHolder(int[] anArray) {
        this.theArray = (int[]) anArray.clone();
    }
}

可変性か、不変性か?」(IBM developerWorks)

不変クラスの条件が緩和できるケース

不変クラスとするには、基本的には前述の条件を満たしているのがベストではありますが、緩和できる部分もあります。1つめの「全てのフィールドがfinalであること」を緩和することができ外部から見えない限りはフィールドの値を変更しても構いません遅延初期化(lazy initialization)が必要なときに この条件の緩和を適用することができます。ただし、該当フィールドのゲッターメソッドを用意していなくても、hashCode()を通して間接的に変更が外部に見えてしまうような場合は この緩和は許容されないので注意が必要です。

不変クラスのデメリット

不変クラスはメリットだらけのように見えてしまいますが、デメリットもあります。それは、前述の関数的な方法に起因するものです。不変クラスではオブジェクトに対して操作や演算を行いたい場合は、操作や演算を行った結果を表す別のオブジェクトを生成して返します。そのため、オブジェクトの生成にコストがかかるような場合に パフォーマンスに影響が出てきます。特に 大きなオブジェクトのうちの ほんの一部だけ変更したオブジェクトを生成するような場合は 可変クラスに比べて性能が大きく劣ります。

このような 不変クラスにおける性能の問題に直面した場合は、コンパニオンクラスを用意することにより対処することができます。コンパニオンクラスとは 不変クラスと同等の情報を表す可変なクラスです。コンパニオンクラスをprivateな内部クラスにすれば クラス外に非公開にできますし、パッケージプライベートにすればパッケージ外に非公開にできます。実際にBigIntegerでは MutableBigIntegerというパッケージプライベートなコンパニオンクラスを用意することによって 不変クラスであることに伴う性能劣化の問題に対処しています。

また、コンパニオンクラスをpublicとして公開している例もあります。不変クラスStringに対して、StringBuilderが可変なコンパニオンクラスであり、StringBuilderはpublicなクラスとして公開されています。

初期化安全(initialization safety)

不変オブジェクトの初期化安全

finalで修飾されたインスタンスフィールドは、宣言時・コンストラクタ・インスタンスイニシャライザのいずれかで値を設定する必要があります。宣言時に値を指定しない場合は 一旦デフォルト値(0やfalseやnull)が設定され その後にコンストラクタやインスタンスイニシャライザの中で値を設定することになります。この最終的に設定される値を 便宜上ここでは最終値と呼ぶことにします。プログラムの実行をステップ・バイ・ステップで追っていくと finalフィールドの値がデフォルト値を示すときと最終値を示すときがあることになりますが、コンストラクタが適切に完了していれば全てのスレッドからインスタンスのfinalフィールドの最終値が見えることが保証されています(初期のJavaメモリモデルではこれが保証されていませんでした)。そのため 不変オブジェクトにアクセスする場合は 別途同期をしなくても常に最終値を見ることができます。これを初期化安全と言い、これが不変オブジェクトがスレッドセーフと言われる理由です。

ただし、注意点が2つあります。

  1. 不変オブジェクトと不変オブジェクトを参照するフィールドは別物です。そのため、不変オブジェクトを参照するフィールドを共有する場合は、可視性を確保する必要があります。不変オブジェクト自体は不変でも、不変オブジェクトを参照するフィールドに可視性がないと、他のスレッドが参照先を設定したり変更した結果が見えない可能性があります。これについてはJPCERTのサイト(※1)で具体例と対策がまとめられています。簡単にまとめると volatileやsynchronized、AtomicReferenceなどを使って可視性を確保することになります。
  2. 初期化安全の条件は コンストラクタが適切に完了していることです。コンストラクタが適切に完了しないケースというのは コンストラクタの中でそのオブジェクトの参照であるthisを 他のスレッドからアクセスできるように公開してしまう場合です(thisの逸出と呼びます)。thisの逸出にはいくつかのパターンがありますが 見落とされがちなケースもあるので、どのようなパターンがあるのかを知っておくことで防ぐことができるようになります。これについてもJPCERTのサイト(※2)で具体例と対策がまとめられています。特に コンストラクタの中で例外ハンドラやイベントハンドラとして自身を登録したり、コンストラクタの中でスレッドを開始するパターンは やってしまいがちなので注意が必要です。
可変オブジェクトの初期化安全

この章の「ダブルチェックロッキング(double-checked-locking)で起こりうる驚くべき事象」でも説明しましたが、可変オブジェクトへのオブジェクト参照フィールドの可視性が確保されていないと コンストラクタが完了していないインスタンスが見えてしまう可能性があります。オブジェクト参照フィールドをvolatileを付けて 可視性を確保することによって コンストラクタが完了していないインスタンスが見えてしまうことを防ぐことができます

ただし、可変オブジェクトへの参照と 可変オブジェクトのフィールドは別物であるため、可変オブジェクトへの参照に対して可視性を確保しても、可変オブジェクトのフィールドには可視性が付与されるわけではありません。そのため、コンストラクタで設定した可変オブジェクトのフィールド値を その後に変更した場合は、他のスレッドから見えるようにするためには 可変オブジェクトのフィールドに対して可視性を確保する必要があります。具体的には 可変オブジェクトのフィールド値をvolatileにしたり、変更操作をsynchronizedブロックで行ったりする必要があります。

ロックフリーでスレッドセーフな操作を提供するクラスを利用する

マルチスレッド環境で スレッドセーフに並行性を高めたい要望に答えるべく ロックフリーまたはロックの粒度を細かくしたりロックの使用を最小限に抑えて、並行性を高めながらもスレッドセーフな操作を提供するクラスが登場してきました。それらはjava.util.concurrentパッケージに収められていて、代表的なものはConcurrentHashMapクラスです。

これらのクラスを利用することによって、並列処理を行う際に 性能劣化を抑えつつ 安全性を確保することができます。ただし、使い方を誤るとスレッドセーフでなくなってしまう場合もありますので、どのような点に注意が必要なのかを見て行きます。

ConcurrentHashMapクラス

ArrayListやHashMapなど、Collectionの基本的な実装クラスはスレッドセーフではなく、例えば同じArrayListに対して2つのスレッドから同時に100個ずつ要素を追加すると ところどころ要素が抜け落ちて 合計は200個未満になってしまいます。これに対してVectorやHashtableは要素の追加・削除・取得などを行うメソッドがsynchronizedメソッドであるため 同じことをすると 抜けることなく追加され 合計は200個ちょうどになります。

しかし、VectorやHashtableを用いても あるスレッドがコレクションを走査している最中に 別のスレッドがコレクションの構造を変更するとConcurrentModificationExceptionが発生することがあり、これを防ぐには走査中に構造が変更されないように VectorやHashtableに対してロックを掛ける必要があります(スレッドセーフが部分的)。また、個々のメソッドを呼び出すたびにVectorやHashtableの全体にロックを掛けるため 効率が悪くなります(非効率)。

マルチスレッド環境で スレッドセーフに並行性を高めたい要望に答えるべく J2SE 5.0でConcurrentHashMapなどのスレッドセーフなコレクション実装が追加されました。ConcurrentHashMapでは コレクション全体にロックを掛けるのではなく、ロックの粒度を細かくすることにより 並行性を高めています。また、コレクションを走査する際に Iteratorを構築した時点のコレクション要素を走査対象とするので、コレクション走査中に別のスレッドがコレクションの構造を変更しても ConcurrentModificationExceptionが発生することはありません。また、保証はされませんが、Iteratorを構築した後に行われた更新が Iteratorに反映されることもあります。

ロックを最小限に抑えて並行性を高めているがゆえに 注意しなくてはいけないこともあります。それは size()・isEmpty()・contansValue()といった 集約状態を返すメソッドを呼び出す場合メソッドを呼び出している間にも他のスレッドが構造を変更している可能性があるということです。そのため、それらの集約状態は 他のスレッドが構造変更をしていない場合に限り有効な値になります。例えば 要素の数に上限を設定したいとします。要素を追加する際に size()で上限に達したかどうか判定してから要素を追加する間に size()の値が変わっている可能性があるため、要素の数が上限を越えるような場合が出てきてしまいます。そのため、集約状態を返すメソッドの戻り値は 正確な制御に用いることはできません。

また、個々のメソッドはスレッドセーフでも 複数のメソッドを組み合わせるとスレッドセーフではなくなることにも注意が必要です。例えば get()でキーの存在確認して 不在ならばput()をするというような複合操作を行う場合は、他のスレッドが割り込む余地を与えてしまいスレッドセーフではありません。このような状況に対応できるように ConcurrentHashMapでは 状態に依存した変更操作をアトミックに行うための各種メソッドを用意しています。例えば 先ほどの キーの存在確認をして不在ならばputするという複合操作は putIfAbsent()メソッドでアトミックに実行することができます。その他に 存在確認+削除をアトミックに行う remove()メソッド、存在確認+置換をアトミックに行う replace()メソッドなどが用意されています。また、汎用的な処理をアトミックに行いたい場合は BiFunctionを引数に取るcompute()メソッドや computeIfPresent()メソッドで実現することができます。

Java 8 時代の ConcurrentHashMap で遊んでみた。」(ゆっちのBlog)

volatileの使いどころ

これまでにも 何度か登場してきましたが volatileの使いどころについてまとめます。

volatileは 複数スレッド間で可視性を確保するために利用します。しかし複合操作におけるアトミック性を保証するものではないため 複合操作を排他的に行いたい場合には使うことができません

volatileは ロックフリーで並列処理の安全性を確保するための道具にはなりますが、正確に使いこなすには Javaメモリモデルを把握している必要があり、十分な検証も必要になります。一歩間違うと ロックフリーによる性能の恩恵を受けるどころか 安全性を損ねてしまう可能性もあり 高度な技術と言えます。

とは言え、volatileを適用できる明確なパターンがいくつかありますので、そのパターンを紹介します。volatileでスレッドセーフを実現するには、次の2つの条件を満たす必要があります。

  • 共有変数への書き込みが、現在の共有変数の値に依存しない。
  • 共有変数が、他の変数との比較等に使われていない。

この2つの条件を満たすケースはかなり限られるため 適用範囲が広くないことは容易に想像できると思います。この条件を満たす実用的な5つのパターンは次の通りになります。

  1. ステータスフラグ
  2. 可変オブジェクトを一度だけ安全に公開
  3. 可変オブジェクトを定期的に公開
  4. 構造体(volatile bean)を共有
  5. 読み書きロック

これらは IBM developerWorksの記事()で紹介されているパターンになります。それぞれのパターンについて見ていきます。

volatile を扱う」(IBM developerWorks)

ステータスフラグ

あるスレッドが繰り返し処理を行っていて、別のスレッドがその繰り返し処理を停止する場合に、ステータスフラグを用いる手法が良く使われます(Thread.stop()が非推奨のため)。このステータスフラグの変更が 他のスレッドから見えるようにするために、ステータスフラグをvolatileにします。

class StoppableThred extends Thread {
    
    private volatile boolean isRunning = true;
    
    @Override
    public void run() {

        while(isRunning) {
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void stopSafely() {
        isRunning = false;
    }
}

isRunningに対する書き込みは stopSafely()メソッドの中で行われますが、現在の状態に依らず定数falseを設定するため 1つ目の条件を満たします。また、isRunningは 定数trueとの比較にだけにしか使われないので2つ目の条件も満たします。

ループの終了条件の他にも、初期化終了のフラグなどにも適用することができます。

class ConfigInitializationExample {

    // 初期化完了フラグ
    static volatile boolean initialized = false;
    // 設定値
    static Map<String, Integer> config = new HashMap<>();

    public static void main(String[] args) {

        ExecutorService es = Executors.newCachedThreadPool();
        
        Future<?> f1 = es.submit(() -> {  // スレッド1
            readConfig(config);    // ファイルから設定値を読み出す。
            initialized = true;    // 初期化完了フラグを立てる。
        });
        
        Future<?> f2 = es.submit(() -> {  // スレッド2
            // 初期化が完了するのを待つ。
            while(initialized == false) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
            // 設定値を利用。
            int timeout = config.get("TIMEOUT");
            int retryCount = config.get("RETRY_COUNT");
        });
        
        // 待ち合わせなど。
    }
}

スレッド1がファイルから設定を読み出して、スレッド2は スレッド1の設定の読み出しを待ってから設定値を利用しますが、初期化完了を伝えるためにステータスフラグinitializedを使います。initializedはvolatileを指定しているので スレッド2ではスレッド1が書き換えた値を読み取ることができます。

ここで特筆するべきは configフィールドにはvolatileをつけなくても意図通りに機能するということです。スレッド1がconfigの値を設定し スレッド2がその値を読み取るには可視性を確保しないといけないように思えてしまいますが、configにvolatileをつけなくても スレッド2ではスレッド1が書き込んだ値を読み取ることができます。込み入った話になってしまいますが、Javaメモリモデルに従うと 次の3つの順序付けが行われます。

  1. スレッド1のconfigへの書き込みと initializedへの書き込みの間に StoreStoreバリアが挿入されます。これによってスレッド1がinitializedへの書き込みを行う前に configの値がメインメモリに書き込まれ 他のスレッドからconfigの最新の値が見えることが保証されます。
  2. 一方で スレッド2ではinitializedの読み出しと configの読み出しの間に LoadLoadバリアが挿入されます。これによってconfigの読み出しが行われる前に initializedの読み出しが完了していることが保証されます。(つまりリオーダーされてconfigの読み出しが先に行われることはありません)
  3. そして initializedにはvolatileがついているので、スレッド1が書き込んだ値は 即座に他のスレッドから見えるようになることが保証されます。

この3つの組み合わせによって configフィールドにvolatileをつけなくても スレッド2では最新の値を読み取ることができます。このようにvolatileは volatileを付けたフィールドだけでなく、その前後の非volatileフィールドの可視性を確保する働きをすることもあります。ただし、読み出しや書き込みの組み合わせや順番によっては 非volatileフィールドの可視性が確保されない場合もあるため、JSR-133を詳細に理解する必要があります。理解を誤ると安全性を損なう恐れがあるため、複数スレッドで共有される全てのフィールドにはvolatileを付けた方が無難です。

可変オブジェクトを一度だけ安全に公開

初期化安全(initialization safety)」で説明した通り、可変オブジェクトへの参照フィールドにvolatileを付けることによって コンストラクタの処理が完了していないインスタンスが見えてしまう可能性を防ぎます。

繰り返しになりますが、コンストラクタの後に可変オブジェクトのフィールドの値を変更する場合は synchronized等で同期を取ったり 該当フィールドをvolatileにするなどして 別途可視性を確保する必要があります。

可変オブジェクトを定期的に公開

一つ上のパターンの拡張で、一度だけ公開するのではなく、繰り返し公開するような場合にも volatileを適用することができます。例えば環境センサから温度を読み出して 定期的に現在の温度を公開するスレッド(公表スレッド)と、定期的に現在温度を読み取って 画面表示を行うようなスレッド(読み出しスレッド)の間で 情報を受け渡すような場合に適用することができます。受け渡す情報がプリミティブ型であれば 公表スレッドが書き込んだ値が即座に反映されるので 読み出しスレッドは最新の値を取得することができます。受け渡す情報が可変オブジェクトであれば 公表スレッドが生成したインスタンスの初期化安全が確保されるので 読み出しスレッドはコンストラクタが完了したインスタンスを読み取ることができます。理屈は一つ上のパターンと一緒です。

構造体(volatile bean)を共有

この章の「元々アトミックな操作に対して 可視性を確保する」で触れましたが、構造体のようなデータクラスを複数スレッド間のデータの受け渡しに使う場合に volatileを適用することができます

JavaBeansはまさに構造体のようなクラスで 次のような条件を満たしている必要があります。

  • publicクラスである。
  • インスタンスフィールドはprivateであり、ゲッター・セッターが用意されている。
  • 引数なしのpublicなコンストラクタが用意されている。

これに 次の条件を加えて複数スレッド間で安全に共有できるようにします。

  • インスタンスフィールドはvolatileである。
  • ゲッター・セッターでは 単独の値の読み出しと書き込み以外の処理は行わない(volatileでは複合操作のアトミック性が確保できないので 複合操作は行わない)。

もし セッターで複合操作を行いたい場合は、次に紹介する 読み書きロックのパターンを適用することもできます。

読み書きロック

この章の「変更操作をロックを掛けて排他制御を行い、読み取り操作を可視性だけを確保する。(ロックを減らす)」で説明した通り、データを保持するだけの単純なクラスの場合でも、往々にして 値の取得はそのまま返すだけで良くても 変更操作を行う場合は 引数チェックなど他の処理を必要とするケースが多いものです。そのような場合に、複合操作を伴う変更操作はsynchronized等を使ってロックを掛けて排他制御を行い、単独のフィールドの読み取りだけの取得操作は フィールドにvolatileを付与して ロックを掛けずに可視性だけを確保することでスレッドセーフを実現することができます。

待ち合わせ、同期

Javaのマルチスレッドにおいて 待ち合わせやタイミングの同期を取る方法として、次のような手段を用意しています。

  • wait()/notify()による同期
  • CountDownLatch等による同期
  • BlockingQueueインタフェースの実装クラス等による同期

以前は 同期を取るためには wait()/notify()が主に使われていましたが、CountDownLatchやBlockingQueueなど同期の用途に応じたクラスが標準ライブラリに追加されてきましたので、用途に適合したクラスがあればそちらを使う方が効率的です

また 単純に別スレッドの終了を待ち合わせるような場合は、それぞれのスレッドフレームワークごとに待ち合わせの手段があります。そららについては 各スレッドフレームワークの章で言及します(例えばプレーンなThreadクラスの場合は join()メソッドで別スレッドの終了を待ち合わせることができます)。

wait()/notify()による同期

あるスレッドがモニタ(synchronized)によって保護されたブロックに入ったけれども 実行条件が整っていない場合は、休眠状態に入り 別のスレッドから実行条件が整った時に起こしてもらうことができます。

Objectは 1つのロックの他にウェイトセットという待機スレッドの集合を持っています。あるスレッドが あるObjectのロックを獲得して wait()メソッドを呼び出すと、そのObjectのウェイトセットに自身を格納して ロックを解放して休眠状態に入ります。休眠状態に入ったスレッドは 次のいずれかが発生するまで待ち続けます。

  • 別スレッドが 同じObjectのロックを獲得してnotify()メソッドを呼び出す。
  • 別スレッドが 同じObjectのロックを獲得してnotifyAll()メソッドを呼び出す。
  • 別スレッドが 休眠しているスレッドのinterrupt()メソッドを呼び出す。
  • wait()がタイムアウトする(引数でタイムアウト時間を指定した場合のみ)。

notify()が呼ばれた場合は ウェイトセットのうちの1つのスレッドが再開されます(ウェイトセットのうち どのスレッドが再開されるかは 順序に依らず不定です)。notifyAll()メソッドが呼ばれた場合は ウェイトセットの全てのスレッドが再開されます。

wait()/notify()による同期の例を次に挙げます。

ExecutorService es = Executors.newCachedThreadPool();
Object lock = new Object();
// 実行条件が整っているかどうかのフラグ
AtomicBoolean isDone = new AtomicBoolean(false);

// waitするスレッド
es.submit(() -> {
    __logger.debug("wait thread start.");
    synchronized (lock) {
        // 基本的にはループで条件チェック
        while (isDone.get() == false) {
            __logger.debug("try wait()");
            try {
                lock.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            __logger.debug("end wait()");
        }
    }
    __logger.debug("wait thread end.");
});

// notifyするスレッド
es.submit(() -> {
    __logger.debug("notify thread start.");
    // 何らかの処理。sleepで代用。
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    // notifyを呼び出してwaitしているスレッドを起こす。
    synchronized (lock) {
        isDone.set(true);
        __logger.debug("call notify()");
        lock.notify();
    }
    __logger.debug("notify thread end.");
});

es.shutdown();

wait()やnotify()は ロックを獲得した状態で呼び出します。wait()を呼び出すとロックは解放されるため、他のスレッドがロックを獲得することができるようになります。

また、wait()は ループの中で 実行条件が整っていなことを確認した上で呼び出すようにすることが重要です。ここでは便宜上 チェックinループ イディオムと呼びます。

Object lock = ...  // ロックオブジェクト
synchronized(lock) {
    while(実行条件が整っていない) {
        lock.wait()
    }
}

このチェックinループイディオムが推奨されるのは、次のような理由によります。

  • wait()を呼び出す前に 既に実行条件が整っていて 別スレッドからnotify()やnotifyAll()が呼ばれてしまっている場合wait()は永久に待機することになってしまいます。そのため、wait()を呼び出す前に 実行条件が整っているかの確認を行います。
  • 待機スレッドを起こすために notify()ではなくnotifyAll()を使うことができます。notify()で待機スレッドを起こす場合は、スレッドとロックオブジェクトの対応を管理する必要があり、誤ったロックオブジェクトの notify()を呼び出してしまうと、起こすべきスレッドが永久に待機してしまう 活性エラーに陥る可能性があります。チェックinループイディオムとnotifyAll()を使えば、notifyAll()で起こされた全てのスレッドが 各々実行条件を確認するため、スレッドとロックオブジェクトの対応を管理する必要がありません。起こされた各スレッドは、実行条件が整っていなければ 再度wait()して、次のnotifyAll()が呼ばれるのを待つことになります。
  • wait()で待機していたスレッドは notify()やnotifyAll()で起こされた後、ロックを獲得しようとします。しかし、wait()以外の別のロック獲得待ちのスレッドがロックを獲得する可能性もありそのスレッドが 整っていた実行条件を崩してしまう可能性があります
  • notify()を呼び出すスレッドが、実行条件が整っていないにも関わらず 誤って待機スレッドを起こしてしまったような場合にも対処できます。チェックinループイディオムであれば、間違って起こされたスレッドは 実行条件が整っているか確認し、整っていなければ再度wait()することになります。
  • ごく稀に 待機スレッドは notify()やnotifyAll()が呼ばれなくても 起きてしまうことがあります。これは偽りの目覚め(spurious wakeup)と呼ばれます。チェックinループイディオムは このような場合にも対処できます。
  • wait()でタイムアウト時間を指定した場合タイムアウトが発生したのかnofity()やnotifyAll()が呼ばれたのかを区別する手段がありません。そのため、wait()でタイムアウト時間を指定する場合は、実行条件が整っているかどうかの確認が必須です。

notify()は どの待機スレッドを起こすかは不定です。そのため、待機スレッドごとに実行条件が異なり、どれかは分からないけれども いずれかのスレッドの実行条件が整ったことを知らせるような場合には、notifyAll()で全ての待機スレッドを起こして、各スレッドに実行条件が整ったかどうかの確認をさせる必要があります。

全ての待機スレッドの実行条件が同じで 任意の1つの待機スレッドを起こしたいような場合は、前述のチェックinループイディオムを使っていれば notify()でもnotifyAll()でも違いは出ません。しかし、スレッドとロックオブジェクトの対応を管理する必要があり、誤ったロックオブジェクトの notify()を呼び出してしまうと、起こすべきスレッドが永久に待機してしまう 活性エラーに陥る可能性があります。そのため、特別な意図が無い限りは notify()よりもnotifyAll()を使う方が無難です

Lock実装クラスによる同期

java.util.concurrent.locks.Lock インタフェースの実装クラスでは、待ち合わせを行うのに Object.wait()/notify()の代わりに Conditionを利用します。Objectの場合はウェイトセットが1つですが、Lockの場合は 複数のConditionを持つことができます。そのため、複数のウェイトセットを保持できる効果があります。Conditionオブジェクトは Lockの実装クラスの newCondition()メソッドにより生成します。

Conditionの場合は メソッド名が異なり、wait()/notify()/notifyAll()ではなく、await()/signal()/signalAll() になります。そして、Condition.await()も Object.wait() と同じ理由で、チェックinループイディオムの中で呼び出すべきです

CountDownLatch等による同期

複数のスレッドが各々処理を行い、別のスレッドがそれらの処理完了を待機するような場合にCountDownLatchを利用することができます。CountDownLatchのように スレッド同士の待ち合わせを可能にするクラスはシンクロナイザと呼ばれます。

CountDownLatchを生成する際にカウンタを指定します。他のスレッドの完了を待つスレッドはCountDownLatchオブジェクトのawait()を呼び出して待機します。それ以外の非同期処理を行うスレッドは 各々処理を行い 終了したらCountDownLatchオブジェクトのcountDown()メソッドを呼び出し カウンタの値を減らします。カウンタの値が0になると await()を呼び出して待機していたスレッドが再開されます。CountDownLatchの典型的な使用例を次に挙げます。

public static void main(String[] args) {
    
    int parallelism = 5;

    CountDownLatch ready = new CountDownLatch(parallelism);
    CountDownLatch start = new CountDownLatch(1);
    CountDownLatch goal = new CountDownLatch(parallelism);
    
    ExecutorService es = Executors.newCachedThreadPool();
    for(int i = 0; i < parallelism; i ++) {
        final String name = "thread-" + i;
        es.submit(() -> {
            try {
                // 準備完了を通知。
                ready.countDown();
                // スタートの合図を待つ。
                start.await();
                __logger.debug(name + ": start");
                int time = ThreadLocalRandom.current().nextInt(100, 1000);
                try {
                    TimeUnit.MILLISECONDS.sleep(time);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                __logger.debug(name + ": end");
            } catch (InterruptedException e) {
                __logger.error(name + ": interrupted.", e);
            } finally {
                // タスク完了を通知。
                goal.countDown();
            }
        });
    }

    try {
        // 全てのタスクの準備完了を待つ。
        ready.await();
        // スタートの合図。
        __logger.debug("start !!");
        start.countDown();
        // 全てのタスクの終了を待つ。
        goal.await();
        __logger.debug("goal !!");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }   
}

CountDownLatchは カウンタをリセットすることができないので一度だけの使い切りになります。カウンタをリセットして再利用したい場合は 少しクラスの役割が違いますが CyclicBarrierクラスを使うこともできます。

BlockingQueueインタフェースの実装クラス等による同期

BlockingQueueインタフェースは 並列処理のデザインパターンの1つである プロデューサ・コンシューマのキューとして利用することができます。コンシューマ(消費者)がキューから要素を取得する際に キューが空の場合は キューに要素が入れられるまで待機することができます。逆に、プロデューサ(生産者)がキューに要素を入れる際に キューが満杯の場合は キューに空きが出るまで待機することができます

BlockingQueueを使った例を次に挙げます。プロデューサは一定間隔でキューに要素を追加していき、コンシューマは取り出す時間間隔をだんだん短くしていきます。始めのうちはプロデューサの方が活発に活動するため キューが一杯になります。そのうちコンシューマの活動頻度が プロデューサのそれを追い越すようになり、キューが空になります。その後はコンシューマは プロデューサがキューに要素を追加するのを待つようになります。

ExecutorService es = Executors.newCachedThreadPool();

// BlockingQueue
BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);

// プロデューサ(生産者)、キューに要素を入れるスレッド
// 一定の時間間隔でキューに入れる。
es.submit(() -> {

    for(int i = 0; i < 20; i ++) {
        try {
            String element = "E-" + i;
            __logger.debug("put " + element);
            queue.put(element);

            // 常に0.1秒間隔。
            TimeUnit.MILLISECONDS.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});

// コンシューマ(消費者)、キューから要素を取り出すスレッド
// キューから取り出す時間間隔を短くしていく。
es.submit(() -> {
    
    long timeSpan = 5000;
    for(int i = 0; i < 20; i ++) {
        try {
            // 取り出す時間間隔を半分にしていく。
            TimeUnit.MILLISECONDS.sleep(timeSpan);
            timeSpan /= 2;
            
            String element = queue.take();
            __logger.debug("take " + element);

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});

es.shutdown();

QueueやDequeインタフェースについては 「コレクションフレームワーク」の章で詳しく見ていきます。

スレッドフレームワーク

プレーンなThreadクラス

前述のように 古いコードをメンテナンスする場合などを除くと、プレーンなThreadクラスを直接扱う必要は ほとんど無くなってきました。マルチスレッドで並列処理を行いたい場合は プレーンなThreadクラスを直接扱うのではなく、後述のExecutorService等を利用します。

ただ、プレーンなThreadクラスを知ることは、スレッド全般に関する基本的な部分を知ることでもありますので、知っていて損はありません。

スレッドの生成、開始、処理の実装

Threadクラスを使って 別スレッドを起動して処理を実行するには、次の2通りの方法があります。

  1. Threadクラスのサブクラスを作成し、run()をオーバーライドして 別スレッドで実行させる処理を記述する。
  2. Runnableインタフェースの実装クラスを作成し、run()を実装して 別スレッドで実行させる処理を記述する。作成したRunnableインタフェースの実装クラスを Threadのコンストラクタに渡す。

別スレッドを開始するには Threadインスタンスのstart()を呼び出します。別スレッドの終了を待ち合わせるには Threadオブジェクトのjoin()を呼び出します。

1のThread のサブクラスの作成例と、利用例を次に示します。

// Thread のサブクラス定義
class SomeThread extends Thread {

    @Override
    public void run() {
        // 別スレッドで実行させる処理。
        // run() メソッドを抜けるとスレッド終了。
    }
}

// 利用例
SomeThread thread = new SomeThread();
thread.start();  // 別スレッド処理開始
try {
    thread.join();  // 別スレッド終了待ち合わせ
} catch(InterruptedException e) {
    e.printStackTrace();
}

2のRunnableインタフェースの実装クラスの作成例と、利用例を次に示します。

// Runnable インタフェース実装クラス定義
class SomeRunnable implements Runnable {

    @Override
    public void run() {
        // 別スレッドで実行させる処理。
        // run() メソッドを抜けるとスレッド終了。         
    }
}

// 利用例
Thread thread = new Thread(new SomeRunnable());
thread.start();  // 別スレッド処理開始
try {
    thread.join();  // 別スレッド終了待ち合わせ
} catch(InterruptedException e) {
    e.printStackTrace();
}

別スレッドで実行する処理が簡易な場合などは、匿名クラスとして実現されることが良くあります。また、Runnableは関数型インタフェース(run()が 引数 void、戻り値 voidの抽象メソッド)なので、ラムダ式で記述することもできます。

スレッドの停止

Threadクラスには stop()メソッドが用意されていますが 非推奨となっています。stop()メソッドを呼び出すと スレッドがどのような状態であっても停止してしまうため、該当スレッドが操作中のデータを破壊してしまうおそれがあり 事実上使用禁止となっています。そのため、スレッドを停止させたい場合は 別の手段を用意する必要があります。スレッド停止手段として、次のような方法が良く使われます。

  1. ThreadのサブクラスまたはRunnableの実装クラスで スレッドが実行中かどうかのフラグを用意します。フラグは「実行中」で初期化するか、スレッド開始用のメソッドを用意してその中で「実行中」に設定します。
  2. run()メソッド内で定期的にフラグの値を確認し、フラグが「実行中」でなければrun()メソッドを終了するようにします。
  3. 外部から停止を行うためのメソッドを用意し、呼び出された場合は上述のフラグを「終了」に変更します。

実装例を次に示します。

class StoppableThred extends Thread {
    
    private AtomicBoolean running = new AtomicBoolean(true);
    
    @Override
    public void run() {

        while(running.get() == true) {
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void stopSafely() {
        running.set(false);
    }
}

実行中かどうかを表すフラグrunningフィールドは 複数のスレッドからアクセスされるため、可視性を確保する必要があります。ここではAtomicBooleanを使いましたが、このケースでは フラグを設定する際に現在の値に依存しておらず、かつ他の変数との比較を行っていないため volatileの適用条件に合致します。したがってこのケースでは フラグrunningフィールドにvolatile修飾子を指定する方法を採ることもできます。

割り込みによる停止

run()メソッドの処理が 割り込みを受け付ける処理を実行している場合Thread.interrupt()をスレッド停止の手段として利用することもできます。run()メソッドでは、InterruptedExceptionをキャッチした場合は run()メソッドを終了するようにしておきます。割り込みを受け付ける処理としては Thread.sleep()クラスメソッド、TimeUnit.sleep()メソッド、Thread.join()メソッド、Object.wait()メソッドなどが挙げられます。run()メソッドで割り込みを受け付ける処理以外を実行している場合interrupt()メソッドを呼び出してもスレッドは中断しないので注意が必要です(スレッドが内部で保持している状態がinterruptedに変わるだけです)。

注意すべき 割り込み不可な処理としては、ソケットからの読み出しやDBアクセスなどが挙げられます。Socketクラスをソースにしたストリームやリーダーからの読み出し中は、Thread.interrupt()を呼び出しても中断されません。また、JDBCのStatementによる DB操作の実行中も同様に、Thread.interrupt()を呼び出しても中断されません。そのような場合は Thread.interrupt()の代わりの中断手段を用意します。Sokcetから読み出す場合は Socket.close()により読み出しを中断することができます。また、Socketの代わりに java.nio.channels.SocketChannelを使うとThread.interrupt()で中断することができるようになります。DB操作の場合は Statement.cancel()で処理を中断することができます。具体的な例は JPCERTのサイト()を参照してください。

また、後で出てくるExecutorフレームワーク等のフレームワークでスレッドを利用する場合は、InterruptedExceptionをキャッチした際には フレームワークが割り込みを適切に処理できるように、Thread.currentThread.interrupt()を呼び出すようにします。

Executorフレームワーク・Futureクラス(J2SE 5.0~)

Executorフレームワークでは ExecutorServiceがスレッドプールを保持し スレッドの実行管理を行います。ユーザ側ではスレッドを生成し(new Thread())、開始する(Thread.sart())代わりに、処理をタスクとしてExecuterServiceに送信して ExecutorServiceに実行管理を任せます

プレーンなThreadでは タスクとしてRunnableを指定しますが、Runnableでは値を返せず 例外を投げても別スレッドで受け取る手段がありませんでした。また、スレッド間通信とスレッド間の同期を独自に実装する必要がありました。そこでCallableFutureインタフェースが導入され、別スレッドで実行したタスクの戻り値や例外をハンドリングできるようになりました。Callableは Runnableに似ていますが、任意の型の戻り値を返し 任意の例外を投げられるようにしたインタフェースです。Futureは 並列処理におけるデザインパターンの一つで、非同期処理を呼び出した際に、結果を取得するためのオブジェクト(Future)を即座に返し、呼び出し側では任意のタイミングでFutureから結果を取得することができるようになります(ただし、非同期処理が終わっていなければ、非同期処理が終わるまで待たされます)。

ExecutorServiceにタスクを送信するには submit()メソッドにRunnableかCallableを渡します。Runnableで戻り値を返すこともでき、その場合は Runnableと戻り値の型を指定します。submit()メソッドでは Futureを返し、get()メソッドで戻り値を取得することができます。Tは戻り値の型で、非同期処理が終了していない間はget()メソッドはブロックして終了を待ちます。タスクが例外を投げた場合は get()メソッドを呼び出すとExecutionExceptionが発生します。ExecutionExceptionオブジェクトのgetCause()メソッドで タスクが投げた例外を識別することができます。

get()で別スレッド終了まで待ち合わせるのは都合が悪い場合もありますので、isDone()メソッドで非同期処理が完了しているかどうか確認することができます。また、get()での最大待ち時間を指定するメソッドもオーバーロードされています。タスクがRunnableで戻り値を返さない場合、submit()は Future<?>を返し、get()メソッドを呼び出すと正常完了時にnullを返します。

ExecutorServiceを使って非同期処理を行う例をいくつか見てみます。始めは非同期処理を行い、待ち合わせを行わない例です。

ExecutorService es = Executors.newWorkStealingPool();

// 非同期処理を実施
es.submit(() -> {
    // 非同期処理
    __logger.debug("do something by work thread."); // __logger は ofg.slf4j.Logger
});

__logger.debug("main thread done.");
es.shutdown();

続いて、非同期処理を行い、非同期処理の結果を取得する例です。非同期処理が終了するまでの間は Future.get()の呼び出しはブロックされます。

ExecutorService es = Executors.newWorkStealingPool();

// 非同期処理の結果を取得
Future<Integer> f = es.submit(() -> {
    // 非同期処理で結果を返す。
    __logger.debug("do something by work thread.");
    return 123;
});
try {
    __logger.debug("result:" + f.get());  // f.get()は非同期処理が終わるまでブロックされる。
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
es.shutdown();

最後に 非同期処理が例外を投げる場合の例を見てみます。非同期処理で例外が投げられると Future.get()の呼び出しでExecutionExceptionが発生します。ExecutionExceptionのgetCause()を呼び出すと 非同期処理で投げられた例外を取得することができます。

ExecutorService es = Executors.newWorkStealingPool();

// 非同期処理が例外を投げる。
Future<Integer> f = es.submit(() -> {
    // 非同期処理が例外を投げる。
    __logger.debug("do something by work thread.");
    throw new IOException("throw exception intentionally");
});
try {
    __logger.debug("result:" + f.get());  // ExecutionExceptionが投げられる。
} catch (InterruptedException | ExecutionException e) {
    __logger.error("catch exception.", e);  // e.getCause()にIOExceptionが設定されている。
}
es.shutdown();

スレッドプールを実装したExecutorServiceが 何種類か用意されています。Executorsクラスの次のようなファクトリメソッドで 目的に応じたExecutorServiceを作成することができます。

Executorsクラスの主なファクトリメソッド
メソッド概要
newFixedThreadPool指定した固定数のスレッドを再利用するスレッドプールを作成します。newWorkStealingPoolも同様ですが、スレッドの上限数を指定する場合は 必要な同時スレッド数よりも小さな数を指定しまうと スレッド枯渇に伴うデッドロックを引き起こす可能性があるため 注意が必要です
newCachedThreadPool必要に応じて新規スレッドを作成するスレッドプールを作成します。利用可能な場合には既存のスレッドを再利用します。一定時間利用されなかったスレッドはプールから削除されます。
同時に大量のタスクを送信すると その分だけスレッドを起動してしまいます。スレッドを大量に起動すると 性能劣化につながるため、同時に大量のタスクを登録する場合には向きません
newWorkStealingPool指定した数(指定がない場合はCPUのプロセッサ数)の並列性を保持するスレッドプールを作成します。各スレッドにタスクキューが割り当てられ、キューが空になったスレッドは 別のタスクキューからタスクを奪ってきて処理を行います。そのため、送信されたタスクの実行順序に関しては何の保証もありません。
newSingleThreadExecutor単一のワーカスレッドを使用するExecutorを作成します。
バックグラウンドスレッドを1つだけ動かしたい場合に適しています。

Executorフレームワークでは、別スレッドを起動して非同期処理を実行し 結果を取得する以外にも 次のように沢山のことができます。

  • invokeAny()やinvokeAll()メソッドを使って、タスク集合のどれか1つまたは全てが終了するのを待ち合わせることができます。
  • 後で出てくるCompletableFutureを使って、非同期処理の結果を 別の非同期処理に渡していくことができます。
  • ScheduledThreadPoolExecutorを使って、一回もしくは周期的なタイマとして利用することができます。

Fork/Joinフレームワーク(Java SE 7~)

「Fork」という名前が付いていますが、Unix系OSのforkのようにマルチプロセスになるわけではなく、マルチスレッドのフレームワークになります。Executorフレームワークでは タスクをExecutorServiceに送信して実行を任せる形が基本的な使い方でした。Fork/Joinフレームワークでは 同じような使い方もできますが、タスク自身が処理を分割して スレッドを枝分かれさせて実行していくことができます(スレッドがフォークのように枝分かれしていきます)。Fork/Joinフレームワークは 再帰処理による分割統治のような処理を行うのに向いていて、代表的な例としてフィボナッチ数列の導出が挙げられます。

フィボナッチ数列は 0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, … と続いていく数列で、最初の二項は 0, 1 で、それに続く項は 直前の2つの項の和になっています。n番目のフィボナッチ数を Fn で表すと、Fn は再帰的に次のように定義されます。

F0 = 0,  F1 = 1,  Fn+2 = Fn + Fn+1 (n >= 0)

これを再帰処理で記述すると、次のようになります。

public static void main(String[] args) {

    List<Integer> list = new ArrayList<>();
    for(int i = 0; i < 20; i ++) {
        list.add(fibonacci(i));
    }
    System.out.println(list); // [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
}

private static int fibonacci(int n) {
    if(n <= 1) {
        return n;
    }
    return fibonacci(n - 2) + fibonacci(n - 1);
}

これをFork/Joinを使って書くと 次のようになります。

class SomeClass {
    public static void main(String[] args) {

        List<Integer> list = new ArrayList<>();
        for(int i = 0; i < 10; i ++) {
            list.add((new CalcFibonacci(i)).invoke());
        }
        System.out.println(list);  // [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
    }
}

class CalcFibonacci extends RecursiveTask<Integer> {
    private final int n;

    CalcFibonacci(int n) {
        this.n = n;
    }

    @Override
    protected Integer compute() {

        if(n <= 1) {
            return n;
        }
        CalcFibonacci task1 = new CalcFibonacci(n - 2);
        CalcFibonacci task2 = new CalcFibonacci(n - 1);
        task2.fork();  // task2 は別スレッドで実行される。
        return task1.compute() + task2.join();
    }
}

上の例ではExecutorServiceが登場しませんが、RecursiveTaskクラス(ForkJoinTaskのサブクラス)のinvoke()メソッドやfork()メソッドを呼び出すと 内部的にExecutorServiceを実装したForkJoinPoolにタスクを送信し実行管理を任せています。スレッドの実行管理にはスレッドプールを使っていますが、それを意識させずにタスクを中心とした処理の記述ができる点が特徴です

ForkJoinPoolがExecutorServiceになり ForkJoinTaskがFutureを実装するタスク(抽象クラス)となりますが、実際の処理はForkJoinTaskのサブクラスでexec()メソッドをオーバーライドするか、タスクの処理を実装したRunnableかCallableをadapt()メソッドに渡すことになります(勿論ラムダ式で記述することもできます)。上の例で出てきた ForkJoinTaskのサブクラスであるRecursiveTaskは exec()メソッドをオーバーライドしていて、その中で 抽象メソッドcompute()を呼び出しています。そのため、RecursiveTaskのサブクラスでは compute()を実装するだけで完結することが出来ます。

ForkJoinTaskのfork()を呼び出すと、別スレッドで処理を開始します。join()メソッドで終了を待ち合わせ 結果を取得します。invoke()メソッドは fork()+join()の働きをします。

CompletableFuture(Java SE 8~)

J2SE 5.0で登場したFutureでは get()メソッドで非同期処理の結果(または例外)を取得することが主な役割でした。他のプログラミング言語のFuture/Promiseでは 処理完了時のコールバックの設定メソッドチェーンによる処理の連鎖などが行え、JavaのFutureは 他の言語のFuture/Promiseに比べると 機能的に不十分でした。Java SE 8でCompletableFutureクラスが登場し、処理完了時のコールバックの設定や メソッドチェーンによる処理の連鎖などが行えるようになりました。

CompletableFutureクラスは FutureインタフェースとCompletionStateインタフェースを実装していて、CompletionStateインタフェースで 処理完了時のコールバックメソッドや 連鎖的に処理を行うメソッドなどを定義しています。Futureインタフェースの実装では、生成時にRunnableかCallableのタスクを与えて タスクの中でFutureの結果を設定するのが一般的です。それに対してCompletableFutureクラスでは 外部(他のスレッド)から結果を設定するための publicなcomplete()メソッドやcompleteExceptionally() メソッドが用意されています。

CompletableFutureの使用例

CompletableFutureの使用例を見てみます。始めに1つの非同期処理を行う例です。CompletableFutureのファクトリメソッドsupplyAsync()やrunAsync()にタスクを渡してCompletableFutureを生成します。また、非同期処理終了後のコールバックをwhenComplete()メソッドで設定します。

// 引数で受け取ったミリ秒sleepして、受け取った値を返す。
public static int someTask(int millisec) {
    try {
        TimeUnit.MILLISECONDS.sleep(millisec);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    __logger.debug("done someTask(" + millisec + ")");
    return millisec;
}

public static void main(String[] args) {

    ExecutorService es = Executors.newCachedThreadPool();
    CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> someTask(100), es);

    cf.whenComplete((ret, ex) -> {
        __logger.debug("whenComplete called");
        if (ex == null) {
            // 正常終了
            __logger.debug("result=" + ret);
        } else {
            // 例外発生
            __logger.debug("err=" + ex);
        }
    });
}

whenComplete()メソッドの引数には BiConsumerを渡しますが、BiConsumerの2番目の引数exがnullかどうかで 非同期処理someTask()が 正常終了したか例外を投げたかを判別します。

また、whenComplete()を呼び出す前に 非同期処理someTask()が終了してしまうような場合でも、whenComplete()は呼び出されます。

続いて、複数の非同期処理の待ち合わせを行う例を挙げます。CompletableFutureのallOf()クラスメソッドで 複数のCompletableFutureを1つにまとめます。そして、1つにまとめたCompletableFutureに対してwhenComplete()メソッドでコールバックを設定します。1つにまとめたCompletableFutureは 戻り値がVoidで 戻り値を取得することができないため、元のCompletableFutureのListの各々に対して結果を取得することになります。

List<CompletableFuture<Integer>> cfs = List.of(CompletableFuture.supplyAsync(() -> someTask(100), es),
        CompletableFuture.supplyAsync(() -> someTask(200), es),
        CompletableFuture.supplyAsync(() -> someTask(300), es));
CompletableFuture<Void> cf = CompletableFuture.allOf(cfs.toArray(new CompletableFuture[cfs.size()]));

cf.whenComplete((ret, ex) -> {
    if (ex == null) {
        List<Integer> msg = cfs.stream().map(future -> {
            try {
                return future.get();
            } catch (InterruptedException | ExecutionException e) {
                __logger.error("future.get() failed.", e);
                return -1;
            }
        }).collect(Collectors.toList());
        __logger.debug(msg.toString());
    } else {
        __logger.debug("err=" + ex);
    }
});

続いて、非同期処理を順番に行う例を挙げます。非同期処理を順番に行うと 1番目の結果を2番目に渡すようなことができます。1番目のCompletableFutureのthenCompose()メソッドに 2番目のCompletableFutureを渡します。

CompletableFuture<Integer> work1 = CompletableFuture.supplyAsync(() -> someTask(100), es);
CompletableFuture<Integer> work2 = work1
        .thenCompose(result1 -> CompletableFuture.supplyAsync(() -> someTask(result1 * 2), es));

work2.thenAcceptAsync(result -> {
    __logger.debug(result.toString());
});

thenCompose()はCompletionStateインタフェースのメソッドで、thenCompose()の他にも 沢山のメソッドが用意されています。

自前でCompletableFutureを作成

CompletableFutureは CompletableFutureの各種クラスメソッドを通じて取得することができますが、自前で作成することもできます。これによって Runnable、Callable、Future、FutureTaskなどの既存の非同期タスクを CompletableFutureでラッピングすることができます。自分でCompletableFutureを作成するポイントは次の通りです。

  • CompletableFutureのコンストラクタでインスタンスを生成します。
    コンストラクタで生成すると 完了状態が未完了になります。
  • 非同期処理が正常に終了した場合は、complete()を呼び出して CompletableFutureオブジェクトに結果を設定して、完了状態を完了に遷移させます。
  • 非同期処理が例外終了した場合は、completeExceptionally()を呼び出して CompletableFutureオブジェクトに例外を設定して、完了状態を完了に遷移させます。

Callableを受け取って非同期処理で実行し CompletableFutureを返すメソッドの例と そのメソッドの使用例を見てみます。

public static <T> CompletableFuture<T> doAsync(Callable<T> callable, Executor executor) {
    // コンストラクタでCompletableFutureを生成すると未完了状態になる。
    CompletableFuture<T> cf = new CompletableFuture<>();
    executor.execute(() -> {
        try {
            T result = callable.call();
            // 成功したら結果を設定して完了状態に遷移。
            cf.complete(result);
        } catch (Exception e) {
            // 失敗したら例外を設定して完了状態に遷移。
            cf.completeExceptionally(e);
        }
    });
    return cf;
}

public static void main(String[] args) {
    CompletableFuture<Integer> cf = doAsync(() -> someTask(100), es);
    
    cf.whenComplete((ret, ex) -> {
        if (ex == null) {
            __logger.debug("result=" + ret);
        } else {
            __logger.debug("err=" + ex);
        }
    });
}

CompletableFutureのキャンセル

CompletableFutureでは 注意すべき点が一つあります。CompletableFutureにはcancel()メソッドが用意されていて、cancel()を呼び出すとCompletableFutureの状態は完了となりますが、非同期タスク自体は中断されません(非同期タスクを実行しているスレッドのinterrupt()を呼び出しません)。非同期タスク自体を中断したい場合には FutureやFutureTaskと言った 非同期タスクを中断できるクラスを使います。

下の参考サイトでは FutureTaskをCompletableFutureでラッピングして、CompletableFutureのcancel()が呼び出された場合に、それを検出して FutureTaskのcancel()を呼び出す仕組みが紹介されています。

CompletableFutureの使い方の基本形とFutureTaskとの連携例」(seraphyの日記)

並列ストリーム

Java SE 8で導入されたストリームで並列処理を実現することができます。詳細は「ストリーム」の章で説明します。