ストリーム

ストリームはデータ(プリミティブ型やクラスのオブジェクト)の流れを作り出し、それぞれのデータに対する処理(変換や集計など)を行うための枠組みで、大量操作に向いています。メソッドチェーンやラムダ式を用いて流れるようにコードを記述することができ、容易に処理の順序を入れ替えたり マルチスレッドで並列化することができます。

一見すると for文等を使ったループ処理と同じようなことができる機能に見えるかもしれませんが、ストリームは関数型言語のパラダイムであるため ループ処理の置き換えではなく ストリームに向く場合、向かない場合があります。

java.util.stream.Streamはコレクションフレームワークと密接に関連してはいますが、コレクションフレームワークの一部ではありません。また、java.ioパッケージのXXXStreamという名前のインタフェースやクラスとは関係ありません。

この章で登場する総称型クラスのメソッドの多くの型引数が境界ワイルドカード型として定義されていますが、簡易的に境界ワイルドカードの記述を省略しています(例:<? extends T>を単に<T>と記述)。正確な定義はJava APIドキュメントを参照してください。

 

ストリームの基本

ストリームの構成要素

ストリームは次の3つの要素で構成されます。

  1. ソース
    Collection、配列、ジェネレータ関数、入出力チャネルなど ストリームを流れる要素の元となるデータ集合です。ソースからストリームを生成する手段として、Collectionのstream()メソッド、Arraysのstream()クラスメソッド、Streamクラスの各種クラスメソッドなどがあります。
  2. 中間操作
    ストリームを流れるデータの加工を行います。各データに変換関数を適用したり、条件を満たさないデータを取り除いたりして、1つのストリームを別のストリームに変換します。filter()、map()、flatMap()、limit()、skip()などがあります。
  3. 終端操作
    ストリームを終端させます。ストリームを流れてきたデータの集計をしたり、Collectionや配列にまとめたり、結果を全て表示したりできます。collect()、reduce()、forEach()、count()、findAny()などがあります。

「操作」は おおまかに分けると、Streamを返すメソッドが中間操作、それ以外は終端操作になります。1つのソースと、0個以上の中間操作、1つの終端操作を組み合わせて1つのストリームパイプラインが構成されます。1つのStreamに対して終端操作は1回だけしか実行できず、終端操作を行ったStreamを再利用すると例外が発生します。

ストリームパイプラインは遅延評価され、終端操作が呼び出されるまで評価は行われません。そのため 終端操作が欠けていると何も行われないことに注意が必要です。

 

ストリームの種類

汎用的なストリームとプリミティブ型ストリーム

ストリームには オブジェクト参照をデータとするStream<T>と、プリミティブ型をデータとする IntStream、LongStream、DoubleStreamが用意されています。Stream<Integer>とIntStreamは同じint型のデータを扱えますが、IntStreamには数値に特化した集計メソッド(例えばsum()やaverage())が用意されていたり、同じメソッドでも引数が異なっていたりします。(例えばStream<T>の map()メソッドは引数が Functonで 入出力のクラスが異なっても良いのに対して、IntStreamのmap()メソッドは引数がIntUnaryOperatorで 入出力共にint型である必要があります。)

 

順次ストリームと並列ストリーム

ストリームには順次ストリーム並列ストリームがあり、並列ストリームの場合 マルチスレッドで並列に処理されます。Streamのメソッドによりどちらを使うか選択することができ、容易にマルチスレッドによる並列処理を実現することができます。しかし、処理の特性や処理対象の件数によっては 並列処理のメリットを享受できないどころか、性能劣化や安全性・活性を損なう場合もありますので、並列処理に適しているかどうかの見極めが必要です

Stream<T>やIntStream等のスーパーインタフェースであるBaseStreamのsequential()メソッドで順次ストリームを、parallel()メソッドで並列ストリームを取得することができます。また、Collectionインタフェースのように順次ストリームを取得するstream()メソッドと並列ストリームを取得するparallelStream()メソッドを用意しているものもあります。

順次ストリームか並列ストリームかはストリームパイプライン全体に適用され、中間操作ごとに順次ストリームを並列ストリームを切替えるようなことはできません。

 

有限ストリームと無限ストリーム

要素の個数が有限なストリームを有限ストリームと言い、要素の個数が無限なストリームを無限ストリームと言います。Collectionや配列などの有限個のデータ集合を元に作られるストリームは有限ストリームになります。一方で一定の計算などによって無限に要素を作り出せる場合は 無限ストリームにすることができます。

 

ストリームの向き不向き

ストリームは汎用性があるため、for文等によるループ処理の大半をストリームに置き換えることができます。しかし、ストリームは関数型言語のパラダイムであるため for文等によるループの置き換えではなく、ストリームが向く場合、向かない場合があります。そういったストリームの特性について見ていきます。

ストリームでできないこと、困難なこと

始めに、ストリームではできないことを挙げます。

  • for文等によるループ処理では スコープ内のローカル変数の読み書きが自由に行えますが、ストリームのラムダ式等からは finalか実質finalな(変更されない)ローカル変数の読み出ししかできず、変更することができません
  • for文等によるループ処理では return、break、continue、例外スローによってループの流れを制御することができますが、ストリームのラムダ式等からはループの流れを制御することはできません

また ストリームにおいて、できなくはないけれども 困難なことがあります。中間操作や終端操作は前段の操作の出力データを受け取ることができますが、前段の操作の入力データが必要になっても 入手する方法がありません。これに対する解決策は次の2つが挙げられますが、どちらも満足の行くものではありません。

  • 中間操作で 入力データと変換等したデータの両方を出力データとして後段に渡します。これによって 後段の操作では 前段の操作が変換等を行う前のデータにもアクセスすることができます。
    しかし、この方法では中間操作が何段にも及ぶ場合 ストリームを流れるデータ量が増えますし、何よりも イレギュラーな方法であるため分かりづらいコードになってしまいます。また、中間操作が変換を行うような場合には対応できても、フィルタリングやソートを行うような場合には対応できません。
  • 別の解としては、後段の操作で 前段の操作の逆変換を行うことによって 前段の操作が変換等する前のデータを導出する方法が考えられます。こちらの方法は 1つめの方法に比べるとスマートです。しかし、前段の操作が不可逆な変換を行う場合やフィルタリング、ソートを行うような場合には 同様に対応することができません。

ストリームが適している場面

ストリームは次のようなことを行うのに適しています。

  • ストリームの各データを同じように変換する場合。
  • ストリームのデータから条件に合ったものを抽出する場合。
  • ストリームのデータの合計、平均、最大、最小といった統計を取ったり、1つのコレクションにまとめる場合。
  • ストリームのデータを何らかの要素でグルーピングする場合。
  • ストリームのデータから条件に合ったデータを検索する場合。

また、ストリームパイプラインの各操作では 基本的に副作用が無いようにします。「副作用がない」とは 操作の結果が入力だけに依存していることで 入力以外の可変な状態に依存していたり、操作において他のオブジェクトの状態を変更したりしないことを表します。関数型言語ではこのような副作用のない関数は純粋関数(pure function)と呼ばれます。(Javaの場合は関数というものはなく 操作は関数オブジェクトになります。)関数型プログラミングの経験がある人にとっては 当たり前のことなのですが、Javaで初めてストリームに触れる場合には この基本事項を覚えておいてください。

ストリームの可読性向上

ストリームを乱用すると 読み手が理解しづらく保守が難しくなる場合もあります。そのような場合は次のような観点で改善ができないか検討してみます。

  • ラムダ式の引数は慣習的に1文字の名前にすることが多いのですが、型が明示されないため情報量が少なくなります。そのため引数に分かり易い名前をつけることによって 可読性の向上につながることがあります。ただし あまり長い名前だと 1行に収まらなくなってしまい、かえって読みづらくなることもあるため バランスが大事です。
  • ラムダ式で 作業用に一時的な変数が必要な場合、一時的な変数に名前が付けられず型も明示されないため 分かりづらいコードになりがちです。例えばcollect()でStringBuilderを生成して個々の文字を連結するような場合、次のような引数を渡します。
    ①StringBuilder::new  ②(sb, c) -> sb.append((char) c)  ③StringBuilder::append
    ①で生成したオブジェクトを使って②、③の操作を行っているのですが、名前が付けられず型も明示されないため 同じオブジェクトを扱っていることが分かりづらくなります。
    そのような場合は 該当処理をヘルパメソッドに移動して メソッドに適切な名前をつけることによって 可読性を上げられることがあります

 

メソッドの戻り値としてのストリーム

メソッドでデータの集合を返す場合、従来は 配列やCollection等のIterableが使われてきましたが、ストリームの登場により ストリームも選択肢に追加されることになりました。ストリームの基礎となるインタフェースであるjava.util.stream.BaseStreamにはiterator()メソッドが定義されていますが、これはIterableインタフェースのiterator()メソッドではありません。BaseStreamはItrerableを拡張しているわけではないので、拡張for文のターゲットにすることはできません。つまり、配列やIterableを返すメソッドとストリームを返すメソッドには互換性がありません。そのため、ループ処理を行いたい場面でストリームを返すメソッドを使いたい場合や、逆にストリームパイプラインに組み込みたい場面で配列やIterableを返すメソッドを使いたい場合には 変換を行う必要があります。それぞれの変換方法を見ていきます。

ストリームからIterableへの変換

ループ処理を行いたい場面で メソッドがストリームを返すような場合は 次のようなアダプタで変換することができます。

 

配列やIterableからストリームへの変換

ストリームパイプラインに組み込みたい場面で メソッドがIterableを返すような場合は 次のようなアダプタで変換することができます。

Iterableの中には Collectionのようにストリームへの変換メソッド(Collectionの場合はのstream()メソッド)をサポートしている物もあります。その場合はそれらを利用することができます。(Collection.stream()は上の変換アダプタと同じ実装になっています。)

また、配列をIterableに変換する場合は Arraysのstream()クラスメソッドや Streamのof()クラスメソッドを利用することができます。

 

理想的な戻り値

CollectionはIterableインタフェースを拡張していて、更にストリームを返すメソッドを実装しています。そのため、拡張for文のターゲットにすることもできますし、stream()メソッドでストリームとして扱うこともできます。データの数が多過ぎず 性能要件が厳しくない場合は データの集合を返す時はCollectionの実装クラスを返すのが理想的です。

 

ストリームの生成(ソース)

Collectionや配列などのソースからストリームを生成する主なメソッドを次にまとめます。

データソースからストリームを生成する主なメソッド
ソース メソッド 概要
Collection stream Collectionから順次ストリームを生成して返します。
parallelStream Collectionから並列ストリームを生成して返します。
配列 Arrays.stream 配列から順次ストリームを生成して返します。
Reader BufferedReader.lines BufferedReaderの1行を1要素としたストリームを生成して返します。
ファイル Files.lines ファイルの1行を 1要素としたストリームを生成して返します。
Files.list 指定したディレクトリ直下のPath一覧のストリームを生成して返します。
Files.walk 指定したディレクトリ配下のサブディレクトリも含めたPath一覧のストリームを生成して返します。

続いて Streamやプリミティブ型のStreamクラスに用意されている ストリームを生成する主なクラスメソッドを次にまとめます。

Streamに用意されているストリームを生成する主なクラスメソッド
有限/無限 クラス メソッド 概要
有限ストリーム Stream of 引数で指定されたデータのストリームを生成して返します。
empty 空(要素数0)のストリームを生成して返します。
concat 引数で指定された 2つのストリームを連結します。
Stream.Builder add、
build
add()メソッドでデータを追加し、最後にbuild()メソッドでストリームを生成して返します。
IntStream
LongStream
range 指定した範囲のストリームを生成して返します。範囲は開始と終了の1つ後を指定します。
rangeClosed 指定した範囲のストリームを生成して返します。範囲は開始と終了を指定します。
iterate 指定された初期値、計算式、終了条件から有限ストリームを生成して返します。
無限ストリーム generate 固定値や乱数値などの無限ストリームを生成して返します。
iterate 指定された初期値、計算式から無限ストリームを生成して返します。

of()、generate()、iterate()の使用例は次の通りです。

 

中間操作

主な中間操作

主な中間操作を次にまとめます。

主な中間操作
メソッド 引数 引数の概要 メソッドの概要
filter Predicate<T> predicate Tを受け取りbooleanを返す predicateで指定された条件に一致する要素を抽出した Streamを生成して返します。
map Function<T, R> mapper Tを受け取りRを返す mapperにしたがって各要素を変換したStreamを生成して返します。
flatMap Function<T, Stream<R>> mapper Tを受け取り Stream<R>を返す mapperにしたがって各要素を1対多に変換したStreamを生成して返します。なお、1対多の多は0や1でも構いません。
distinct なし 重複した要素を取り除いたStreamを生成して返します。
sorted なし 要素を自然順序付けに従って並べ替えたStreamを生成して返します。要素がComparableインタフェースを実装している必要があります。
sorted Comparator<T> comparator 2つTを受け取り比較結果(-1, 0, 1)を返す 要素をcomparatorにしたがって並べ替えたStreamを生成して返します。
peek Consumer<T> action Tを受け取り何も返さない 要素をactionにしたがって処理します。
limit long maxSize 要素の個数 maxSizeで指定された要素の個数までに限定します。
skip long n スキップする個数 nで指定された個数をスキップします。
takeWhile Predicate<T> predicate T を受け取り boolean を返す predicateで指定された条件に一致する間、要素を出力ストリームに渡します。
dropWhile Predicate<T> predicate Tを受け取りbooleanを返す predicateで指定された条件に一致する間、スキップします。

filter()、map()、flatMap()の使用例を挙げます。

filter()は Predicate<T> predicateを引数に取り、predicateがtrueとなる要素のみを抽出します。for文の中のif 文と同じような役割を実現できます。

 

map()は 大雑把に言うとFunction<T, R> mapperを引数に取り、T型の要素をR型の要素に変換します。TとRは同じ型でもOKです。

map()は ストリームから要素を取り出し、要素に関数を適用し、再度ストリームに要素を詰め込む という役割を果たします。違う見方をすると、ストリームという入れ物から要素を取り出さずに 要素に関数を適用することができることになります。

 

flatMap()は 大雑把に言うとFunction<T, Stream<R>> mapperを引数に取り、T型の要素をR型のStreamに変換します。個々のT型の要素からR型のStreamを作成し、1つのStreamに結合します。

flatMap()は ストリームから要素を取り出し、要素に関数を適用するところまではmap()と同じですが、再度ストリームに要素を詰め込まないところが異なります。これにより、複数のストリームを1つに結合する 集約の機能を実現することができます。

また、flatMap()は 関数の適用結果をストリームに詰め込む部分で独自の処理を行いたい場合にも使えます。例えば要素の数を2倍に増やすような 増幅の機能を実現するのに利用することができます。

 

ステートレス操作とステートフル操作

中間操作はステートレス操作ステートフル操作に分類できます。操作の対象が1要素だけなのか、それとも他の要素を参照する必要があるかどうかの違いです。ステートレス操作は1要素だけで操作を完結できます。filter()やmap()などはステートレスです。ステートフル操作は1要素だけでは操作が完結できず、他の要素を参照する必要があります。distinct()やsorted()などはステートフルです。distinct()は重複を排除するため 他の要素と一致するかどうかの比較が必要になりますし、sorted()は並べ替えるために 全要素との比較が必要になります。

ステートレス操作は他の要素とは無関係に処理を行えるため、並列処理と相性が良くなります。逆に ステートフル操作は他の要素との関連があるので、処理を並列にしてもメリットを享受できない場合があります。

 

短絡操作

短絡操作とは ストリームの全要素を対象としない中間操作または終端操作のことを指します。limit()やfindFirst()のようにストリームの途中の要素までを操作対象とするものが多いのですが、中には skip()のように途中からを操作対象とするものもあります。特に無限ストリームを扱う際に重要で、無限ストリームの操作を終わらせるためには短絡操作が必要になります。また、有限ストリームでも短絡操作によって不要な操作を実施せずに効率化を図ることができます

 

順次ストリームの場合の操作の実行順番

ストリームが順次ストリームの場合、基本的にはある一つの要素に対して中間操作から終端操作まで実行し、次の要素に対して中間操作から終端操作まで実施して・・・という順番で実行されます。

例えば上のようなストリーム・パイプラインの場合、要素1に対して filter()、map()、forEach()が適用され、続いて要素2に対してfilter()が適用され(falseになるのでmap()には進まない)、続いて要素3に対してfilter()、map()、forEach()が適用され… という具合に処理されていきます。実行結果は次の通りです。(// 以降は注釈です)

distinct()のようなステートフルな中間操作が含まれる場合も基本的には同じです。

要素1に対してfilter()、distinct()、map()、forEach()が適用され、続く要素1に対してfilter()、distinct()が適用され(重複しているのでmap()には進まない)、続いて要素2に対してfilter()が適用され(falseになるのでmap() には進まない)、続いて要素3に対してfilter()、distinct()、map()、forEach()が適用され…という具合に処理されます。(// 以降は注釈です)

sorted()の場合は特殊で、要素が全てが出揃ってからでないとソートができないため、これまでのものと順番が異なってきます。

sorted()で全要素の比較が必要になるため、全ての要素のmap()までを実施してからsorted()で並べ替え、続いて forEach()に進むという順場になります。実行結果は次のようになります。

全要素が揃わないと並べ替えができないことから、無限ストリームにsorted()を適用すると無限ループに陥ります。次の例は無限ループに陥ります。

sorted()の前にlimit()などを適用して有限ストリームにする必要があります。

また、並列処理のところで詳しく説明しますが、並列ストリームで並列処理を行う場合 sorted()で一旦全要素を待ち合わせることになるので並列処理にするメリットが薄れます。

 

終端操作

主な終端操作

終端操作は大きく次のように分類できます。

  • 副作用:何も返さず何らかの副作用を行う操作です。(forEach()など)
  • 判定:存在確認や一致確認などの判定を行い、booleanを返す操作です。(anyMatch()、allMatch()など)
  • 検索:特定の1要素をOptionalで返す操作です。(findAny()、findFirst()など)
  • リダクション:集計や畳み込み演算を行い、単一の結果(プリミティブ型やサマリ統計のようなオブジェクト)を返す操作です。(count()、max()、min()、reduce()など)
  • 可変リダクション:リダクション同様集計や畳み込み演算を行い、Collectionや配列等の集合(結果コンテナ)を返す操作です。(collect()、toArray()など)

リダクションや可変リダクションで集約を行う場合は、順序性の保証が無いので結合的な関数を指定する必要があります。関数opが結合的となるのは、次の条件が成り立つ場合です。

(a op b) op c == a op (b op c)

つまり、結合的であればどのような順番で関数を適用していっても全体の結果は同じになります。

 

主な終端操作を次にまとめます。

主な終端操作
分類 メソッド 引数 戻り値 メソッドの概要
副作用 forEach Consumer<T> action void Tを受け取り何らかの副作用を行います。しかし基本的にストリームの各操作は副作用がないのが望ましいため、結果を報告する用途以外に使うべきではありません。並列ストリームの場合順序性は確保されません。
forEachOrdered Consumer<T> action void forEach()と同様ですが、並列ストリームの場合に順序性が確保されます。
判定 anyMatch Predicate<T> predicate boolean 条件に一致する要素があればtrueを返します。条件に一致する要素が見つかった時点でtrueを返して処理を終了する短絡終端操作です。
allMatch Predicate<T> predicate boolean 全ての要素が条件に一致すればtrueを返します。条件に一致しない要素が見つかった時点でfalseを返して処理を終了する短絡終端操作です。
noneMatch Predicate<T> predicate boolean 条件に一致する要素が1つもなければtrueを返します。条件に一致する要素が見つかった時点でfalseを返して処理を終了する短絡操作です。
検索 findFirst void Optional<T> ストリーム内の先頭の要素を返します。ストリームが空の場合はOptional.emptyを返します。
findAny void Optional<T> ストリーム内の任意の要素を返します。どの要素が返るかは不定です。ストリームが空の場合はOptional.emptyを返します。
リダクション count void long ストリーム内の要素数を返します。
max Comparator<T, T> comparator Optional<T> comparatorに従って比較を行い、最大となる要素を返します。ストリームが空の場合はOptional.emptyを返します。
min Comparator<T, T> comparator Optional<T> comparatorに従って比較を行い、最小となる要素を返します。ストリームが空の場合はOptional.emptyを返します。
sum void プリミティブ型 プリミティブストリームに用意されています。合計値を返します。
average void OptionalDouble プリミティブストリームに用意されています。平均値を返します。
summary
Statictics
void XXSummary
Statistics
プリミティブストリームに用意されています。要素数・合計値・最小値・最大値・平均値をまとめたXXXSummaryStatisticsクラスのオブジェクトを返します。
reduce BinaryOperator<T, T> accumulator Optional<T> 累積関数accumulatorで集約した値を返します。accumulatorは結合的である必要があります。ストリームが空の場合はOptional.emptyを返します。
T identity, BinaryOperator<T, T> accumulator T 上のreduce()の単位元identityを指定する版です。ストリームが空の場合は単位元が返るので、戻り値はOptionalではなくTとなります。
U identity, BiFunction<U, T> accumulator, BinaryOperator<U, U> combiner U 上のreduce()の単位元identityに加えて結合関数combinerを指定する版です。combinerは並列ストリームの場合にaccumulatorによる部分的な結果を結合する関数で、結合的である必要があります。また、このメソッドではaccumulatorで型変換を行うことができます。
可変リダクション collect Collector<T, A, R> collector R Collectorを指定して結果コンテナを返します。CollectorはCollectorsのクラスメソッドで生成することができます。代表的なのはListを生成するためのCollectorを返すCollections.toList()メソッドです。
Supplier<R> supplier, BiConsumer<R, T> accumulator, BiConsumer<R, R> combiner R reduce()の3つ引数を取る版と似ていますが、第一引数は単位元ではなく 集合的な結果を格納する結果コンテナを生成して返すSupplierになります。2番目の引数は累積関数、3番目の引数は結合関数です。
toArray void Object[] Object 配列を返します。
IntFunction<A[]> generator A[] A 型の配列を返します。generatorは引数がint、戻り値がA[]の関数ですが、generatorの引数には生成する配列の要素数が渡されます。

副作用、判定、検索、リダクションのcount()、max()、min()等は直感的に理解し易いと思いますが、reduce()とcollect()については次で補足します。reduce()とcollect()の違いは 集約した結果を返すか、集合的な結果コンテナを返すかです

 

reduce:リダクション操作

reduce()は集約した結果を返します。reduce()は3つの形でオーバーロードされています。

  1. Optional<T>  reduce(BinaryOperator<T> accumulator);
  2. T  reduce(T identity,  BinaryOperator<T> accumulator);
  3. <U> U  reduce(U identity,  BiFunction<U, T, U> accumulator,  BinaryOperator<U> combiner);

それぞれ詳しく見て行きます。

1番目のreduce()

Optional<T>  reduce(BinaryOperator<T> accumulator);

1番目は累積関数accumulatorを指定します。累積関数はそれまでの部分的な結果と要素を引数にして 部分的な結果を返します。単純に加算するだけの例を挙げて動作を確認します。

動作確認用メソッドdebug()は次の通りです。

実行結果は次の通りです。(// 以降はコメントです)

順次ストリームを並列ストリームに変更した場合、実行スレッドや順番は不定となりますが 例えば次のようになりました。

 

2番目のreduce()

T  reduce(T identity,  BinaryOperator<T> accumulator);

続いて2番目のreduce()を見てみます。2番目のreduce()は単位元identityと累積関数accumulatorを指定します。順次ストリームの場合 identityは初期値のように思えてしまいますが、初期値ではなく単位元である必要があります単位元とは要素 tとすると 全ての要素について accumulator.apply(identity, t) がtと等しくならなければなりません。例えばaccumulatorが単純な足し算だとすると、全ての要素について identity + t = t が成立するにはidentityは0である必要があります。また、accumulatorが単純な掛け算だとすると、全ての要素について identity * t = t が成立するにはidentityは1である必要があります。つまり、identityは単純な初期値として任意な値を指定できるわけではないということです。

ストリームが空の場合identityが戻り値となりますので、戻り値はOptionalではなくなります。

1番目と同様に単純な例で動作を確認します。debug()メソッドは前述と同様です。

実行結果は次の通りです。

並列ストリームに変更した場合、実行スレッドや順番は不定となりますが 例えば次のようになりました。

identityは何度も引数として利用されていることがわかります。identityを初期値と誤解して 単位元以外の値を指定しまうと、並列ストリームの場合に意図しない結果となってしまいます。この例では全てのaccumulatorの引数がidentityと各要素になっていますが、これはワーカスレッド数に対してストリームの要素が少ないためで 要素の数が増えると部分的な結果と要素が渡されるようになります。

 

3番目のreduce()

<U> U  reduce(U identity,  BiFunction<U, T, U> accumulator,  BinaryOperator<U> combiner);

最後に3番目のreduce()を見てみます。3番目のreduce()は単位元identityと累積関数accumulatorおよび 結合関数combinerを指定します。また、accumulatorは1番目、2番目のreduce()のBinaryOperator<T>とは異なり、BiFunction<U, T, U>になります。つまり3番目のreduce()は型変換を伴うことができます。(T型とU型は同じ型でも構いません。)accumulatorの引数Uはidentityまたは部分的な結果、引数Tはストリームの要素となります。combinerは並列ストリームの場合に登場し、2つの引数Uは部分的な結果となります。各スレッドが accumulatorで算出した結果をまとめる役割を果たします。

1番目、2番目と同様に単純な例で動作を確認してみます。

実行結果は次の通りです。2番目のreduce()と同じ動作となりました。

並列ストリームに置き換えてみます。実行スレッドや順番は不定となりますが 例えば次のようになりました。

並列ストリームにするとcombinerが登場してきて、部分的な結果の結合の役割を果たしているのが確認できます。

 

リダクション操作count()、max()、min()、sum()、average()、summaryStatistics()は、良く使われる用途にreduce()を特化したものになります。

 

collect:可変リダクション操作

collect()は 集合的な結果コンテナを返します。collect()は2つの形でオーバーロードされています。

  1. <R, A> R  collect(Collector<T, A, R> collector);
  2. <R> R  collect(Supplier<R> supplier,  BiConsumer<R, T> accumulator,  BiConsumer<R, R> combiner);

1番目のcollect()

<R, A> R  collect(Collector<T, A, R> collector);

1番目collect()の引数はCollectorで Collectorインタフェースを実装したクラスを渡すことになります。Collector インタフェースでは次の抽象メソッドが定義されています。

  • Supplier<A>  supplier();
  • BiConsumer<A, T>  accumulator();
  • BinaryOperator<A>  combiner();
  • Function<A, R>  finisher();
  • Set<Collector.Characteristics>  characteristics();

java.util.stream.Collectorsに用意されているクラスメソッドで 汎用的なCollectorを利用することができます。

Collectorsが提供するCollector

java.util.stream.Collectorsには良く使われるCollectorを返すクラスメソッドが用意されています。主な物を次にまとめました。尚、ストリームパイプラインのコードが読み易くなるように 慣習的にCollectorsの全てのメンバをstaticインポートしてメソッド名だけで指定できるようにします。

Collectorを返すCollectorsの主なクラスメソッド
メソッド 引数 概要
toList void ストリームの要素を格納したListを返します。
toMap Function<T, K> keyMapper,
Function<T, U> valueMapper
引数で指定された要素をキーに変換する関数keyMapperと値に変換する関数valueMapperにしたがって生成したMapを返します。keyMapperによって変換されたキーが重複する場合は IllegalStateExceptionが発生します
toMap Function<T, K> keyMapper,
Function<T, U> valueMapper,
BinaryOperator<U> mergeFunction
keyMapperとvalueMapperに加えて、キーが重複した場合のマージ関数mergeFunctionを指定することができます。keyMapperによって変換されたキーが重複する場合はmergeFunctionによって値がマージされます
toMap Function<T, K> keyMapper,
Function<T, U> valueMapper,
BinaryOperator<U> mergeFunction,
Supplier<M> mapFactory
上のメソッドの3つの関数に加えて、任意のMapを生成できるように Mapを生成するSupplier mapFactoryを指定することができます。
joining void Charsequence(StringやStringBuilderなど)のストリーム用。文字列を連結して返します。
joining CharSequence delimiter 文字列連結の際の区切り文字delimiterを指定できます。
joining CharSequence delimiter, CharSequence prefix, CharSequence suffix delimiterに加え接頭語prefix、接尾語suffixを指定することができます。
summingXXX ToXXXFunction<T> mapper XXXはプリミティブ型(Int、Long、Double)が入ります。要素をプリミティブ型に変換する関数mapperを指定して変換後の値の合計を返します。reduce()でも同じことを実現できますが、型変換を伴うためidentity、accumulator、combinerの3つを引数にとるreduce()を使う必要があり、こちらの方が簡単です。
averagingXXX ToXXXFunction<T> mapper XXXはプリミティブ型(Int、Long、Double)が入ります。要素をプリミティブ型に変換する関数mapperを指定して変換後の値の平均値を返します。同じことをreduce()でも実現できますが、こちらの方が簡単です。
summarizingXXX ToXXXFunction<T> mapper XXXはプリミティブ型(Int、Long、Double)が入ります。要素をプリミティブ型に変換する関数mapperを指定して変換後の値の統計サマリ(カウント、合計、平均、最大、最小)を返します。
partitioningBy Predicate<T> predicate ストリームの要素を指定した条件predicateがtrueを返すグループとfalseを返すグループに分けます。戻り値はMapの形で、キーはboolean、値は該当するグループの要素のListです。
partitioningBy Predicate<T> predicate,
Collector<T, A, D> downstream
ストリームの要素をpredicateがtrueを返すグループとfalseを返すグループに分け、各グループの要素をdownstreamに従って集約します。戻り値はMapの形で、キーはboolean、値は該当するグループの要素をdownstreamで集約した値です。
groupingBy Function<T, K> classifier ストリームの要素を指定した関数classifierに従ってグループ分けします。戻り値はMapの形で、キーはclassifierが返す値、値は該当するグループの要素のListです。
groupingBy Function<T, K> classifier,
Collector<T, A, D> downstream
ストリームの要素をclassifierに従ってグループ分けして、各グループの要素をdownstreamに従って集約します。戻り値はMapの形で、キーはclassifierが返す値、値は該当するグループの要素をdownstreamで集約した値です。
groupingBy Function<T, K> classifier,
Supplier<M> mapFactory,
Collector<T, A, D> downstream
上のメソッドに加えて、任意のMapを生成できるようにMap生成の Supplier mapFactoryを指定することができます。

Collectorsが返すCollectorを使えば、汎用的な集約やグルーピングを実現することができます。

Collectorsが提供する汎用的な集約以外を実現したい場合は 独自のCollectorを実装するか、次のcollect()メソッドを使うことになります。

2番目のcollect()

<R> R  collect(Supplier<R> supplier,  BiConsumer<R, T> accumulator,  BiConsumer<R, R> combiner);

2番目のcollect()の引数にはCollectorインタフェースの各抽象メソッドと同じ関数オブジェクトを当てはめることができることから、Collectorインタフェースを理解できれば 2番目のcollect()の引数の意味を理解することができます。そのため、ここではCollectorインタフェースの各抽象メソッドについて説明します。

Collectorインタフェースの各抽象メソッドの役割は次の通りです。

  • supplierは collect()の戻り値となる 結果コンテナのインスタンス(Listや配列など)を生成します。
  • accumulatorは 結果コンテナにストリームの要素を累積します。
  • combinerは 並列ストリームで登場し、部分的な結果コンテナ同士を結合します。
  • finisherは 結果オブジェクトの変換を行います。

Collectors.toList()が返すCollectorの動作を模擬したCollectorを実装して動作を見てみます。まずはCollectorを実装したクラスの定義です。

続いて上で定義したCollectorImplを使った例です。

実行結果は次の通りです。

順次ストリームの場合はaccumulatorに部分的な結果コンテナと要素が渡されていきます。並列ストリームに変更すると次のようになりました。

並列ストリームにするとcombinerが登場してきます。accumulatorにはsupplierが生成した結果コンテナと要素が渡されます。ワーカスレッドに対して要素の数が大きければaccumulatorには部分的な結果コンテナと要素が渡されていきます。combinerには部分的な結果コンテナが渡されます。

 

並列処理

ストリームでは 順次処理と並列処理を容易に切替えることができますが、並列化によって必ずしも処理性能の向上がもたらされるわけではありません。ある程度の条件が揃わないと並列化のメリットを享受することができず、場合によっては 性能劣化につながってしまうこともあります。

始めに 並列化に向く場合と 向かない場合を見ていきます。続いて、ステートフルな操作において順序性が性能に与える影響を見ていきます。

 

並列化によって性能向上が見込める場合

並列化による性能向上が見込めるのは 次のような条件に合致する場合になります。

  • ストリームのソースが 正確に低いコストで分割可能であること
    ストリームを並列処理する場合は ストリームを複数に分割して処理を行います。そのため 分割が正確に低いコストで行えることが一つのポイントになります。分割が正確に低いコストで行えるソースは ArrayList、HashMap、HashSet、ConcurrentHashMap、配列、intの範囲(IntStream.range()等)、longの範囲(LongStream.range()等)が該当します。逆に 例えばStream.iterateの場合は 分割する際にストリームの要素を走査しないといけないため分割コストが高くつきます。
  • 終端操作が並列化可能でコストが低いこと
    どんなに中間操作で並列化の効果があっても 終端操作が並列に行えず かつ終端操作の割合がストリームパイプライン全体の多くを占めるような場合は 並列化のメリットはありません。
    並列化可能でコストが低い終端操作としてはリダクション(min()、max()、count()、sum()、reduce()等)や 短絡操作である判定・検索(anyMatch()、allMatch()、noneMatch()等)が挙げられます。可変リダクション(collect())はコレクションをまとめるオーバーヘッドが大きいため 並列化にはあまり向きません。
  • ステートレスな操作で順序性の確保が不要であること
    ステートレスな操作であれば 並列に処理を行っているスレッド間での同期が必要ないため 並列化に適しています。ステートフルな操作が全て並列化に向かないというわけではないのですが、sorted()のように他のスレッドのデータの待ち合わせを行わないといけない操作は並列化には向きません。
    また、順序性の確保が必要な場合、ステートフルな操作は他のスレッドのデータの待ち合わせが必要になるため並列化には向きません。
  • ストリームパイプラインで処理するデータの量と処理量がある程度大きいこと
    ストリームを分割して処理するには ある程度のコストが掛かります。そのため、データの数が少なかったり 各要素に対する処理が少ないと 並列化にかかるコストに見合う性能向上が見込めません。Effective Javaの中では『大雑把な見積もりとして ストリーム中の要素数と、一つの要素ごとに実行されるコードのステップ数(ループの場合は繰り返した数を乗算)の積が、少なくとも10万であるべき』という指針が示されています。

 

 

 

ステートフル操作による実行順番

ストリームのソースが順序性を持つ物(Listや配列、ファイルなど)であれば、順次ストリーム・並列ストリームのどちらで実行しても要素の並び順が変わらないように、内部的に順番を管理しています。HashSetのようにストリームのソースが順序性を持たない物は、そもそも順番が不定なため内部で順番を気にする必要はありません。

ストリームのソースが順序性を持つ場合、ストリームを並列化しても結果の順序が入れ替わっていないことを確認してみます。

peek()でスレッドID等を出力することでparallel()によりマルチスレッドで実行されていることが確認できます。続いて、作成されたリストの要素の順番が変更されていないことが確認できます。

もう少し詳しく見てみます。各操作の間にデバグ用のpeek()メソッドを追加してみます。

map()はステートレスな操作なので順序性を気にせず実行されます。マルチスレッドでpeek1(mapの前)、peek2(map~skip) のデバグメッセージが順序入り乱れて出力されますが、peek3(skip~limit)と peek4(limit~collect)のメッセージはそれぞれまとめて実行されていることが分かります。これは、skip()やlimit()等のステートフルな操作は 順序性を確保するために全要素を待ち合わせてから操作を行っているためです。順次ストリームの場合のsorted()と同じようになってしまい、このような操作を挟むと並列処理のメリットがなくなってしまいます。

そうなると、無限ストリームに対してskip()やlimit()を適用すると 無限ループになって処理が終わらなくなってしまいそうですが、skip()の場合はその通りで処理は終わりません。一方でlimit()の場合は余分に入力を読むことにはなりますが、処理は終わります。skip()とlimit()では無限ストリームに対する処理の仕方に違いがあるようです。

それではステートフルな操作は並列処理で意味を成さないのかと言うと そうでもなく、順序性を確保するという制約を除くことができれば、並列処理のメリットを享受することができます。順序性確保の制約を外すには、unordered()を呼び出します。

これにより、peek1(mapの前)、peek2(map~skip)、peek3(skip~limit)、peek4(limit~collect) のメッセージが入り乱れていてマルチスレッドで並列に処理されている様子が確認できます。ただし、順序性を確保しないことにより メソッドの機能が変わってしまうことに注意が必要です。例えばlimit()の場合、順序性を確保する場合は「最初のn個」の要素を返しますが、順序性を確保しない場合は「任意のn個」の要素を返すことになり、実行するたびに結果が変わる 非決定的な動作になってしまいます。

 

尚、 終端操作で良く登場するforEach()は並列ストリームでは順序性を考慮しない仕様のため、並列ストリームで使用すると順不同に実行されます。並列ストリームでforEach()の実行順序を確保したい場合は forEach()の代わりにforEachOrdered()を使います。

 

 

Stream」(エンタープライズギークス)
タイトルとURLをコピーしました