Skip to content

Commit d387e3b

Browse files
authored
Merge branch 'main' into fix_gson_grpc_json_marshaller_builder
2 parents 889e09c + e0f4682 commit d387e3b

File tree

25 files changed

+974
-425
lines changed

25 files changed

+974
-425
lines changed

build.gradle

+3
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ allprojects {
112112
maxParallelForks = gradle.startParameter.maxWorkerCount
113113
}
114114

115+
// Allow re-running the tests if the DOC_SERVICE_DEMO property changes.
116+
inputs.property('DOC_SERVICE_DEMO', 'true'.equals(project.findProperty('DOC_SERVICE_DEMO')))
117+
115118
// Fail the build at the end if there was any leak.
116119
// We do not fail immediately so that we collect as many leaks as possible.
117120
doFirst {

core/src/main/java/com/linecorp/armeria/client/Http2ResponseDecoder.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,22 @@ private void onWrapperCompleted(HttpResponseWrapper resWrapper, int id, @Nullabl
9292
resWrapper.onSubscriptionCancelled(cause);
9393

9494
if (cause != null) {
95+
if (cause instanceof UnprocessedRequestException ||
96+
cause instanceof ClosedStreamException) {
97+
return;
98+
}
99+
final int streamId = idToStreamId(id);
100+
final Http2Stream stream = conn.stream(streamId);
101+
if (stream == null || !stream.isHeadersSent()) {
102+
return;
103+
}
95104
// Removing the response and decrementing `unfinishedResponses` isn't done immediately
96105
// here. Instead, we rely on `Http2ResponseDecoder#onStreamClosed` to decrement
97106
// `unfinishedResponses` after Netty decrements `numActiveStreams` in `DefaultHttp2Connection`
98107
// so that `unfinishedResponses` is never greater than `numActiveStreams`.
99108

100109
// Reset the stream.
101-
final int streamId = idToStreamId(id);
110+
102111
final int lastStreamId = conn.local().lastStreamKnownByPeer();
103112
if (lastStreamId < 0 || // Did not receive a GOAWAY yet or
104113
streamId <= lastStreamId) { // received a GOAWAY and the request's streamId <= lastStreamId

core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroup.java

-2
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,6 @@ private void destroyOldContexts(HealthCheckContextGroup contextGroup) {
292292

293293
private void updateHealth(Endpoint endpoint, boolean health) {
294294
if (isClosing()) {
295-
logger.debug("HealthCheckedEndpointGroup is closed. Ignoring health update for: {}. (healthy: {})",
296-
endpoint, health);
297295
return;
298296
}
299297

core/src/main/java/com/linecorp/armeria/common/CommonPools.java

+17-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616

1717
package com.linecorp.armeria.common;
1818

19+
import java.lang.reflect.Method;
20+
import java.util.function.Predicate;
21+
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
1925
import com.linecorp.armeria.client.ClientFactoryBuilder;
2026
import com.linecorp.armeria.common.metric.MeterIdPrefix;
2127
import com.linecorp.armeria.common.metric.MoreMeterBinders;
@@ -30,6 +36,8 @@
3036
*/
3137
public final class CommonPools {
3238

39+
private static final Logger logger = LoggerFactory.getLogger(CommonPools.class);
40+
3341
// Threads spawned as needed and reused, with a 60s timeout and unbounded work queue.
3442
private static final BlockingTaskExecutor BLOCKING_TASK_EXECUTOR =
3543
BlockingTaskExecutor.builder().threadNamePrefix("armeria-common-blocking-tasks").build();
@@ -43,11 +51,17 @@ public final class CommonPools {
4351
.bindTo(Flags.meterRegistry());
4452

4553
try {
46-
Class.forName("reactor.core.scheduler.Schedulers",
47-
true, CommonPools.class.getClassLoader());
54+
final Class<?> aClass = Class.forName("reactor.core.scheduler.Schedulers",
55+
true, CommonPools.class.getClassLoader());
56+
final Method ignored = aClass.getDeclaredMethod(
57+
"registerNonBlockingThreadPredicate", Predicate.class);
58+
// Call only when the method exists.
4859
ReactorNonBlockingUtil.registerEventLoopAsNonBlocking();
4960
} catch (ClassNotFoundException e) {
50-
// Do nothing.
61+
// Do nothing because reactor is not available.
62+
} catch (NoSuchMethodException e) {
63+
logger.warn("Failed to register the common worker group as non-blocking for Reactor. " +
64+
"Please consider upgrading Reactor to 3.7.0 or newer.");
5165
}
5266
}
5367

core/src/main/java/com/linecorp/armeria/common/stream/DefaultStreamMessageDuplicator.java

+3-17
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ void doRequestDemand(long cumulativeDemand) {
340340
}
341341

342342
final long delta = cumulativeDemand - requestedDemand;
343-
requestedDemand += delta;
343+
requestedDemand = cumulativeDemand;
344344
upstreamSubscription.request(delta);
345345
}
346346

@@ -658,18 +658,12 @@ public void request(long n) {
658658
return;
659659
}
660660

661-
accumulateDemand(n);
661+
cumulativeDemand = LongMath.saturatedAdd(cumulativeDemand, n);
662662
processor.requestDemand(cumulativeDemand);
663663

664664
for (;;) {
665665
final long oldDemand = demand;
666-
final long newDemand;
667-
if (oldDemand >= Long.MAX_VALUE - n) {
668-
newDemand = Long.MAX_VALUE;
669-
} else {
670-
newDemand = oldDemand + n;
671-
}
672-
666+
final long newDemand = LongMath.saturatedAdd(oldDemand, n);
673667
if (demandUpdater.compareAndSet(this, oldDemand, newDemand)) {
674668
if (oldDemand == 0) {
675669
signal();
@@ -679,14 +673,6 @@ public void request(long n) {
679673
}
680674
}
681675

682-
private void accumulateDemand(long n) {
683-
if (n == Long.MAX_VALUE || Long.MAX_VALUE - n >= cumulativeDemand) {
684-
cumulativeDemand = Long.MAX_VALUE;
685-
} else {
686-
cumulativeDemand += n;
687-
}
688-
}
689-
690676
void signal() {
691677
if (downstreamExecutor.inEventLoop()) {
692678
doSignal();

core/src/main/java/com/linecorp/armeria/common/stream/DeferredStreamMessage.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.reactivestreams.Subscriber;
3232
import org.reactivestreams.Subscription;
3333

34+
import com.google.common.math.LongMath;
35+
3436
import com.linecorp.armeria.common.annotation.Nullable;
3537
import com.linecorp.armeria.common.annotation.UnstableApi;
3638
import com.linecorp.armeria.common.util.CompletionActions;
@@ -273,7 +275,7 @@ private void doRequest(long n) {
273275
if (upstreamSubscription != null) {
274276
upstreamSubscription.request(n);
275277
} else {
276-
pendingDemand += n;
278+
pendingDemand = LongMath.saturatedAdd(pendingDemand, n);
277279
}
278280
}
279281

core/src/main/java/com/linecorp/armeria/internal/common/SplitHttpMessageSubscriber.java

+1
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ private void onNext0(HttpData httpData) {
234234
if (!usePooledObject) {
235235
httpData = PooledObjects.copyAndClose(httpData);
236236
}
237+
final Subscriber<? super HttpData> downstream = this.downstream;
237238
assert downstream != null;
238239
downstream.onNext(httpData);
239240
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2025 LINE Corporation
3+
*
4+
* LINE Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.linecorp.armeria.client;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
21+
import java.time.Duration;
22+
import java.util.concurrent.CompletableFuture;
23+
24+
import org.junit.jupiter.api.Test;
25+
import org.junit.jupiter.api.extension.RegisterExtension;
26+
27+
import com.google.common.base.Strings;
28+
29+
import com.linecorp.armeria.common.AggregatedHttpResponse;
30+
import com.linecorp.armeria.common.HttpMethod;
31+
import com.linecorp.armeria.common.HttpResponse;
32+
import com.linecorp.armeria.common.HttpStatus;
33+
import com.linecorp.armeria.common.RequestHeaders;
34+
import com.linecorp.armeria.server.ServerBuilder;
35+
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
36+
37+
import io.netty.handler.codec.http2.Http2Exception.HeaderListSizeException;
38+
39+
class HeaderListSizeExceptionTest {
40+
41+
@RegisterExtension
42+
static ServerExtension server = new ServerExtension() {
43+
@Override
44+
protected void configure(ServerBuilder sb) {
45+
sb.service("/", (ctx, req) -> HttpResponse.delayed(
46+
() -> HttpResponse.of("OK"), Duration.ofMillis(100)));
47+
}
48+
};
49+
50+
@Test
51+
void doNotSendRstStreamWhenHeaderListSizeExceptionIsRaised() throws InterruptedException {
52+
final CompletableFuture<AggregatedHttpResponse> future = server.webClient().get("/").aggregate();
53+
final String a = Strings.repeat("aa", 10000);
54+
final RequestHeaders headers = RequestHeaders.of(HttpMethod.GET, "/", "foo", "bar",
55+
"baz", a);
56+
assertThatThrownBy(() -> server.webClient().execute(headers).aggregate().join())
57+
.hasCauseInstanceOf(UnprocessedRequestException.class)
58+
.cause()
59+
.hasCauseInstanceOf(HeaderListSizeException.class);
60+
// If the client sends RST_STREAM with invalid stream ID, the server will send GOAWAY back thus
61+
// the first request will be failed with ClosedSessionException.
62+
assertThat(future.join().status()).isSameAs(HttpStatus.OK);
63+
}
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2025 LINE Corporation
3+
*
4+
* LINE Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.linecorp.armeria.common.stream;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.awaitility.Awaitility.await;
20+
21+
import java.util.concurrent.atomic.AtomicLong;
22+
23+
import org.junit.jupiter.api.Test;
24+
import org.reactivestreams.Subscriber;
25+
import org.reactivestreams.Subscription;
26+
27+
final class DuplicatorCumulativeDemandTest {
28+
29+
@Test
30+
void cumulativeDemand() throws InterruptedException {
31+
final AtomicLong demand = new AtomicLong();
32+
final PublisherBasedStreamMessage<Integer> streamMessage = new PublisherBasedStreamMessage<>(
33+
s -> s.onSubscribe(new Subscription() {
34+
@Override
35+
public void request(long n) {
36+
demand.addAndGet(n);
37+
}
38+
39+
@Override
40+
public void cancel() {}
41+
}));
42+
streamMessage.toDuplicator().duplicate().subscribe(new Subscriber<Integer>() {
43+
@Override
44+
public void onSubscribe(Subscription s) {
45+
s.request(1);
46+
}
47+
48+
@Override
49+
public void onNext(Integer integer) {}
50+
51+
@Override
52+
public void onError(Throwable t) {}
53+
54+
@Override
55+
public void onComplete() {}
56+
});
57+
await().untilAsserted(() -> assertThat(demand.get()).isEqualTo(1));
58+
Thread.sleep(100);
59+
// The demand is still 1.
60+
assertThat(demand.get()).isEqualTo(1);
61+
}
62+
}

core/src/test/java/com/linecorp/armeria/internal/server/annotation/AnnotatedDocServiceTest.java

+3
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.stream.Stream;
4242

4343
import org.junit.jupiter.api.Test;
44+
import org.junit.jupiter.api.extension.ExtendWith;
4445
import org.junit.jupiter.api.extension.RegisterExtension;
4546
import org.junit.jupiter.params.ParameterizedTest;
4647
import org.junit.jupiter.params.provider.CsvSource;
@@ -67,6 +68,7 @@
6768
import com.linecorp.armeria.common.MediaTypeNames;
6869
import com.linecorp.armeria.common.util.UnmodifiableFuture;
6970
import com.linecorp.armeria.internal.server.annotation.AnnotatedDocServicePluginTest.CompositeBean;
71+
import com.linecorp.armeria.internal.testing.DocServiceExtension;
7072
import com.linecorp.armeria.internal.testing.TestUtil;
7173
import com.linecorp.armeria.server.HttpService;
7274
import com.linecorp.armeria.server.Route;
@@ -103,6 +105,7 @@
103105
import com.linecorp.armeria.server.docs.TypeSignature;
104106
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
105107

108+
@ExtendWith(DocServiceExtension.class)
106109
class AnnotatedDocServiceTest {
107110

108111
private static final ObjectMapper mapper = new ObjectMapper();

core/src/test/java/com/linecorp/armeria/GracefulShutdownBuilderTest.java core/src/test/java/com/linecorp/armeria/server/GracefulShutdownBuilderTest.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* under the License.
1515
*/
1616

17-
package com.linecorp.armeria;
17+
package com.linecorp.armeria.server;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -25,7 +25,6 @@
2525

2626
import com.linecorp.armeria.common.ShuttingDownException;
2727
import com.linecorp.armeria.internal.testing.AnticipatedException;
28-
import com.linecorp.armeria.server.GracefulShutdown;
2928

3029
class GracefulShutdownBuilderTest {
3130

0 commit comments

Comments
 (0)