Skip to content

Commit 8150425

Browse files
authored
Fix Incorrect Calculation of Cumulative Demand in Duplicator (#6150)
Motivation: The cumulative demand in a duplicator is mistakenly calculated, causing it to reach the maximum value incorrectly. Modifications: - Corrected the calculation logic for cumulative demand in the duplicator. Result: - The cumulative demand in a duplicator is correctly calculated.
1 parent c020c0d commit 8150425

File tree

5 files changed

+70
-20
lines changed

5 files changed

+70
-20
lines changed

core/src/main/java/com/linecorp/armeria/common/stream/DefaultStreamMessageDuplicator.java

+3-17
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ void doRequestDemand(long cumulativeDemand) {
340340
}
341341

342342
final long delta = cumulativeDemand - requestedDemand;
343-
requestedDemand += delta;
343+
requestedDemand = cumulativeDemand;
344344
upstreamSubscription.request(delta);
345345
}
346346

@@ -658,18 +658,12 @@ public void request(long n) {
658658
return;
659659
}
660660

661-
accumulateDemand(n);
661+
cumulativeDemand = LongMath.saturatedAdd(cumulativeDemand, n);
662662
processor.requestDemand(cumulativeDemand);
663663

664664
for (;;) {
665665
final long oldDemand = demand;
666-
final long newDemand;
667-
if (oldDemand >= Long.MAX_VALUE - n) {
668-
newDemand = Long.MAX_VALUE;
669-
} else {
670-
newDemand = oldDemand + n;
671-
}
672-
666+
final long newDemand = LongMath.saturatedAdd(oldDemand, n);
673667
if (demandUpdater.compareAndSet(this, oldDemand, newDemand)) {
674668
if (oldDemand == 0) {
675669
signal();
@@ -679,14 +673,6 @@ public void request(long n) {
679673
}
680674
}
681675

682-
private void accumulateDemand(long n) {
683-
if (n == Long.MAX_VALUE || Long.MAX_VALUE - n >= cumulativeDemand) {
684-
cumulativeDemand = Long.MAX_VALUE;
685-
} else {
686-
cumulativeDemand += n;
687-
}
688-
}
689-
690676
void signal() {
691677
if (downstreamExecutor.inEventLoop()) {
692678
doSignal();

core/src/main/java/com/linecorp/armeria/common/stream/DeferredStreamMessage.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.reactivestreams.Subscriber;
3232
import org.reactivestreams.Subscription;
3333

34+
import com.google.common.math.LongMath;
35+
3436
import com.linecorp.armeria.common.annotation.Nullable;
3537
import com.linecorp.armeria.common.annotation.UnstableApi;
3638
import com.linecorp.armeria.common.util.CompletionActions;
@@ -273,7 +275,7 @@ private void doRequest(long n) {
273275
if (upstreamSubscription != null) {
274276
upstreamSubscription.request(n);
275277
} else {
276-
pendingDemand += n;
278+
pendingDemand = LongMath.saturatedAdd(pendingDemand, n);
277279
}
278280
}
279281

core/src/main/java/com/linecorp/armeria/internal/common/SplitHttpMessageSubscriber.java

+1
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ private void onNext0(HttpData httpData) {
234234
if (!usePooledObject) {
235235
httpData = PooledObjects.copyAndClose(httpData);
236236
}
237+
final Subscriber<? super HttpData> downstream = this.downstream;
237238
assert downstream != null;
238239
downstream.onNext(httpData);
239240
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2025 LINE Corporation
3+
*
4+
* LINE Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.linecorp.armeria.common.stream;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.awaitility.Awaitility.await;
20+
21+
import java.util.concurrent.atomic.AtomicLong;
22+
23+
import org.junit.jupiter.api.Test;
24+
import org.reactivestreams.Subscriber;
25+
import org.reactivestreams.Subscription;
26+
27+
final class DuplicatorCumulativeDemandTest {
28+
29+
@Test
30+
void cumulativeDemand() throws InterruptedException {
31+
final AtomicLong demand = new AtomicLong();
32+
final PublisherBasedStreamMessage<Integer> streamMessage = new PublisherBasedStreamMessage<>(
33+
s -> s.onSubscribe(new Subscription() {
34+
@Override
35+
public void request(long n) {
36+
demand.addAndGet(n);
37+
}
38+
39+
@Override
40+
public void cancel() {}
41+
}));
42+
streamMessage.toDuplicator().duplicate().subscribe(new Subscriber<Integer>() {
43+
@Override
44+
public void onSubscribe(Subscription s) {
45+
s.request(1);
46+
}
47+
48+
@Override
49+
public void onNext(Integer integer) {}
50+
51+
@Override
52+
public void onError(Throwable t) {}
53+
54+
@Override
55+
public void onComplete() {}
56+
});
57+
await().untilAsserted(() -> assertThat(demand.get()).isEqualTo(1));
58+
Thread.sleep(100);
59+
// The demand is still 1.
60+
assertThat(demand.get()).isEqualTo(1);
61+
}
62+
}

core/src/test/java/com/linecorp/armeria/GracefulShutdownBuilderTest.java core/src/test/java/com/linecorp/armeria/server/GracefulShutdownBuilderTest.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* under the License.
1515
*/
1616

17-
package com.linecorp.armeria;
17+
package com.linecorp.armeria.server;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -25,7 +25,6 @@
2525

2626
import com.linecorp.armeria.common.ShuttingDownException;
2727
import com.linecorp.armeria.internal.testing.AnticipatedException;
28-
import com.linecorp.armeria.server.GracefulShutdown;
2928

3029
class GracefulShutdownBuilderTest {
3130

0 commit comments

Comments
 (0)