Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zero Buffer Implementation and Tests #5416

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.opensearch.dataprepper.core.pipeline;

public interface PipelineRunner {
void runAllProcessorsAndPublishToSinks();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.opensearch.dataprepper.core.pipeline.buffer;

import org.opensearch.dataprepper.core.pipeline.PipelineRunner;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;

/**
* Represents the base class for zero buffer implementation and implements {@link Buffer} interface.
* It provides a common functionality to run all processors and sinks within the same thread context.
*/
public abstract class AbstractZeroBuffer <T extends Record<?>> implements Buffer<T> {
private PipelineRunner pipelineRunner;

protected void runAllProcessorsAndPublishToSinks() {
// TODO : Implement functionality to call the processors and sinks within the same context
getPipelineRunner().runAllProcessorsAndPublishToSinks();
}

public PipelineRunner getPipelineRunner() {
return pipelineRunner;
}

public void setPipelineRunner(PipelineRunner pipelineRunner) {
this.pipelineRunner = pipelineRunner;
}
}
27 changes: 27 additions & 0 deletions data-prepper-plugins/zero-buffer/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-core')
implementation 'io.micrometer:micrometer-core'
implementation 'org.projectlombok:lombok:1.18.30'
annotationProcessor 'org.projectlombok:lombok:1.18.30'
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule { //in addition to core projects rule
limit {
minimum = 0.90
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package org.opensearch.dataprepper.plugins.buffer.zerobuffer;

import org.opensearch.dataprepper.core.pipeline.buffer.AbstractZeroBuffer;
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.record.Record;
import io.micrometer.core.instrument.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.Getter;
import lombok.AccessLevel;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@DataPrepperPlugin(name = "zero_buffer", pluginType = Buffer.class)
public class ZeroBuffer<T extends Record<?>> extends AbstractZeroBuffer<T> {
private static final Logger LOG = LoggerFactory.getLogger(ZeroBuffer.class);
private static final String PLUGIN_COMPONENT_ID = "ZeroBuffer";
private final PluginMetrics pluginMetrics;
private final ThreadLocal<Collection<T>> threadLocalStore;
@Getter(value = AccessLevel.PACKAGE)
private final String pipelineName;
@Getter(value = AccessLevel.PACKAGE)
private final Counter writeRecordsCounter;
@Getter(value = AccessLevel.PACKAGE)
private final Counter readRecordsCounter;

@DataPrepperPluginConstructor
public ZeroBuffer(PipelineDescription pipelineDescription) {
this.pluginMetrics = PluginMetrics.fromNames(PLUGIN_COMPONENT_ID, pipelineDescription.getPipelineName());
this.pipelineName = pipelineDescription.getPipelineName();
this.threadLocalStore = new ThreadLocal<>();
this.writeRecordsCounter = pluginMetrics.counter(MetricNames.RECORDS_WRITTEN);
this.readRecordsCounter = pluginMetrics.counter(MetricNames.RECORDS_READ);
}

@Override
public void write(T record, int timeoutInMillis) throws TimeoutException {
if (record == null) {
throw new NullPointerException("The write record cannot be null");
}

if (threadLocalStore.get() == null) {
threadLocalStore.set(new ArrayList<>());
}

threadLocalStore.get().add(record);
writeRecordsCounter.increment();

runAllProcessorsAndPublishToSinks();
}

@Override
public void writeAll(Collection<T> records, int timeoutInMillis) throws Exception {
if (records == null) {
throw new NullPointerException("The write records cannot be null");
}

if (threadLocalStore.get() == null) {
threadLocalStore.set(records);
} else {
// Add the new records to the existing records
threadLocalStore.get().addAll(records);
}

writeRecordsCounter.increment(records.size() * 1.0);
runAllProcessorsAndPublishToSinks();
}

@Override
public Map.Entry<Collection<T>, CheckpointState> read(int timeoutInMillis) {
if (threadLocalStore.get() == null) {
threadLocalStore.set(new ArrayList<>());
}

Collection<T> storedRecords = threadLocalStore.get();
CheckpointState checkpointState = new CheckpointState(0);
if (storedRecords!= null && !storedRecords.isEmpty()) {
checkpointState = new CheckpointState(storedRecords.size());
threadLocalStore.remove();
readRecordsCounter.increment(storedRecords.size() * 1.0);
}

return new AbstractMap.SimpleEntry<>(storedRecords, checkpointState);
}

@Override
public void checkpoint(CheckpointState checkpointState) {}

@Override
public boolean isEmpty() {
return (this.threadLocalStore.get() == null || this.threadLocalStore.get().isEmpty());
}
}
Loading
Loading