diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java index 744105d46d..7d3a73d7a5 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java @@ -68,7 +68,7 @@ void simple_pipeline_with_single_record() { final int numRecords = 1; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -84,7 +84,7 @@ void simple_pipeline_with_multiple_records() { final int numRecords = 100; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -99,7 +99,7 @@ void two_pipelines_with_multiple_records() { final int numRecords = 100; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -114,7 +114,7 @@ void three_pipelines_with_multiple_records() { final int numRecords = 100; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -129,7 +129,7 @@ void three_pipelines_with_all_unrouted_records() { final int numRecords = 2; inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { assertTrue(inMemorySourceAccessor != null); assertTrue(inMemorySourceAccessor.getAckReceived() != null); @@ -145,7 +145,7 @@ void three_pipelines_with_route_and_multiple_records() { final int numRecords = 100; inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -161,7 +161,7 @@ void three_pipelines_with_default_route_and_multiple_records() { inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -176,7 +176,7 @@ void two_parallel_pipelines_multiple_records() { final int numRecords = 100; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -191,7 +191,7 @@ void three_pipelines_multi_sink_multiple_records() { final int numRecords = 100; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -206,7 +206,7 @@ void one_pipeline_three_sinks_multiple_records() { final int numRecords = 100; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -221,7 +221,7 @@ void one_pipeline_ack_expiry_multiple_records() { final int numRecords = 100; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -237,7 +237,7 @@ void one_pipeline_three_sinks_negative_ack_multiple_records() { inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); inMemorySinkAccessor.setResult(false); - await().atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty()));