From c5059fd77ca0981af20a204f597e74e9424fd048 Mon Sep 17 00:00:00 2001 From: Jonas Konrad Date: Thu, 6 Feb 2025 12:42:21 +0100 Subject: [PATCH] Fix streaming responses blocking forever in netty http client (#1070) This bug was hidden before https://github.com/micronaut-projects/micronaut-core/pull/11440 fixed core backpressure behavior. --- gradle/libs.versions.toml | 2 +- .../netty/LimitedBufferingSubscriber.java | 2 + .../oraclecloud/httpclient/NettyTest.java | 47 +++++++++++++++++++ 3 files changed, 50 insertions(+), 1 deletion(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 8c49b0f28..4aaee946f 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,5 +1,5 @@ [versions] -micronaut = "4.7.4" +micronaut = "4.7.13" micronaut-platform = "4.5.1" micronaut-docs = "2.0.0" fn = '1.0.196' diff --git a/oraclecloud-httpclient-netty/src/main/java/io/micronaut/oraclecloud/httpclient/netty/LimitedBufferingSubscriber.java b/oraclecloud-httpclient-netty/src/main/java/io/micronaut/oraclecloud/httpclient/netty/LimitedBufferingSubscriber.java index 467fc34e9..ddb579afc 100644 --- a/oraclecloud-httpclient-netty/src/main/java/io/micronaut/oraclecloud/httpclient/netty/LimitedBufferingSubscriber.java +++ b/oraclecloud-httpclient-netty/src/main/java/io/micronaut/oraclecloud/httpclient/netty/LimitedBufferingSubscriber.java @@ -57,6 +57,8 @@ public void onSubscribe(Subscription s) { } if (closed) { s.cancel(); + } else { + s.request(1); } } diff --git a/test-suite-http-client/src/main/java/io/micronaut/oraclecloud/httpclient/NettyTest.java b/test-suite-http-client/src/main/java/io/micronaut/oraclecloud/httpclient/NettyTest.java index 65162840e..57a86d0cc 100644 --- a/test-suite-http-client/src/main/java/io/micronaut/oraclecloud/httpclient/NettyTest.java +++ b/test-suite-http-client/src/main/java/io/micronaut/oraclecloud/httpclient/NettyTest.java @@ -21,13 +21,18 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.ssl.util.SelfSignedCertificate; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -172,6 +177,48 @@ public void streamingRequestBufferedKnownSize() throws Exception { } } + @Test + @Timeout(10) + public void streamingResponse() throws Exception { + int chunkSize = 8192; + int numberOfChunks = 10; + + netty.aggregate = false; + netty.handleOneRequest((ctx, request) -> { + Assertions.assertEquals(HttpMethod.GET, request.method()); + Assertions.assertEquals("/foo", request.uri()); + + DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().add("content-length", chunkSize * numberOfChunks); + ctx.writeAndFlush(response).addListener(new ChannelFutureListener() { + int n = numberOfChunks; + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (n-- == 0) { + ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, ctx.voidPromise()); + } else { + ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(new byte[chunkSize]))).addListener(this); + } + } + }); + }); + + try ( + HttpClient client = newBuilder() + .property(StandardClientProperties.BUFFER_REQUEST, false) + .build(); + HttpResponse response = client.createRequest(Method.GET) + .appendPathPart("foo") + .execute().toCompletableFuture() + .get(); + InputStream stream = response.streamBody().toCompletableFuture().get()) { + + Assertions.assertEquals(200, response.status()); + Assertions.assertEquals(chunkSize * numberOfChunks, stream.readAllBytes().length); + } + } + @Test public void onlyUploadIfPositiveResponse() throws Exception { netty.handleContinue = true;