Skip to content

Commit

Permalink
Merge pull request #15 from AxonFramework/graceful-shutdown
Browse files Browse the repository at this point in the history
Graceful shutdown
  • Loading branch information
m1l4n54v1c authored Mar 2, 2020
2 parents 677612c + 7bdc821 commit 5badd75
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.DirectExecutor;
import org.axonframework.common.Registration;
import org.axonframework.lifecycle.Phase;
import org.axonframework.lifecycle.ShutdownLatch;
import org.axonframework.lifecycle.StartHandler;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.serialization.Serializer;
Expand Down Expand Up @@ -73,6 +76,7 @@ public class SpringHttpCommandBusConnector implements CommandBusConnector {
private final RestOperations restOperations;
private final Serializer serializer;
private final Executor executor;
private final ShutdownLatch shutdownLatch = new ShutdownLatch();

/**
* Instantiate a {@link SpringHttpCommandBusConnector} based on the fields contained in the {@link Builder}.
Expand All @@ -91,6 +95,14 @@ protected SpringHttpCommandBusConnector(Builder builder) {
this.executor = builder.executor;
}

/**
* Start the Connector.
*/
@StartHandler(phase = Phase.EXTERNAL_CONNECTIONS)
public void start() {
shutdownLatch.initialize();
}

/**
* Instantiate a Builder to be able to create a {@link SpringHttpCommandBusConnector}.
* <p>
Expand All @@ -106,6 +118,7 @@ public static Builder builder() {

@Override
public <C> void send(Member destination, CommandMessage<? extends C> commandMessage) {
shutdownLatch.ifShuttingDown("JGroupsConnector is shutting down, no new commands will be sent.");
if (destination.local()) {
localCommandBus.dispatch(commandMessage);
} else {
Expand All @@ -117,19 +130,31 @@ public <C> void send(Member destination, CommandMessage<? extends C> commandMess
public <C, R> void send(Member destination,
CommandMessage<C> commandMessage,
CommandCallback<? super C, R> callback) {
shutdownLatch.ifShuttingDown("JGroupsConnector is shutting down, no new commands will be sent.");
ShutdownLatch.ActivityHandle activityHandle = shutdownLatch.registerActivity();
if (destination.local()) {
localCommandBus.dispatch(commandMessage, callback);
CommandCallback<C, R> wrapper = (cm, crm) -> {
callback.onResult(cm, crm);
activityHandle.end();
};
localCommandBus.dispatch(commandMessage, wrapper);
} else {
executor.execute(() -> {
SpringHttpReplyMessage<R> replyMessage =
this.<C, R>sendRemotely(destination, commandMessage, EXPECT_REPLY).getBody();
activityHandle.end();
if (replyMessage != null) {
callback.onResult(commandMessage, replyMessage.getCommandResultMessage(serializer));
}
});
}
}

@Override
public CompletableFuture<Void> initiateShutdown() {
return shutdownLatch.initiateShutdown();
}

/**
* Send the command message to a remote member
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.axonframework.commandhandling.distributed.Member;
import org.axonframework.commandhandling.distributed.SimpleMember;
import org.axonframework.common.DirectExecutor;
import org.axonframework.lifecycle.ShutdownInProgressException;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.RemoteExceptionDescription;
import org.axonframework.messaging.RemoteHandlingException;
Expand All @@ -45,8 +46,12 @@
import org.springframework.web.client.RestTemplate;

import java.net.URI;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static java.util.Collections.singletonMap;
import static org.axonframework.commandhandling.GenericCommandResultMessage.asCommandResultMessage;
Expand Down Expand Up @@ -102,6 +107,7 @@ public void setUp() throws Exception {
.serializer(serializer)
.executor(executor)
.build();
testSubject.start();
}

@Test
Expand All @@ -125,6 +131,45 @@ public void testSendWithoutCallbackThrowsExceptionForMissingDestinationURI() {
verify(executor).execute(any());
}

@Test
public void testStopSendingCommands() throws InterruptedException, ExecutionException, TimeoutException {
SpringHttpReplyMessage<String> testReplyMessage =
new SpringHttpReplyMessage<>(COMMAND_MESSAGE.getIdentifier(), COMMAND_RESULT, serializer);
ResponseEntity<SpringHttpReplyMessage<String>> testResponseEntity =
new ResponseEntity<>(testReplyMessage, HttpStatus.OK);
when(restTemplate.exchange(eq(expectedUri),
eq(HttpMethod.POST),
any(),
argThat(new ParameterizedTypeReferenceMatcher<String>()))
).thenAnswer(invocationOnMock -> {
Thread.sleep(200);
return testResponseEntity;
});
testSubject.send(DESTINATION, COMMAND_MESSAGE, commandCallback);

testSubject.initiateShutdown().get(400, TimeUnit.MILLISECONDS);

try {
testSubject.send(DESTINATION, COMMAND_MESSAGE, commandCallback);
fail("Expected sending new commands to fail once connector is stopped.");
} catch (ShutdownInProgressException e) {
// expected
}

//noinspection unchecked
ArgumentCaptor<CommandMessage<? extends String>> commandMessageArgumentCaptor =
ArgumentCaptor.forClass(CommandMessage.class);
//noinspection unchecked
ArgumentCaptor<CommandResultMessage<? extends String>> commandResultMessageArgumentCaptor =
ArgumentCaptor.forClass(CommandResultMessage.class);
verify(commandCallback).onResult(commandMessageArgumentCaptor.capture(),
commandResultMessageArgumentCaptor.capture());
assertEquals(COMMAND_MESSAGE.getMetaData(), commandMessageArgumentCaptor.getValue().getMetaData());
assertEquals(COMMAND_MESSAGE.getPayload(), commandMessageArgumentCaptor.getValue().getPayload());
assertEquals(COMMAND_RESULT.getMetaData(), commandResultMessageArgumentCaptor.getValue().getMetaData());
assertEquals(COMMAND_RESULT.getPayload(), commandResultMessageArgumentCaptor.getValue().getPayload());
}

@Test
public void testSendWithCallbackSucceedsAndReturnsSucceeded() {
SpringHttpReplyMessage<String> testReplyMessage =
Expand Down

0 comments on commit 5badd75

Please sign in to comment.