Skip to content

Commit

Permalink
Pass pipeline creation source as enum
Browse files Browse the repository at this point in the history
Thanks to @dblock for the suggestion to pass the pipeline creation
source in a way that accounts for possible future pipeline sources (and
lets us distinguish between actual named pipeline creation and the
validation create() that executes before we write a pipeline definition
to cluster state).

Signed-off-by: Michael Froh <froh@amazon.com>
  • Loading branch information
msfroh committed Jul 2, 2023
1 parent cecea2f commit 71cfc46
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
import org.opensearch.search.pipeline.common.helpers.SearchRequestMap;

import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;
Expand Down Expand Up @@ -127,6 +128,8 @@ SearchScript getPrecompiledSearchScript() {
* Factory class for creating {@link ScriptRequestProcessor}.
*/
public static final class Factory implements Processor.Factory<SearchRequestProcessor> {
private static final List<String> SCRIPT_CONFIG_KEYS = List.of("id", "source", "inline", "lang", "params", "options");

private final ScriptService scriptService;

/**
Expand Down Expand Up @@ -155,16 +158,21 @@ public ScriptRequestProcessor create(
String description,
Map<String, Object> config
) throws Exception {
Map<String, Object> scriptConfig = new HashMap<>();
for (String key : SCRIPT_CONFIG_KEYS) {
Object val = config.remove(key);
if (val != null) {
scriptConfig.put(key, val);
}
}
try (
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(config);
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(scriptConfig);
InputStream stream = BytesReference.bytes(builder).streamInput();
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)
) {
Script script = Script.parse(parser);

Arrays.asList("id", "source", "inline", "lang", "params", "options").forEach(config::remove);

// verify script is able to be compiled before successfully creating processor.
SearchScript searchScript = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ teardown:
"request_processors": [
{
"script" : {
"tag": "empty_script",
"lang": "painless",
"source" : ""
}
Expand All @@ -38,6 +39,7 @@ teardown:
"request_processors": [
{
"script" : {
"tag": "working",
"lang" : "painless",
"source" : "ctx._source['size'] += 10; ctx._source['from'] = ctx._source['from'] <= 0 ? ctx._source['from'] : ctx._source['from'] - 1 ; ctx._source['explain'] = !ctx._source['explain']; ctx._source['version'] = !ctx._source['version']; ctx._source['seq_no_primary_term'] = !ctx._source['seq_no_primary_term']; ctx._source['track_scores'] = !ctx._source['track_scores']; ctx._source['track_total_hits'] = 1; ctx._source['min_score'] -= 0.9; ctx._source['terminate_after'] += 2; ctx._source['profile'] = !ctx._source['profile'];"
}
Expand Down
23 changes: 19 additions & 4 deletions server/src/main/java/org/opensearch/search/pipeline/Processor.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
*/
public interface Processor {
/**
* Processor configuration key to let the factory know that the pipeline is defined in a search request.
* For processors whose creation is expensive (e.g. creates a connection pool), the factory can reject
* the request or create a more lightweight (but possibly less efficient) version of the processor.
* Processor configuration key to let the factory know the context for pipeline creation.
* <p>
* See {@link PipelineSource}.
*/
String AD_HOC_PIPELINE = "ad_hoc_pipeline";
String PIPELINE_SOURCE = "pipeline_source";

/**
* Gets the type of processor
Expand Down Expand Up @@ -144,4 +144,19 @@ public Parameters(
}

}

/**
* Passed via the "pipeline_source" configuration to a processor factory to convey the context for pipeline creation.
* <p>
* A processor factory may change the processor initialization behavior based on the creation context (e.g. avoiding
* creating expensive resources during validation or in a request-scoped pipeline.)
*/
enum PipelineSource {
// A named pipeline is being created or updated
UPDATE_PIPELINE,
// Pipeline is defined within a search request
SEARCH_REQUEST,
// A named pipeline is being validated before being written to cluster state
VALIDATE_PIPELINE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.plugins.SearchPipelinePlugin;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
Expand Down Expand Up @@ -1093,7 +1092,7 @@ private static void assertPipelineStats(OperationStats stats, long count, long f
public void testAdHocRejectingProcessor() {
String processorType = "ad_hoc_rejecting";
Map<String, Processor.Factory<SearchRequestProcessor>> requestProcessorFactories = Map.of(processorType, (pf, t, d, c) -> {
if (ConfigurationUtils.readBooleanProperty(processorType, t, c, Processor.AD_HOC_PIPELINE, false)) {
if (c.get(Processor.PIPELINE_SOURCE) == Processor.PipelineSource.SEARCH_REQUEST) {
throw new IllegalArgumentException(processorType + " cannot be created as part of a pipeline defined in a search request");
}
return new FakeRequestProcessor(processorType, t, d, r -> {});
Expand Down

0 comments on commit 71cfc46

Please sign in to comment.