From 53302c5f1f3bac112896365abcfec89aa7dc6abf Mon Sep 17 00:00:00 2001 From: Timon Borter Date: Thu, 30 Nov 2023 13:32:25 +0100 Subject: [PATCH] feat: scenario completion tracking Completion of scenario executions happens asynchronously, because scenarios with intermediate messaging must not block the main threads. Trying to achieve a sync outside the execution service is laborious at best. This implementation solves this problem. --- .../endpoint/SimulatorEndpointAdapter.java | 3 +- .../listener/SimulatorStatusListener.java | 8 +-- .../impl/ScenarioExecutionServiceImpl.java | 56 ++++++++++++++- .../ScenarioExecutionServiceImplTest.java | 72 ++++++++++++++++++- 4 files changed, 130 insertions(+), 9 deletions(-) diff --git a/simulator-spring-boot/src/main/java/org/citrusframework/simulator/endpoint/SimulatorEndpointAdapter.java b/simulator-spring-boot/src/main/java/org/citrusframework/simulator/endpoint/SimulatorEndpointAdapter.java index d1e215b2a..6e1f8b8a9 100644 --- a/simulator-spring-boot/src/main/java/org/citrusframework/simulator/endpoint/SimulatorEndpointAdapter.java +++ b/simulator-spring-boot/src/main/java/org/citrusframework/simulator/endpoint/SimulatorEndpointAdapter.java @@ -99,8 +99,7 @@ public Message dispatchMessage(Message request, String mappingName) { scenario = applicationContext.getBean(scenarioName, SimulatorScenario.class); } else { scenarioName = configuration.getDefaultScenario(); - logger.info("Unable to find scenario for mapping '{}' - " + - "using default scenario '{}'", mappingName, scenarioName); + logger.info("Unable to find scenario for mapping '{}' - using default scenario '{}'", mappingName, scenarioName); scenario = applicationContext.getBean(scenarioName, SimulatorScenario.class); } diff --git a/simulator-spring-boot/src/main/java/org/citrusframework/simulator/listener/SimulatorStatusListener.java b/simulator-spring-boot/src/main/java/org/citrusframework/simulator/listener/SimulatorStatusListener.java index 10ec6c28e..1cf663e9a 100644 --- a/simulator-spring-boot/src/main/java/org/citrusframework/simulator/listener/SimulatorStatusListener.java +++ b/simulator-spring-boot/src/main/java/org/citrusframework/simulator/listener/SimulatorStatusListener.java @@ -83,8 +83,8 @@ public void onTestFinish(TestCase test) { @Override public void onTestSuccess(TestCase test) { TestResult result; - if (test instanceof DefaultTestCase) { - result = TestResult.success(test.getName(), test.getTestClass().getSimpleName(), ((DefaultTestCase)test).getParameters()); + if (test instanceof DefaultTestCase defaultTestCase) { + result = TestResult.success(test.getName(), test.getTestClass().getSimpleName(), defaultTestCase.getParameters()); } else { result = TestResult.success(test.getName(), test.getTestClass().getSimpleName()); } @@ -98,8 +98,8 @@ public void onTestSuccess(TestCase test) { @Override public void onTestFailure(TestCase test, Throwable cause) { TestResult result; - if (test instanceof DefaultTestCase) { - result = TestResult.failed(test.getName(), test.getTestClass().getSimpleName(), cause, ((DefaultTestCase)test).getParameters()); + if (test instanceof DefaultTestCase defaultTestCase) { + result = TestResult.failed(test.getName(), test.getTestClass().getSimpleName(), cause, defaultTestCase.getParameters()); } else { result = TestResult.failed(test.getName(), test.getTestClass().getSimpleName(), cause); } diff --git a/simulator-spring-boot/src/main/java/org/citrusframework/simulator/service/impl/ScenarioExecutionServiceImpl.java b/simulator-spring-boot/src/main/java/org/citrusframework/simulator/service/impl/ScenarioExecutionServiceImpl.java index b884bb084..8f0288722 100644 --- a/simulator-spring-boot/src/main/java/org/citrusframework/simulator/service/impl/ScenarioExecutionServiceImpl.java +++ b/simulator-spring-boot/src/main/java/org/citrusframework/simulator/service/impl/ScenarioExecutionServiceImpl.java @@ -17,6 +17,9 @@ package org.citrusframework.simulator.service.impl; import jakarta.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; import org.citrusframework.TestCase; import org.citrusframework.exceptions.CitrusRuntimeException; import org.citrusframework.simulator.model.ScenarioExecution; @@ -37,6 +40,8 @@ import java.util.List; import java.util.Optional; +import static java.lang.String.format; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.citrusframework.simulator.service.impl.TestCaseUtil.getScenarioExecutionId; /** @@ -50,6 +55,8 @@ public class ScenarioExecutionServiceImpl implements ScenarioExecutionService { private final TimeProvider timeProvider = new TimeProvider(); + private final Map scenarioExecutionLock = new HashMap<>(); + private final ScenarioExecutionRepository scenarioExecutionRepository; public ScenarioExecutionServiceImpl(ScenarioExecutionRepository scenarioExecutionRepository) { @@ -98,7 +105,11 @@ public ScenarioExecution createAndSaveExecutionScenario(String scenarioName, @Nu } } - return save(scenarioExecution); + scenarioExecution = save(scenarioExecution); + + scenarioExecutionLock.put(scenarioExecution.getExecutionId(), new CountDownLatch(1)); + + return scenarioExecution; } @Override @@ -126,7 +137,48 @@ private ScenarioExecution completeScenarioExecution(ScenarioExecution.Status sta return scenarioExecution; }) .map(scenarioExecutionRepository::save) - .orElseThrow(() -> new CitrusRuntimeException(String.format("Error while completing ScenarioExecution for test %s", testCase.getName()))); + .map(scenarioExecution -> { + scenarioExecutionRepository.flush(); + scenarioExecutionLock.computeIfAbsent(scenarioExecution.getExecutionId(), k -> new CountDownLatch(1)).countDown(); + return scenarioExecution; + }) + .orElseThrow(()->new CitrusRuntimeException(format("Error while completing ScenarioExecution for test %s", testCase.getName()))); + } + + /** + * Completion of {@link ScenarioExecution} entity happens asynchronously, because scenarios with + * intermediate messaging must not block the main threads. It can therefore happen, under + * some circumstances, that something tries accessing a {@link ScenarioExecution} that has been + * started, but not completed yet. Trying to achieve a sync outside this execution service is + * laborious at best. + *

+ * This method waits at most {@code timeoutMilliseconds} until the execution of the scenario + * should have been completed. It returns immediately if either: + *

+ *

+ * It is guaranteed, that the updating transaction has been flushed beforehand. + *

+ * If the lock has not been released within the given timeframe (and the corresponding + * {@link ScenarioExecution} has not been completed to this time) a + * {@link CitrusRuntimeException} will be thrown. + * + * @param executionId the scenario execution id + * @param timeoutMillis timeout waiting for lock + * @throws InterruptedException if the thread has been interrupted while waiting for the lock + */ + public void awaitScenarioCompletion(Long executionId, Long timeoutMillis) throws InterruptedException { + if (!scenarioExecutionLock.containsKey(executionId)) { + return; + } + + if (scenarioExecutionLock.get(executionId).await(timeoutMillis, MILLISECONDS)) { + scenarioExecutionLock.remove(executionId); + } else { + throw new CitrusRuntimeException(format("Failed to complete scenario %s within %s milliseconds!", executionId, timeoutMillis)); + } } private static void writeCauseToErrorMessage(Throwable cause, ScenarioExecution scenarioExecution) { diff --git a/simulator-spring-boot/src/test/java/org/citrusframework/simulator/service/impl/ScenarioExecutionServiceImplTest.java b/simulator-spring-boot/src/test/java/org/citrusframework/simulator/service/impl/ScenarioExecutionServiceImplTest.java index fd980887d..cb1fd3120 100644 --- a/simulator-spring-boot/src/test/java/org/citrusframework/simulator/service/impl/ScenarioExecutionServiceImplTest.java +++ b/simulator-spring-boot/src/test/java/org/citrusframework/simulator/service/impl/ScenarioExecutionServiceImplTest.java @@ -1,5 +1,6 @@ package org.citrusframework.simulator.service.impl; +import java.util.concurrent.CountDownLatch; import org.citrusframework.TestCase; import org.citrusframework.exceptions.CitrusRuntimeException; import org.citrusframework.simulator.model.ScenarioExecution; @@ -22,7 +23,9 @@ import java.util.Optional; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -30,6 +33,8 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; +import static org.springframework.test.util.ReflectionTestUtils.getField; +import static org.springframework.test.util.ReflectionTestUtils.setField; @ExtendWith(MockitoExtension.class) class ScenarioExecutionServiceImplTest { @@ -103,19 +108,32 @@ void testCreateAndSaveExecutionScenario() { Instant now = Instant.now(); doReturn(now).when(timeProviderMock).getTimeNow(); - doAnswer(invocationOnMock -> invocationOnMock.getArgument(0, ScenarioExecution.class)).when(scenarioExecutionRepositoryMock).save(any(ScenarioExecution.class)); + long executionId = 1234L; + doAnswer(invocationOnMock -> { + ScenarioExecution scenarioExecution = invocationOnMock.getArgument(0, ScenarioExecution.class); + setField(scenarioExecution, "executionId", executionId, Long.class); + return scenarioExecution; + }).when(scenarioExecutionRepositoryMock).save(any(ScenarioExecution.class)); ScenarioParameter scenarioParameter = new ScenarioParameter(); List scenarioParameters = List.of(scenarioParameter); ScenarioExecution result = fixture.createAndSaveExecutionScenario(scenarioName, scenarioParameters); + assertEquals(executionId, result.getExecutionId()); assertEquals(scenarioName, result.getScenarioName()); assertEquals(now, result.getStartDate()); assertEquals(ScenarioExecution.Status.RUNNING, result.getStatus()); assertThat(result.getScenarioParameters()) .hasSize(1) .containsExactly(scenarioParameter); + + Map scenarioExecutionLock = getScenarioExecutionLock(); + assertNotNull(scenarioExecutionLock); + assertTrue( + scenarioExecutionLock.containsKey(result.getExecutionId()), + "Countdown Latch should have been registered!" + ); } @Nested @@ -129,6 +147,8 @@ class CompleteScenarioExecution { @BeforeEach void beforeEachSetup() { + setField(sampleScenarioExecution, "executionId", scenarioExecutionId); + Map variableDefinitions = Map.of(ScenarioExecution.EXECUTION_ID, String.valueOf(scenarioExecutionId)); doReturn(variableDefinitions).when(testCaseMock).getVariableDefinitions(); } @@ -137,11 +157,19 @@ void beforeEachSetup() { void successful() { preparePersistenceLayerMocks(); + Map scenarioExecutionLock = getScenarioExecutionLock(); + scenarioExecutionLock.put(scenarioExecutionId, new CountDownLatch(1)); + ScenarioExecution result = fixture.completeScenarioExecutionSuccess(testCaseMock); assertEquals(ScenarioExecution.Status.SUCCESS, result.getStatus()); assertEquals(now, result.getEndDate()); assertNull(result.getErrorMessage()); + + verify(scenarioExecutionRepositoryMock).save(result); + verify(scenarioExecutionRepositoryMock).flush(); + + assertEquals(0, scenarioExecutionLock.get(scenarioExecutionId).getCount()); } @Test @@ -159,6 +187,9 @@ void successfulWithNoScenarioExecutionFound() { void failed() { preparePersistenceLayerMocks(); + Map scenarioExecutionLock = getScenarioExecutionLock(); + scenarioExecutionLock.put(scenarioExecutionId, new CountDownLatch(1)); + Throwable cause = new RuntimeException("Failure cause"); ScenarioExecution result = fixture.completeScenarioExecutionFailure(testCaseMock, cause); @@ -167,6 +198,9 @@ void failed() { assertTrue(result.getErrorMessage().startsWith("java.lang.RuntimeException: Failure cause"), "Error message must contain cause!"); verify(scenarioExecutionRepositoryMock).save(result); + verify(scenarioExecutionRepositoryMock).flush(); + + assertEquals(0, scenarioExecutionLock.get(scenarioExecutionId).getCount()); } @Test @@ -186,4 +220,40 @@ private void preparePersistenceLayerMocks() { doReturn(now).when(timeProviderMock).getTimeNow(); } } + + @Nested + class AwaitScenarioCompletion { + + @Test + void returnsResolvedLock() { + long executionId = 1234L; + + Map scenarioExecutionLock = getScenarioExecutionLock(); + scenarioExecutionLock.put(executionId, new CountDownLatch(0)); + + assertDoesNotThrow(() -> fixture.awaitScenarioCompletion(executionId, 1000L)); + } + + @Test + void returnsWithoutPresentLock() { + assertDoesNotThrow(() -> fixture.awaitScenarioCompletion(1234L, 1000L)); + } + + @Test + void timesOutOnNonResolvedLock() { + long executionId = 1234L; + + Map scenarioExecutionLock = getScenarioExecutionLock(); + scenarioExecutionLock.put(executionId, new CountDownLatch(1)); + + CitrusRuntimeException exception = assertThrows(CitrusRuntimeException.class, () -> fixture.awaitScenarioCompletion(executionId, 1000L)); + assertEquals("Failed to complete scenario 1234 within 1000 milliseconds!", exception.getMessage()); + } + } + + private Map getScenarioExecutionLock() { + Map scenarioExecutionLock = (Map) getField(fixture, "scenarioExecutionLock"); + assertNotNull(scenarioExecutionLock); + return scenarioExecutionLock; + } }