From b3b6c650c1b564bec253ea5fa7bc1b34b16b903b Mon Sep 17 00:00:00 2001 From: Mohammed Aghil Puthiyottil <57040494+MohammedAghil@users.noreply.github.com> Date: Fri, 7 Feb 2025 17:16:27 -0800 Subject: [PATCH] Zero Buffer Implementation and Tests (#5416) * Zero Buffer Implementation and Tests Signed-off-by: Mohammed Aghil Puthiyottil <57040494+MohammedAghil@users.noreply.github.com> * Moved ZeroBuffer Implementation into data-prepper-core and addressed comments Signed-off-by: Mohammed Aghil Puthiyottil <57040494+MohammedAghil@users.noreply.github.com> * Modified ZeroBufferTests to use MockitoExtension and addressed comments Signed-off-by: Mohammed Aghil Puthiyottil <57040494+MohammedAghil@users.noreply.github.com> --------- Signed-off-by: Mohammed Aghil Puthiyottil <57040494+MohammedAghil@users.noreply.github.com> --- .../core/pipeline/PipelineRunner.java | 5 + .../core/pipeline/SupportsPipelineRunner.java | 7 + .../core/pipeline/buffer/ZeroBuffer.java | 110 +++++++ .../core/pipeline/buffer/ZeroBufferTests.java | 276 ++++++++++++++++++ 4 files changed, 398 insertions(+) create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunner.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/SupportsPipelineRunner.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/buffer/ZeroBuffer.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/buffer/ZeroBufferTests.java diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunner.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunner.java new file mode 100644 index 0000000000..be7a4595cc --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunner.java @@ -0,0 +1,5 @@ +package org.opensearch.dataprepper.core.pipeline; + +public interface PipelineRunner { + void runAllProcessorsAndPublishToSinks(); +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/SupportsPipelineRunner.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/SupportsPipelineRunner.java new file mode 100644 index 0000000000..2496293f7c --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/SupportsPipelineRunner.java @@ -0,0 +1,7 @@ +package org.opensearch.dataprepper.core.pipeline; + +public interface SupportsPipelineRunner { + PipelineRunner getPipelineRunner(); + + void setPipelineRunner(PipelineRunner pipelineRunner); +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/buffer/ZeroBuffer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/buffer/ZeroBuffer.java new file mode 100644 index 0000000000..cebed1b6a3 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/buffer/ZeroBuffer.java @@ -0,0 +1,110 @@ +package org.opensearch.dataprepper.core.pipeline.buffer; + +import com.google.common.annotations.VisibleForTesting; +import org.opensearch.dataprepper.core.pipeline.PipelineRunner; +import org.opensearch.dataprepper.core.pipeline.SupportsPipelineRunner; +import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.model.record.Record; +import io.micrometer.core.instrument.Counter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +@DataPrepperPlugin(name = "zero", pluginType = Buffer.class) +public class ZeroBuffer> implements Buffer, SupportsPipelineRunner { + private static final Logger LOG = LoggerFactory.getLogger(ZeroBuffer.class); + private static final String PLUGIN_COMPONENT_ID = "ZeroBuffer"; + private final PluginMetrics pluginMetrics; + private final ThreadLocal> threadLocalStore; + private PipelineRunner pipelineRunner; + @VisibleForTesting + final String pipelineName; + private final Counter writeRecordsCounter; + private final Counter readRecordsCounter; + + @DataPrepperPluginConstructor + public ZeroBuffer(PipelineDescription pipelineDescription) { + this.pluginMetrics = PluginMetrics.fromNames(PLUGIN_COMPONENT_ID, pipelineDescription.getPipelineName()); + this.pipelineName = pipelineDescription.getPipelineName(); + this.threadLocalStore = new ThreadLocal<>(); + this.writeRecordsCounter = pluginMetrics.counter(MetricNames.RECORDS_WRITTEN); + this.readRecordsCounter = pluginMetrics.counter(MetricNames.RECORDS_READ); + } + + @Override + public void write(T record, int timeoutInMillis) throws TimeoutException { + if (record == null) { + throw new NullPointerException("The write record cannot be null"); + } + + if (threadLocalStore.get() == null) { + threadLocalStore.set(new ArrayList<>()); + } + + threadLocalStore.get().add(record); + writeRecordsCounter.increment(); + + getPipelineRunner().runAllProcessorsAndPublishToSinks(); + } + + @Override + public void writeAll(Collection records, int timeoutInMillis) throws Exception { + if (records == null) { + throw new NullPointerException("The write records cannot be null"); + } + + if (threadLocalStore.get() == null) { + threadLocalStore.set(new ArrayList<>(records)); + } else { + // Add the new records to the existing records + threadLocalStore.get().addAll(records); + } + + writeRecordsCounter.increment((double) records.size()); + getPipelineRunner().runAllProcessorsAndPublishToSinks(); + } + + @Override + public Map.Entry, CheckpointState> read(int timeoutInMillis) { + if (threadLocalStore.get() == null) { + threadLocalStore.set(new ArrayList<>()); + } + + Collection storedRecords = threadLocalStore.get(); + CheckpointState checkpointState = new CheckpointState(0); + if (storedRecords!= null && !storedRecords.isEmpty()) { + checkpointState = new CheckpointState(storedRecords.size()); + threadLocalStore.remove(); + readRecordsCounter.increment((double) storedRecords.size()); + } + + return Map.entry(storedRecords, checkpointState); + } + + @Override + public void checkpoint(CheckpointState checkpointState) {} + + @Override + public boolean isEmpty() { + return (this.threadLocalStore.get() == null || this.threadLocalStore.get().isEmpty()); + } + + @Override + public PipelineRunner getPipelineRunner() { + return pipelineRunner; + } + + @Override + public void setPipelineRunner(PipelineRunner pipelineRunner) { + this.pipelineRunner = pipelineRunner; + } +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/buffer/ZeroBufferTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/buffer/ZeroBufferTests.java new file mode 100644 index 0000000000..653b2b4f8c --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/buffer/ZeroBufferTests.java @@ -0,0 +1,276 @@ +package org.opensearch.dataprepper.core.pipeline.buffer; + +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mock; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.core.pipeline.PipelineRunner; +import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.model.record.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.TimeoutException; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@ExtendWith(MockitoExtension.class) +public class ZeroBufferTests { + private static final Logger LOG = LoggerFactory.getLogger(ZeroBufferTests.class); + private static final String MOCK_PIPELINE_NAME = "mock-pipeline"; + private static final int WRITE_TIMEOUT = 100; + private static final int READ_TIMEOUT = 500; + private static final String SINGLE_RECORD_DATA_FORMAT = "{\"message\":\"test\"}"; + private static final String BATCH_RECORDS_DATA_FORMAT = "{\"message\":\"test-%d\"}"; + @Mock + PipelineDescription pipelineDescription; + @Mock + PipelineRunner pipelineRunner; + + + @BeforeEach + public void setup() { + new ArrayList<>(Metrics.globalRegistry.getRegistries()) + .forEach(Metrics.globalRegistry::remove); + new ArrayList<>(Metrics.globalRegistry.getMeters()) + .forEach(Metrics.globalRegistry::remove); + Metrics.addRegistry(new SimpleMeterRegistry()); + } + + @Nested + class WriteTests { + @Test + public void testSingleWriteAndReadReturnsCorrectRecord() throws Exception { + ZeroBuffer> zeroBuffer = createObjectUnderTest(); + doNothing().when(pipelineRunner).runAllProcessorsAndPublishToSinks(); + + zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); + + Collection> readRecords = zeroBuffer.read(READ_TIMEOUT).getKey(); + assertEquals(1, readRecords.size()); + + assertEquals(SINGLE_RECORD_DATA_FORMAT, readRecords.iterator().next().getData()); + + zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); + readRecords = zeroBuffer.read(READ_TIMEOUT).getKey(); + assertEquals(1, readRecords.size()); + verify(pipelineRunner, times(2)).runAllProcessorsAndPublishToSinks(); + } + + @Test + public void testMultipleWriteAndReadReturnsCorrectRecord() throws Exception { + ZeroBuffer> zeroBuffer = createObjectUnderTest(); + doNothing().when(pipelineRunner).runAllProcessorsAndPublishToSinks(); + + zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); + zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); + + Collection> readRecords = zeroBuffer.read(READ_TIMEOUT).getKey(); + assertEquals(2, readRecords.size()); + assertEquals(SINGLE_RECORD_DATA_FORMAT, readRecords.iterator().next().getData()); + assertEquals(SINGLE_RECORD_DATA_FORMAT, readRecords.iterator().next().getData()); + verify(pipelineRunner, times(2)).runAllProcessorsAndPublishToSinks(); + } + + @Test + public void testWriteAllAndReadReturnsAllRecords() throws Exception { + ZeroBuffer> zeroBuffer = createObjectUnderTest(); + doNothing().when(pipelineRunner).runAllProcessorsAndPublishToSinks(); + + Collection> writeRecords = generateRecords(IntStream.range(0, 10) + .mapToObj(i -> String.format(BATCH_RECORDS_DATA_FORMAT, i)) + .collect(Collectors.toList())); + zeroBuffer.writeAll(writeRecords, WRITE_TIMEOUT); + + Map.Entry>, CheckpointState> readRecordsMap = zeroBuffer.read(READ_TIMEOUT); + Collection> readRecords = readRecordsMap.getKey(); + for (Record record : readRecords) { + LOG.debug(record.getData()); + } + + // Ensure that the write records are the same as the read records + assertEquals(writeRecords.size(), readRecords.size()); + verify(pipelineRunner).runAllProcessorsAndPublishToSinks(); + } + + @Test + public void testWriteNullRecordThrowsException() { + ZeroBuffer> zeroBuffer = createObjectUnderTest(); + + Exception writeException = assertThrows(NullPointerException.class, () -> { + zeroBuffer.write(null, WRITE_TIMEOUT); + }); + + Exception writeAllException = assertThrows(NullPointerException.class, () -> { + zeroBuffer.writeAll(null, WRITE_TIMEOUT); + }); + + assertEquals("The write record cannot be null", writeException.getMessage()); + assertEquals("The write records cannot be null", writeAllException.getMessage()); + verify(pipelineRunner, never()).runAllProcessorsAndPublishToSinks(); + } + + @Test + public void testWriteEmptyRecordDoesNotThrowException() { + ZeroBuffer> zeroBuffer = createObjectUnderTest(); + doNothing().when(pipelineRunner).runAllProcessorsAndPublishToSinks(); + + Record emptyRecord = generateRecord(null); + Collection> emptyRecordCollection = generateRecords(new ArrayList<>()); + + assertDoesNotThrow(() -> zeroBuffer.write(emptyRecord, WRITE_TIMEOUT)); + assertDoesNotThrow(() -> zeroBuffer.writeAll(emptyRecordCollection, WRITE_TIMEOUT)); + verify(pipelineRunner, times(2)).runAllProcessorsAndPublishToSinks(); + } + + @Test + public void testThreadReadAndWriteIsolation() throws Exception { + final ZeroBuffer> zeroBuffer = createObjectUnderTestWithPipelineName(); + + Thread workerThread = new Thread(() -> { + try { + PipelineRunner pipelineRunnerMock = mock(PipelineRunner.class); + zeroBuffer.setPipelineRunner(pipelineRunnerMock); + doNothing().when(pipelineRunnerMock).runAllProcessorsAndPublishToSinks(); + zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); + verify(pipelineRunnerMock).runAllProcessorsAndPublishToSinks(); + } catch (TimeoutException e) { + fail("Timeout exception occurred"); + } + }); + workerThread.start(); + workerThread.join(); + + // Ensure that main thread does not share the same records store as the worker thread + assertEquals(0, zeroBuffer.read(READ_TIMEOUT).getKey().size()); + assertTrue(zeroBuffer.isEmpty()); + verify(pipelineRunner, never()).runAllProcessorsAndPublishToSinks(); + } + + @Test + public void testWriteAndWriteAllReturnsCorrectRecords() throws Exception { + ZeroBuffer> zeroBuffer = createObjectUnderTest(); + doNothing().when(pipelineRunner).runAllProcessorsAndPublishToSinks(); + + zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); + zeroBuffer.writeAll(generateRecords(IntStream.range(0, 10) + .mapToObj(i -> String.format(BATCH_RECORDS_DATA_FORMAT, i)) + .collect(Collectors.toList())), WRITE_TIMEOUT); + + Collection> readRecords = zeroBuffer.read(READ_TIMEOUT).getKey(); + for (Record record : readRecords) { + LOG.debug(record.getData()); + } + assertEquals(11, readRecords.size()); + verify(pipelineRunner, times(2)).runAllProcessorsAndPublishToSinks(); + } + } + + @Nested + class ReadTests { + @Test + public void testReadFromNonEmptyBufferReturnsCorrectRecords() throws Exception { + ZeroBuffer> zeroBuffer = createObjectUnderTest(); + + zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); + + Collection> initialReadRecords = zeroBuffer.read(READ_TIMEOUT).getKey(); + Collection> secondAttemptToReadRecords = zeroBuffer.read(READ_TIMEOUT).getKey(); + + assertEquals(1, initialReadRecords.size()); + assertEquals(SINGLE_RECORD_DATA_FORMAT, initialReadRecords.iterator().next().getData()); + + assertEquals(0, secondAttemptToReadRecords.size()); + verify(pipelineRunner).runAllProcessorsAndPublishToSinks(); + } + + @Test + public void testReadFromEmptyBufferReturnsNoRecords() { + ZeroBuffer> zeroBuffer = createObjectUnderTest(); + + Map.Entry>, CheckpointState> readRecordsMap = zeroBuffer.read(READ_TIMEOUT); + assertTrue(readRecordsMap.getKey().isEmpty()); + verify(pipelineRunner, never()).runAllProcessorsAndPublishToSinks(); + } + } + + @Nested + class EmptyBufferTests { + @Test + public void testIsEmptyReturnsTrueWhenBufferIsEmpty() { + ZeroBuffer> zeroBuffer = createObjectUnderTestWithPipelineName(); + assertTrue(zeroBuffer.isEmpty()); + verify(pipelineRunner, never()).runAllProcessorsAndPublishToSinks(); + } + + @Test + public void testIsEmptyReturnsFalseWhenBufferIsNotEmpty() throws Exception { + ZeroBuffer> zeroBuffer = createObjectUnderTest(); + + zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); + + assertFalse(zeroBuffer.isEmpty()); + verify(pipelineRunner).runAllProcessorsAndPublishToSinks(); + } + } + + @Nested + class CommonTests { + @Test + public void testCreateZeroBufferWithPipelineName() { + ZeroBuffer> zeroBuffer = createObjectUnderTestWithPipelineName(); + assertEquals(MOCK_PIPELINE_NAME, zeroBuffer.pipelineName); + } + + @Test + public void testCheckpointDoesNotThrowException() { + ZeroBuffer> zeroBuffer = createObjectUnderTest(); + assertDoesNotThrow(() -> zeroBuffer.checkpoint(null)); + assertDoesNotThrow(() -> zeroBuffer.checkpoint(new CheckpointState(0))); + } + } + + /*-------------------------Private Helper Methods---------------------------*/ + private Record generateRecord(final T data) { + return new Record<>(data); + } + + private Collection> generateRecords(Collection data) { + Collection> records = new ArrayList<>(); + for (T recordData : data) { + Record record = new Record<>(recordData); + records.add(record); + } + return records; + } + + private ZeroBuffer> createObjectUnderTest() { + ZeroBuffer> zeroBuffer = createObjectUnderTestWithPipelineName(); + zeroBuffer.setPipelineRunner(pipelineRunner); + return zeroBuffer; + } + + private ZeroBuffer> createObjectUnderTestWithPipelineName() { + when(pipelineDescription.getPipelineName()).thenReturn(MOCK_PIPELINE_NAME); + return new ZeroBuffer<>(pipelineDescription); + } +}