Skip to content

Commit e08527d

Browse files
morominminwoox
andauthored
Support 100-continue header on client side (#5646)
Motivation: Armeria client should support the [`Expect: 100-continue`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/100) header to optimize the sending of request bodies and ensure compliance with HTTP standards. Modifications: - Modified the client to wait for a `100 Continue` response status before sending the request body. - If the client receives `100 Continue`, it proceeds to send the request body. - If the client receives a different status, the request body is discarded. Result: - The Armeria client now properly supports the [`Expect: 100-continue`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/100) header. --------- Co-authored-by: minwoox <songmw725@gmail.com> Co-authored-by: minux <minu.song@linecorp.com>
1 parent f0ec1ba commit e08527d

16 files changed

+736
-41
lines changed

core/src/main/java/com/linecorp/armeria/client/AbstractHttpRequestHandler.java

+56-14
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@
3333
import com.linecorp.armeria.common.HttpHeaderNames;
3434
import com.linecorp.armeria.common.HttpHeaders;
3535
import com.linecorp.armeria.common.HttpObject;
36+
import com.linecorp.armeria.common.HttpStatus;
3637
import com.linecorp.armeria.common.RequestHeaders;
3738
import com.linecorp.armeria.common.ResponseCompleteException;
39+
import com.linecorp.armeria.common.ResponseHeaders;
3840
import com.linecorp.armeria.common.SessionProtocol;
3941
import com.linecorp.armeria.common.annotation.Nullable;
4042
import com.linecorp.armeria.common.logging.RequestLogBuilder;
@@ -50,6 +52,7 @@
5052
import io.netty.channel.ChannelFuture;
5153
import io.netty.channel.ChannelFutureListener;
5254
import io.netty.channel.ChannelPromise;
55+
import io.netty.handler.codec.http.HttpHeaderValues;
5356
import io.netty.handler.codec.http2.Http2Error;
5457
import io.netty.handler.proxy.ProxyConnectException;
5558

@@ -61,6 +64,7 @@ enum State {
6164
NEEDS_TO_WRITE_FIRST_HEADER,
6265
NEEDS_DATA,
6366
NEEDS_DATA_OR_TRAILERS,
67+
NEEDS_100_CONTINUE,
6468
DONE
6569
}
6670

@@ -143,6 +147,11 @@ public final void operationComplete(ChannelFuture future) throws Exception {
143147
responseWrapper.initTimeout();
144148
}
145149

150+
if (state == State.NEEDS_100_CONTINUE) {
151+
assert responseWrapper != null;
152+
responseWrapper.initTimeout();
153+
}
154+
146155
onWriteSuccess();
147156
return;
148157
}
@@ -176,7 +185,7 @@ final boolean tryInitialize() {
176185
}
177186

178187
this.session = session;
179-
responseWrapper = responseDecoder.addResponse(id, originalRes, ctx, ch.eventLoop());
188+
responseWrapper = responseDecoder.addResponse(this, id, originalRes, ctx, ch.eventLoop());
180189

181190
if (timeoutMillis > 0) {
182191
// The timer would be executed if the first message has not been sent out within the timeout.
@@ -187,34 +196,39 @@ final boolean tryInitialize() {
187196
return true;
188197
}
189198

199+
RequestHeaders mergedRequestHeaders(RequestHeaders headers) {
200+
final HttpHeaders internalHeaders;
201+
final ClientRequestContextExtension ctxExtension = ctx.as(ClientRequestContextExtension.class);
202+
if (ctxExtension == null) {
203+
internalHeaders = HttpHeaders.of();
204+
} else {
205+
internalHeaders = ctxExtension.internalRequestHeaders();
206+
}
207+
return mergeRequestHeaders(
208+
headers, ctx.defaultRequestHeaders(), ctx.additionalRequestHeaders(), internalHeaders);
209+
}
210+
190211
/**
191212
* Writes the {@link RequestHeaders} to the {@link Channel}.
192213
* The {@link RequestHeaders} is merged with {@link ClientRequestContext#additionalRequestHeaders()}
193214
* before being written.
194215
* Note that the written data is not flushed by this method. The caller should explicitly call
195216
* {@link Channel#flush()} when each write unit is done.
196217
*/
197-
final void writeHeaders(RequestHeaders headers) {
218+
final void writeHeaders(RequestHeaders headers, boolean needs100Continue) {
198219
final SessionProtocol protocol = session.protocol();
199220
assert protocol != null;
200-
if (headersOnly) {
221+
if (needs100Continue) {
222+
state = State.NEEDS_100_CONTINUE;
223+
} else if (headersOnly) {
201224
state = State.DONE;
202225
} else if (allowTrailers) {
203226
state = State.NEEDS_DATA_OR_TRAILERS;
204227
} else {
205228
state = State.NEEDS_DATA;
206229
}
207230

208-
final HttpHeaders internalHeaders;
209-
final ClientRequestContextExtension ctxExtension = ctx.as(ClientRequestContextExtension.class);
210-
if (ctxExtension == null) {
211-
internalHeaders = HttpHeaders.of();
212-
} else {
213-
internalHeaders = ctxExtension.internalRequestHeaders();
214-
}
215-
final RequestHeaders merged = mergeRequestHeaders(
216-
headers, ctx.defaultRequestHeaders(), ctx.additionalRequestHeaders(), internalHeaders);
217-
logBuilder.requestHeaders(merged);
231+
logBuilder.requestHeaders(headers);
218232

219233
final String connectionOption = headers.get(HttpHeaderNames.CONNECTION);
220234
if (CLOSE_STRING.equalsIgnoreCase(connectionOption) || !keepAlive) {
@@ -230,9 +244,37 @@ final void writeHeaders(RequestHeaders headers) {
230244
// Attach a listener first to make the listener early handle a cause raised while writing headers
231245
// before any other callbacks like `onStreamClosed()` are invoked.
232246
promise.addListener(this);
233-
encoder.writeHeaders(id, streamId(), merged, headersOnly, promise);
247+
encoder.writeHeaders(id, streamId(), headers, headersOnly, promise);
234248
}
235249

250+
static boolean needs100Continue(RequestHeaders headers) {
251+
return headers.contains(HttpHeaderNames.EXPECT, HttpHeaderValues.CONTINUE.toString());
252+
}
253+
254+
void handle100Continue(ResponseHeaders responseHeaders) {
255+
if (state != State.NEEDS_100_CONTINUE) {
256+
return;
257+
}
258+
259+
if (responseHeaders.status() == HttpStatus.CONTINUE) {
260+
state = State.NEEDS_DATA_OR_TRAILERS;
261+
resume();
262+
// TODO(minwoox): reset the timeout
263+
} else {
264+
// We do not retry the request when HttpStatus.EXPECTATION_FAILED is received
265+
// because:
266+
// - Most servers support 100-continue.
267+
// - It's much simpler to just fail the request and let the user retry.
268+
state = State.DONE;
269+
logBuilder.endRequest();
270+
discardRequestBody();
271+
}
272+
}
273+
274+
abstract void resume();
275+
276+
abstract void discardRequestBody();
277+
236278
/**
237279
* Writes the {@link HttpData} to the {@link Channel}.
238280
* Note that the written data is not flushed by this method. The caller should explicitly call

core/src/main/java/com/linecorp/armeria/client/AbstractHttpRequestSubscriber.java

+29-3
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ static AbstractHttpRequestSubscriber of(Channel channel, ClientHttpObjectEncoder
5353
}
5454

5555
private final HttpRequest request;
56+
private final boolean http1WebSocket;
5657

5758
@Nullable
5859
private Subscription subscription;
@@ -62,10 +63,11 @@ static AbstractHttpRequestSubscriber of(Channel channel, ClientHttpObjectEncoder
6263
HttpResponseDecoder responseDecoder,
6364
HttpRequest request, DecodedHttpResponse originalRes,
6465
ClientRequestContext ctx, long timeoutMillis, boolean allowTrailers,
65-
boolean keepAlive) {
66+
boolean keepAlive, boolean http1WebSocket) {
6667
super(ch, encoder, responseDecoder, originalRes, ctx, timeoutMillis, request.isEmpty(), allowTrailers,
6768
keepAlive);
6869
this.request = request;
70+
this.http1WebSocket = http1WebSocket;
6971
}
7072

7173
@Override
@@ -77,15 +79,22 @@ public void onSubscribe(Subscription subscription) {
7779
return;
7880
}
7981

82+
final RequestHeaders headers = mergedRequestHeaders(mapHeaders(request.headers()));
83+
final boolean needs100Continue = needs100Continue(headers);
84+
if (needs100Continue && http1WebSocket) {
85+
failRequest(new IllegalArgumentException(
86+
"a WebSocket request is not allowed to have Expect: 100-continue header"));
87+
return;
88+
}
89+
8090
if (!tryInitialize()) {
8191
return;
8292
}
8393

8494
// NB: This must be invoked at the end of this method because otherwise the callback methods in this
8595
// class can be called before the member fields (subscription, id, responseWrapper and
8696
// timeoutFuture) are initialized.
87-
// It is because the successful write of the first headers will trigger subscription.request(1).
88-
writeHeaders(mapHeaders(request.headers()));
97+
writeHeaders(headers, needs100Continue(headers));
8998
channel().flush();
9099
}
91100

@@ -111,6 +120,13 @@ public void onComplete() {
111120

112121
@Override
113122
void onWriteSuccess() {
123+
if (state() == State.NEEDS_100_CONTINUE) {
124+
return;
125+
}
126+
request();
127+
}
128+
129+
private void request() {
114130
// Request more messages regardless whether the state is DONE. It makes the producer have
115131
// a chance to produce the last call such as 'onComplete' and 'onError' when there are
116132
// no more messages it can produce.
@@ -126,4 +142,14 @@ void cancel() {
126142
assert subscription != null;
127143
subscription.cancel();
128144
}
145+
146+
@Override
147+
final void resume() {
148+
request();
149+
}
150+
151+
@Override
152+
void discardRequestBody() {
153+
cancel();
154+
}
129155
}

core/src/main/java/com/linecorp/armeria/client/AbstractHttpResponseDecoder.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,11 @@ public InboundTrafficController inboundTrafficController() {
5858
}
5959

6060
@Override
61-
public HttpResponseWrapper addResponse(
62-
int id, DecodedHttpResponse res, ClientRequestContext ctx, EventLoop eventLoop) {
61+
public HttpResponseWrapper addResponse(@Nullable AbstractHttpRequestHandler requestHandler,
62+
int id, DecodedHttpResponse res,
63+
ClientRequestContext ctx, EventLoop eventLoop) {
6364
final HttpResponseWrapper newRes =
64-
new HttpResponseWrapper(res, eventLoop, ctx,
65+
new HttpResponseWrapper(requestHandler, res, eventLoop, ctx,
6566
ctx.responseTimeoutMillis(), ctx.maxResponseLength());
6667
final HttpResponseWrapper oldRes = responses.put(id, newRes);
6768
keepAliveHandler().increaseNumRequests();

core/src/main/java/com/linecorp/armeria/client/AggregatedHttpRequestHandler.java

+42-7
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.linecorp.armeria.common.HttpData;
2323
import com.linecorp.armeria.common.HttpHeaders;
2424
import com.linecorp.armeria.common.HttpRequest;
25+
import com.linecorp.armeria.common.RequestHeaders;
2526
import com.linecorp.armeria.common.annotation.Nullable;
2627
import com.linecorp.armeria.internal.client.DecodedHttpResponse;
2728

@@ -31,6 +32,8 @@
3132
final class AggregatedHttpRequestHandler extends AbstractHttpRequestHandler
3233
implements BiFunction<AggregatedHttpRequest, Throwable, Void> {
3334

35+
@Nullable
36+
private AggregatedHttpRequest request;
3437
private boolean cancelled;
3538

3639
AggregatedHttpRequestHandler(Channel ch, ClientHttpObjectEncoder encoder,
@@ -58,32 +61,51 @@ private void apply0(@Nullable AggregatedHttpRequest request, @Nullable Throwable
5861
}
5962

6063
assert request != null;
64+
final RequestHeaders merged = mergedRequestHeaders(request.headers());
65+
final boolean needs100Continue = needs100Continue(merged);
66+
final HttpData content = request.content();
67+
if (needs100Continue && content.isEmpty()) {
68+
content.close();
69+
failRequest(new IllegalArgumentException(
70+
"an empty content is not allowed with Expect: 100-continue header"));
71+
return;
72+
}
73+
6174
if (!tryInitialize()) {
62-
request.content().close();
75+
content.close();
6376
return;
6477
}
6578

66-
writeHeaders(request.headers());
79+
writeHeaders(merged, needs100Continue);
6780
if (cancelled) {
68-
request.content().close();
81+
content.close();
6982
// If the headers size exceeds the limit, the headers write fails immediately.
7083
return;
7184
}
7285

73-
HttpData content = request.content();
86+
if (!needs100Continue) {
87+
writeDataAndTrailers(request);
88+
} else {
89+
this.request = request;
90+
}
91+
channel().flush();
92+
}
93+
94+
private void writeDataAndTrailers(AggregatedHttpRequest request) {
95+
final HttpData content = request.content();
7496
final boolean contentEmpty = content.isEmpty();
7597
final HttpHeaders trailers = request.trailers();
7698
final boolean trailersEmpty = trailers.isEmpty();
7799
if (!contentEmpty) {
78100
if (trailersEmpty) {
79-
content = content.withEndOfStream();
101+
writeData(content.withEndOfStream());
102+
} else {
103+
writeData(content);
80104
}
81-
writeData(content);
82105
}
83106
if (!trailersEmpty) {
84107
writeTrailers(trailers);
85108
}
86-
channel().flush();
87109
}
88110

89111
@Override
@@ -95,4 +117,17 @@ void onWriteSuccess() {
95117
void cancel() {
96118
cancelled = true;
97119
}
120+
121+
@Override
122+
void resume() {
123+
assert request != null;
124+
writeDataAndTrailers(request);
125+
channel().flush();
126+
}
127+
128+
@Override
129+
void discardRequestBody() {
130+
assert request != null;
131+
request.content().close();
132+
}
98133
}

core/src/main/java/com/linecorp/armeria/client/Http1ResponseDecoder.java

+3
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
196196

197197
res.startResponse();
198198
final ResponseHeaders responseHeaders = ArmeriaHttpUtil.toArmeria(nettyRes);
199+
200+
res.handle100Continue(responseHeaders);
201+
199202
final boolean written;
200203
if (responseHeaders.status().codeClass() == HttpStatusClass.INFORMATIONAL) {
201204
state = State.NEED_INFORMATIONAL_DATA;

core/src/main/java/com/linecorp/armeria/client/Http2ResponseDecoder.java

+1
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers
206206
if (converted instanceof ResponseHeaders) {
207207
res.startResponse();
208208
final ResponseHeaders responseHeaders = (ResponseHeaders) converted;
209+
res.handle100Continue(responseHeaders);
209210
if (responseHeaders.status().codeClass() == HttpStatusClass.INFORMATIONAL) {
210211
written = res.tryWrite(converted);
211212
} else {

core/src/main/java/com/linecorp/armeria/client/HttpClientPipelineConfigurator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ public void onComplete() {}
570570
System.nanoTime(), SystemInfo.currentTimeMicros());
571571

572572
// NB: No need to set the response timeout because we have session creation timeout.
573-
responseDecoder.addResponse(0, res, reqCtx, ctx.channel().eventLoop());
573+
responseDecoder.addResponse(null, 0, res, reqCtx, ctx.channel().eventLoop());
574574
ctx.fireChannelActive();
575575
}
576576

core/src/main/java/com/linecorp/armeria/client/HttpRequestSubscriber.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class HttpRequestSubscriber extends AbstractHttpRequestSubscriber {
3131
HttpRequestSubscriber(Channel ch, ClientHttpObjectEncoder encoder, HttpResponseDecoder responseDecoder,
3232
HttpRequest request, DecodedHttpResponse originalRes,
3333
ClientRequestContext ctx, long timeoutMillis) {
34-
super(ch, encoder, responseDecoder, request, originalRes, ctx, timeoutMillis, true, true);
34+
super(ch, encoder, responseDecoder, request, originalRes, ctx, timeoutMillis, true, true, false);
3535
}
3636

3737
@Override

core/src/main/java/com/linecorp/armeria/client/HttpResponseDecoder.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ interface HttpResponseDecoder {
3131

3232
InboundTrafficController inboundTrafficController();
3333

34-
HttpResponseWrapper addResponse(
35-
int id, DecodedHttpResponse res, ClientRequestContext ctx, EventLoop eventLoop);
34+
HttpResponseWrapper addResponse(@Nullable AbstractHttpRequestHandler requestHandler, int id,
35+
DecodedHttpResponse res, ClientRequestContext ctx, EventLoop eventLoop);
3636

3737
@Nullable
3838
HttpResponseWrapper getResponse(int id);

0 commit comments

Comments
 (0)