1. ホーム
  2. java

takeWhile() が flatmap と異なる動作をする件

2023-09-14 23:44:35

質問

takeWhileの可能性を探るために、takeWhileを使ったスニペットを作成しています。flatMapと組み合わせて使用した場合、動作が期待通りになりません。以下のコードスニペットをご覧ください。

String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};

Arrays.stream(strArray)
        .flatMap(indStream -> Arrays.stream(indStream))
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
        .forEach(ele -> System.out.println(ele));

実際の出力

Sample1
Sample2
Sample3
Sample5

ExpectedOutputです。

Sample1
Sample2
Sample3

期待する理由は、takeWhileは中の条件がtrueになるまで実行される必要があるからです。また、デバッグのために、flatmapの中にprintout文を追加しています。ストリームは2回だけ返され、これは期待に沿ったものです。

しかし、これはチェーンにflatmapがなくても問題なく動作します。

String[] strArraySingle = {"Sample3", "Sample4", "Sample5"};
Arrays.stream(strArraySingle)
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
        .forEach(ele -> System.out.println(ele));

実際の出力

Sample3

ここでは、実際の出力と期待される出力が一致しています。

免責事項:これらのスニペットはコードの練習用であり、有効なユースケースを提供するものではありません。

更新しました。 バグ JDK-8193856 : 修正は JDK 10 の一部として利用可能になる予定です。 この変更により whileOps Sink::accept

@Override 
public void accept(T t) {
    if (take = predicate.test(t)) {
        downstream.accept(t);
    }
}

実装を変更しました。

@Override
public void accept(T t) {
    if (take && (take = predicate.test(t))) {
        downstream.accept(t);
    }
}

どのように解決するのですか?

これは JDK 9 のバグです。 問題番号 8193856 :

<ブロッククオート

takeWhile は、上流のオペレーションがキャンセルをサポートし、それを尊重することを誤って仮定しています。 flatMap .

説明

ストリームが順序付けされている場合。 takeWhile は期待された動作を示すはずです。これはあなたのコードでは完全にそうなっているわけではありません。 forEach を使っているからです。この例ではそうなっていますが、もし気になるようでしたら forEachOrdered を使うべきです。面白いことに、これでは何も変わりません。????

ということは、そもそもストリームは順序付けされていないのでは?(その場合 の動作は大丈夫です。 .) から作成されたストリームに対して一時変数を作成すると strArray という式を実行し、それが順序付けされているかどうかをチェックする。 ((StatefulOp) stream).isOrdered(); をブレークポイントで実行すると、確かに順序付けされていることがわかります。

String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};

Stream<String> stream = Arrays.stream(strArray)
        .flatMap(indStream -> Arrays.stream(indStream))
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));

// breakpoint here
System.out.println(stream);

つまり、これは実装上のエラーである可能性が非常に高いということです。

コードの中へ

他の人が疑っているように、私も今、この につながっています。 flatMap が熱心であることと関係があるかもしれません。より正確には、両方の問題が同じ根本原因を持っているかもしれません。

の原因を調べてみると WhileOps のソースを見ると、これらのメソッドを見ることができます。

@Override
public void accept(T t) {
    if (take = predicate.test(t)) {
        downstream.accept(t);
    }
}

@Override
public boolean cancellationRequested() {
    return !take || downstream.cancellationRequested();
}

このコードは takeWhile で指定されたストリーム要素をチェックするために t が存在するかどうかを調べます。 predicate が満たされているかどうか。

  • もしそうなら、その要素を downstream 操作、この場合は System.out::println .
  • そうでない場合は take を false に設定し、次回パイプラインをキャンセルするかどうか (つまり終了するかどうか) を尋ねられたときに true .

これは takeWhile の操作になります。他に知っておかなければならないのは forEachOrdered というメソッドを実行する末端のオペレーションを導くことです。 ReferencePipeline::forEachWithCancel :

@Override
final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    boolean cancelled;
    do { } while (
            !(cancelled = sink.cancellationRequested())
            && spliterator.tryAdvance(sink));
    return cancelled;
}

これがやっていることは

  1. パイプラインがキャンセルされたかどうかをチェックする
  2. そうでなければ、シンクを1要素進める
  3. 最後の要素であった場合は停止する

期待できそうでしょう?

なし flatMap

良い場合(を使用しない場合)。 flatMap がない場合(2番目の例) forEachWithCancel を直接操作して WhileOp として sink として、どのように再生されるかを見ることができます。

  • ReferencePipeline::forEachWithCancel はループを行う。
    • WhileOps::accept は各ストリーム要素
    • WhileOps::cancellationRequested は各要素の後にクエリされます
  • ある時点で "Sample4" は述語に失敗し、ストリームはキャンセルされます。

やったーーーー

flatMap

悪い場合( flatMap ;あなたの最初の例)。 forEachWithCancelflatMap の操作を行うが、これは単に forEachRemaining の上に ArraySpliterator に対して {"Sample3", "Sample4", "Sample5"} というようにします。

if ((a = array).length >= (hi = fence) &&
    (i = index) >= 0 && i < (index = hi)) {
    do { action.accept((T)a[i]); } while (++i < hi);
}

をすべて無視し hifence のようなもので、並列ストリームのために配列処理を分割する場合にのみ使用されるもので、これは単純な for ループで、各要素を takeWhile 演算に渡す。 に渡しますが、それがキャンセルされたかどうかをチェックすることはありません。 . そのため、停止する前に、その "substream" のすべての要素を熱心に調べ、おそらくは ストリームの残りを通して .