Skip to content

Commit

Permalink
Zero Buffer Implementation and Tests (#5416)
Browse files Browse the repository at this point in the history
* Zero Buffer Implementation and Tests

Signed-off-by: Mohammed Aghil Puthiyottil <57040494+MohammedAghil@users.noreply.github.com>

* Moved ZeroBuffer Implementation into data-prepper-core and addressed comments

Signed-off-by: Mohammed Aghil Puthiyottil <57040494+MohammedAghil@users.noreply.github.com>

* Modified ZeroBufferTests to use MockitoExtension and addressed comments

Signed-off-by: Mohammed Aghil Puthiyottil <57040494+MohammedAghil@users.noreply.github.com>

---------

Signed-off-by: Mohammed Aghil Puthiyottil <57040494+MohammedAghil@users.noreply.github.com>
  • Loading branch information
MohammedAghil authored Feb 8, 2025
1 parent 815ddc0 commit b3b6c65
Show file tree
Hide file tree
Showing 4 changed files with 398 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.opensearch.dataprepper.core.pipeline;

public interface PipelineRunner {
void runAllProcessorsAndPublishToSinks();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.opensearch.dataprepper.core.pipeline;

public interface SupportsPipelineRunner {
PipelineRunner getPipelineRunner();

void setPipelineRunner(PipelineRunner pipelineRunner);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package org.opensearch.dataprepper.core.pipeline.buffer;

import com.google.common.annotations.VisibleForTesting;
import org.opensearch.dataprepper.core.pipeline.PipelineRunner;
import org.opensearch.dataprepper.core.pipeline.SupportsPipelineRunner;
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.record.Record;
import io.micrometer.core.instrument.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@DataPrepperPlugin(name = "zero", pluginType = Buffer.class)
public class ZeroBuffer<T extends Record<?>> implements Buffer<T>, SupportsPipelineRunner {
private static final Logger LOG = LoggerFactory.getLogger(ZeroBuffer.class);
private static final String PLUGIN_COMPONENT_ID = "ZeroBuffer";
private final PluginMetrics pluginMetrics;
private final ThreadLocal<Collection<T>> threadLocalStore;
private PipelineRunner pipelineRunner;
@VisibleForTesting
final String pipelineName;
private final Counter writeRecordsCounter;
private final Counter readRecordsCounter;

@DataPrepperPluginConstructor
public ZeroBuffer(PipelineDescription pipelineDescription) {
this.pluginMetrics = PluginMetrics.fromNames(PLUGIN_COMPONENT_ID, pipelineDescription.getPipelineName());
this.pipelineName = pipelineDescription.getPipelineName();
this.threadLocalStore = new ThreadLocal<>();
this.writeRecordsCounter = pluginMetrics.counter(MetricNames.RECORDS_WRITTEN);
this.readRecordsCounter = pluginMetrics.counter(MetricNames.RECORDS_READ);
}

@Override
public void write(T record, int timeoutInMillis) throws TimeoutException {
if (record == null) {
throw new NullPointerException("The write record cannot be null");
}

if (threadLocalStore.get() == null) {
threadLocalStore.set(new ArrayList<>());
}

threadLocalStore.get().add(record);
writeRecordsCounter.increment();

getPipelineRunner().runAllProcessorsAndPublishToSinks();
}

@Override
public void writeAll(Collection<T> records, int timeoutInMillis) throws Exception {
if (records == null) {
throw new NullPointerException("The write records cannot be null");
}

if (threadLocalStore.get() == null) {
threadLocalStore.set(new ArrayList<>(records));
} else {
// Add the new records to the existing records
threadLocalStore.get().addAll(records);
}

writeRecordsCounter.increment((double) records.size());
getPipelineRunner().runAllProcessorsAndPublishToSinks();
}

@Override
public Map.Entry<Collection<T>, CheckpointState> read(int timeoutInMillis) {
if (threadLocalStore.get() == null) {
threadLocalStore.set(new ArrayList<>());
}

Collection<T> storedRecords = threadLocalStore.get();
CheckpointState checkpointState = new CheckpointState(0);
if (storedRecords!= null && !storedRecords.isEmpty()) {
checkpointState = new CheckpointState(storedRecords.size());
threadLocalStore.remove();
readRecordsCounter.increment((double) storedRecords.size());
}

return Map.entry(storedRecords, checkpointState);
}

@Override
public void checkpoint(CheckpointState checkpointState) {}

@Override
public boolean isEmpty() {
return (this.threadLocalStore.get() == null || this.threadLocalStore.get().isEmpty());
}

@Override
public PipelineRunner getPipelineRunner() {
return pipelineRunner;
}

@Override
public void setPipelineRunner(PipelineRunner pipelineRunner) {
this.pipelineRunner = pipelineRunner;
}
}
Loading

0 comments on commit b3b6c65

Please sign in to comment.