Skip to content

Commit

Permalink
feat: scenario completion tracking
Browse files Browse the repository at this point in the history
Completion of scenario executions happens asynchronously, because
scenarios with intermediate messaging must not block the main threads.
Trying to achieve a sync outside the execution service is laborious at
best.
This implementation solves this problem.
  • Loading branch information
bbortt committed Nov 30, 2023
1 parent 6f5e857 commit 53302c5
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ public Message dispatchMessage(Message request, String mappingName) {
scenario = applicationContext.getBean(scenarioName, SimulatorScenario.class);
} else {
scenarioName = configuration.getDefaultScenario();
logger.info("Unable to find scenario for mapping '{}' - " +
"using default scenario '{}'", mappingName, scenarioName);
logger.info("Unable to find scenario for mapping '{}' - using default scenario '{}'", mappingName, scenarioName);
scenario = applicationContext.getBean(scenarioName, SimulatorScenario.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public void onTestFinish(TestCase test) {
@Override
public void onTestSuccess(TestCase test) {
TestResult result;
if (test instanceof DefaultTestCase) {
result = TestResult.success(test.getName(), test.getTestClass().getSimpleName(), ((DefaultTestCase)test).getParameters());
if (test instanceof DefaultTestCase defaultTestCase) {
result = TestResult.success(test.getName(), test.getTestClass().getSimpleName(), defaultTestCase.getParameters());
} else {
result = TestResult.success(test.getName(), test.getTestClass().getSimpleName());
}
Expand All @@ -98,8 +98,8 @@ public void onTestSuccess(TestCase test) {
@Override
public void onTestFailure(TestCase test, Throwable cause) {
TestResult result;
if (test instanceof DefaultTestCase) {
result = TestResult.failed(test.getName(), test.getTestClass().getSimpleName(), cause, ((DefaultTestCase)test).getParameters());
if (test instanceof DefaultTestCase defaultTestCase) {
result = TestResult.failed(test.getName(), test.getTestClass().getSimpleName(), cause, defaultTestCase.getParameters());
} else {
result = TestResult.failed(test.getName(), test.getTestClass().getSimpleName(), cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package org.citrusframework.simulator.service.impl;

import jakarta.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.citrusframework.TestCase;
import org.citrusframework.exceptions.CitrusRuntimeException;
import org.citrusframework.simulator.model.ScenarioExecution;
Expand All @@ -37,6 +40,8 @@
import java.util.List;
import java.util.Optional;

import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.citrusframework.simulator.service.impl.TestCaseUtil.getScenarioExecutionId;

/**
Expand All @@ -50,6 +55,8 @@ public class ScenarioExecutionServiceImpl implements ScenarioExecutionService {

private final TimeProvider timeProvider = new TimeProvider();

private final Map<Long, CountDownLatch> scenarioExecutionLock = new HashMap<>();

private final ScenarioExecutionRepository scenarioExecutionRepository;

public ScenarioExecutionServiceImpl(ScenarioExecutionRepository scenarioExecutionRepository) {
Expand Down Expand Up @@ -98,7 +105,11 @@ public ScenarioExecution createAndSaveExecutionScenario(String scenarioName, @Nu
}
}

return save(scenarioExecution);
scenarioExecution = save(scenarioExecution);

scenarioExecutionLock.put(scenarioExecution.getExecutionId(), new CountDownLatch(1));

return scenarioExecution;
}

@Override
Expand Down Expand Up @@ -126,7 +137,48 @@ private ScenarioExecution completeScenarioExecution(ScenarioExecution.Status sta
return scenarioExecution;
})
.map(scenarioExecutionRepository::save)
.orElseThrow(() -> new CitrusRuntimeException(String.format("Error while completing ScenarioExecution for test %s", testCase.getName())));
.map(scenarioExecution -> {
scenarioExecutionRepository.flush();
scenarioExecutionLock.computeIfAbsent(scenarioExecution.getExecutionId(), k -> new CountDownLatch(1)).countDown();
return scenarioExecution;
})
.orElseThrow(()->new CitrusRuntimeException(format("Error while completing ScenarioExecution for test %s", testCase.getName())));
}

/**
* Completion of {@link ScenarioExecution} entity happens asynchronously, because scenarios with
* intermediate messaging <b>must</b> not block the main threads. It can therefore happen, under
* some circumstances, that something tries accessing a {@link ScenarioExecution} that has been
* started, but not completed yet. Trying to achieve a sync outside this execution service is
* laborious at best.
* <p>
* This method waits at most {@code timeoutMilliseconds} until the execution of the scenario
* should have been completed. It returns immediately if either:
* <ul>
* <li>no lock exists for this execution id, or</li>
* <li>the lock is no longer held by the simulator.</li>
* </ul>
* <p>
* It is guaranteed, that the updating transaction has been flushed beforehand.
* <p>
* If the lock has not been released within the given timeframe (and the corresponding
* {@link ScenarioExecution} has not been completed to this time) a
* {@link CitrusRuntimeException} will be thrown.
*
* @param executionId the scenario execution id
* @param timeoutMillis timeout waiting for lock
* @throws InterruptedException if the thread has been interrupted while waiting for the lock
*/
public void awaitScenarioCompletion(Long executionId, Long timeoutMillis) throws InterruptedException {
if (!scenarioExecutionLock.containsKey(executionId)) {
return;
}

if (scenarioExecutionLock.get(executionId).await(timeoutMillis, MILLISECONDS)) {
scenarioExecutionLock.remove(executionId);
} else {
throw new CitrusRuntimeException(format("Failed to complete scenario %s within %s milliseconds!", executionId, timeoutMillis));
}
}

private static void writeCauseToErrorMessage(Throwable cause, ScenarioExecution scenarioExecution) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.citrusframework.simulator.service.impl;

import java.util.concurrent.CountDownLatch;
import org.citrusframework.TestCase;
import org.citrusframework.exceptions.CitrusRuntimeException;
import org.citrusframework.simulator.model.ScenarioExecution;
Expand All @@ -22,14 +23,18 @@
import java.util.Optional;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
import static org.springframework.test.util.ReflectionTestUtils.getField;
import static org.springframework.test.util.ReflectionTestUtils.setField;

@ExtendWith(MockitoExtension.class)
class ScenarioExecutionServiceImplTest {
Expand Down Expand Up @@ -103,19 +108,32 @@ void testCreateAndSaveExecutionScenario() {
Instant now = Instant.now();
doReturn(now).when(timeProviderMock).getTimeNow();

doAnswer(invocationOnMock -> invocationOnMock.getArgument(0, ScenarioExecution.class)).when(scenarioExecutionRepositoryMock).save(any(ScenarioExecution.class));
long executionId = 1234L;
doAnswer(invocationOnMock -> {
ScenarioExecution scenarioExecution = invocationOnMock.getArgument(0, ScenarioExecution.class);
setField(scenarioExecution, "executionId", executionId, Long.class);
return scenarioExecution;
}).when(scenarioExecutionRepositoryMock).save(any(ScenarioExecution.class));

ScenarioParameter scenarioParameter = new ScenarioParameter();
List<ScenarioParameter> scenarioParameters = List.of(scenarioParameter);

ScenarioExecution result = fixture.createAndSaveExecutionScenario(scenarioName, scenarioParameters);

assertEquals(executionId, result.getExecutionId());
assertEquals(scenarioName, result.getScenarioName());
assertEquals(now, result.getStartDate());
assertEquals(ScenarioExecution.Status.RUNNING, result.getStatus());
assertThat(result.getScenarioParameters())
.hasSize(1)
.containsExactly(scenarioParameter);

Map<Long, CountDownLatch> scenarioExecutionLock = getScenarioExecutionLock();
assertNotNull(scenarioExecutionLock);
assertTrue(
scenarioExecutionLock.containsKey(result.getExecutionId()),
"Countdown Latch should have been registered!"
);
}

@Nested
Expand All @@ -129,6 +147,8 @@ class CompleteScenarioExecution {

@BeforeEach
void beforeEachSetup() {
setField(sampleScenarioExecution, "executionId", scenarioExecutionId);

Map<String, String> variableDefinitions = Map.of(ScenarioExecution.EXECUTION_ID, String.valueOf(scenarioExecutionId));
doReturn(variableDefinitions).when(testCaseMock).getVariableDefinitions();
}
Expand All @@ -137,11 +157,19 @@ void beforeEachSetup() {
void successful() {
preparePersistenceLayerMocks();

Map<Long, CountDownLatch> scenarioExecutionLock = getScenarioExecutionLock();
scenarioExecutionLock.put(scenarioExecutionId, new CountDownLatch(1));

ScenarioExecution result = fixture.completeScenarioExecutionSuccess(testCaseMock);

assertEquals(ScenarioExecution.Status.SUCCESS, result.getStatus());
assertEquals(now, result.getEndDate());
assertNull(result.getErrorMessage());

verify(scenarioExecutionRepositoryMock).save(result);
verify(scenarioExecutionRepositoryMock).flush();

assertEquals(0, scenarioExecutionLock.get(scenarioExecutionId).getCount());
}

@Test
Expand All @@ -159,6 +187,9 @@ void successfulWithNoScenarioExecutionFound() {
void failed() {
preparePersistenceLayerMocks();

Map<Long, CountDownLatch> scenarioExecutionLock = getScenarioExecutionLock();
scenarioExecutionLock.put(scenarioExecutionId, new CountDownLatch(1));

Throwable cause = new RuntimeException("Failure cause");
ScenarioExecution result = fixture.completeScenarioExecutionFailure(testCaseMock, cause);

Expand All @@ -167,6 +198,9 @@ void failed() {
assertTrue(result.getErrorMessage().startsWith("java.lang.RuntimeException: Failure cause"), "Error message must contain cause!");

verify(scenarioExecutionRepositoryMock).save(result);
verify(scenarioExecutionRepositoryMock).flush();

assertEquals(0, scenarioExecutionLock.get(scenarioExecutionId).getCount());
}

@Test
Expand All @@ -186,4 +220,40 @@ private void preparePersistenceLayerMocks() {
doReturn(now).when(timeProviderMock).getTimeNow();
}
}

@Nested
class AwaitScenarioCompletion {

@Test
void returnsResolvedLock() {
long executionId = 1234L;

Map<Long, CountDownLatch> scenarioExecutionLock = getScenarioExecutionLock();
scenarioExecutionLock.put(executionId, new CountDownLatch(0));

assertDoesNotThrow(() -> fixture.awaitScenarioCompletion(executionId, 1000L));
}

@Test
void returnsWithoutPresentLock() {
assertDoesNotThrow(() -> fixture.awaitScenarioCompletion(1234L, 1000L));
}

@Test
void timesOutOnNonResolvedLock() {
long executionId = 1234L;

Map<Long, CountDownLatch> scenarioExecutionLock = getScenarioExecutionLock();
scenarioExecutionLock.put(executionId, new CountDownLatch(1));

CitrusRuntimeException exception = assertThrows(CitrusRuntimeException.class, () -> fixture.awaitScenarioCompletion(executionId, 1000L));
assertEquals("Failed to complete scenario 1234 within 1000 milliseconds!", exception.getMessage());
}
}

private Map<Long, CountDownLatch> getScenarioExecutionLock() {
Map<Long, CountDownLatch> scenarioExecutionLock = (Map<Long, CountDownLatch>) getField(fixture, "scenarioExecutionLock");
assertNotNull(scenarioExecutionLock);
return scenarioExecutionLock;
}
}

0 comments on commit 53302c5

Please sign in to comment.