From fecd517d482207780779745c9c22e91e1be0c7fa Mon Sep 17 00:00:00 2001 From: He-Pin Date: Mon, 18 Dec 2023 12:17:03 +0800 Subject: [PATCH] =str Fold InHandler and outHandler for UniqueBidiKillSwitchStage. --- .../org/apache/pekko/stream/KillSwitch.scala | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/KillSwitch.scala b/stream/src/main/scala/org/apache/pekko/stream/KillSwitch.scala index ca169e6309f..2d7f8a19c3d 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/KillSwitch.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/KillSwitch.scala @@ -118,29 +118,23 @@ object KillSwitches { val logic = new KillableGraphStageLogic(promise.future, shape) { - setHandler(shape.in1, - new InHandler { + setHandlers(shape.in1, shape.out1, + new InHandler with OutHandler { override def onPush(): Unit = push(shape.out1, grab(shape.in1)) override def onUpstreamFinish(): Unit = complete(shape.out1) override def onUpstreamFailure(ex: Throwable): Unit = fail(shape.out1, ex) + override def onPull(): Unit = pull(shape.in1) + override def onDownstreamFinish(cause: Throwable): Unit = cancel(shape.in1, cause) }) - setHandler(shape.in2, - new InHandler { + + setHandlers(shape.in2, shape.out2, + new InHandler with OutHandler { override def onPush(): Unit = push(shape.out2, grab(shape.in2)) override def onUpstreamFinish(): Unit = complete(shape.out2) override def onUpstreamFailure(ex: Throwable): Unit = fail(shape.out2, ex) - }) - setHandler(shape.out1, - new OutHandler { - override def onPull(): Unit = pull(shape.in1) - override def onDownstreamFinish(cause: Throwable): Unit = cancel(shape.in1, cause) - }) - setHandler(shape.out2, - new OutHandler { override def onPull(): Unit = pull(shape.in2) override def onDownstreamFinish(cause: Throwable): Unit = cancel(shape.in2, cause) }) - } (logic, switch)