Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: Added changes to make ServerImpl.internalClose() thread-safe #11924

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions core/src/main/java/io/grpc/internal/AbstractServerStream.java
Original file line number Diff line number Diff line change
@@ -74,6 +74,7 @@ protected interface Sink {
private final StatsTraceContext statsTraceCtx;
private boolean outboundClosed;
private boolean headersSent;
private boolean closeCalled;

protected AbstractServerStream(
WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx) {
@@ -120,6 +121,7 @@ public final void deliverFrame(

@Override
public final void close(Status status, Metadata trailers) {
Preconditions.checkState(!closeCalled, "call already closed");
Preconditions.checkNotNull(status, "status");
Preconditions.checkNotNull(trailers, "trailers");
if (!outboundClosed) {
@@ -130,6 +132,7 @@ public final void close(Status status, Metadata trailers) {
// closedStatus is only set from here, and is read from a place that has happen-after
// guarantees with respect to here.
transportState().setClosedStatus(status);
closeCalled = true;
abstractServerStreamSink().writeTrailers(trailers, headersSent, status);
}
}
13 changes: 8 additions & 5 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Original file line number Diff line number Diff line change
@@ -779,8 +779,8 @@ static final class JumpToApplicationThreadServerStreamListener implements Server
// Only accessed from callExecutor.
private ServerStreamListener listener;

public JumpToApplicationThreadServerStreamListener(Executor executor,
Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) {
public JumpToApplicationThreadServerStreamListener(Executor executor, Executor cancelExecutor,
ServerStream stream, Context.CancellableContext context, Tag tag) {
this.callExecutor = executor;
this.cancelExecutor = cancelExecutor;
this.stream = stream;
@@ -808,10 +808,13 @@ void setListener(ServerStreamListener listener) {
/**
* Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use.
*/
private void internalClose(Throwable t) {
// TODO(ejona86): this is not thread-safe :)
void internalClose(Throwable t) {
String description = "Application error processing RPC";
stream.close(Status.UNKNOWN.withDescription(description).withCause(t), new Metadata());
Metadata metadata = Status.trailersFromThrowable(t);
if (metadata == null) {
metadata = new Metadata();
}
stream.close(Status.UNKNOWN.withDescription(description).withCause(t), metadata);
}

@Override
20 changes: 19 additions & 1 deletion core/src/test/java/io/grpc/internal/ServerImplTest.java
Original file line number Diff line number Diff line change
@@ -78,7 +78,6 @@
import io.grpc.StringMarshaller;
import io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener;
import io.grpc.internal.ServerImplBuilder.ClientTransportServersBuilder;
import io.grpc.internal.SingleMessageProducer;
import io.grpc.internal.testing.TestServerStreamTracer;
import io.grpc.util.MutableHandlerRegistry;
import io.perfmark.PerfMark;
@@ -1535,6 +1534,25 @@ public void channelz_transport_membershp() throws Exception {
assertTrue(after.end);
}

@Test
public void testInternalClose_withNullMetadata() {
JumpToApplicationThreadServerStreamListener listener
= new JumpToApplicationThreadServerStreamListener(
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation(),
PerfMark.createTag());
Throwable throwableMock = mock(Throwable.class);
// Stub Status.trailersFromThrowable to return null, simulating the case where metadata is null
when(Status.trailersFromThrowable(throwableMock)).thenReturn(null);
listener.internalClose(throwableMock);
// Capture the arguments passed to stream.close()
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
verify(stream).close(statusCaptor.capture(), metadataCaptor.capture());
}

private void createAndStartServer() throws IOException {
createServer();
server.start();
Original file line number Diff line number Diff line change
@@ -1573,6 +1573,7 @@ public void messageProducerOnlyProducesRequestedMessages() throws Exception {
verifyMessageCountAndClose(serverStreamCreation.listener.messageQueue, 1);
}

@SuppressWarnings("MissingFail")
@Test
public void interactionsAfterServerStreamCloseAreNoops() throws Exception {
server.start(serverListener);
@@ -1597,10 +1598,14 @@ public void interactionsAfterServerStreamCloseAreNoops() throws Exception {
assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));

// Ensure that for a closed ServerStream, interactions are noops
server.stream.writeHeaders(new Metadata(), true);
server.stream.writeMessage(methodDescriptor.streamResponse("response"));
server.stream.close(Status.INTERNAL, new Metadata());
try {
// Ensure that for a closed ServerStream, interactions are noops
server.stream.writeHeaders(new Metadata(), true);
server.stream.writeMessage(methodDescriptor.streamResponse("response"));
server.stream.close(Status.INTERNAL, new Metadata());
} catch (Exception exception) {
assertTrue(exception.getMessage().contains("call already closed"));
}

// Make sure new streams still work properly
doPingPong(serverListener);