Skip to content

Commit

Permalink
migrate Blocking Buffer and StdInSource configurations off of plugin …
Browse files Browse the repository at this point in the history
…setting (#5317)

migrate Blocking Buffer configuration off of plugin setting
Migrate StdInSource configuration off of pluginSetting

Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>
  • Loading branch information
Galactus22625 authored Feb 13, 2025
1 parent 1a41705 commit 1309cda
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 79 deletions.
2 changes: 2 additions & 0 deletions data-prepper-plugins/blocking-buffer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ plugins {
dependencies {
implementation project(':data-prepper-api')
testImplementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-annotations'
implementation 'com.fasterxml.jackson.core:jackson-databind'
}

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.AbstractBuffer;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
Expand All @@ -31,20 +33,20 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
import static org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBufferConfig.DEFAULT_BATCH_SIZE;
import static org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBufferConfig.DEFAULT_BUFFER_CAPACITY;

/**
* A bounded BlockingBuffer is an implementation of {@link Buffer} using {@link LinkedBlockingQueue}, it is bounded
* to the provided capacity {@link #ATTRIBUTE_BUFFER_CAPACITY} or {@link #DEFAULT_BUFFER_CAPACITY} (if attribute is
* to the provided capacity {@link #ATTRIBUTE_BUFFER_CAPACITY} or {@link #ATTRIBUTE_BUFFER_CAPACITY} (if attribute is
* not provided); {@link #write(Record, int)} inserts specified non-null record into this buffer, waiting up to the
* specified timeout in milliseconds if necessary for space to become available; and throws an exception if the
* record is null. {@link #read(int)} retrieves and removes the batch of records from the head of the queue. The
* batch size is defined/determined by the configuration attribute {@link #ATTRIBUTE_BATCH_SIZE} or the timeout parameter
*/
@DataPrepperPlugin(name = "bounded_blocking", pluginType = Buffer.class)
@DataPrepperPlugin(name = "bounded_blocking", pluginType = Buffer.class, pluginConfigurationType = BlockingBufferConfig.class)
public class BlockingBuffer<T extends Record<?>> extends AbstractBuffer<T> {
private static final Logger LOG = LoggerFactory.getLogger(BlockingBuffer.class);
private static final int DEFAULT_BUFFER_CAPACITY = 12_800;
private static final int DEFAULT_BATCH_SIZE = 200;
private static final String PLUGIN_NAME = "bounded_blocking";
private static final String ATTRIBUTE_BUFFER_CAPACITY = "buffer_size";
private static final String ATTRIBUTE_BATCH_SIZE = "batch_size";
Expand Down Expand Up @@ -81,18 +83,18 @@ public BlockingBuffer(final int bufferCapacity, final int batchSize, final Strin

/**
* Mandatory constructor for Data Prepper Component - This constructor is used by Data Prepper runtime engine to construct an
* instance of {@link BlockingBuffer} using an instance of {@link PluginSetting} which has access to
* pluginSetting metadata from pipeline pluginSetting file. Buffer settings like `buffer-size`, `batch-size`,
* `batch-timeout` are optional and can be passed via {@link PluginSetting}, if not present default values will
* instance of {@link BlockingBuffer} using an instance of {@link BlockingBufferConfig}. Buffer settings like `buffer-size`, `batch-size`,
* `batch-timeout` are optional and can be passed via {@link BlockingBufferConfig}, if not present default values will
* be used to create the buffer.
*
* @param pluginSetting instance with metadata information from pipeline pluginSetting file.
* @param blockingBufferConfig instance takes values from yaml
* @param pipelineDescription instance with metadata information aout pipeline man
*/
public BlockingBuffer(final PluginSetting pluginSetting) {
this(checkNotNull(pluginSetting, "PluginSetting cannot be null")
.getIntegerOrDefault(ATTRIBUTE_BUFFER_CAPACITY, DEFAULT_BUFFER_CAPACITY),
pluginSetting.getIntegerOrDefault(ATTRIBUTE_BATCH_SIZE, DEFAULT_BATCH_SIZE),
pluginSetting.getPipelineName());
@DataPrepperPluginConstructor
public BlockingBuffer(final BlockingBufferConfig blockingBufferConfig, final PipelineDescription pipelineDescription) {
this(checkNotNull(blockingBufferConfig, "BlockingBufferConfig cannot be null").getBufferSize(),
blockingBufferConfig.getBatchSize(),
pipelineDescription.getPipelineName());
}

public BlockingBuffer(final String pipelineName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.buffer.blockingbuffer;

import com.fasterxml.jackson.annotation.JsonProperty;

public class BlockingBufferConfig {
public static final int DEFAULT_BUFFER_CAPACITY = 12_800;
public static final int DEFAULT_BATCH_SIZE = 200;


@JsonProperty("buffer_size")
private int bufferSize = DEFAULT_BUFFER_CAPACITY;

public int getBufferSize() {
return bufferSize;
}

@JsonProperty("batch_size")
private int batchSize = DEFAULT_BATCH_SIZE;

public int getBatchSize() {
return batchSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -43,6 +46,8 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class BlockingBufferTests {
private static final String ATTRIBUTE_BATCH_SIZE = "batch_size";
Expand All @@ -63,25 +68,32 @@ public void setup() {
}

@Test
public void testCreationUsingPluginSetting() {
final PluginSetting completePluginSetting = completePluginSettingForBlockingBuffer();
final BlockingBuffer<Record<String>> blockingBuffer = new BlockingBuffer<>(completePluginSetting);
public void testCreationUsingBlockingBufferConfig() throws JsonProcessingException {
final BlockingBufferConfig blockingBufferConfig = completeBlockingBufferConfig();
PipelineDescription pipelineDescription = mock(PipelineDescription.class);
when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME);
final BlockingBuffer<Record<String>> blockingBuffer = new BlockingBuffer<>(blockingBufferConfig, pipelineDescription);
assertThat(blockingBuffer, notNullValue());
}

@Test
public void testCreationUsingNullPluginSetting() {
public void testCreationUsingNullBlockingBufferConfig() {
PipelineDescription pipelineDescription = mock(PipelineDescription.class);
try {
new BlockingBuffer<Record<String>>((PluginSetting) null);
new BlockingBuffer<Record<String>>(null, pipelineDescription);
} catch (NullPointerException ex) {
assertThat(ex.getMessage(), is(equalTo("PluginSetting cannot be null")));
assertThat(ex.getMessage(), is(equalTo("BlockingBufferConfig cannot be null")));
}
}

@Test
public void testCreationUsingDefaultPluginSettings() {
final BlockingBuffer<Record<String>> blockingBuffer = new BlockingBuffer<>(
BlockingBuffer.getDefaultPluginSettings());
public void testCreationUsingDefaultBlockingBufferConfig() throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
String json = "{}";
BlockingBufferConfig config = objectMapper.readValue(json, BlockingBufferConfig.class);
PipelineDescription pipelineDescription = mock(PipelineDescription.class);
when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME);
final BlockingBuffer<Record<String>> blockingBuffer = new BlockingBuffer<>(config, pipelineDescription);
assertThat(blockingBuffer, notNullValue());
}

Expand Down Expand Up @@ -197,8 +209,10 @@ public void testReadEmptyBuffer() {
@ParameterizedTest
@ValueSource(ints = {0, TEST_BATCH_READ_TIMEOUT})
public void testBatchRead(final int readTimeout) throws Exception {
final PluginSetting completePluginSetting = completePluginSettingForBlockingBuffer();
final BlockingBuffer<Record<String>> blockingBuffer = new BlockingBuffer<>(completePluginSetting);
final BlockingBufferConfig blockingBufferConfig = completeBlockingBufferConfig();
PipelineDescription pipelineDescription = mock(PipelineDescription.class);
when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME);
final BlockingBuffer<Record<String>> blockingBuffer = new BlockingBuffer<>(blockingBufferConfig, pipelineDescription);
assertThat(blockingBuffer, notNullValue());
final int testSize = 5;
for (int i = 0; i < testSize; i++) {
Expand All @@ -209,7 +223,7 @@ public void testBatchRead(final int readTimeout) throws Exception {
final Map.Entry<Collection<Record<String>>, CheckpointState> partialReadResult = blockingBuffer.read(readTimeout);
final Collection<Record<String>> partialRecords = partialReadResult.getKey();
final CheckpointState partialCheckpointState = partialReadResult.getValue();
final int expectedBatchSize = (Integer) completePluginSetting.getAttributeFromSettings(ATTRIBUTE_BATCH_SIZE);
final int expectedBatchSize = blockingBufferConfig.getBatchSize();
assertThat(partialRecords.size(), is(expectedBatchSize));
assertEquals(expectedBatchSize, partialCheckpointState.getNumRecordsToBeChecked());
int i = 0;
Expand All @@ -235,18 +249,22 @@ public void testBatchRead(final int readTimeout) throws Exception {
}

@Test
public void testBufferIsEmpty() {
final PluginSetting completePluginSetting = completePluginSettingForBlockingBuffer();
final BlockingBuffer<Record<String>> blockingBuffer = new BlockingBuffer<>(completePluginSetting);
public void testBufferIsEmpty() throws JsonProcessingException {
final BlockingBufferConfig blockingBufferConfig = completeBlockingBufferConfig();
PipelineDescription pipelineDescription = mock(PipelineDescription.class);
when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME);
final BlockingBuffer<Record<String>> blockingBuffer = new BlockingBuffer<>(blockingBufferConfig, pipelineDescription);

assertTrue(blockingBuffer.isEmpty());
verifyBufferUsageMetric(0);
}

@Test
public void testBufferIsNotEmpty() throws Exception {
final PluginSetting completePluginSetting = completePluginSettingForBlockingBuffer();
final BlockingBuffer<Record<String>> blockingBuffer = new BlockingBuffer<>(completePluginSetting);
final BlockingBufferConfig blockingBufferConfig = completeBlockingBufferConfig();
PipelineDescription pipelineDescription = mock(PipelineDescription.class);
when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME);
final BlockingBuffer<Record<String>> blockingBuffer = new BlockingBuffer<>(blockingBufferConfig, pipelineDescription);

Record<String> record = new Record<>("TEST");
blockingBuffer.write(record, TEST_WRITE_TIMEOUT);
Expand All @@ -257,8 +275,10 @@ public void testBufferIsNotEmpty() throws Exception {

@Test
void testNonZeroBatchDelayReturnsAllRecords() throws Exception {
final PluginSetting completePluginSetting = completePluginSettingForBlockingBuffer();
final BlockingBuffer<Record<String>> buffer = new BlockingBuffer<>(completePluginSetting);
final BlockingBufferConfig blockingBufferConfig = completeBlockingBufferConfig();
PipelineDescription pipelineDescription = mock(PipelineDescription.class);
when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME);
final BlockingBuffer<Record<String>> buffer = new BlockingBuffer<>(blockingBufferConfig, pipelineDescription);
assertThat(buffer, notNullValue());

final Collection<Record<String>> testRecords = generateBatchRecords(1);
Expand All @@ -283,8 +303,10 @@ void testNonZeroBatchDelayReturnsAllRecords() throws Exception {

@Test
void testZeroBatchDelayReturnsAvailableRecords() throws Exception {
final PluginSetting completePluginSetting = completePluginSettingForBlockingBuffer();
final BlockingBuffer<Record<String>> buffer = new BlockingBuffer<>(completePluginSetting);
final BlockingBufferConfig blockingBufferConfig = completeBlockingBufferConfig();
PipelineDescription pipelineDescription = mock(PipelineDescription.class);
when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME);
final BlockingBuffer<Record<String>> buffer = new BlockingBuffer<>(blockingBufferConfig, pipelineDescription);
assertThat(buffer, notNullValue());

final Collection<Record<String>> testRecords = generateBatchRecords(1);
Expand Down Expand Up @@ -346,6 +368,16 @@ private PluginSetting completePluginSettingForBlockingBuffer() {
return testSettings;
}

private BlockingBufferConfig completeBlockingBufferConfig() throws JsonProcessingException {
final Map<String, Object> settings = new HashMap<>();
settings.put(ATTRIBUTE_BUFFER_SIZE, TEST_BUFFER_SIZE);
settings.put(ATTRIBUTE_BATCH_SIZE, TEST_BATCH_SIZE);
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(settings);
BlockingBufferConfig blockingBufferConfig = objectMapper.readValue(json, BlockingBufferConfig.class);
return blockingBufferConfig;
}

private Collection<Record<String>> generateBatchRecords(final int numRecords) {
final Collection<Record<String>> results = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
package org.opensearch.dataprepper.plugins.source;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
Expand All @@ -25,7 +26,7 @@
* A simple source which reads data from console each line at a time. It exits when it reads case insensitive "exit"
* from console or if Pipeline notifies to stop.
*/
@DataPrepperPlugin(name = "stdin", pluginType = Source.class)
@DataPrepperPlugin(name = "stdin", pluginType = Source.class, pluginConfigurationType = StdInSourceConfig.class)
public class StdInSource implements Source<Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(StdInSource.class);
private static final String ATTRIBUTE_TIMEOUT = "write_timeout";
Expand All @@ -37,16 +38,16 @@ public class StdInSource implements Source<Record<Event>> {

/**
* Mandatory constructor for Data Prepper Component - This constructor is used by Data Prepper
* runtime engine to construct an instance of {@link StdInSource} using an instance of {@link PluginSetting} which
* has access to pluginSetting metadata from pipeline
* pluginSetting file.
* runtime engine to construct an instance of {@link StdInSource} using an instance of {@link StdInSourceConfig}
*
* @param pluginSetting instance with metadata information from pipeline pluginSetting file.
* @param stdInSourceConfig The configuration instance for {@link StdInSource}
* @param pipelineDescription The pipeline description which has access to pipeline Name
*/
public StdInSource(final PluginSetting pluginSetting) {
this(checkNotNull(pluginSetting, "PluginSetting cannot be null")
.getIntegerOrDefault(ATTRIBUTE_TIMEOUT, WRITE_TIMEOUT),
pluginSetting.getPipelineName());
@DataPrepperPluginConstructor
public StdInSource(final StdInSourceConfig stdInSourceConfig, final PipelineDescription pipelineDescription) {
this(checkNotNull(stdInSourceConfig, "StdInSourceConfig cannot be null")
.getWriteTimeout(),
pipelineDescription.getPipelineName());
}

public StdInSource(final int writeTimeout, final String pipelineName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source;

import com.fasterxml.jackson.annotation.JsonProperty;

public class StdInSourceConfig {
private static final int WRITE_TIMEOUT = 5_000;

@JsonProperty("write_timeout")
private int writeTimeout = WRITE_TIMEOUT;

public int getWriteTimeout() {
return writeTimeout;
}
}
Loading

0 comments on commit 1309cda

Please sign in to comment.