From d5231e014a58c98b28c88cd9e1c2dd7cb11306b3 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sun, 7 Jan 2024 18:16:25 +0800 Subject: [PATCH] feat:Add onErrorComplete stream operator. --- .../Source-or-Flow/onErrorComplete.md | 29 ++++++ .../main/paradox/stream/operators/index.md | 2 + .../apache/pekko/stream/javadsl/FlowTest.java | 73 ++++++++++++++ .../scaladsl/FlowOnErrorCompleteSpec.scala | 98 +++++++++++++++++++ .../org/apache/pekko/stream/impl/Stages.scala | 1 + .../apache/pekko/stream/javadsl/Flow.scala | 52 ++++++++++ .../apache/pekko/stream/javadsl/Source.scala | 52 ++++++++++ .../apache/pekko/stream/javadsl/SubFlow.scala | 51 ++++++++++ .../pekko/stream/javadsl/SubSource.scala | 51 ++++++++++ .../apache/pekko/stream/scaladsl/Flow.scala | 40 ++++++++ 10 files changed, 449 insertions(+) create mode 100644 docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorComplete.md create mode 100644 stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorCompleteSpec.scala diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorComplete.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorComplete.md new file mode 100644 index 00000000000..e783e857265 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorComplete.md @@ -0,0 +1,29 @@ +# onErrorComplete + +Allows completing the stream when a upstream error occur. + +@ref[Error handling](../index.md#error-handling) + +## Signature + +@apidoc[Source.onErrorComplete](Source) { scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(java.util.function.Predicate)" } +@apidoc[Source.onErrorComplete](Source) { scala="#onErrorComplete%5BT%20%3C%3A%20Throwable%5D()(implicit%20tag%3A%20ClassTag%5BT%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(java.lang.Class)" } +@apidoc[Flow.onErrorComplete](Flow) { scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(java.util.function.Predicate)" } +@apidoc[Flow.onErrorComplete](Flow) { scala="#onErrorComplete%5BT%20%3C%3A%20Throwable%5D()(implicit%20tag%3A%20ClassTag%5BT%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(java.lang.Class)" } + +## Description + +Allows to complete the stream when an upstream error occur. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** element is available from the upstream + +**backpressures** downstream backpressures + +**completes** upstream completes or upstream failed with exception this operator can handle + +**Cancels when** downstream cancels +@@@ \ No newline at end of file diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index 28be441d189..e7d4748d44b 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -363,6 +363,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) | |Operator|Description| |--|--|--| |Source/Flow|@ref[mapError](Source-or-Flow/mapError.md)|While similar to `recover` this operators can be used to transform an error signal to a different one *without* logging it as an error in the process.| +|Source/Flow|@ref[onErrorComplete](Source-or-Flow/onErrorComplete.md)|Allows completing the stream when a upstream error occur.| |RestartSource|@ref[onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)|Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Source] will not restart on completion of the wrapped flow.| |RestartFlow|@ref[onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)|Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Flow] will not restart on completion of the wrapped flow.| |Source/Flow|@ref[recover](Source-or-Flow/recover.md)|Allow sending of one last element downstream when a failure has happened upstream.| @@ -532,6 +533,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [never](Source/never.md) * [never](Sink/never.md) * [onComplete](Sink/onComplete.md) +* [onErrorComplete](Source-or-Flow/onErrorComplete.md) * [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md) * [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md) * [orElse](Source-or-Flow/orElse.md) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index f9a8e2306ea..f886cfbd0d0 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -1165,6 +1165,79 @@ public void mustBeAbleToRecoverWithRetriesClass() throws Exception { future.toCompletableFuture().get(3, TimeUnit.SECONDS); } + @Test + public void mustBeAbleToOnErrorComplete() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new RuntimeException("ex"); + } else { + return elem; + } + }) + .onErrorComplete() + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + + @Test + public void mustBeAbleToOnErrorCompleteWithDedicatedException() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("ex"); + } else { + return elem; + } + }) + .onErrorComplete(IllegalArgumentException.class) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + + @Test + public void mustBeAbleToFailWhenExceptionTypeNotMatch() { + final IllegalArgumentException ex = new IllegalArgumentException("ex"); + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw ex; + } else { + return elem; + } + }) + .onErrorComplete(TimeoutException.class) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectError(ex); + } + + @Test + public void mustBeAbleToOnErrorCompleteWithPredicate() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("Boom"); + } else { + return elem; + } + }) + .onErrorComplete(ex -> ex.getMessage().contains("Boom")) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + @Test public void mustBeAbleToMapErrorClass() { final String head = "foo"; diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorCompleteSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorCompleteSpec.scala new file mode 100644 index 00000000000..338669f2700 --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorCompleteSpec.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.scaladsl + +import org.apache.pekko.stream.testkit.StreamSpec +import org.apache.pekko.stream.testkit.scaladsl.TestSink + +import scala.concurrent.TimeoutException +import scala.util.control.NoStackTrace + +class FlowOnErrorCompleteSpec extends StreamSpec { + val ex = new RuntimeException("ex") with NoStackTrace + + "A CompleteOn" must { + "can complete with all exceptions" in { + Source(List(1, 2)) + .map { a => + if (a == 2) throw ex else a + } + .onErrorComplete[Throwable]() + .runWith(TestSink[Int]()) + .request(2) + .expectNext(1) + .expectComplete() + } + + "can complete with dedicated exception type" in { + Source(List(1, 2)) + .map { a => + if (a == 2) throw new IllegalArgumentException() else a + } + .onErrorComplete[IllegalArgumentException]() + .runWith(TestSink[Int]()) + .request(2) + .expectNext(1) + .expectComplete() + } + + "can fail if an unexpected exception occur" in { + Source(List(1, 2)) + .map { a => + if (a == 2) throw new IllegalArgumentException() else a + } + .onErrorComplete[TimeoutException]() + .runWith(TestSink[Int]()) + .request(1) + .expectNext(1) + .request(1) + .expectError() + } + + "can complete if the pf is applied" in { + Source(List(1, 2)) + .map { a => + if (a == 2) throw new TimeoutException() else a + } + .onErrorComplete { + case _: IllegalArgumentException => false + case _: TimeoutException => true + } + .runWith(TestSink[Int]()) + .request(2) + .expectNext(1) + .expectComplete() + } + + "can fail if the pf is not applied" in { + Source(List(1, 2)) + .map { a => + if (a == 2) throw ex else a + } + .onErrorComplete { + case _: IllegalArgumentException => false + case _: TimeoutException => true + } + .runWith(TestSink[Int]()) + .request(2) + .expectNext(1) + .expectError() + } + + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index c367fdba695..8ec7409dc30 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -92,6 +92,7 @@ import pekko.stream.Attributes._ val mergePrioritized = name("mergePrioritized") val flattenMerge = name("flattenMerge") val recoverWith = name("recoverWith") + val onErrorComplete = name("onErrorComplete") val broadcast = name("broadcast") val wireTap = name("wireTap") val balance = name("balance") diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 8aaf3a45eee..5c2784e21a8 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -1907,6 +1907,58 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr case elem if clazz.isInstance(elem) => supplier.get() }) + /** + * onErrorComplete allows to complete the stream when an upstream error occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or failed with exception is an instance of the provided type + * + * '''Cancels when''' downstream cancels + */ + def onErrorComplete(): javadsl.Flow[In, Out, Mat] = onErrorComplete(classOf[Throwable]) + + /** + * onErrorComplete allows to complete the stream when an upstream error occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or failed with exception is an instance of the provided type + * + * '''Cancels when''' downstream cancels + */ + def onErrorComplete(clazz: Class[_ <: Throwable]): javadsl.Flow[In, Out, Mat] = + onErrorComplete(ex => clazz.isInstance(ex)) + + /** + * onErrorComplete allows to complete the stream when an upstream error occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or failed with predicate return ture + * + * '''Cancels when''' downstream cancels + */ + def onErrorComplete(predicate: java.util.function.Predicate[_ >: Throwable]): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.onErrorComplete { + case ex: Throwable if predicate.test(ex) => true + }) + /** * Terminate processing (and cancel the upstream publisher) after the given * number of elements. Due to input buffering some elements may have been diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 2e6db545893..c70ff1c6c3b 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -2382,6 +2382,58 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ case elem if clazz.isInstance(elem) => supplier.get() }: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]) + /** + * onErrorComplete allows to complete the stream when an upstream error occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or failed with exception is an instance of the provided type + * + * '''Cancels when''' downstream cancels + */ + def onErrorComplete(): javadsl.Source[Out, Mat] = onErrorComplete(classOf[Throwable]) + + /** + * onErrorComplete allows to complete the stream when an upstream error occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or failed with exception is an instance of the provided type + * + * '''Cancels when''' downstream cancels + */ + def onErrorComplete(clazz: Class[_ <: Throwable]): javadsl.Source[Out, Mat] = + onErrorComplete(ex => clazz.isInstance(ex)) + + /** + * onErrorComplete allows to complete the stream when an upstream error occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or failed with predicate return ture + * + * '''Cancels when''' downstream cancels + */ + def onErrorComplete(predicate: java.util.function.Predicate[_ >: Throwable]): javadsl.Source[Out, Mat] = + new Source(delegate.onErrorComplete { + case ex: Throwable if predicate.test(ex) => true + }) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 93942830b94..553b413c556 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -1166,6 +1166,57 @@ class SubFlow[In, Out, Mat]( pf: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]): SubFlow[In, Out, Mat] = new SubFlow(delegate.recoverWithRetries(attempts, pf)) + /** + * onErrorComplete allows to complete the stream when an upstream error occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or failed with exception is an instance of the provided type + * + * '''Cancels when''' downstream cancels + */ + def onErrorComplete(): SubFlow[In, Out, Mat] = onErrorComplete(classOf[Throwable]) + + /** + * onErrorComplete allows to complete the stream when an upstream error occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or failed with exception is an instance of the provided type + * + * '''Cancels when''' downstream cancels + */ + def onErrorComplete(clazz: Class[_ <: Throwable]): SubFlow[In, Out, Mat] = onErrorComplete(ex => clazz.isInstance(ex)) + + /** + * onErrorComplete allows to complete the stream when an upstream error occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or failed with predicate return ture + * + * '''Cancels when''' downstream cancels + */ + def onErrorComplete(predicate: java.util.function.Predicate[_ >: Throwable]): SubFlow[In, Out, Mat] = + new SubFlow(delegate.onErrorComplete { + case ex: Throwable if predicate.test(ex) => true + }) + /** * While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index 14c441f3e49..75877459428 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -1146,6 +1146,57 @@ class SubSource[Out, Mat]( pf: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]): SubSource[Out, Mat] = new SubSource(delegate.recoverWithRetries(attempts, pf)) + /** + * onErrorComplete allows to complete the stream when an upstream error occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or failed with exception is an instance of the provided type + * + * '''Cancels when''' downstream cancels + */ + def onErrorComplete(): SubSource[Out, Mat] = onErrorComplete(classOf[Throwable]) + + /** + * onErrorComplete allows to complete the stream when an upstream error occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or failed with exception is an instance of the provided type + * + * '''Cancels when''' downstream cancels + */ + def onErrorComplete(clazz: Class[_ <: Throwable]): SubSource[Out, Mat] = onErrorComplete(ex => clazz.isInstance(ex)) + + /** + * onErrorComplete allows to complete the stream when an upstream error occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or failed with predicate return ture + * + * '''Cancels when''' downstream cancels + */ + def onErrorComplete(predicate: java.util.function.Predicate[_ >: Throwable]): SubSource[Out, Mat] = + new SubSource(delegate.onErrorComplete { + case ex: Throwable if predicate.test(ex) => true + }) + /** * While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index e26f48837e9..7ef64b52b11 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -929,6 +929,46 @@ trait FlowOps[+Out, +Mat] { pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] = via(new RecoverWith(attempts, pf)) + /** + * onErrorComplete allows to complete the stream when an upstream error occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or failed with exception is an instance of the provided type + * + * '''Cancels when''' downstream cancels + */ + def onErrorComplete[T <: Throwable]()(implicit tag: ClassTag[T]): Repr[Out] = onErrorComplete { + case ex if tag.runtimeClass.isInstance(ex) => true + } + + /** + * onErrorComplete allows to complete the stream when an upstream error occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + */ + def onErrorComplete(pf: PartialFunction[Throwable, Boolean]): Repr[Out] = + via( + Flow[Out] + .recoverWith(pf.andThen({ + case true => Source.empty[Out] + }: PartialFunction[Boolean, Graph[SourceShape[Out], NotUsed]])) + .withAttributes(DefaultAttributes.onErrorComplete and SourceLocation.forLambda(pf))) + /** * While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover