diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/DefaultStreamMessageDuplicator.java b/core/src/main/java/com/linecorp/armeria/common/stream/DefaultStreamMessageDuplicator.java index 1e07659d434..2f4734c45de 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/DefaultStreamMessageDuplicator.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/DefaultStreamMessageDuplicator.java @@ -340,7 +340,7 @@ void doRequestDemand(long cumulativeDemand) { } final long delta = cumulativeDemand - requestedDemand; - requestedDemand += delta; + requestedDemand = cumulativeDemand; upstreamSubscription.request(delta); } @@ -658,18 +658,12 @@ public void request(long n) { return; } - accumulateDemand(n); + cumulativeDemand = LongMath.saturatedAdd(cumulativeDemand, n); processor.requestDemand(cumulativeDemand); for (;;) { final long oldDemand = demand; - final long newDemand; - if (oldDemand >= Long.MAX_VALUE - n) { - newDemand = Long.MAX_VALUE; - } else { - newDemand = oldDemand + n; - } - + final long newDemand = LongMath.saturatedAdd(oldDemand, n); if (demandUpdater.compareAndSet(this, oldDemand, newDemand)) { if (oldDemand == 0) { signal(); @@ -679,14 +673,6 @@ public void request(long n) { } } - private void accumulateDemand(long n) { - if (n == Long.MAX_VALUE || Long.MAX_VALUE - n >= cumulativeDemand) { - cumulativeDemand = Long.MAX_VALUE; - } else { - cumulativeDemand += n; - } - } - void signal() { if (downstreamExecutor.inEventLoop()) { doSignal(); diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/DeferredStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/DeferredStreamMessage.java index 350608f2531..880eccf5857 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/DeferredStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/DeferredStreamMessage.java @@ -31,6 +31,8 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import com.google.common.math.LongMath; + import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; import com.linecorp.armeria.common.util.CompletionActions; @@ -273,7 +275,7 @@ private void doRequest(long n) { if (upstreamSubscription != null) { upstreamSubscription.request(n); } else { - pendingDemand += n; + pendingDemand = LongMath.saturatedAdd(pendingDemand, n); } } diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/SplitHttpMessageSubscriber.java b/core/src/main/java/com/linecorp/armeria/internal/common/SplitHttpMessageSubscriber.java index 865f39523df..153d90debfa 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/SplitHttpMessageSubscriber.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/SplitHttpMessageSubscriber.java @@ -234,6 +234,7 @@ private void onNext0(HttpData httpData) { if (!usePooledObject) { httpData = PooledObjects.copyAndClose(httpData); } + final Subscriber<? super HttpData> downstream = this.downstream; assert downstream != null; downstream.onNext(httpData); } diff --git a/core/src/test/java/com/linecorp/armeria/common/stream/DuplicatorCumulativeDemandTest.java b/core/src/test/java/com/linecorp/armeria/common/stream/DuplicatorCumulativeDemandTest.java new file mode 100644 index 00000000000..a73de079754 --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/common/stream/DuplicatorCumulativeDemandTest.java @@ -0,0 +1,62 @@ +/* + * Copyright 2025 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.common.stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.jupiter.api.Test; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +final class DuplicatorCumulativeDemandTest { + + @Test + void cumulativeDemand() throws InterruptedException { + final AtomicLong demand = new AtomicLong(); + final PublisherBasedStreamMessage<Integer> streamMessage = new PublisherBasedStreamMessage<>( + s -> s.onSubscribe(new Subscription() { + @Override + public void request(long n) { + demand.addAndGet(n); + } + + @Override + public void cancel() {} + })); + streamMessage.toDuplicator().duplicate().subscribe(new Subscriber<Integer>() { + @Override + public void onSubscribe(Subscription s) { + s.request(1); + } + + @Override + public void onNext(Integer integer) {} + + @Override + public void onError(Throwable t) {} + + @Override + public void onComplete() {} + }); + await().untilAsserted(() -> assertThat(demand.get()).isEqualTo(1)); + Thread.sleep(100); + // The demand is still 1. + assertThat(demand.get()).isEqualTo(1); + } +} diff --git a/core/src/test/java/com/linecorp/armeria/GracefulShutdownBuilderTest.java b/core/src/test/java/com/linecorp/armeria/server/GracefulShutdownBuilderTest.java similarity index 97% rename from core/src/test/java/com/linecorp/armeria/GracefulShutdownBuilderTest.java rename to core/src/test/java/com/linecorp/armeria/server/GracefulShutdownBuilderTest.java index bb866e1f5ab..724e5a27e1f 100644 --- a/core/src/test/java/com/linecorp/armeria/GracefulShutdownBuilderTest.java +++ b/core/src/test/java/com/linecorp/armeria/server/GracefulShutdownBuilderTest.java @@ -14,7 +14,7 @@ * under the License. */ -package com.linecorp.armeria; +package com.linecorp.armeria.server; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -25,7 +25,6 @@ import com.linecorp.armeria.common.ShuttingDownException; import com.linecorp.armeria.internal.testing.AnticipatedException; -import com.linecorp.armeria.server.GracefulShutdown; class GracefulShutdownBuilderTest {