1. ホーム
  2. java

JavaストリームでflatMap()の後のfilter()が「完全には」遅延しない理由とは?

2023-08-08 12:57:34

質問

以下のようなサンプルコードがあります。

System.out.println(
       "Result: " +
        Stream.of(1, 2, 3)
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .findFirst()
                .get()
);
System.out.println("-----------");
System.out.println(
       "Result: " +
        Stream.of(1, 2, 3)
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .findFirst()
                .get()
);

出力は以下のようになります。

1
Result: 1
-----------
-1
0
1
0
1
2
1
2
3
Result: -1

ここから、最初のケースである stream は本当に怠惰な振る舞いをします。 findFirst() を使っているので、最初の要素があればフィルタリングラムダは呼び出されません。 しかし、2番目のケースは flatMap を使う2番目のケースでは、フィルタリング条件を満たす最初の要素が見つかったにもかかわらず(ラムダは常に真を返すので、最初の要素は何でもよい)、ストリームのさらなるコンテンツがフィルタリング関数を通して供給され続けていることがわかります。

私は、最初のケースのように最初の要素が計算された後にあきらめるのではなく、なぜこのような動作をするのかを理解しようとしています。 役立つ情報があれば、ありがたく存じます。

どのように解決するのですか?

TL;DR では、この問題に対処するために JDK-8075939 で対処され、Java 10 で修正されました (そして、バックポートされた Java 8 JDK-8225328 ).

実装を調べてみると、( ReferencePipeline.java ) を見ると、メソッド [ リンク ]

@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}

のために呼び出されます。 findFirst の操作のために呼び出されます。特別に注意しなければならないのは sink.cancellationRequested() で、これは最初のマッチでループを終了させることができます。比較対象は [ リンク ]

@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
    Objects.requireNonNull(mapper);
    // We can do better than this, by polling cancellationRequested when stream is infinite
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    try (Stream<? extends R> result = mapper.apply(u)) {
                        // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
                        if (result != null)
                            result.sequential().forEach(downstream);
                    }
                }
            };
        }
    };
}

1つの項目を進めるためのメソッドは、最終的に forEach を呼び出すことになり、それ以前に終了する可能性はありません。 flatMap メソッドの冒頭のコメントは、この機能がないことまで示しています。

これは単なる最適化の問題ではなく、サブストリームが無限になったときにコードが単に壊れてしまうことを意味するので、開発者が早く「これよりも良いことができる」ことを証明してくれることを望みます...。


この意味を説明するために Stream.iterate(0, i->i+1).findFirst() は期待通りに動作します。 Stream.of("").flatMap(x->Stream.iterate(0, i->i+1)).findFirst() は無限ループに陥ります。

仕様については、そのほとんどが

パッケージ仕様の「ストリーム操作とパイプライン」章にあります。 :

...

中間演算は新しいストリームを返します。これらは常に 怠け者 ;

...

1000文字より長い最初の文字列を見つける、といった操作では、ソースから利用可能なすべての文字列を調べることなく、必要な特性を持つものを見つけるために十分な文字列を調べるだけでよいのです。(この動作は、入力ストリームが単に大きいだけでなく、無限である場合にさらに重要になります)。

...

さらに、いくつかの操作は、みなし 短絡的 演算とみなされます。中間演算は、無限の入力を与えられたとき、結果として有限のストリームを生成することができる場合、短絡的である。終端演算は、無限の入力を与えられたときに有限時間で終了する可能性がある場合、短絡的であると言えます。パイプラインに短絡演算があることは、無限ストリームの処理が有限時間で正常に終了するための必要条件であり、十分条件ではありません。

たとえば、フィルターがどの項目にもマッチしない場合、処理は完了できません。しかし、操作の短絡的な性質を無視するだけで、有限時間でのいかなる終了もサポートしない実装は、仕様から大きく外れています。