Skip to content

Commit

Permalink
Fixing a flaky test (#5365)
Browse files Browse the repository at this point in the history
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
  • Loading branch information
san81 authored Jan 28, 2025
1 parent 440fb2d commit c37d255
Showing 1 changed file with 83 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,13 @@
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import org.mockito.Mock;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
Expand All @@ -55,9 +38,6 @@
import org.opensearch.dataprepper.plugins.lambda.common.config.ClientOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.InvocationType;
import org.opensearch.dataprepper.plugins.lambda.processor.exception.StrictResponseModeNotRespectedException;
import static org.opensearch.dataprepper.plugins.lambda.utils.LambdaTestSetupUtil.createLambdaConfigurationFromYaml;
import static org.opensearch.dataprepper.plugins.lambda.utils.LambdaTestSetupUtil.getSampleEventRecords;
import static org.opensearch.dataprepper.plugins.lambda.utils.LambdaTestSetupUtil.getSampleRecord;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
Expand All @@ -75,6 +55,28 @@
import java.util.function.Consumer;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.lambda.utils.LambdaTestSetupUtil.createLambdaConfigurationFromYaml;
import static org.opensearch.dataprepper.plugins.lambda.utils.LambdaTestSetupUtil.getSampleEventRecords;
import static org.opensearch.dataprepper.plugins.lambda.utils.LambdaTestSetupUtil.getSampleRecord;


@MockitoSettings(strictness = Strictness.LENIENT)
public class LambdaProcessorTest {
Expand Down Expand Up @@ -129,6 +131,55 @@ public class LambdaProcessorTest {
@Mock
private LambdaAsyncClient lambdaAsyncClient;

private static Stream<Arguments> getLambdaResponseConversionSamplesForStrictAndAggregateMode() {
return Stream.of(
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), null, RuntimeException.class, 0),
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), "null", StrictResponseModeNotRespectedException.class, 0),
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), "random string", JsonParseException.class, 0),
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("{}".getBytes()), StrictResponseModeNotRespectedException.class, 0),
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[]".getBytes()), StrictResponseModeNotRespectedException.class, 0),
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[{\"key\":\"val\"}]".getBytes()), null, 1),
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[{\"key\":\"val\"}, {\"key\":\"val\"}]".getBytes()), StrictResponseModeNotRespectedException.class, 0),
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(2), SdkBytes.fromByteArray("[{\"key\":\"val\"}, {\"key\":\"val\"}]".getBytes()), null, 2),
//Aggregate mode
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), null, RuntimeException.class, 0),
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), "null", null, 0),
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), "random string", JsonParseException.class, 0),
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("{}".getBytes()), null, 0),
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[]".getBytes()), null, 0),
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(2), SdkBytes.fromByteArray("[{\"key\":\"val\"}]".getBytes()), null, 1)

);
}

private static Stream<Arguments> getDoExecuteSamplesForStrictAndAggregateMode() {
List<Record<Event>> firstSample = getSampleEventRecords(1);

List<Record<Event>> secondSample = getSampleEventRecords(1);
List<Record<Event>> thirdSample = getSampleEventRecords(1);
List<Record<Event>> fourthSample = getSampleEventRecords(1);
List<Record<Event>> fifthSample = getSampleEventRecords(1);
String fifthSampleJsonString = fifthSample.get(0).getData().toJsonString();
fifthSampleJsonString = "[" + fifthSampleJsonString + "]";


return Stream.of(
arguments("lambda-processor-success-config.yaml", firstSample, null, firstSample, true),
arguments("lambda-processor-success-config.yaml", secondSample, "null", secondSample, true),
arguments("lambda-processor-success-config.yaml", thirdSample, "random string", thirdSample, true),
arguments("lambda-processor-success-config.yaml", fourthSample, SdkBytes.fromByteArray("[]".getBytes()), fourthSample, true),
arguments("lambda-processor-success-config.yaml", fifthSample, SdkBytes.fromByteArray(fifthSampleJsonString.getBytes()), fifthSample, false)/*,
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[{\"key\":\"val\"}, {\"key\":\"val\"}]".getBytes()),Collections.emptyList()),
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(2), SdkBytes.fromByteArray("[{\"key\":\"val\"}, {\"key\":\"val\"}]".getBytes()), Collections.emptyList()),
//Aggregate mode
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), null, Collections.emptyList()),
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), "null", Collections.emptyList()),
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), "random string", Collections.emptyList()),
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("{}".getBytes()), Collections.emptyList()),
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[]".getBytes()), Collections.emptyList())
*/
);
}

@BeforeEach
public void setUp() {
Expand Down Expand Up @@ -263,19 +314,27 @@ public void testDoExecute_UnableParseResponse(String configFileName) throws Exce
int recordCount = (int) (Math.random() * 100);
List<Record<Event>> records = getSampleEventRecords(recordCount);
InvokeResponse invokeResponse = mock(InvokeResponse.class);
// Setting up an invalid json that will fail at the parsing step
when(invokeResponse.payload()).thenReturn(SdkBytes.fromUtf8String("[{\"invalid_json:\"parsing_fails\"}]"));
when(invokeResponse.statusCode()).thenReturn(200); // Success status code
// Mock the invoke method to return a completed future
CompletableFuture<InvokeResponse> invokeFuture = CompletableFuture.completedFuture(invokeResponse);
when(lambdaAsyncClient.invoke(any(InvokeRequest.class))).thenReturn(invokeFuture);


// Mock Buffer to return empty payload
when(invokeResponse.payload()).thenReturn(SdkBytes.fromUtf8String("[{\"key\": \"value\"}]"));
// Processor instant creation and executing
LambdaProcessorConfig lambdaProcessorConfig = createLambdaConfigurationFromYaml(configFileName);
LambdaProcessor lambdaProcessor = new LambdaProcessor(pluginFactory, pluginSetting, lambdaProcessorConfig,
awsCredentialsSupplier, expressionEvaluator);
// Clearing up previous interactions in case if they have any
reset(numberOfRecordsSuccessCounter);
reset(numberOfRecordsFailedCounter);
populatePrivateFields(lambdaProcessor);
// Act
Collection<Record<Event>> result = lambdaProcessor.doExecute(records);

// Assert
assertEquals(recordCount, result.size(), "Result should be empty due to empty Lambda response.");
assertEquals(recordCount, result.size(),
"In the case of parsing failure, original records with failure tags should come out as response");
verify(numberOfRecordsSuccessCounter, times(0)).increment(1.0);
verify(numberOfRecordsFailedCounter, times(1)).increment(recordCount);
}
Expand Down Expand Up @@ -493,28 +552,6 @@ public void testConvertLambdaResponseToEvent_WithUnequalEventCounts_SuccessfulPr
assertEquals(3, resultRecords.size(), "ResultRecords should contain three records.");
}


private static Stream<Arguments> getLambdaResponseConversionSamplesForStrictAndAggregateMode() {
return Stream.of(
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), null, RuntimeException.class, 0),
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), "null", StrictResponseModeNotRespectedException.class, 0),
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), "random string", JsonParseException.class, 0),
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("{}".getBytes()), StrictResponseModeNotRespectedException.class, 0),
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[]".getBytes()), StrictResponseModeNotRespectedException.class, 0),
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[{\"key\":\"val\"}]".getBytes()), null, 1),
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[{\"key\":\"val\"}, {\"key\":\"val\"}]".getBytes()), StrictResponseModeNotRespectedException.class, 0),
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(2), SdkBytes.fromByteArray("[{\"key\":\"val\"}, {\"key\":\"val\"}]".getBytes()), null, 2),
//Aggregate mode
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), null, RuntimeException.class, 0),
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), "null", null, 0),
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), "random string", JsonParseException.class, 0),
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("{}".getBytes()), null, 0),
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[]".getBytes()), null, 0),
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(2), SdkBytes.fromByteArray("[{\"key\":\"val\"}]".getBytes()), null, 1)

);
}

@ParameterizedTest
@MethodSource("getLambdaResponseConversionSamplesForStrictAndAggregateMode")
public void testConvertLambdaResponseToEvent_for_strict_and_aggregate_mode(String configFile,
Expand Down Expand Up @@ -545,35 +582,6 @@ public void testConvertLambdaResponseToEvent_for_strict_and_aggregate_mode(Strin
}
}

private static Stream<Arguments> getDoExecuteSamplesForStrictAndAggregateMode() {
List<Record<Event>> firstSample = getSampleEventRecords(1);

List<Record<Event>> secondSample = getSampleEventRecords(1);
List<Record<Event>> thirdSample = getSampleEventRecords(1);
List<Record<Event>> fourthSample = getSampleEventRecords(1);
List<Record<Event>> fifthSample = getSampleEventRecords(1);
String fifthSampleJsonString = fifthSample.get(0).getData().toJsonString();
fifthSampleJsonString = "[" + fifthSampleJsonString + "]";


return Stream.of(
arguments("lambda-processor-success-config.yaml", firstSample, null, firstSample, true),
arguments("lambda-processor-success-config.yaml", secondSample, "null", secondSample, true),
arguments("lambda-processor-success-config.yaml", thirdSample, "random string",thirdSample, true),
arguments("lambda-processor-success-config.yaml", fourthSample, SdkBytes.fromByteArray("[]".getBytes()), fourthSample, true),
arguments("lambda-processor-success-config.yaml", fifthSample, SdkBytes.fromByteArray(fifthSampleJsonString.getBytes()), fifthSample, false)/*,
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[{\"key\":\"val\"}, {\"key\":\"val\"}]".getBytes()),Collections.emptyList()),
arguments("lambda-processor-success-config.yaml", getSampleEventRecords(2), SdkBytes.fromByteArray("[{\"key\":\"val\"}, {\"key\":\"val\"}]".getBytes()), Collections.emptyList()),
//Aggregate mode
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), null, Collections.emptyList()),
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), "null", Collections.emptyList()),
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), "random string", Collections.emptyList()),
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("{}".getBytes()), Collections.emptyList()),
arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[]".getBytes()), Collections.emptyList())
*/
);
}

@ParameterizedTest
@MethodSource("getDoExecuteSamplesForStrictAndAggregateMode")
public void testDoExecute_for_strict_and_aggregate_mode(String configFile,
Expand Down

0 comments on commit c37d255

Please sign in to comment.