From dc3dbd09b73e11a90571fe59e98a7187ff28d458 Mon Sep 17 00:00:00 2001 From: Navneet Verma Date: Fri, 31 Jan 2025 15:04:23 -0800 Subject: [PATCH] Enabled concurrent graph creation for Lucene engine with index thread qty settings Signed-off-by: Navneet Verma --- CHANGELOG.md | 3 +- .../org/opensearch/knn/index/KNNSettings.java | 8 +++ .../KNN9120PerFieldKnnVectorsFormat.java | 52 ++++++++++++++----- .../codec/nativeindex/NativeIndexWriter.java | 4 +- .../opensearch/knn/training/TrainingJob.java | 5 +- .../knn/index/KNNSettingsTests.java | 11 ++++ 6 files changed, 63 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1db516ec5e..f4dd832764 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,7 +33,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Remove DocsWithFieldSet reference from NativeEngineFieldVectorsWriter (#2408)[https://github.com/opensearch-project/k-NN/pull/2408] - Remove skip building graph check for quantization use case (#2430)[https://github.com/opensearch-project/k-NN/2430] - Removing redundant type conversions for script scoring for hamming space with binary vectors (#2351)[https://github.com/opensearch-project/k-NN/pull/2351] -- Update default to 0 to always build graph as default behavior (#52)[https://github.com/opensearch-project/k-NN/pull/2452] +- Update default to 0 to always build graph as default behavior (#2452)[https://github.com/opensearch-project/k-NN/pull/2452] +- Enabled concurrent graph creation for Lucene engine with index thread qty settings(#2480)[https://github.com/opensearch-project/k-NN/pull/2480] ### Bug Fixes * Fixing the bug when a segment has no vector field present for disk based vector search (#2282)[https://github.com/opensearch-project/k-NN/pull/2282] * Fixing the bug where search fails with "fields" parameter for an index with a knn_vector field (#2314)[https://github.com/opensearch-project/k-NN/pull/2314] diff --git a/src/main/java/org/opensearch/knn/index/KNNSettings.java b/src/main/java/org/opensearch/knn/index/KNNSettings.java index 53cfdf6870..78e4528109 100644 --- a/src/main/java/org/opensearch/knn/index/KNNSettings.java +++ b/src/main/java/org/opensearch/knn/index/KNNSettings.java @@ -737,6 +737,14 @@ public void onIndexModule(IndexModule module) { }); } + /** + * Get the index thread quantity setting value from cluster setting. + * @return int + */ + public static int getIndexThreadQty() { + return KNNSettings.state().getSettingValue(KNN_ALGO_PARAM_INDEX_THREAD_QTY); + } + private static String percentageAsString(Integer percentage) { return percentage + "%"; } diff --git a/src/main/java/org/opensearch/knn/index/codec/KNN9120Codec/KNN9120PerFieldKnnVectorsFormat.java b/src/main/java/org/opensearch/knn/index/codec/KNN9120Codec/KNN9120PerFieldKnnVectorsFormat.java index 6e8fc767ec..3e11af0fa9 100644 --- a/src/main/java/org/opensearch/knn/index/codec/KNN9120Codec/KNN9120PerFieldKnnVectorsFormat.java +++ b/src/main/java/org/opensearch/knn/index/codec/KNN9120Codec/KNN9120PerFieldKnnVectorsFormat.java @@ -7,18 +7,22 @@ import org.apache.lucene.codecs.lucene99.Lucene99HnswScalarQuantizedVectorsFormat; import org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsFormat; +import org.opensearch.common.collect.Tuple; import org.opensearch.index.mapper.MapperService; +import org.opensearch.knn.index.KNNSettings; import org.opensearch.knn.index.SpaceType; import org.opensearch.knn.index.codec.BasePerFieldKnnVectorsFormat; import org.opensearch.knn.index.engine.KNNEngine; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Class provides per field format implementation for Lucene Knn vector type */ public class KNN9120PerFieldKnnVectorsFormat extends BasePerFieldKnnVectorsFormat { - private static final int NUM_MERGE_WORKERS = 1; + private static final Tuple DEFAULT_MERGE_THREAD_COUNT_AND_EXECUTOR_SERVICE = Tuple.tuple(1, null); public KNN9120PerFieldKnnVectorsFormat(final Optional mapperService) { super( @@ -27,37 +31,59 @@ public KNN9120PerFieldKnnVectorsFormat(final Optional mapperServi Lucene99HnswVectorsFormat.DEFAULT_BEAM_WIDTH, Lucene99HnswVectorsFormat::new, knnVectorsFormatParams -> { + Tuple mergeThreadCountAndExecutorService = getMergeThreadCountAndExecutorService(); // There is an assumption here that hamming space will only be used for binary vectors. This will need to be fixed if that // changes in the future. if (knnVectorsFormatParams.getSpaceType() == SpaceType.HAMMING) { return new KNN9120HnswBinaryVectorsFormat( knnVectorsFormatParams.getMaxConnections(), - knnVectorsFormatParams.getBeamWidth() + knnVectorsFormatParams.getBeamWidth(), + mergeThreadCountAndExecutorService.v1(), + mergeThreadCountAndExecutorService.v2() ); } else { - return new Lucene99HnswVectorsFormat(knnVectorsFormatParams.getMaxConnections(), knnVectorsFormatParams.getBeamWidth()); + return new Lucene99HnswVectorsFormat( + knnVectorsFormatParams.getMaxConnections(), + knnVectorsFormatParams.getBeamWidth(), + mergeThreadCountAndExecutorService.v1(), + mergeThreadCountAndExecutorService.v2() + ); } }, - knnScalarQuantizedVectorsFormatParams -> new Lucene99HnswScalarQuantizedVectorsFormat( - knnScalarQuantizedVectorsFormatParams.getMaxConnections(), - knnScalarQuantizedVectorsFormatParams.getBeamWidth(), - NUM_MERGE_WORKERS, - knnScalarQuantizedVectorsFormatParams.getBits(), - knnScalarQuantizedVectorsFormatParams.isCompressFlag(), - knnScalarQuantizedVectorsFormatParams.getConfidenceInterval(), - null - ) + knnScalarQuantizedVectorsFormatParams -> { + Tuple mergeThreadCountAndExecutorService = getMergeThreadCountAndExecutorService(); + return new Lucene99HnswScalarQuantizedVectorsFormat( + knnScalarQuantizedVectorsFormatParams.getMaxConnections(), + knnScalarQuantizedVectorsFormatParams.getBeamWidth(), + mergeThreadCountAndExecutorService.v1(), + knnScalarQuantizedVectorsFormatParams.getBits(), + knnScalarQuantizedVectorsFormatParams.isCompressFlag(), + knnScalarQuantizedVectorsFormatParams.getConfidenceInterval(), + mergeThreadCountAndExecutorService.v2() + ); + } ); } - @Override /** * This method returns the maximum dimension allowed from KNNEngine for Lucene codec * * @param fieldName Name of the field, ignored * @return Maximum constant dimension set by KNNEngine */ + @Override public int getMaxDimensions(String fieldName) { return KNNEngine.getMaxDimensionByEngine(KNNEngine.LUCENE); } + + private static Tuple getMergeThreadCountAndExecutorService() { + int mergeThreadCount = KNNSettings.getIndexThreadQty(); + // We need to return null whenever the merge threads are 0, as lucene assumes that if number of threads are 1 + // then we should be giving a null value of the executor + if (mergeThreadCount <= 1) { + return DEFAULT_MERGE_THREAD_COUNT_AND_EXECUTOR_SERVICE; + } else { + return Tuple.tuple(mergeThreadCount, Executors.newFixedThreadPool(mergeThreadCount)); + } + } } diff --git a/src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexWriter.java b/src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexWriter.java index 7078645e54..de535c39e8 100644 --- a/src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexWriter.java +++ b/src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexWriter.java @@ -226,7 +226,7 @@ private Map getParameters(FieldInfo fieldInfo, VectorDataType ve maybeAddBinaryPrefixForFaissBWC(knnEngine, parameters, fieldAttributes); // Used to determine how many threads to use when indexing - parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.state().getSettingValue(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY)); + parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.getIndexThreadQty()); return parameters; } @@ -258,7 +258,7 @@ private void maybeAddBinaryPrefixForFaissBWC(KNNEngine knnEngine, Map getTemplateParameters(FieldInfo fieldInfo, Model model) throws IOException { Map parameters = new HashMap<>(); - parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.state().getSettingValue(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY)); + parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.getIndexThreadQty()); parameters.put(KNNConstants.MODEL_ID, fieldInfo.attributes().get(MODEL_ID)); parameters.put(KNNConstants.MODEL_BLOB_PARAMETER, model.getModelBlob()); if (FieldInfoExtractor.extractQuantizationConfig(fieldInfo) != QuantizationConfig.EMPTY) { diff --git a/src/main/java/org/opensearch/knn/training/TrainingJob.java b/src/main/java/org/opensearch/knn/training/TrainingJob.java index 275aa2f47c..d3731e5aa9 100644 --- a/src/main/java/org/opensearch/knn/training/TrainingJob.java +++ b/src/main/java/org/opensearch/knn/training/TrainingJob.java @@ -179,10 +179,7 @@ public void run() { .getKNNLibraryIndexingContext(knnMethodContext, knnMethodConfigContext); Map trainParameters = libraryIndexingContext.getLibraryParameters(); - trainParameters.put( - KNNConstants.INDEX_THREAD_QTY, - KNNSettings.state().getSettingValue(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY) - ); + trainParameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.getIndexThreadQty()); if (libraryIndexingContext.getQuantizationConfig() != QuantizationConfig.EMPTY) { trainParameters.put(KNNConstants.VECTOR_DATA_TYPE_FIELD, VectorDataType.BINARY.getValue()); diff --git a/src/test/java/org/opensearch/knn/index/KNNSettingsTests.java b/src/test/java/org/opensearch/knn/index/KNNSettingsTests.java index cfb7ad6704..12d3475dc9 100644 --- a/src/test/java/org/opensearch/knn/index/KNNSettingsTests.java +++ b/src/test/java/org/opensearch/knn/index/KNNSettingsTests.java @@ -209,6 +209,17 @@ public void testGetFaissAVX2DisabledSettingValueFromConfig_enableSetting_thenVal assertEquals(expectedKNNFaissAVX2Disabled, actualKNNFaissAVX2Disabled); } + @SneakyThrows + public void testGetIndexThreadQty_WithDifferentValues_thenSuccess() { + Node mockNode = createMockNode(Map.of(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY, 3)); + mockNode.start(); + ClusterService clusterService = mockNode.injector().getInstance(ClusterService.class); + KNNSettings.state().setClusterService(clusterService); + int threadQty = KNNSettings.getIndexThreadQty(); + mockNode.close(); + assertEquals(3, threadQty); + } + private Node createMockNode(Map configSettings) throws IOException { Path configDir = createTempDir(); File configFile = configDir.resolve("opensearch.yml").toFile();