Java 8 の Stream
Java 8 の Stream
を試しててアレ?と思った話。
実験
Stream
は Scala の並列コレクションに例えられるので、Scala で
0.until(5).toStream .map({i => println("o:\t" + i); i}) .par .map({i => println("a:\t" + i); i}) .seq .map({i => println("b:\t" + i); i})
みたいなコードを書いてみた。これは実行すると、
o: 0 o: 1 o: 2 o: 3 o: 4 a: 0 a: 3 a: 4 a: 2 a: 1 b: 0 b: 1 b: 2 b: 3 b: 4
のように、.par
と .seq
の間だけが並列に走る。
いい感じである。
さてこれを、Java 8 で直訳したコードは、
IntStream.range(0, 5) .peek(i -> System.out.println("o:\t" + i)) .parallel() .peek(i -> System.out.println("a:\t" + i)) .sequential() .peek(i -> System.out.println("b:\t" + i));
のようになる。これを実行すると、
アレ?何も出ない。
実は、Stream#forEach
などの terminal operation
と呼ばれる種類のメソッドを呼ばないとパイプラインをつなぐだけで何も実行されないのである。
なので、コードを変更して、
final IntStream is = IntStream.range(0, R) .peek(i -> System.out.println("o:\t" + i)) .parallel() .peek(i -> System.out.println("a:\t" + i)) .sequential() .peek(i -> System.out.println("b:\t" + i)); System.out.println("hoge"); is.forEach(i -> System.out.println("c:\t" + i));
のようにして、実行してみよう。
hoge o: 0 a: 0 b: 0 c: 0 o: 1 a: 1 b: 1 c: 1 o: 2 : (略) : c: 4
なんかストリーミング処理のようになっているけれど、順番が変になっているところがなくて、 全然並列化されていない模様。期待とは違う感じである。
悲しい。 使用方法ぜんぜん違う。
そもそも何がしたかったのか
fold
の中間結果を保存するリストを生成するような scan
みたいに
stateful なリスト・ストリーム処理があるが、
これを split と combine で表現するのは自明では上にできないと考えられている場合もある。
そういったときには、逐次的に処理する必要がある。
なんだけど、あらかじめ用意されている terminal operation で、
並列ストリームでも逐次的に実行してくれるのは Stream#forEachOrdered
以外に見当たらない。
なので、直前に逐次に戻そうと思ったのにうまくいかない。
なぜなんだーという話である。
結局のところ、parallel とか sequential とか (unordered とか) といった性質が
同じストリームにつなぎ続ける限り、全体に伝搬してしまうのでストリームを作りなおすと良い。
それは Stream#reduce
で書ける。
例えば、IntStream
なら以下のようにするとよい。
.mapToObj(i -> IntStream.of(i)) .reduce((stream1, stream2) -> IntStream.concat(stream1, stream2)) .orElse(IntStream.empty()) // 空のときの場合分けで返り値が Optional なので単位元を渡す
*1
これだと List
とかに変換してしまう場合と違って、
終わったところから順次とかいうのにも対応できる。
感想
一応何とかなったけど、リスト処理のかゆいところに手が届かない感というか。 バルク処理の必要に駆られて入れたけど、付け焼刃という感じがする。 びみょい。
あと stateful な forEachOrdered ほしい。つまり BiConsumer をとるやつ。
*1:このコード、微妙な変更で Stream が既に閉じられているとかいう例外が頻発してつらい。。中途半端に mutable にするな!