Skip to content

Commit

Permalink
Fix streaming responses blocking forever in netty http client (#1070)
Browse files Browse the repository at this point in the history
This bug was hidden before micronaut-projects/micronaut-core#11440 fixed core backpressure behavior.
  • Loading branch information
yawkat authored Feb 6, 2025
1 parent 1f28e1f commit c5059fd
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 1 deletion.
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[versions]
micronaut = "4.7.4"
micronaut = "4.7.13"
micronaut-platform = "4.5.1"
micronaut-docs = "2.0.0"
fn = '1.0.196'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public void onSubscribe(Subscription s) {
}
if (closed) {
s.cancel();
} else {
s.request(1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -172,6 +177,48 @@ public void streamingRequestBufferedKnownSize() throws Exception {
}
}

@Test
@Timeout(10)
public void streamingResponse() throws Exception {
int chunkSize = 8192;
int numberOfChunks = 10;

netty.aggregate = false;
netty.handleOneRequest((ctx, request) -> {
Assertions.assertEquals(HttpMethod.GET, request.method());
Assertions.assertEquals("/foo", request.uri());

DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.headers().add("content-length", chunkSize * numberOfChunks);
ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
int n = numberOfChunks;

@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (n-- == 0) {
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, ctx.voidPromise());
} else {
ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(new byte[chunkSize]))).addListener(this);
}
}
});
});

try (
HttpClient client = newBuilder()
.property(StandardClientProperties.BUFFER_REQUEST, false)
.build();
HttpResponse response = client.createRequest(Method.GET)
.appendPathPart("foo")
.execute().toCompletableFuture()
.get();
InputStream stream = response.streamBody().toCompletableFuture().get()) {

Assertions.assertEquals(200, response.status());
Assertions.assertEquals(chunkSize * numberOfChunks, stream.readAllBytes().length);
}
}

@Test
public void onlyUploadIfPositiveResponse() throws Exception {
netty.handleContinue = true;
Expand Down

0 comments on commit c5059fd

Please sign in to comment.