-
Notifications
You must be signed in to change notification settings - Fork 83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimized text embedding processor for single document update #1191
base: optimized-processor
Are you sure you want to change the base?
Optimized text embedding processor for single document update #1191
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## optimized-processor #1191 +/- ##
=========================================================
+ Coverage 81.74% 81.79% +0.04%
+ Complexity 2511 1298 -1213
=========================================================
Files 190 97 -93
Lines 8564 4422 -4142
Branches 1436 746 -690
=========================================================
- Hits 7001 3617 -3384
+ Misses 1006 513 -493
+ Partials 557 292 -265 ☔ View full report in Codecov by Sentry. |
9264760
to
d68dca2
Compare
*/ | ||
|
||
@Log4j2 | ||
public abstract class OptimizedInferenceProcessor extends InferenceProcessor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel the term "optimization" is too generic to use
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for pointing this out. Will wait to see if others have better opinions on the naming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I agree with @junqiu-lei here. There is a already a InferenceProcessor and we are trying to Optimize what i do already. So better if we name it around what optimization it does upon the existing one.
But if we are not able to come up with something more appropriate then I am good with this one. No strong concerns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will name as ConditionalTextEmbeddingProcessor
to convey that it selectively generates embeddings when conditions are met. Let me know if there are better suggestions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will name it SelectiveTextEmbeddingProcessor
for now
@@ -26,6 +26,8 @@ public final class TextEmbeddingProcessor extends InferenceProcessor { | |||
|
|||
public static final String TYPE = "text_embedding"; | |||
public static final String LIST_TYPE_NESTED_MAP_KEY = "knn"; | |||
public static final String IGNORE_EXISTING = "ignore_existing"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From user's point, they might not clear what we're ignoring. maybe rename the flag to something like forceRegeneration
or alwaysGenerateEmbedding
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignore
prefix was what we preferred based on discussions I had with ML-commons team because it is consistent with other flags that are in use. But open to more suggestions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like "ignore_existing_embedding" since it's more descriptive. If the default to maintain backwards compatibility is to have optimization disabled, "ignore" makes it clearer that it's a feature they have to turn on to get the benefit (default false). like I wonder if users would be confused if a setting labelled "force" was default true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just saw the above comment about the name of the processor too, maybe ideally the processor name and the option name are similar since the processor exists to support the option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to make the naming more clear. ignore_existing is a little bit confusing that we actually want to do the optimization when it is true. People can view it as ignore_existing_embedding which means we want to always do the inference work.
I think even we can document how the parameter works it should be self explain and give people a gut feel what does it control. It should convey the info that we will do some optimization when the doc has existing embedding. It's actually kind weird we have to start with ignore. And if starting with ignore can make it confusing we should propose to not do that. I feel even we simply call it as "optimize_embedding_generation" is more descriptive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do we think about skipExisting
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a good point. How about skip_existing
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have any integration test covered for the new processors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will include IT tests in this PR
We might need BWC tests to verify mixed-version cluster scenarios where:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm understanding correctly, when a user adds a text embedding processor to the pipeline with the config option as true, then we create a different processor object than if the config was false.
Are there any scenarios where this is confusing for users that the processor object we create isn't the exact same one that they define? Or is this all abstracted away?
int min = Math.min(((List) sourceList.get()).size(), ((List) existingList.get()).size()); | ||
for (int j = 0; j < min; j++) { | ||
if (((List) sourceList.get()).get(j).equals(((List) existingList.get()).get(j))) { | ||
updatedEmbeddings.add(((List) embeddingList.get()).get(j)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would having a helper method to cast lists would make this more readable? Like private List asList(Object object)
. Since raw casting requires double parentheses making it hard to parse inline, compare ((List) object)
vs asList(object)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will make it more readable. thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need an extra method here. You just need to declare another variable to avoid repetitive casting.
List sList = (List)sourceList.get();
List eList = (List)existsingList.get();
if (sList.get(j).equals(eList.get(j)) {
...
}
the implementation detail should be abstracted away from user, and all user needs to know is the behavior of the flag. |
Map<String, Object> existingSourceAndMetadataMap, | ||
int index | ||
) { | ||
String textKey = ProcessorUtils.findKeyFromFromValue(ProcessorDocumentUtils.unflattenJson(fieldMap), currentPath, level); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I notice we do ProcessorDocumentUtils.unflattenJson(fieldMap) multiple times. Can we only do it once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed. unflattened in constructor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Completed the 1st round of review. Will wait for comments to be addressed before doing 2nd round of review.
Thanks
return v1; | ||
} else { | ||
return v2; | ||
if (v1 instanceof List<?> && v2 instanceof List<?>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you Add a comment on what does this peice of code do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a leftover from previous code that lacks comments explaining its functionality. It would be great if you could add a comment to clarify it while making your modifications, helping to address this technical debt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comments
} | ||
|
||
if (v1 instanceof Map<?, ?> && v2 instanceof Map<?, ?>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you Add a comment on what does this peice of code do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, will add comments for this method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted back to previous after updated implementation
src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java
Outdated
Show resolved
Hide resolved
IngestDocument ingestDocument, | ||
Map<String, Object> processorMap, | ||
List<?> results, | ||
boolean update |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
boolean update | |
boolean update |
Can you come up with a little usecase specific variable name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or adding a javadoc explaining each parameter for this method might be good as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added javadoc for this
} | ||
} | ||
|
||
protected void setVectorFieldsToDocument(IngestDocument ingestDocument, Map<String, Object> processorMap, List<?> results) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is this method called? I see there are 2 methods with the same name. If we need this method then please change the name of this method to usecase specific.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or better is to make the earlier method except this flag and put if else
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. We can remove this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored to remove this method
@@ -512,17 +560,18 @@ private void processMapEntryValue( | |||
) { | |||
// build nlp output for object in sourceValue which is map type | |||
Iterator<Map<String, Object>> iterator = sourceAndMetadataMapValueInList.iterator(); | |||
IntStream.range(0, sourceAndMetadataMapValueInList.size()).forEach(index -> { | |||
IndexWrapper listIndexWrapper = new IndexWrapper(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't needs this listIndexWrapper object. The argument indexWrapper is same object what you are trying to create here. Please traceback the indexWrapper object and you will find that it is called from buildNLPResult method in this class https://github.com/opensearch-project/neural-search/pull/1191/files#diff-d2b79f65d193c79dd65558833fcf583eb2c29301325e1eb4eb83a963c737f2f8R468. Which has the same declaration. They what is the need of making this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indexWrapper is used to iterate the embedding list for update. listIndexWrapper is used to replace the existingIntStream.range(0, sourceAndMetadataMapValueInList.size())
. Currently, every update is just another ingest so we overwrite the values and its embeddings every time. With the new way, some values and its embeddings will have been copied over, so we need a way to skip overwriting those values
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I am understanding correctly then we need only listIndexMapper not the indexMapper. As the listIndexMapper have the details of what needs to be updated right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change was done to increment the nestedIndex based on the existence of embedding in document, instead of sequentially incrementing like IntStream.range(0, sourceAndMetadataMapValueInList.size())
. This change was done to handle recently found edge cases such as #1024. Since this change is only needed only for specific cases and seems like a distraction to overall objective of the PR, I will raise a separate PR for this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The IndexWrapper is necessary. We want to increase counter only when we consumed value from inputNestedMapEntryValue
so we need a way to keep the counter value. I see the problem is in the class name which is confusing because index in OpenSearch has its own meaning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated implementation logic to remove the need for this change.
src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java
Show resolved
Hide resolved
@@ -543,7 +591,7 @@ private void putNLPResultToSingleSourceMapInList( | |||
List<?> results, | |||
IndexWrapper indexWrapper, | |||
Map<String, Object> sourceAndMetadataMap, | |||
int nestedElementIndex | |||
IndexWrapper listIndexWrapper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indexWrapper and listIndexWrapper are same. Please reevaluate the need of this object here.
@@ -54,15 +55,32 @@ public abstract class InferenceProcessor extends AbstractBatchingProcessor { | |||
public static final String MODEL_ID_FIELD = "model_id"; | |||
public static final String FIELD_MAP_FIELD = "field_map"; | |||
private static final BiFunction<Object, Object, Object> REMAPPING_FUNCTION = (v1, v2) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this REMAPPING _Function a method? Loading this method in memory by default is not ideal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if making it as method is necessary. In terms of loading into memory, there is no difference between this and method.
return new TextEmbeddingProcessor(tag, description, batchSize, modelId, filedMap, clientAccessor, environment, clusterService); | ||
Map<String, Object> fieldMap = readMap(TYPE, tag, config, FIELD_MAP_FIELD); | ||
boolean ignoreExisting = readBooleanProperty(TYPE, tag, config, IGNORE_EXISTING, DEFAULT_IGNORE_EXISTING); | ||
if (ignoreExisting == true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we say ignore_existing does that means we want to always do inference work? Which is our existing behavior so we should use the existing processor rather than the optimized one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
open to suggestions, we can make the feature name more descriptive
) { | ||
String index = ingestDocument.getSourceAndMetadata().get(INDEX_FIELD).toString(); | ||
String id = ingestDocument.getSourceAndMetadata().get(ID_FIELD).toString(); | ||
openSearchClient.execute(GetAction.INSTANCE, new GetRequest(index, id), ActionListener.wrap(response -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this processor we always do this get call which kind downgrade the performance of the operations we know for sure there is no existing embedding can be reused. We can control this through the ignore_existing flag but just thinking would it better in this optimized processor we also consider if the we are processing a create doc API? I think we can tell this info based on the which API is invoked and attach it to the IngestDocument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that would require model change in ingestDocument to attach API performed on the ingestDocument. This change should be a separate feature in my opinion, and should be implemented separately if needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no difference between create doc and update doc API.
return new TextEmbeddingProcessor(tag, description, batchSize, modelId, filedMap, clientAccessor, environment, clusterService); | ||
Map<String, Object> fieldMap = readMap(TYPE, tag, config, FIELD_MAP_FIELD); | ||
boolean ignoreExisting = readBooleanProperty(TYPE, tag, config, IGNORE_EXISTING, DEFAULT_IGNORE_EXISTING); | ||
if (ignoreExisting == true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Since ignoreExisting is already a boolean you don't need to check equality to true here, you can use the boolean as the condition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
following the developer guide: https://github.com/opensearch-project/OpenSearch/blob/main/DEVELOPER_GUIDE.md#java-language-formatting-guidelines
explicit true/false check is preferred
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is only applicable for false conditions. If(ignoreExisting) by all means mean a true condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Therefore, it is recommended to use if(boolvariable) rather than if(boolvariable==true). The end goal is to improve readability. For false conditions it makes sense to write that explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, will change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is standard format for OpenSearch core where we explicitly compare it to either true or false. Even if we don't force it in neural, would be nice to follow it.
} else { | ||
return v2; | ||
if (v1 instanceof List<?> && v2 instanceof List<?>) { | ||
List<Object> v1List = new ArrayList<>((List<?>) v1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid creating new array here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored
List<?> v2List = (List<?>) v2; | ||
|
||
Iterator<?> iterator = v2List.iterator(); | ||
for (int i = 0; i < v1List.size() && iterator.hasNext(); i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- In previous code, we added all item from v2 to v1. Now, we are setting null value in v1 using v2 value. Could you tell how this new logic can cover the previous use case as well?
- The assumption here is that, the number of null value in v1List is same as the size of v2. Can we make it obvious from the code? For example we can do like
for (int i = 0; i , v1List.size(); i++) {
if (v1List.get(i) == null) {
assert iterator.hasNext() == true;
v1List.set(i, iterator.next());
}
}
assert iterator.hasNext() == false;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored with comments
} | ||
|
||
if (v1 instanceof Map<?, ?> && v2 instanceof Map<?, ?>) { | ||
Map<String, Object> v1Map = new LinkedHashMap<>((Map<String, Object>) v1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid creating new map? This might be costly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We checked if v1 is instance of Map<?, ?>
but not we are casting it to Map<String, Object>
. This is not safe.
} | ||
return v1Map; | ||
} | ||
return v2 != null ? v2 : v1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we ever get to this case? What is the logic behind this returning v2 and ignoring v1?
Map<?, ?> v2Map = (Map<?, ?>) v2; | ||
|
||
for (Map.Entry<?, ?> entry : v2Map.entrySet()) { | ||
if (entry.getKey() instanceof String && !v1Map.containsKey(entry.getKey())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a case where v2Map has different types of key? Like one key is string type and other int type?
src/main/java/org/opensearch/neuralsearch/processor/util/ProcessorUtils.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/neuralsearch/processor/util/ProcessorUtils.java
Outdated
Show resolved
Hide resolved
Map<String, Object> currentMap = sourceAsMap; | ||
String targetValue = keys[keys.length - 1]; | ||
for (String key : keys) { | ||
if (key.equals(targetValue)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if path is "level1.level2.level1" and level is 3?
The code will break here and lastFoundKey will be null.
* map:
* {
* "level1": {
* "level2" : {
* "first_text": "level1"
* }
* }
* }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed, and added a test case
if (currentMap.containsKey(key)) { | ||
Object value = currentMap.get(key); | ||
if (value instanceof Map) { | ||
currentMap = (Map<String, Object>) value; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unsafe cast
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
src/main/java/org/opensearch/neuralsearch/processor/util/ProcessorUtils.java
Outdated
Show resolved
Hide resolved
52b6dbc
to
3b7769e
Compare
((Collection) v1).addAll((Collection) v2); | ||
Iterator<?> iterator = ((Collection) v2).iterator(); | ||
for (int i = 0; i < ((Collection) v1).size(); i++) { | ||
if (((List) v1).get(i) == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should not cast Collection to List without type checking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This get(i)
is not efficient for some of the list type.
ListIterator<String> v1Iterator = v1.listIterator();
while (v1Iterator.hasNext()) {
if (v1Iterator.next() == null) {
v1Iterator.set(v2Iterator.next());
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One question is, how is this compatible with previous code logic? The previous and current logic is totally different and I wonder how this new logic can support the previous use case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vibrantvarun can confirm, but previously, maps would merge without remapping function, because the key would be inserted for the first time, not requiring remapping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean about List case not about Map case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.merge still requires a function to be passed in, regardless of whether it's used or not.
Google's answer on .merge():
The merge() method, available in Java's Map interface, operates as a bifunction, taking a key, a value, and a BiFunction as arguments. It either adds a new key-value pair to the map or updates an existing entry based on the provided BiFunction.
If the specified key is not already present in the map, the merge() method simply inserts the key with the given value. However, if the key exists, the BiFunction is applied to the current value associated with the key and the provided value. The result of this function determines the new value for the key. If the BiFunction returns null, the entry for the key is removed from the map. Otherwise, the key is updated with the new value returned by the BiFunction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use .put()
instead of .merge()
if there is no key conflict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martin-gaievski @vibrantvarun can you verify my assumption is correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, this would be invoked when user directly ingests with vector field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leaving the inferenceProcessor the same
nlpResult.forEach(ingestDocument::setFieldValue); | ||
// if partialUpdate is set to false, full update is required where each vector embedding in nlpResult | ||
// can directly be populated to ingestDocument | ||
if (partialUpdate == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's return early to eliminate the else
block and reduce nesting?
if (partialUpdate == false) { | |
if (partialUpdate == false) { | |
// .. | |
return; | |
} | |
// ... |
@@ -534,7 +590,6 @@ private void processMapEntryValue( | |||
* @param results | |||
* @param indexWrapper | |||
* @param sourceAndMetadataMap | |||
* @param nestedElementIndex index of the element in the list field of source document |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be removed.
); | ||
} | ||
} else { | ||
if (sourceValue instanceof List && ((List<Object>) sourceValue).get(nestedElementIndex) != null) { | ||
sourceAndMetadataMap.merge(processorKey, results.get(indexWrapper.index++), REMAPPING_FUNCTION); | ||
if (sourceValue instanceof List) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is no change, can we revert it back to original code?
IntStream.range(0, sourceValue.size()) | ||
.forEachOrdered(x -> keyToResult.add(ImmutableMap.of(listTypeNestedMapKey, results.get(indexWrapper.index++)))); | ||
IntStream.range(0, sourceValue.size()).forEachOrdered(x -> { | ||
if (sourceValue.get(x) != null) { // only add to keyToResult when sourceValue.get(x) exists, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see we use get(i)
in the original code which is not safe.
FYI, we should either use iterator or force it to be array list. I think this require changes in the original code as well so I won't push for it in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
simply changing the type fails the integ tests. This will require additional changes
|
||
if (existingValue instanceof List) { | ||
if (index >= 0 && index < ((List) existingValue).size()) { | ||
((List) existingValue).set(index, targetValue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not efficient when existingValue is not array list.
src/main/java/org/opensearch/neuralsearch/processor/util/ProcessorUtils.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/neuralsearch/processor/util/ProcessorUtils.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/neuralsearch/processor/util/ProcessorUtils.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/neuralsearch/processor/util/ProcessorUtils.java
Outdated
Show resolved
Hide resolved
c1f9d45
to
bf7e35f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did 2nd round of review. Will wait for comments to address before doing 3rd round.
src/main/java/org/opensearch/neuralsearch/processor/factory/TextEmbeddingProcessorFactory.java
Outdated
Show resolved
Hide resolved
this.reversedFieldMap = ProcessorDocumentUtils.flattenAndFlip(fieldMap); | ||
} | ||
|
||
public abstract Object processValue( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add javadoc to all public methods
Map<String, Object> existingSourceAndMetadataMap, | ||
Map<String, Object> sourceAndMetadataMap, | ||
Map<String, Object> processMap, | ||
String prevPath |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you think of a better name here?
Map<String, Object> filteredInnerMap = filterProcessMap( | ||
existingSourceAndMetadataMap, | ||
sourceAndMetadataMap, | ||
(Map<String, Object>) value, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
* @param existingSourceAndMetadataMap SourceAndMetadataMap of existing Document | ||
* @param sourceAndMetadataMap SourceAndMetadataMap of ingestDocument Document | ||
* @param processMap The current processMap | ||
* @param prevPath The dot-notation path of the parent elements |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a small example how does that dot-notation path look like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
Map<String, Object> existingSourceAndMetadataMap | ||
) { | ||
String textKey = reversedFieldMap.get(embeddingKey); | ||
if (textKey == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use Objects.isNull
} | ||
|
||
@Override | ||
public void doExecute( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add javadoc to all public methods
* -1 is passed in | ||
*/ | ||
|
||
public static void setValueToSource(Map<String, Object> sourceAsMap, String targetKey, Object targetValue, int index) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a lot happening in this method. Please break it down. Validations, function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
simplified the function
src/main/java/org/opensearch/neuralsearch/util/ProcessorDocumentUtils.java
Show resolved
Hide resolved
src/main/java/org/opensearch/neuralsearch/util/ProcessorDocumentUtils.java
Show resolved
Hide resolved
src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java
Outdated
Show resolved
Hide resolved
...ain/java/org/opensearch/neuralsearch/processor/optimization/SelectiveInferenceProcessor.java
Outdated
Show resolved
Hide resolved
5f882f0
to
90e77d1
Compare
@@ -26,18 +32,31 @@ public final class TextEmbeddingProcessor extends InferenceProcessor { | |||
|
|||
public static final String TYPE = "text_embedding"; | |||
public static final String LIST_TYPE_NESTED_MAP_KEY = "knn"; | |||
public static final String SKIP_EXISTING = "skip_existing"; | |||
public static final boolean DEFAULT_SKIP_EXISTING = Boolean.FALSE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static final boolean DEFAULT_SKIP_EXISTING = Boolean.FALSE; | |
public static final boolean DEFAULT_SKIP_EXISTING = false; |
You are using primitive datatype so false should be fine.
src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
}, e -> { handler.accept(null, e); })); | ||
} else { // skip existing flag is turned off. Call model inference without filtering |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment in new line.
src/main/java/org/opensearch/neuralsearch/processor/optimization/InferenceFilter.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/neuralsearch/processor/optimization/InferenceFilter.java
Outdated
Show resolved
Hide resolved
...in/java/org/opensearch/neuralsearch/processor/optimization/TextEmbeddingInferenceFilter.java
Outdated
Show resolved
Hide resolved
...in/java/org/opensearch/neuralsearch/processor/optimization/TextEmbeddingInferenceFilter.java
Outdated
Show resolved
Hide resolved
@@ -171,6 +187,48 @@ public static Optional<Object> getValueFromSource(final Map<String, Object> sour | |||
return currentValue; | |||
} | |||
|
|||
public static void setValueToSource(Map<String, Object> sourceAsMap, String targetKey, Object targetValue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need this method? There is another method with the same name below. Can you combine both of them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/main/java/org/opensearch/neuralsearch/processor/util/ProcessorUtils.java
Outdated
Show resolved
Hide resolved
@@ -139,28 +141,42 @@ public static void removeTargetFieldFromSource(final Map<String, Object> sourceA | |||
} | |||
} | |||
|
|||
public static Optional<Object> getValueFromSource(final Map<String, Object> sourceAsMap, final String targetField) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a method with same name and return type below. Can you combine this and the below one? Why do we need this separate method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is an existing method that assumes nested structure will not contain lists.
src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java
Outdated
Show resolved
Hide resolved
// have been copied | ||
String index = ingestDocument.getSourceAndMetadata().get(INDEX_FIELD).toString(); | ||
String id = ingestDocument.getSourceAndMetadata().get(ID_FIELD).toString(); | ||
openSearchClient.execute(GetAction.INSTANCE, new GetRequest(index, id), ActionListener.wrap(response -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a concern with this async operation. This might impact to other search request by consuming all search thread.
There are three options I can think of,
- we might need to have a circuit breaker to limit the total number of concurrent ingestion request.
- ask cx to pass embedding if they want to skip inference call so that we only can access the source document without existing document or cx can omit the text field so that inference can be skipped for update operation.
- Make it as synchronous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed offline, we'll first perform benchmark testing to see the impact on the current change
Map<String, Object> filteredProcessMap = new HashMap<>(); | ||
Map<String, Object> castedProcessMap = ProcessorUtils.castToMap(processMap); | ||
for (Map.Entry<?, ?> entry : castedProcessMap.entrySet()) { | ||
if ((entry.getKey() instanceof String) == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need this check. castedProcessMap is already Map<String, Object> and the key is String.
} | ||
Map<String, Object> filteredProcessMap = new HashMap<>(); | ||
Map<String, Object> castedProcessMap = ProcessorUtils.castToMap(processMap); | ||
for (Map.Entry<?, ?> entry : castedProcessMap.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (Map.Entry<?, ?> entry : castedProcessMap.entrySet()) { | |
for (Map.Entry<String, Object> entry : castedProcessMap.entrySet()) { |
} else if (value instanceof List) { | ||
List<Object> processedList = filterListValue( | ||
currentPath, | ||
(List<Object>) value, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not safe as well. If you know that value will be always List, let's use
@SuppressWarnings("unchecked")
public static List<Object> castToObjectList(List<?> obj) {
return (List<Object>) obj;
}
// return empty list if processList and existingList are equal and embeddings are copied, return empty list otherwise | ||
return filterInferenceValuesInList( | ||
processList, | ||
(List<Object>) existingList.get(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for this. Not all List is List. For example, we cannot cast List to List
90e77d1
to
9bf5cce
Compare
b1816f1
to
5e7f19e
Compare
} | ||
} | ||
}, e -> { handler.accept(null, e); })); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. Better to handle simple case at top and reduce the nested depth?
if (skipExisting == false) {
makeInferenceCall(ingestDocument, processMap, inferenceList, handler);
return;
}
// rest of codes..
// This method should be used only when you are certain the object is a `Map<String, Object>`. | ||
// It is recommended to use this method as a last resort. | ||
@SuppressWarnings("unchecked") | ||
public static Map<String, Object> castToMap(Object obj) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static Map<String, Object> castToMap(Object obj) { | |
public static Map<String, Object> unsafeCastToObjectMap(Object obj) { |
// This method should be used only when you are certain the object is a `List<Object>`. | ||
// It is recommended to use this method as a last resort. | ||
@SuppressWarnings("unchecked") | ||
public static List<Object> castToObjectList(Object obj) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static List<Object> castToObjectList(Object obj) { | |
public static List<Object> unsafeCastToObjectList(Object obj) { |
* | ||
* @param sourceAsMap The Source map (a map of maps) to iterate through | ||
* @param targetField The path to take to get the desired mapping | ||
* @param index the index to use when a list is encountered during traversal; if list processing is not needed, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the case when list processing is not needed when the value is list? In such case, returning empty optional is expected behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
user should always pass in index, if they know sourceAsMap contains list. In any other case, we should return empty optional to indicate value not found.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then the design of this method is not correct imo. If the caller already know that it contains list, they just get the value directly or there should be a separate method for that.
if (listValue instanceof Map) { | ||
Map<String, Object> currentMap = castToMap(listValue); | ||
return Optional.ofNullable(currentMap.get(key)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when listValue is not map, do we return empty?
} | |
} | |
else { | |
return Optional.empty(); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if index is in correct format, listValue should not be anything other than map in the middle of traversal. It can only be different at the last key, which would be returned as currentValue.
} | ||
Map<String, Object> currentMap = (Map<String, Object>) value; | ||
return Optional.ofNullable(currentMap.get(key)); | ||
return Optional.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. For me better to put this under else block to make sure every block should return itself without missing any condition.
else {
Optional.empty();
}
} | ||
Map<String, Object> currentMap = (Map<String, Object>) value; | ||
return Optional.ofNullable(currentMap.get(key)); | ||
return Optional.empty(); | ||
}); | ||
|
||
if (currentValue.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe break;
could be better?
|
||
for (int i = 0; i < keys.length - 1; i++) { | ||
Object next = current.computeIfAbsent(keys[i], k -> new HashMap<>()); | ||
if (next instanceof ArrayList<?> list) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, is there a case where next is array list but index is -1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if user assumes map doesn't contain lists, and doesn't specify the index, it will not set the value to map
5e7f19e
to
fc83219
Compare
Signed-off-by: will-hwang <sang7239@gmail.com>
fc83219
to
0f99e96
Compare
* | ||
* @param sourceAsMap The Source map (a map of maps) to iterate through | ||
* @param targetField The path to take to get the desired mapping | ||
* @param index the index to use when a list is encountered during traversal; if list processing is not needed, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then the design of this method is not correct imo. If the caller already know that it contains list, they just get the value directly or there should be a separate method for that.
* Stores the reverse mapping of field names to support efficient lookups for embedding keys. | ||
* This is generated by flattening and flipping the provided field map. | ||
*/ | ||
protected Map<String, String> reversedFieldMap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you comment what is the example key and value here?
existingSourceAndMetadataMap | ||
); | ||
filteredProcessMap.put(key, processedList); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check if value is String and throw exception otherwise.
if (embeddingList.isPresent() == false || embeddingList.get() instanceof List == false) { | ||
return processList; | ||
} | ||
// return empty list if processList and existingList are equal and embeddings are copied, return empty list otherwise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not correct
String newKey = prefix.isEmpty() ? key : prefix + "." + key; | ||
|
||
if (value instanceof Map) { | ||
flattenAndFlip(newKey, (Map<String, Object>) value, flippedMap); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not safe.
String transformedValue = parentPath.isEmpty() ? value.toString() : parentPath + "." + value.toString(); | ||
if (flippedMap.containsKey(transformedValue)) { | ||
int index = 1; | ||
while (flippedMap.containsKey(transformedValue + "_" + index)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not efficient..
private Map<String, Object> filter( | ||
Map<String, Object> existingSourceAndMetadataMap, | ||
Map<String, Object> sourceAndMetadataMap, | ||
Object processMap, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Object processMap, | |
Map<String, Object> processMap, |
Object processMap, | ||
String traversedPath | ||
) { | ||
if (processMap instanceof Map == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (processMap instanceof Map == false) { |
Object value = entry.getValue(); | ||
String currentPath = traversedPath.isEmpty() ? key : traversedPath + "." + key; | ||
if (value instanceof Map<?, ?>) { | ||
Map<String, Object> filteredInnerMap = filter(existingSourceAndMetadataMap, sourceAndMetadataMap, value, currentPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Map<String, Object> filteredInnerMap = filter(existingSourceAndMetadataMap, sourceAndMetadataMap, value, currentPath); | |
Map<String, Object> filteredInnerMap = filter(existingSourceAndMetadataMap, sourceAndMetadataMap, unsafeCastToObjectMap(value), currentPath); |
* @return A possible result within an optional | ||
*/ | ||
public static Optional<Object> getValueFromSource(final Map<String, Object> sourceAsMap, final String targetField) { | ||
public static Optional<Object> getValueFromSource(final Map<String, Object> sourceAsMap, final String targetField, int index) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static Optional<Object> getValueFromSource(final Map<String, Object> sourceAsMap, final String targetField, int index) { | |
public static List<Optional<Object>> getValueFromSource(final Map<String, Object> sourceAsMap, final String targetField, int index) { |
Description
This PR implements optimized text embedding processor for single document update scenario.
Proposed State [Single Document Update]
Steps:
User Scenarios
Scenario 1: Document Update with no change to all fields
Explanation: Both inference texts
“this is 1st name”, “this is 2nd name”
mapped with embedding have remained unchanged, so the corresponding embedding values have been copied over, instead of making inference callsScenario 2: Document Update with partial change to fields
Explanation: The inference text at index 0 changed from
“this is 1st name”
to“this is 2nd name”
, while the text at index 1 (“this is 2nd name”
) remained unchanged. An inference call is made only for the modified text at index 0. Even if the existing document already contains embeddings for“this is 2nd name”
, the inference call is still triggered because text changes are evaluated based on their respective indices, not on content duplication.Scenario 3: Document Update with change to only irrelevant fields
Explanation:
“irrelevant_field”
is not mapped with vector embedding, so inference call is not made regardless of the feature. Meanwhile, both inference texts“this is 1st name”, “this is 2nd name”
mapped with embedding have remained unchanged, so the corresponding embedding values have been copied over instead of making inference calls.Scenario 4: Document Update with changes in list values
Explanation: The optimized text embedding processor compares lists as a whole. If all texts in the list match, the existing embeddings are copied over. If any of the texts doesn't match, inference call is made for all values in the list regardless of matching texts
Testing:
Benchmark Test in progress. Will discuss the results before merge to main
Related Issues
Resolves #[Issue number to be closed when this PR is merged]
HLD: #1138
Check List
--signoff
.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.