Skip to content

Commit

Permalink
Merge branch 'main' into lambda-sink-stateful
Browse files Browse the repository at this point in the history
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
  • Loading branch information
srikanthjg authored Feb 10, 2025
2 parents cb9390a + b3b6c65 commit e10828e
Show file tree
Hide file tree
Showing 157 changed files with 3,589 additions and 834 deletions.
16 changes: 8 additions & 8 deletions MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ This document contains a list of maintainers in this repo. See [opensearch-proje
| Taylor Gray | [graytaylor0](https://github.com/graytaylor0) | Amazon |
| Dinu John | [dinujoh](https://github.com/dinujoh) | Amazon |
| Krishna Kondaka | [kkondaka](https://github.com/kkondaka) | Amazon |
| Asif Sohail Mohammed | [asifsmohammed](https://github.com/asifsmohammed) | Amazon |
| Karsten Schnitter | [KarstenSchnitter](https://github.com/KarstenSchnitter) | SAP |
| David Venable | [dlvenable](https://github.com/dlvenable) | Amazon |
| Hai Yan | [oeyh](https://github.com/oeyh) | Amazon |


## Emeritus

| Maintainer | GitHub ID | Affiliation |
| -------------------- | ----------------------------------------------------- | ----------- |
| Steven Bayer | [sbayer55](https://github.com/sbayer55) | Amazon |
| Christopher Manning | [cmanning09](https://github.com/cmanning09) | Amazon |
| David Powers | [dapowers87](https://github.com/dapowers87) | Amazon |
| Shivani Shukla | [sshivanii](https://github.com/sshivanii) | Amazon |
| Phill Treddenick | [treddeni-amazon](https://github.com/treddeni-amazon) | Amazon |
| Maintainer | GitHub ID | Affiliation |
| ---------------------- | ----------------------------------------------------- | ----------- |
| Steven Bayer | [sbayer55](https://github.com/sbayer55) | Amazon |
| Christopher Manning | [cmanning09](https://github.com/cmanning09) | Amazon |
| Asif Sohail Mohammed | [asifsmohammed](https://github.com/asifsmohammed) | Amazon |
| David Powers | [dapowers87](https://github.com/dapowers87) | Amazon |
| Shivani Shukla | [sshivanii](https://github.com/sshivanii) | Amazon |
| Phill Treddenick | [treddeni-amazon](https://github.com/treddeni-amazon) | Amazon |
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
public @interface AlsoRequired {
/**
* Array of Required annotations, each representing a required property with its allowed values.
* @return returns array of required values
*/
Required[] values();

Expand All @@ -25,11 +26,13 @@
@interface Required {
/**
* Name of the required property.
* @return returns name
*/
String name();

/**
* Allowed values for the required property. The default value of {} means any non-null value is allowed.
* @return returns array of allowed values
*/
String[] allowedValues() default {};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
public @interface ConditionalRequired {
/**
* Array of if-then-else requirements.
* @return returns array of if and else values
*/
IfThenElse[] value();

Expand All @@ -22,14 +23,17 @@
@interface IfThenElse {
/**
* Array of property schemas involved in if condition.
* @return returns of if schema properties
*/
SchemaProperty[] ifFulfilled();
/**
* Array of property schemas involved in then expectation.
* @return returns of then schema properties
*/
SchemaProperty[] thenExpect();
/**
* Array of property schemas involved in else expectation.
* @return returns of else schema properties
*/
SchemaProperty[] elseExpect() default {};
}
Expand All @@ -40,10 +44,12 @@
@interface SchemaProperty {
/**
* Name of the property.
* @return returns schema field
*/
String field();
/**
* Value of the property. Empty string means any non-null value is allowed.
* @return returns schema value
*/
String value() default "";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

/**
* A description of the example value.
* @return returns description
*
* @since 2.11
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public interface Processor<InputRecord extends Record<?>, OutputRecord extends R
* @since 2.11
* Indicates if the processor holds the events or not
* Holding events indicates that the events are not ready to be released.
* @return returns if events are held by the processor or not
*/
default boolean holdsEvents() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugin;

import io.micrometer.core.instrument.Counter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -14,18 +15,19 @@
import org.opensearch.dataprepper.core.event.EventFactoryApplicationContextMarker;
import org.opensearch.dataprepper.core.validation.LoggingPluginErrorsHandler;
import org.opensearch.dataprepper.core.validation.PluginErrorCollector;
import org.opensearch.dataprepper.model.plugin.NoPluginFoundException;
import org.opensearch.dataprepper.plugins.configtest.TestComponentWithConfigInject;
import org.opensearch.dataprepper.plugins.configtest.TestDISourceWithConfig;
import org.opensearch.dataprepper.validation.PluginErrorsHandler;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.NoPluginFoundException;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.plugins.TestObjectPlugin;
import org.opensearch.dataprepper.plugins.configtest.TestComponentWithConfigInject;
import org.opensearch.dataprepper.plugins.configtest.TestDISourceWithConfig;
import org.opensearch.dataprepper.plugins.test.TestComponent;
import org.opensearch.dataprepper.plugins.test.TestDISource;
import org.opensearch.dataprepper.plugins.test.TestPlugin;
import org.opensearch.dataprepper.validation.PluginErrorsHandler;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.util.Collections;
Expand Down Expand Up @@ -129,7 +131,7 @@ void loadPlugin_should_return_a_new_plugin_instance_with_DI_context_initialized(
}

@Test
void loadPlugin_should_return_a_new_plugin_instance_with_DI_context_and_config_injected() {
void loadPlugin_should_return_a_new_plugin_instance_with_DI_context_with_config_and_plugin_metrics_injected() {

final String requiredStringValue = UUID.randomUUID().toString();
final String optionalStringValue = UUID.randomUUID().toString();
Expand All @@ -152,6 +154,9 @@ void loadPlugin_should_return_a_new_plugin_instance_with_DI_context_and_config_i
assertThat(pluginConfig.getRequiredString(), equalTo(requiredStringValue));
assertThat(pluginConfig.getOptionalString(), equalTo(optionalStringValue));
assertThat(plugin.getTestComponent().getIdentifier(), equalTo("test-component-with-plugin-config-injected"));
PluginMetrics pluginMetrics = plugin.getTestComponent().getPluginMetrics();
assertInstanceOf(PluginMetrics.class, pluginMetrics);
assertInstanceOf(Counter.class, pluginMetrics.counter("testCounter"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.opensearch.dataprepper.core.pipeline;

public interface PipelineRunner {
void runAllProcessorsAndPublishToSinks();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.opensearch.dataprepper.core.pipeline;

public interface SupportsPipelineRunner {
PipelineRunner getPipelineRunner();

void setPipelineRunner(PipelineRunner pipelineRunner);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package org.opensearch.dataprepper.core.pipeline.buffer;

import com.google.common.annotations.VisibleForTesting;
import org.opensearch.dataprepper.core.pipeline.PipelineRunner;
import org.opensearch.dataprepper.core.pipeline.SupportsPipelineRunner;
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.record.Record;
import io.micrometer.core.instrument.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@DataPrepperPlugin(name = "zero", pluginType = Buffer.class)
public class ZeroBuffer<T extends Record<?>> implements Buffer<T>, SupportsPipelineRunner {
private static final Logger LOG = LoggerFactory.getLogger(ZeroBuffer.class);
private static final String PLUGIN_COMPONENT_ID = "ZeroBuffer";
private final PluginMetrics pluginMetrics;
private final ThreadLocal<Collection<T>> threadLocalStore;
private PipelineRunner pipelineRunner;
@VisibleForTesting
final String pipelineName;
private final Counter writeRecordsCounter;
private final Counter readRecordsCounter;

@DataPrepperPluginConstructor
public ZeroBuffer(PipelineDescription pipelineDescription) {
this.pluginMetrics = PluginMetrics.fromNames(PLUGIN_COMPONENT_ID, pipelineDescription.getPipelineName());
this.pipelineName = pipelineDescription.getPipelineName();
this.threadLocalStore = new ThreadLocal<>();
this.writeRecordsCounter = pluginMetrics.counter(MetricNames.RECORDS_WRITTEN);
this.readRecordsCounter = pluginMetrics.counter(MetricNames.RECORDS_READ);
}

@Override
public void write(T record, int timeoutInMillis) throws TimeoutException {
if (record == null) {
throw new NullPointerException("The write record cannot be null");
}

if (threadLocalStore.get() == null) {
threadLocalStore.set(new ArrayList<>());
}

threadLocalStore.get().add(record);
writeRecordsCounter.increment();

getPipelineRunner().runAllProcessorsAndPublishToSinks();
}

@Override
public void writeAll(Collection<T> records, int timeoutInMillis) throws Exception {
if (records == null) {
throw new NullPointerException("The write records cannot be null");
}

if (threadLocalStore.get() == null) {
threadLocalStore.set(new ArrayList<>(records));
} else {
// Add the new records to the existing records
threadLocalStore.get().addAll(records);
}

writeRecordsCounter.increment((double) records.size());
getPipelineRunner().runAllProcessorsAndPublishToSinks();
}

@Override
public Map.Entry<Collection<T>, CheckpointState> read(int timeoutInMillis) {
if (threadLocalStore.get() == null) {
threadLocalStore.set(new ArrayList<>());
}

Collection<T> storedRecords = threadLocalStore.get();
CheckpointState checkpointState = new CheckpointState(0);
if (storedRecords!= null && !storedRecords.isEmpty()) {
checkpointState = new CheckpointState(storedRecords.size());
threadLocalStore.remove();
readRecordsCounter.increment((double) storedRecords.size());
}

return Map.entry(storedRecords, checkpointState);
}

@Override
public void checkpoint(CheckpointState checkpointState) {}

@Override
public boolean isEmpty() {
return (this.threadLocalStore.get() == null || this.threadLocalStore.get().isEmpty());
}

@Override
public PipelineRunner getPipelineRunner() {
return pipelineRunner;
}

@Override
public void setPipelineRunner(PipelineRunner pipelineRunner) {
this.pipelineRunner = pipelineRunner;
}
}
Loading

0 comments on commit e10828e

Please sign in to comment.