diff --git a/CHANGELOG.md b/CHANGELOG.md index 1db516ec5e..f4dd832764 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,7 +33,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Remove DocsWithFieldSet reference from NativeEngineFieldVectorsWriter (#2408)[https://github.com/opensearch-project/k-NN/pull/2408] - Remove skip building graph check for quantization use case (#2430)[https://github.com/opensearch-project/k-NN/2430] - Removing redundant type conversions for script scoring for hamming space with binary vectors (#2351)[https://github.com/opensearch-project/k-NN/pull/2351] -- Update default to 0 to always build graph as default behavior (#52)[https://github.com/opensearch-project/k-NN/pull/2452] +- Update default to 0 to always build graph as default behavior (#2452)[https://github.com/opensearch-project/k-NN/pull/2452] +- Enabled concurrent graph creation for Lucene engine with index thread qty settings(#2480)[https://github.com/opensearch-project/k-NN/pull/2480] ### Bug Fixes * Fixing the bug when a segment has no vector field present for disk based vector search (#2282)[https://github.com/opensearch-project/k-NN/pull/2282] * Fixing the bug where search fails with "fields" parameter for an index with a knn_vector field (#2314)[https://github.com/opensearch-project/k-NN/pull/2314] diff --git a/src/main/java/org/opensearch/knn/index/KNNSettings.java b/src/main/java/org/opensearch/knn/index/KNNSettings.java index 53cfdf6870..78e4528109 100644 --- a/src/main/java/org/opensearch/knn/index/KNNSettings.java +++ b/src/main/java/org/opensearch/knn/index/KNNSettings.java @@ -737,6 +737,14 @@ public void onIndexModule(IndexModule module) { }); } + /** + * Get the index thread quantity setting value from cluster setting. + * @return int + */ + public static int getIndexThreadQty() { + return KNNSettings.state().getSettingValue(KNN_ALGO_PARAM_INDEX_THREAD_QTY); + } + private static String percentageAsString(Integer percentage) { return percentage + "%"; } diff --git a/src/main/java/org/opensearch/knn/index/codec/KNN9120Codec/KNN9120PerFieldKnnVectorsFormat.java b/src/main/java/org/opensearch/knn/index/codec/KNN9120Codec/KNN9120PerFieldKnnVectorsFormat.java index 6e8fc767ec..3a133ddc76 100644 --- a/src/main/java/org/opensearch/knn/index/codec/KNN9120Codec/KNN9120PerFieldKnnVectorsFormat.java +++ b/src/main/java/org/opensearch/knn/index/codec/KNN9120Codec/KNN9120PerFieldKnnVectorsFormat.java @@ -8,17 +8,19 @@ import org.apache.lucene.codecs.lucene99.Lucene99HnswScalarQuantizedVectorsFormat; import org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsFormat; import org.opensearch.index.mapper.MapperService; +import org.opensearch.knn.index.KNNSettings; import org.opensearch.knn.index.SpaceType; import org.opensearch.knn.index.codec.BasePerFieldKnnVectorsFormat; import org.opensearch.knn.index.engine.KNNEngine; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Class provides per field format implementation for Lucene Knn vector type */ public class KNN9120PerFieldKnnVectorsFormat extends BasePerFieldKnnVectorsFormat { - private static final int NUM_MERGE_WORKERS = 1; public KNN9120PerFieldKnnVectorsFormat(final Optional mapperService) { super( @@ -41,23 +43,31 @@ public KNN9120PerFieldKnnVectorsFormat(final Optional mapperServi knnScalarQuantizedVectorsFormatParams -> new Lucene99HnswScalarQuantizedVectorsFormat( knnScalarQuantizedVectorsFormatParams.getMaxConnections(), knnScalarQuantizedVectorsFormatParams.getBeamWidth(), - NUM_MERGE_WORKERS, + getMergeThreadCount(), knnScalarQuantizedVectorsFormatParams.getBits(), knnScalarQuantizedVectorsFormatParams.isCompressFlag(), knnScalarQuantizedVectorsFormatParams.getConfidenceInterval(), - null + getMergeExecutorService() ) ); } - @Override /** * This method returns the maximum dimension allowed from KNNEngine for Lucene codec * * @param fieldName Name of the field, ignored * @return Maximum constant dimension set by KNNEngine */ + @Override public int getMaxDimensions(String fieldName) { return KNNEngine.getMaxDimensionByEngine(KNNEngine.LUCENE); } + + private static int getMergeThreadCount() { + return KNNSettings.getIndexThreadQty(); + } + + private static ExecutorService getMergeExecutorService() { + return Executors.newFixedThreadPool(getMergeThreadCount()); + } } diff --git a/src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexWriter.java b/src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexWriter.java index 7078645e54..de535c39e8 100644 --- a/src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexWriter.java +++ b/src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexWriter.java @@ -226,7 +226,7 @@ private Map getParameters(FieldInfo fieldInfo, VectorDataType ve maybeAddBinaryPrefixForFaissBWC(knnEngine, parameters, fieldAttributes); // Used to determine how many threads to use when indexing - parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.state().getSettingValue(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY)); + parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.getIndexThreadQty()); return parameters; } @@ -258,7 +258,7 @@ private void maybeAddBinaryPrefixForFaissBWC(KNNEngine knnEngine, Map getTemplateParameters(FieldInfo fieldInfo, Model model) throws IOException { Map parameters = new HashMap<>(); - parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.state().getSettingValue(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY)); + parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.getIndexThreadQty()); parameters.put(KNNConstants.MODEL_ID, fieldInfo.attributes().get(MODEL_ID)); parameters.put(KNNConstants.MODEL_BLOB_PARAMETER, model.getModelBlob()); if (FieldInfoExtractor.extractQuantizationConfig(fieldInfo) != QuantizationConfig.EMPTY) { diff --git a/src/main/java/org/opensearch/knn/training/TrainingJob.java b/src/main/java/org/opensearch/knn/training/TrainingJob.java index 275aa2f47c..5c2e79c132 100644 --- a/src/main/java/org/opensearch/knn/training/TrainingJob.java +++ b/src/main/java/org/opensearch/knn/training/TrainingJob.java @@ -181,7 +181,7 @@ public void run() { Map trainParameters = libraryIndexingContext.getLibraryParameters(); trainParameters.put( KNNConstants.INDEX_THREAD_QTY, - KNNSettings.state().getSettingValue(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY) + KNNSettings.getIndexThreadQty() ); if (libraryIndexingContext.getQuantizationConfig() != QuantizationConfig.EMPTY) { diff --git a/src/test/java/org/opensearch/knn/index/KNNSettingsTests.java b/src/test/java/org/opensearch/knn/index/KNNSettingsTests.java index cfb7ad6704..12d3475dc9 100644 --- a/src/test/java/org/opensearch/knn/index/KNNSettingsTests.java +++ b/src/test/java/org/opensearch/knn/index/KNNSettingsTests.java @@ -209,6 +209,17 @@ public void testGetFaissAVX2DisabledSettingValueFromConfig_enableSetting_thenVal assertEquals(expectedKNNFaissAVX2Disabled, actualKNNFaissAVX2Disabled); } + @SneakyThrows + public void testGetIndexThreadQty_WithDifferentValues_thenSuccess() { + Node mockNode = createMockNode(Map.of(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY, 3)); + mockNode.start(); + ClusterService clusterService = mockNode.injector().getInstance(ClusterService.class); + KNNSettings.state().setClusterService(clusterService); + int threadQty = KNNSettings.getIndexThreadQty(); + mockNode.close(); + assertEquals(3, threadQty); + } + private Node createMockNode(Map configSettings) throws IOException { Path configDir = createTempDir(); File configFile = configDir.resolve("opensearch.yml").toFile();