From 1309cda83d6a859c15020c7ce24137b57b745081 Mon Sep 17 00:00:00 2001 From: Maxwell Brown <55033421+Galactus22625@users.noreply.github.com> Date: Thu, 13 Feb 2025 10:55:31 -0800 Subject: [PATCH] migrate Blocking Buffer and StdInSource configurations off of plugin setting (#5317) migrate Blocking Buffer configuration off of plugin setting Migrate StdInSource configuration off of pluginSetting Signed-off-by: Maxwell Brown --- .../blocking-buffer/build.gradle | 2 + .../buffer/blockingbuffer/BlockingBuffer.java | 28 +++---- .../blockingbuffer/BlockingBufferConfig.java | 28 +++++++ .../blockingbuffer/BlockingBufferTests.java | 74 +++++++++++++------ .../plugins/source/StdInSource.java | 21 +++--- .../plugins/source/StdInSourceConfig.java | 19 +++++ .../plugins/source/StdInSourceTests.java | 25 +++++-- .../plugins/source/file/FileSourceTests.java | 17 +++-- .../source/loghttp/HTTPSourceTest.java | 26 ++++--- .../consumer/KafkaCustomConsumerTest.java | 24 ++++-- .../OpenSearchAPISourceTest.java | 15 ++-- 11 files changed, 200 insertions(+), 79 deletions(-) create mode 100644 data-prepper-plugins/blocking-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferConfig.java create mode 100644 data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/StdInSourceConfig.java diff --git a/data-prepper-plugins/blocking-buffer/build.gradle b/data-prepper-plugins/blocking-buffer/build.gradle index dd4f1ed39a..70e63a8352 100644 --- a/data-prepper-plugins/blocking-buffer/build.gradle +++ b/data-prepper-plugins/blocking-buffer/build.gradle @@ -9,6 +9,8 @@ plugins { dependencies { implementation project(':data-prepper-api') testImplementation 'io.micrometer:micrometer-core' + implementation 'com.fasterxml.jackson.core:jackson-annotations' + implementation 'com.fasterxml.jackson.core:jackson-databind' } jacocoTestCoverageVerification { diff --git a/data-prepper-plugins/blocking-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBuffer.java b/data-prepper-plugins/blocking-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBuffer.java index aac67e6775..a208e59e31 100644 --- a/data-prepper-plugins/blocking-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBuffer.java +++ b/data-prepper-plugins/blocking-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBuffer.java @@ -9,9 +9,11 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.CheckpointState; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.AbstractBuffer; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.buffer.SizeOverflowException; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; import org.slf4j.Logger; @@ -31,20 +33,20 @@ import static com.google.common.base.Preconditions.checkNotNull; import static java.lang.String.format; +import static org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBufferConfig.DEFAULT_BATCH_SIZE; +import static org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBufferConfig.DEFAULT_BUFFER_CAPACITY; /** * A bounded BlockingBuffer is an implementation of {@link Buffer} using {@link LinkedBlockingQueue}, it is bounded - * to the provided capacity {@link #ATTRIBUTE_BUFFER_CAPACITY} or {@link #DEFAULT_BUFFER_CAPACITY} (if attribute is + * to the provided capacity {@link #ATTRIBUTE_BUFFER_CAPACITY} or {@link #ATTRIBUTE_BUFFER_CAPACITY} (if attribute is * not provided); {@link #write(Record, int)} inserts specified non-null record into this buffer, waiting up to the * specified timeout in milliseconds if necessary for space to become available; and throws an exception if the * record is null. {@link #read(int)} retrieves and removes the batch of records from the head of the queue. The * batch size is defined/determined by the configuration attribute {@link #ATTRIBUTE_BATCH_SIZE} or the timeout parameter */ -@DataPrepperPlugin(name = "bounded_blocking", pluginType = Buffer.class) +@DataPrepperPlugin(name = "bounded_blocking", pluginType = Buffer.class, pluginConfigurationType = BlockingBufferConfig.class) public class BlockingBuffer> extends AbstractBuffer { private static final Logger LOG = LoggerFactory.getLogger(BlockingBuffer.class); - private static final int DEFAULT_BUFFER_CAPACITY = 12_800; - private static final int DEFAULT_BATCH_SIZE = 200; private static final String PLUGIN_NAME = "bounded_blocking"; private static final String ATTRIBUTE_BUFFER_CAPACITY = "buffer_size"; private static final String ATTRIBUTE_BATCH_SIZE = "batch_size"; @@ -81,18 +83,18 @@ public BlockingBuffer(final int bufferCapacity, final int batchSize, final Strin /** * Mandatory constructor for Data Prepper Component - This constructor is used by Data Prepper runtime engine to construct an - * instance of {@link BlockingBuffer} using an instance of {@link PluginSetting} which has access to - * pluginSetting metadata from pipeline pluginSetting file. Buffer settings like `buffer-size`, `batch-size`, - * `batch-timeout` are optional and can be passed via {@link PluginSetting}, if not present default values will + * instance of {@link BlockingBuffer} using an instance of {@link BlockingBufferConfig}. Buffer settings like `buffer-size`, `batch-size`, + * `batch-timeout` are optional and can be passed via {@link BlockingBufferConfig}, if not present default values will * be used to create the buffer. * - * @param pluginSetting instance with metadata information from pipeline pluginSetting file. + * @param blockingBufferConfig instance takes values from yaml + * @param pipelineDescription instance with metadata information aout pipeline man */ - public BlockingBuffer(final PluginSetting pluginSetting) { - this(checkNotNull(pluginSetting, "PluginSetting cannot be null") - .getIntegerOrDefault(ATTRIBUTE_BUFFER_CAPACITY, DEFAULT_BUFFER_CAPACITY), - pluginSetting.getIntegerOrDefault(ATTRIBUTE_BATCH_SIZE, DEFAULT_BATCH_SIZE), - pluginSetting.getPipelineName()); + @DataPrepperPluginConstructor + public BlockingBuffer(final BlockingBufferConfig blockingBufferConfig, final PipelineDescription pipelineDescription) { + this(checkNotNull(blockingBufferConfig, "BlockingBufferConfig cannot be null").getBufferSize(), + blockingBufferConfig.getBatchSize(), + pipelineDescription.getPipelineName()); } public BlockingBuffer(final String pipelineName) { diff --git a/data-prepper-plugins/blocking-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferConfig.java b/data-prepper-plugins/blocking-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferConfig.java new file mode 100644 index 0000000000..0700b0c68d --- /dev/null +++ b/data-prepper-plugins/blocking-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferConfig.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.buffer.blockingbuffer; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class BlockingBufferConfig { + public static final int DEFAULT_BUFFER_CAPACITY = 12_800; + public static final int DEFAULT_BATCH_SIZE = 200; + + + @JsonProperty("buffer_size") + private int bufferSize = DEFAULT_BUFFER_CAPACITY; + + public int getBufferSize() { + return bufferSize; + } + + @JsonProperty("batch_size") + private int batchSize = DEFAULT_BATCH_SIZE; + + public int getBatchSize() { + return batchSize; + } +} diff --git a/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java b/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java index f3f28db174..a296a4ffef 100644 --- a/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java +++ b/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java @@ -19,8 +19,11 @@ import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.model.CheckpointState; import org.opensearch.dataprepper.model.buffer.SizeOverflowException; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import java.util.ArrayList; import java.util.Collection; @@ -43,6 +46,8 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class BlockingBufferTests { private static final String ATTRIBUTE_BATCH_SIZE = "batch_size"; @@ -63,25 +68,32 @@ public void setup() { } @Test - public void testCreationUsingPluginSetting() { - final PluginSetting completePluginSetting = completePluginSettingForBlockingBuffer(); - final BlockingBuffer> blockingBuffer = new BlockingBuffer<>(completePluginSetting); + public void testCreationUsingBlockingBufferConfig() throws JsonProcessingException { + final BlockingBufferConfig blockingBufferConfig = completeBlockingBufferConfig(); + PipelineDescription pipelineDescription = mock(PipelineDescription.class); + when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + final BlockingBuffer> blockingBuffer = new BlockingBuffer<>(blockingBufferConfig, pipelineDescription); assertThat(blockingBuffer, notNullValue()); } @Test - public void testCreationUsingNullPluginSetting() { + public void testCreationUsingNullBlockingBufferConfig() { + PipelineDescription pipelineDescription = mock(PipelineDescription.class); try { - new BlockingBuffer>((PluginSetting) null); + new BlockingBuffer>(null, pipelineDescription); } catch (NullPointerException ex) { - assertThat(ex.getMessage(), is(equalTo("PluginSetting cannot be null"))); + assertThat(ex.getMessage(), is(equalTo("BlockingBufferConfig cannot be null"))); } } @Test - public void testCreationUsingDefaultPluginSettings() { - final BlockingBuffer> blockingBuffer = new BlockingBuffer<>( - BlockingBuffer.getDefaultPluginSettings()); + public void testCreationUsingDefaultBlockingBufferConfig() throws JsonProcessingException { + ObjectMapper objectMapper = new ObjectMapper(); + String json = "{}"; + BlockingBufferConfig config = objectMapper.readValue(json, BlockingBufferConfig.class); + PipelineDescription pipelineDescription = mock(PipelineDescription.class); + when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + final BlockingBuffer> blockingBuffer = new BlockingBuffer<>(config, pipelineDescription); assertThat(blockingBuffer, notNullValue()); } @@ -197,8 +209,10 @@ public void testReadEmptyBuffer() { @ParameterizedTest @ValueSource(ints = {0, TEST_BATCH_READ_TIMEOUT}) public void testBatchRead(final int readTimeout) throws Exception { - final PluginSetting completePluginSetting = completePluginSettingForBlockingBuffer(); - final BlockingBuffer> blockingBuffer = new BlockingBuffer<>(completePluginSetting); + final BlockingBufferConfig blockingBufferConfig = completeBlockingBufferConfig(); + PipelineDescription pipelineDescription = mock(PipelineDescription.class); + when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + final BlockingBuffer> blockingBuffer = new BlockingBuffer<>(blockingBufferConfig, pipelineDescription); assertThat(blockingBuffer, notNullValue()); final int testSize = 5; for (int i = 0; i < testSize; i++) { @@ -209,7 +223,7 @@ public void testBatchRead(final int readTimeout) throws Exception { final Map.Entry>, CheckpointState> partialReadResult = blockingBuffer.read(readTimeout); final Collection> partialRecords = partialReadResult.getKey(); final CheckpointState partialCheckpointState = partialReadResult.getValue(); - final int expectedBatchSize = (Integer) completePluginSetting.getAttributeFromSettings(ATTRIBUTE_BATCH_SIZE); + final int expectedBatchSize = blockingBufferConfig.getBatchSize(); assertThat(partialRecords.size(), is(expectedBatchSize)); assertEquals(expectedBatchSize, partialCheckpointState.getNumRecordsToBeChecked()); int i = 0; @@ -235,9 +249,11 @@ public void testBatchRead(final int readTimeout) throws Exception { } @Test - public void testBufferIsEmpty() { - final PluginSetting completePluginSetting = completePluginSettingForBlockingBuffer(); - final BlockingBuffer> blockingBuffer = new BlockingBuffer<>(completePluginSetting); + public void testBufferIsEmpty() throws JsonProcessingException { + final BlockingBufferConfig blockingBufferConfig = completeBlockingBufferConfig(); + PipelineDescription pipelineDescription = mock(PipelineDescription.class); + when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + final BlockingBuffer> blockingBuffer = new BlockingBuffer<>(blockingBufferConfig, pipelineDescription); assertTrue(blockingBuffer.isEmpty()); verifyBufferUsageMetric(0); @@ -245,8 +261,10 @@ public void testBufferIsEmpty() { @Test public void testBufferIsNotEmpty() throws Exception { - final PluginSetting completePluginSetting = completePluginSettingForBlockingBuffer(); - final BlockingBuffer> blockingBuffer = new BlockingBuffer<>(completePluginSetting); + final BlockingBufferConfig blockingBufferConfig = completeBlockingBufferConfig(); + PipelineDescription pipelineDescription = mock(PipelineDescription.class); + when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + final BlockingBuffer> blockingBuffer = new BlockingBuffer<>(blockingBufferConfig, pipelineDescription); Record record = new Record<>("TEST"); blockingBuffer.write(record, TEST_WRITE_TIMEOUT); @@ -257,8 +275,10 @@ public void testBufferIsNotEmpty() throws Exception { @Test void testNonZeroBatchDelayReturnsAllRecords() throws Exception { - final PluginSetting completePluginSetting = completePluginSettingForBlockingBuffer(); - final BlockingBuffer> buffer = new BlockingBuffer<>(completePluginSetting); + final BlockingBufferConfig blockingBufferConfig = completeBlockingBufferConfig(); + PipelineDescription pipelineDescription = mock(PipelineDescription.class); + when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + final BlockingBuffer> buffer = new BlockingBuffer<>(blockingBufferConfig, pipelineDescription); assertThat(buffer, notNullValue()); final Collection> testRecords = generateBatchRecords(1); @@ -283,8 +303,10 @@ void testNonZeroBatchDelayReturnsAllRecords() throws Exception { @Test void testZeroBatchDelayReturnsAvailableRecords() throws Exception { - final PluginSetting completePluginSetting = completePluginSettingForBlockingBuffer(); - final BlockingBuffer> buffer = new BlockingBuffer<>(completePluginSetting); + final BlockingBufferConfig blockingBufferConfig = completeBlockingBufferConfig(); + PipelineDescription pipelineDescription = mock(PipelineDescription.class); + when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + final BlockingBuffer> buffer = new BlockingBuffer<>(blockingBufferConfig, pipelineDescription); assertThat(buffer, notNullValue()); final Collection> testRecords = generateBatchRecords(1); @@ -346,6 +368,16 @@ private PluginSetting completePluginSettingForBlockingBuffer() { return testSettings; } + private BlockingBufferConfig completeBlockingBufferConfig() throws JsonProcessingException { + final Map settings = new HashMap<>(); + settings.put(ATTRIBUTE_BUFFER_SIZE, TEST_BUFFER_SIZE); + settings.put(ATTRIBUTE_BATCH_SIZE, TEST_BATCH_SIZE); + ObjectMapper objectMapper = new ObjectMapper(); + String json = objectMapper.writeValueAsString(settings); + BlockingBufferConfig blockingBufferConfig = objectMapper.readValue(json, BlockingBufferConfig.class); + return blockingBufferConfig; + } + private Collection> generateBatchRecords(final int numRecords) { final Collection> results = new ArrayList<>(); for (int i = 0; i < numRecords; i++) { diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/StdInSource.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/StdInSource.java index bac92a988a..6ec3cfd3ca 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/StdInSource.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/StdInSource.java @@ -6,8 +6,9 @@ package org.opensearch.dataprepper.plugins.source; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; @@ -25,7 +26,7 @@ * A simple source which reads data from console each line at a time. It exits when it reads case insensitive "exit" * from console or if Pipeline notifies to stop. */ -@DataPrepperPlugin(name = "stdin", pluginType = Source.class) +@DataPrepperPlugin(name = "stdin", pluginType = Source.class, pluginConfigurationType = StdInSourceConfig.class) public class StdInSource implements Source> { private static final Logger LOG = LoggerFactory.getLogger(StdInSource.class); private static final String ATTRIBUTE_TIMEOUT = "write_timeout"; @@ -37,16 +38,16 @@ public class StdInSource implements Source> { /** * Mandatory constructor for Data Prepper Component - This constructor is used by Data Prepper - * runtime engine to construct an instance of {@link StdInSource} using an instance of {@link PluginSetting} which - * has access to pluginSetting metadata from pipeline - * pluginSetting file. + * runtime engine to construct an instance of {@link StdInSource} using an instance of {@link StdInSourceConfig} * - * @param pluginSetting instance with metadata information from pipeline pluginSetting file. + * @param stdInSourceConfig The configuration instance for {@link StdInSource} + * @param pipelineDescription The pipeline description which has access to pipeline Name */ - public StdInSource(final PluginSetting pluginSetting) { - this(checkNotNull(pluginSetting, "PluginSetting cannot be null") - .getIntegerOrDefault(ATTRIBUTE_TIMEOUT, WRITE_TIMEOUT), - pluginSetting.getPipelineName()); + @DataPrepperPluginConstructor + public StdInSource(final StdInSourceConfig stdInSourceConfig, final PipelineDescription pipelineDescription) { + this(checkNotNull(stdInSourceConfig, "StdInSourceConfig cannot be null") + .getWriteTimeout(), + pipelineDescription.getPipelineName()); } public StdInSource(final int writeTimeout, final String pipelineName) { diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/StdInSourceConfig.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/StdInSourceConfig.java new file mode 100644 index 0000000000..66ce3a17c0 --- /dev/null +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/StdInSourceConfig.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class StdInSourceConfig { + private static final int WRITE_TIMEOUT = 5_000; + + @JsonProperty("write_timeout") + private int writeTimeout = WRITE_TIMEOUT; + + public int getWriteTimeout() { + return writeTimeout; + } +} diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/StdInSourceTests.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/StdInSourceTests.java index d4a42751ad..d442061d8f 100644 --- a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/StdInSourceTests.java +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/StdInSourceTests.java @@ -5,11 +5,13 @@ package org.opensearch.dataprepper.plugins.source; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.CheckpointState; -import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.buffer.TestBuffer; @@ -18,6 +20,7 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.Queue; @@ -27,6 +30,8 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; class StdInSourceTests { private static final String SOURCE_CONTENT = "THIS IS A TEST\nexit"; @@ -56,10 +61,15 @@ void testStdInSourceCreationUsingParameters() { } @Test - void testStdInSourceCreationUsingPluginSetting() { - final PluginSetting pluginSetting = new PluginSetting("stdin", null); - pluginSetting.setPipelineName(TEST_PIPELINE_NAME); - final StdInSource stdInSource = new StdInSource(pluginSetting); + void testStdInSourceCreationUsingStdInSourceConfig() throws JsonProcessingException { + final HashMap configMap = new HashMap<>(); + configMap.put("write_timeout", 5_000); + ObjectMapper objectMapper = new ObjectMapper(); + String json = objectMapper.writeValueAsString(configMap); + StdInSourceConfig stdInSourceConfig = objectMapper.readValue(json, StdInSourceConfig.class); + PipelineDescription pipelineDescription = mock(PipelineDescription.class); + when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + final StdInSource stdInSource = new StdInSource(stdInSourceConfig, pipelineDescription); assertThat(stdInSource, notNullValue()); } @@ -74,10 +84,11 @@ void testStdInSourceCreationWithNullPipelineName() { @Test void testStdInSourceCreationWithNullPluginSetting() { + PipelineDescription pipelineDescription = mock(PipelineDescription.class); try { - new StdInSource(null); + new StdInSource(null, pipelineDescription); } catch (NullPointerException ex) { - assertThat(ex.getMessage(), is(equalTo("PluginSetting cannot be null"))); + assertThat(ex.getMessage(), is(equalTo("StdInSourceConfig cannot be null"))); } } diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java index aedacdcbb2..346548e4c8 100644 --- a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java @@ -19,12 +19,14 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventBuilder; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; +import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBufferConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +74,9 @@ public class FileSourceTests { @Mock private PluginFactory pluginFactory; + @Mock + private PipelineDescription pipelineDescription; + @BeforeEach void setUp() { pluginSettings = new HashMap<>(); @@ -102,7 +107,7 @@ class WithRecord { @BeforeEach - public void setup() { + public void setup() throws JsonProcessingException { expectedEventsPlain = new ArrayList<>(); expectedEventsJson = new ArrayList<>(); expectedEventsInvalidJson = new ArrayList<>(); @@ -139,13 +144,15 @@ public void setup() { buffer = getBuffer(); } - private BlockingBuffer> getBuffer() { + private BlockingBuffer> getBuffer() throws JsonProcessingException { final HashMap integerHashMap = new HashMap<>(); integerHashMap.put("buffer_size", 2); integerHashMap.put("batch_size", 2); - final PluginSetting pluginSetting = new PluginSetting("blocking_buffer", integerHashMap); - pluginSetting.setPipelineName(TEST_PIPELINE_NAME); - return new BlockingBuffer<>(pluginSetting); + ObjectMapper objectMapper = new ObjectMapper(); + String json = objectMapper.writeValueAsString(integerHashMap); + BlockingBufferConfig blockingBufferConfig = objectMapper.readValue(json, BlockingBufferConfig.class); + when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + return new BlockingBuffer<>(blockingBufferConfig, pipelineDescription); } @Test diff --git a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java index f51666db59..ef0c22c771 100644 --- a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java +++ b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.source.loghttp; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.linecorp.armeria.client.ClientFactory; import com.linecorp.armeria.client.ResponseTimeoutException; import com.linecorp.armeria.client.WebClient; @@ -50,6 +52,7 @@ import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.HttpBasicArmeriaHttpAuthenticationProvider; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; +import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBufferConfig; import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; import org.opensearch.dataprepper.plugins.certificate.model.Certificate; import org.opensearch.dataprepper.plugins.codec.CompressionOption; @@ -131,6 +134,9 @@ class HTTPSourceTest { @Mock private Certificate certificate; + @Mock + private PipelineDescription pipelineDescription; + private BlockingBuffer> testBuffer; private HTTPSource HTTPSourceUnderTest; private List requestsReceivedMeasurements; @@ -145,15 +151,16 @@ class HTTPSourceTest { private HTTPSourceConfig sourceConfig; private PluginMetrics pluginMetrics; private PluginFactory pluginFactory; - private PipelineDescription pipelineDescription; - private BlockingBuffer> getBuffer() { + private BlockingBuffer> getBuffer() throws JsonProcessingException { final HashMap integerHashMap = new HashMap<>(); integerHashMap.put("buffer_size", 1); integerHashMap.put("batch_size", 1); - final PluginSetting pluginSetting = new PluginSetting("blocking_buffer", integerHashMap); - pluginSetting.setPipelineName(TEST_PIPELINE_NAME); - return new BlockingBuffer<>(pluginSetting); + ObjectMapper objectMapper = new ObjectMapper(); + String json = objectMapper.writeValueAsString(integerHashMap); + BlockingBufferConfig blockingBufferConfig = objectMapper.readValue(json, BlockingBufferConfig.class); + when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + return new BlockingBuffer<>(blockingBufferConfig, pipelineDescription); } /** @@ -201,7 +208,7 @@ private byte[] createGZipCompressedPayload(final String payload) throws IOExcept } @BeforeEach - public void setUp() { + public void setUp() throws JsonProcessingException { lenient().when(serverBuilder.annotatedService(any())).thenReturn(serverBuilder); lenient().when(serverBuilder.http(anyInt())).thenReturn(serverBuilder); lenient().when(serverBuilder.https(anyInt())).thenReturn(serverBuilder); @@ -227,7 +234,6 @@ public void setUp() { .thenReturn(authenticationProvider); testBuffer = getBuffer(); - pipelineDescription = mock(PipelineDescription.class); when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); } @@ -691,7 +697,7 @@ void testServerStartACMCertNull() throws NoSuchFieldException, IllegalAccessExce @Test - void testHTTPSJsonResponse() { + void testHTTPSJsonResponse() throws JsonProcessingException { reset(sourceConfig); when(sourceConfig.getPort()).thenReturn(2021); when(sourceConfig.getPath()).thenReturn(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI); @@ -721,7 +727,7 @@ void testHTTPSJsonResponse() { @Test - void testHTTPRequestWhenSSLRequiredNoResponse() { + void testHTTPRequestWhenSSLRequiredNoResponse() throws JsonProcessingException { reset(sourceConfig); when(sourceConfig.getPort()).thenReturn(2021); when(sourceConfig.getPath()).thenReturn(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI); @@ -757,7 +763,7 @@ void testHTTPRequestWhenSSLRequiredNoResponse() { } @Test - void testHTTPSJsonResponse_with_custom_path_along_with_placeholder() { + void testHTTPSJsonResponse_with_custom_path_along_with_placeholder() throws JsonProcessingException { reset(sourceConfig); when(sourceConfig.getPort()).thenReturn(2021); when(sourceConfig.getPath()).thenReturn("/${pipelineName}/test"); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java index 208682e2c0..1bbc60ecdb 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java @@ -7,6 +7,7 @@ import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException; import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.core.instrument.Counter; @@ -29,13 +30,14 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.buffer.SizeOverflowException; -import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; +import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBufferConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicConsumerMetrics; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; @@ -47,8 +49,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import static org.awaitility.Awaitility.await; @@ -89,6 +91,9 @@ public class KafkaCustomConsumerTest { @Mock private KafkaTopicConsumerMetrics topicMetrics; + @Mock + private PipelineDescription pipelineDescription; + @Mock private PauseConsumePredicate pauseConsumePredicate; @@ -123,7 +128,7 @@ public class KafkaCustomConsumerTest { private boolean resumed; @BeforeEach - public void setUp() { + public void setUp() throws JsonProcessingException { delayTime = Duration.ofMillis(10); paused = false; resumed = false; @@ -190,13 +195,15 @@ public KafkaCustomConsumer createObjectUnderTest(String schemaType, boolean ackn acknowledgementSetManager, null, topicMetrics, pauseConsumePredicate); } - private BlockingBuffer> getBuffer() { + private BlockingBuffer> getBuffer() throws JsonProcessingException { final HashMap integerHashMap = new HashMap<>(); integerHashMap.put("buffer_size", 10); integerHashMap.put("batch_size", 10); - final PluginSetting pluginSetting = new PluginSetting("blocking_buffer", integerHashMap); - pluginSetting.setPipelineName(TEST_PIPELINE_NAME); - return new BlockingBuffer<>(pluginSetting); + ObjectMapper objectMapper = new ObjectMapper(); + String json = objectMapper.writeValueAsString(integerHashMap); + BlockingBufferConfig blockingBufferConfig = objectMapper.readValue(json, BlockingBufferConfig.class); + when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + return new BlockingBuffer<>(blockingBufferConfig, pipelineDescription); } @Test @@ -272,6 +279,7 @@ public void testBufferOverflowPauseResume() throws InterruptedException, Excepti Assertions.assertNotNull(event.getEventHandle().getExternalOriginationTime()); } } + @Test public void testPlainTextConsumeRecords() throws InterruptedException { String topic = topicConfig.getName(); diff --git a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceTest.java b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceTest.java index d02b6f3932..c9d77b77fa 100644 --- a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceTest.java +++ b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceTest.java @@ -50,6 +50,7 @@ import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.HttpBasicArmeriaHttpAuthenticationProvider; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; +import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBufferConfig; import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.plugins.source.opensearchapi.model.BulkAPIEventMetadataKeyAttributes; @@ -108,6 +109,9 @@ class OpenSearchAPISourceTest { @Mock private CompletableFuture completableFuture; + @Mock + private PipelineDescription pipelineDescription; + private BlockingBuffer> testBuffer; private OpenSearchAPISource openSearchAPISource; private List requestsReceivedMeasurements; @@ -122,15 +126,16 @@ class OpenSearchAPISourceTest { private OpenSearchAPISourceConfig sourceConfig; private PluginMetrics pluginMetrics; private PluginFactory pluginFactory; - private PipelineDescription pipelineDescription; - private BlockingBuffer> getBuffer() { + private BlockingBuffer> getBuffer() throws JsonProcessingException { final HashMap integerHashMap = new HashMap<>(); integerHashMap.put("buffer_size", 1); integerHashMap.put("batch_size", 1); - final PluginSetting pluginSetting = new PluginSetting("blocking_buffer", integerHashMap); - pluginSetting.setPipelineName(testPipelineName); - return new BlockingBuffer<>(pluginSetting); + ObjectMapper objectMapper = new ObjectMapper(); + String json = objectMapper.writeValueAsString(integerHashMap); + BlockingBufferConfig blockingBufferConfig = objectMapper.readValue(json, BlockingBufferConfig.class); + when(pipelineDescription.getPipelineName()).thenReturn(testPipelineName); + return new BlockingBuffer<>(blockingBufferConfig, pipelineDescription); } /**