Skip to content

Commit

Permalink
Move PipelineSource into PipelineContext and explicitly pass to create
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Froh <froh@amazon.com>
  • Loading branch information
msfroh committed Jun 29, 2023
1 parent edc9f3b commit e01d8a1
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ public FilterQueryRequestProcessor create(
Map<String, Processor.Factory<SearchRequestProcessor>> processorFactories,
String tag,
String description,
Map<String, Object> config
Map<String, Object> config,
PipelineContext pipelineContext
) throws Exception {
Map<String, Object> query = ConfigurationUtils.readOptionalMap(TYPE, tag, config, QUERY_KEY);
if (query == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ public RenameFieldResponseProcessor create(
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
Map<String, Object> config
Map<String, Object> config,
PipelineContext pipelineContext
) throws Exception {
String oldField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
String newField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,22 +141,13 @@ public Factory(ScriptService scriptService) {
this.scriptService = scriptService;
}

/**
* Creates a new instance of {@link ScriptRequestProcessor}.
*
* @param registry The registry of processor factories.
* @param processorTag The processor's tag.
* @param description The processor's description.
* @param config The configuration options for the processor.
* @return The created {@link ScriptRequestProcessor} instance.
* @throws Exception if an error occurs during the creation process.
*/
@Override
public ScriptRequestProcessor create(
Map<String, Processor.Factory<SearchRequestProcessor>> registry,
String processorTag,
String description,
Map<String, Object> config
Map<String, Object> config,
PipelineContext pipelineContext
) throws Exception {
Map<String, Object> scriptConfig = new HashMap<>();
for (String key : SCRIPT_CONFIG_KEYS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public SearchPipelineCommonModulePlugin() {}
* @return A map of processor factories, where the keys are the processor types and the values are the corresponding factory instances.
*/
@Override
public Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Processor.Parameters parameters) {
public Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Parameters parameters) {
return Map.of(
FilterQueryRequestProcessor.TYPE,
new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry),
Expand All @@ -43,7 +43,7 @@ public Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcesso
}

@Override
public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Processor.Parameters parameters) {
public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Parameters parameters) {
return Map.of(RenameFieldResponseProcessor.TYPE, new RenameFieldResponseProcessor.Factory());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@ public void testFilterQuery() throws Exception {
public void testFactory() throws Exception {
FilterQueryRequestProcessor.Factory factory = new FilterQueryRequestProcessor.Factory(this.xContentRegistry());
Map<String, Object> configMap = new HashMap<>(Map.of("query", Map.of("term", Map.of("field", "value"))));
FilterQueryRequestProcessor processor = factory.create(Collections.emptyMap(), null, null, configMap);
FilterQueryRequestProcessor processor = factory.create(Collections.emptyMap(), null, null, configMap, null);
assertEquals(new TermQueryBuilder("field", "value"), processor.filterQuery);

// Missing "query" parameter:
expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap()));
expectThrows(
IllegalArgumentException.class,
() -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap(), null)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,15 @@ public void testFactory() throws Exception {
config.put("target_field", newField);

RenameFieldResponseProcessor.Factory factory = new RenameFieldResponseProcessor.Factory();
RenameFieldResponseProcessor processor = factory.create(Collections.emptyMap(), null, null, config);
RenameFieldResponseProcessor processor = factory.create(Collections.emptyMap(), null, null, config, null);
assertEquals(processor.getType(), "rename_field");
assertEquals(processor.getOldField(), oldField);
assertEquals(processor.getNewField(), newField);
assertFalse(processor.isIgnoreMissing());

expectThrows(OpenSearchParseException.class, () -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap()));
expectThrows(
OpenSearchParseException.class,
() -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap(), null)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,23 @@

package org.opensearch.plugins;

import org.opensearch.client.Client;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.script.ScriptService;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.SearchResponseProcessor;
import org.opensearch.threadpool.Scheduler;

import java.util.Collections;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.LongSupplier;

/**
* An extension point for {@link Plugin} implementation to add custom search pipeline processors.
Expand All @@ -28,7 +39,7 @@ public interface SearchPipelinePlugin {
* in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory}
* to create the processor from a given pipeline configuration.
*/
default Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Processor.Parameters parameters) {
default Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Parameters parameters) {
return Collections.emptyMap();
}

Expand All @@ -39,7 +50,78 @@ default Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcess
* in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory}
* to create the processor from a given pipeline configuration.
*/
default Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Processor.Parameters parameters) {
default Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Parameters parameters) {
return Collections.emptyMap();
}

/**
* Infrastructure class that holds services that can be used by processor factories to create processor instances
* and that gets passed around to all {@link SearchPipelinePlugin}s.
*/
class Parameters {

/**
* Useful to provide access to the node's environment like config directory to processor factories.
*/
public final Environment env;

/**
* Provides processors script support.
*/
public final ScriptService scriptService;

/**
* Provide analyzer support
*/
public final AnalysisRegistry analysisRegistry;

/**
* Allows processors to read headers set by {@link org.opensearch.action.support.ActionFilter}
* instances that have run while handling the current search.
*/
public final ThreadContext threadContext;

public final LongSupplier relativeTimeSupplier;

public final SearchPipelineService searchPipelineService;

public final Consumer<Runnable> genericExecutor;

public final NamedXContentRegistry namedXContentRegistry;

/**
* Provides scheduler support
*/
public final BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler;

/**
* Provides access to the node's cluster client
*/
public final Client client;

public Parameters(
Environment env,
ScriptService scriptService,
AnalysisRegistry analysisRegistry,
ThreadContext threadContext,
LongSupplier relativeTimeSupplier,
BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
SearchPipelineService searchPipelineService,
Client client,
Consumer<Runnable> genericExecutor,
NamedXContentRegistry namedXContentRegistry
) {
this.env = env;
this.scriptService = scriptService;
this.threadContext = threadContext;
this.analysisRegistry = analysisRegistry;
this.relativeTimeSupplier = relativeTimeSupplier;
this.scheduler = scheduler;
this.searchPipelineService = searchPipelineService;
this.client = client;
this.genericExecutor = genericExecutor;
this.namedXContentRegistry = namedXContentRegistry;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,28 @@ static PipelineWithMetrics create(
Map<String, Processor.Factory<SearchResponseProcessor>> responseProcessorFactories,
NamedWriteableRegistry namedWriteableRegistry,
OperationMetrics totalRequestProcessingMetrics,
OperationMetrics totalResponseProcessingMetrics
OperationMetrics totalResponseProcessingMetrics,
Processor.PipelineContext pipelineContext
) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null);
List<Map<String, Object>> requestProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, REQUEST_PROCESSORS_KEY);
List<SearchRequestProcessor> requestProcessors = readProcessors(requestProcessorFactories, requestProcessorConfigs);
List<SearchRequestProcessor> requestProcessors = readProcessors(
requestProcessorFactories,
requestProcessorConfigs,
pipelineContext
);
List<Map<String, Object>> responseProcessorConfigs = ConfigurationUtils.readOptionalList(
null,
null,
config,
RESPONSE_PROCESSORS_KEY
);
List<SearchResponseProcessor> responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs);
List<SearchResponseProcessor> responseProcessors = readProcessors(
responseProcessorFactories,
responseProcessorConfigs,
pipelineContext
);
if (config.isEmpty() == false) {
throw new OpenSearchParseException(
"pipeline ["
Expand All @@ -103,7 +112,8 @@ static PipelineWithMetrics create(

private static <T extends Processor> List<T> readProcessors(
Map<String, Processor.Factory<T>> processorFactories,
List<Map<String, Object>> requestProcessorConfigs
List<Map<String, Object>> requestProcessorConfigs,
Processor.PipelineContext pipelineContext
) throws Exception {
List<T> processors = new ArrayList<>();
if (requestProcessorConfigs == null) {
Expand All @@ -118,7 +128,19 @@ private static <T extends Processor> List<T> readProcessors(
Map<String, Object> config = (Map<String, Object>) entry.getValue();
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY);
processors.add(processorFactories.get(type).create(processorFactories, tag, description, config));
processors.add(processorFactories.get(type).create(processorFactories, tag, description, config, pipelineContext));
if (config.isEmpty() == false) {
String processorName = type;
if (tag != null) {
processorName = processorName + ":" + tag;
}
throw new OpenSearchParseException(
"processor ["
+ processorName
+ "] doesn't support one or more provided configuration parameters: "
+ Arrays.toString(config.keySet().toArray())
);
}
}
}
return Collections.unmodifiableList(processors);
Expand Down
97 changes: 17 additions & 80 deletions server/src/main/java/org/opensearch/search/pipeline/Processor.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,7 @@

package org.opensearch.search.pipeline;

import org.opensearch.client.Client;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.plugins.SearchPipelinePlugin;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.Scheduler;

import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.LongSupplier;

/**
* A processor implementation may modify the request or response from a search call.
Expand Down Expand Up @@ -67,87 +55,36 @@ interface Factory<T extends Processor> {
* @param tag The tag for the processor
* @param description A short description of what this processor does
* @param config The configuration for the processor
*
* <b>Note:</b> Implementations are responsible for removing the used configuration
* keys, so that after creation the config map should be empty.
* @param pipelineContext Contextual information about the enclosing pipeline.
*/
T create(Map<String, Factory<T>> processorFactories, String tag, String description, Map<String, Object> config) throws Exception;
T create(
Map<String, Factory<T>> processorFactories,
String tag,
String description,
Map<String, Object> config,
PipelineContext pipelineContext
) throws Exception;
}

/**
* Infrastructure class that holds services that can be used by processor factories to create processor instances
* and that gets passed around to all {@link SearchPipelinePlugin}s.
* Contextual information about the enclosing pipeline. A processor factory may change processor initialization behavior or
* pass this information to the created processor instance.
*/
class Parameters {

/**
* Useful to provide access to the node's environment like config directory to processor factories.
*/
public final Environment env;

/**
* Provides processors script support.
*/
public final ScriptService scriptService;

/**
* Provide analyzer support
*/
public final AnalysisRegistry analysisRegistry;

/**
* Allows processors to read headers set by {@link org.opensearch.action.support.ActionFilter}
* instances that have run while handling the current search.
*/
public final ThreadContext threadContext;

public final LongSupplier relativeTimeSupplier;

public final SearchPipelineService searchPipelineService;

public final Consumer<Runnable> genericExecutor;
class PipelineContext {
private final PipelineSource pipelineSource;

public final NamedXContentRegistry namedXContentRegistry;

/**
* Provides scheduler support
*/
public final BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler;

/**
* Provides access to the node's cluster client
*/
public final Client client;

public Parameters(
Environment env,
ScriptService scriptService,
AnalysisRegistry analysisRegistry,
ThreadContext threadContext,
LongSupplier relativeTimeSupplier,
BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
SearchPipelineService searchPipelineService,
Client client,
Consumer<Runnable> genericExecutor,
NamedXContentRegistry namedXContentRegistry
) {
this.env = env;
this.scriptService = scriptService;
this.threadContext = threadContext;
this.analysisRegistry = analysisRegistry;
this.relativeTimeSupplier = relativeTimeSupplier;
this.scheduler = scheduler;
this.searchPipelineService = searchPipelineService;
this.client = client;
this.genericExecutor = genericExecutor;
this.namedXContentRegistry = namedXContentRegistry;
public PipelineContext(PipelineSource pipelineSource) {
this.pipelineSource = pipelineSource;
}

public PipelineSource getPipelineSource() {
return pipelineSource;
}
}

/**
* Passed via the "pipeline_source" configuration to a processor factory to convey the context for pipeline creation.
* <p>
* A processor factory may change the processor initialization behavior based on the creation context (e.g. avoiding
* creating expensive resources during validation or in a request-scoped pipeline.)
*/
Expand Down
Loading

0 comments on commit e01d8a1

Please sign in to comment.