diff --git a/src/main/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategy.java b/src/main/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategy.java index 72ab3aa1b..441f70c46 100644 --- a/src/main/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategy.java +++ b/src/main/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategy.java @@ -14,6 +14,7 @@ import org.opensearch.knn.index.KNNSettings; import org.opensearch.knn.index.codec.nativeindex.NativeIndexBuildStrategy; import org.opensearch.knn.index.codec.nativeindex.model.BuildIndexParams; +import org.opensearch.knn.index.vectorvalues.KNNVectorValues; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryMissingException; @@ -25,6 +26,19 @@ import static org.opensearch.knn.index.KNNSettings.KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING; import static org.opensearch.knn.index.KNNSettings.KNN_INDEX_REMOTE_VECTOR_BUILD_THRESHOLD_SETTING; import static org.opensearch.knn.index.KNNSettings.KNN_REMOTE_VECTOR_REPO_SETTING; +import static org.opensearch.knn.index.codec.util.KNNCodecUtil.initializeVectorValues; +import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.BUILD_REQUEST_FAILURE_COUNT; +import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.BUILD_REQUEST_SUCCESS_COUNT; +import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.READ_FAILURE_COUNT; +import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.READ_SUCCESS_COUNT; +import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.READ_TIME; +import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_CURRENT_OPERATIONS; +import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_CURRENT_SIZE; +import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_TIME; +import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.WAITING_TIME; +import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.WRITE_FAILURE_COUNT; +import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.WRITE_SUCCESS_COUNT; +import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.WRITE_TIME; /** * This class orchestrates building vector indices. It handles uploading data to a repository, submitting a remote @@ -108,11 +122,17 @@ public static boolean shouldBuildIndexRemotely(IndexSettings indexSettings, long */ @Override public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException { + StopWatch remoteBuildTimeStopwatch = new StopWatch(); + KNNVectorValues knnVectorValues = indexInfo.getKnnVectorValuesSupplier().get(); + initializeVectorValues(knnVectorValues); + startRemoteIndexBuildStats((long) indexInfo.getTotalLiveDocs() * knnVectorValues.bytesPerVector(), remoteBuildTimeStopwatch); StopWatch stopWatch; long time_in_millis; + VectorRepositoryAccessor vectorRepositoryAccessor = new DefaultVectorRepositoryAccessor(getRepository(), indexSettings); + + // 1. Write required data to repository + stopWatch = new StopWatch().start(); try { - VectorRepositoryAccessor vectorRepositoryAccessor = new DefaultVectorRepositoryAccessor(getRepository(), indexSettings); - stopWatch = new StopWatch().start(); // We create a new time based UUID per file in order to avoid conflicts across shards. It is also very difficult to get the // shard id in this context. String blobName = UUIDs.base64UUID() + "_" + indexInfo.getFieldName() + "_" + indexInfo.getSegmentWriteState().segmentInfo.name; @@ -123,27 +143,61 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException { indexInfo.getKnnVectorValuesSupplier() ); time_in_millis = stopWatch.stop().totalTime().millis(); + WRITE_SUCCESS_COUNT.increment(); + WRITE_TIME.incrementBy(time_in_millis); log.debug("Repository write took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName()); + } catch (Exception e) { + time_in_millis = stopWatch.stop().totalTime().millis(); + WRITE_FAILURE_COUNT.increment(); + log.error("Repository write failed after {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName(), e); + handleFailure(indexInfo, knnVectorValues.bytesPerVector(), remoteBuildTimeStopwatch); + return; + } - stopWatch = new StopWatch().start(); + // 2. Triggers index build + stopWatch = new StopWatch().start(); + try { submitVectorBuild(); time_in_millis = stopWatch.stop().totalTime().millis(); + BUILD_REQUEST_SUCCESS_COUNT.increment(); log.debug("Submit vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName()); + } catch (Exception e) { + BUILD_REQUEST_FAILURE_COUNT.increment(); + log.error("Submit vector failed after {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName(), e); + handleFailure(indexInfo, knnVectorValues.bytesPerVector(), remoteBuildTimeStopwatch); + return; + } - stopWatch = new StopWatch().start(); + // 3. Awaits on vector build to complete + stopWatch = new StopWatch().start(); + try { awaitVectorBuild(); time_in_millis = stopWatch.stop().totalTime().millis(); + WAITING_TIME.incrementBy(time_in_millis); log.debug("Await vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName()); + } catch (Exception e) { + log.debug("Await vector build failed after {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName()); + handleFailure(indexInfo, knnVectorValues.bytesPerVector(), remoteBuildTimeStopwatch); + return; + } - stopWatch = new StopWatch().start(); + // 4. Downloads index file and writes to indexOutput + stopWatch = new StopWatch().start(); + try { vectorRepositoryAccessor.readFromRepository(); time_in_millis = stopWatch.stop().totalTime().millis(); + READ_SUCCESS_COUNT.increment(); + READ_TIME.incrementBy(time_in_millis); log.debug("Repository read took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName()); } catch (Exception e) { - // TODO: This needs more robust failure handling - log.warn("Failed to build index remotely", e); - fallbackStrategy.buildAndWriteIndex(indexInfo); + time_in_millis = stopWatch.stop().totalTime().millis(); + READ_FAILURE_COUNT.increment(); + log.error("Repository read failed after {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName(), e); + handleFailure(indexInfo, knnVectorValues.bytesPerVector(), remoteBuildTimeStopwatch); + return; } + + endRemoteIndexBuildStats((long) indexInfo.getTotalLiveDocs() * knnVectorValues.bytesPerVector(), stopWatch); } /** @@ -178,4 +232,28 @@ private void submitVectorBuild() { private void awaitVectorBuild() { throw new NotImplementedException(); } + + private void startRemoteIndexBuildStats(long size, StopWatch stopWatch) { + stopWatch.start(); + REMOTE_INDEX_BUILD_CURRENT_OPERATIONS.increment(); + REMOTE_INDEX_BUILD_CURRENT_SIZE.incrementBy(size); + } + + private void endRemoteIndexBuildStats(long size, StopWatch stopWatch) { + long time_in_millis = stopWatch.stop().totalTime().millis(); + REMOTE_INDEX_BUILD_CURRENT_OPERATIONS.decrement(); + REMOTE_INDEX_BUILD_CURRENT_SIZE.incrementBy(size); + REMOTE_INDEX_BUILD_TIME.incrementBy(time_in_millis); + } + + /** + * Helper method to collect remote index build metrics on failure and invoke fallback strategy + * @param indexParams + * @param bytesPerVector + * @throws IOException + */ + private void handleFailure(BuildIndexParams indexParams, long bytesPerVector, StopWatch stopWatch) throws IOException { + endRemoteIndexBuildStats(indexParams.getTotalLiveDocs() * bytesPerVector, stopWatch); + fallbackStrategy.buildAndWriteIndex(indexParams); + } } diff --git a/src/main/java/org/opensearch/knn/plugin/stats/KNNRemoteIndexBuildValue.java b/src/main/java/org/opensearch/knn/plugin/stats/KNNRemoteIndexBuildValue.java new file mode 100644 index 000000000..2ae39991f --- /dev/null +++ b/src/main/java/org/opensearch/knn/plugin/stats/KNNRemoteIndexBuildValue.java @@ -0,0 +1,90 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.knn.plugin.stats; + +import lombok.Getter; + +import java.util.concurrent.atomic.LongAdder; + +public enum KNNRemoteIndexBuildValue { + + // Repository Accumulating Stats + WRITE_SUCCESS_COUNT("write_success_count"), + WRITE_FAILURE_COUNT("write_failure_count"), + WRITE_TIME("write_time_in_millis"), + READ_SUCCESS_COUNT("read_success_count"), + READ_FAILURE_COUNT("read_failure_count"), + READ_TIME("read_time_in_millis"), + + // Remote Index Build PIT Stats + REMOTE_INDEX_BUILD_CURRENT_OPERATIONS("remote_index_build_current_operations"), + REMOTE_INDEX_BUILD_CURRENT_SIZE("remote_index_build_current_size"), + REMOTE_INDEX_BUILD_TIME("remote_index_build_time_in_millis"), + + // Client Stats + BUILD_REQUEST_SUCCESS_COUNT("build_request_success_count"), + BUILD_REQUEST_FAILURE_COUNT("build_request_failure_count"), + STATUS_REQUEST_SUCCESS_COUNT("status_request_success_count"), + STATUS_REQUEST_FAILURE_COUNT("status_request_failure_count"), + INDEX_BUILD_SUCCESS_COUNT("index_build_success_count"), + INDEX_BUILD_FAILURE_COUNT("index_build_failure_count"), + WAITING_TIME("waiting_time_in_ms"); + + @Getter + private final String name; + private final LongAdder value; + + /** + * Constructor + * + * @param name name of the graph value + */ + KNNRemoteIndexBuildValue(String name) { + this.name = name; + this.value = new LongAdder(); + } + + /** + * Get the graph value + * + * @return value + */ + public Long getValue() { + return value.longValue(); + } + + /** + * Increment the graph value + */ + public void increment() { + value.increment(); + } + + /** + * Decrement the graph value + */ + public void decrement() { + value.decrement(); + } + + /** + * Increment the graph value by a specified amount + * + * @param delta The amount to increment + */ + public void incrementBy(long delta) { + value.add(delta); + } + + /** + * Decrement the graph value by a specified amount + * + * @param delta The amount to decrement + */ + public void decrementBy(long delta) { + value.add(delta * -1); + } +} diff --git a/src/main/java/org/opensearch/knn/plugin/stats/KNNStats.java b/src/main/java/org/opensearch/knn/plugin/stats/KNNStats.java index bcd419ea6..71850a4fe 100644 --- a/src/main/java/org/opensearch/knn/plugin/stats/KNNStats.java +++ b/src/main/java/org/opensearch/knn/plugin/stats/KNNStats.java @@ -8,8 +8,9 @@ import com.google.common.cache.CacheStats; import com.google.common.collect.ImmutableMap; import org.opensearch.knn.common.KNNConstants; -import org.opensearch.knn.index.memory.NativeMemoryCacheManager; +import org.opensearch.knn.common.featureflags.KNNFeatureFlags; import org.opensearch.knn.index.engine.KNNEngine; +import org.opensearch.knn.index.memory.NativeMemoryCacheManager; import org.opensearch.knn.indices.ModelCache; import org.opensearch.knn.indices.ModelDao; import org.opensearch.knn.plugin.stats.suppliers.EventOccurredWithinThresholdSupplier; @@ -80,12 +81,15 @@ private Map> getClusterOrNodeStats(Boolean getClusterStats) { private Map> buildStatsMap() { ImmutableMap.Builder> builder = ImmutableMap.>builder(); - addQueryStats(builder); - addNativeMemoryStats(builder); - addEngineStats(builder); - addScriptStats(builder); - addModelStats(builder); - addGraphStats(builder); +// addQueryStats(builder); +// addNativeMemoryStats(builder); +// addEngineStats(builder); +// addScriptStats(builder); +// addModelStats(builder); +// addGraphStats(builder); +// if (KNNFeatureFlags.isKNNRemoteVectorBuildEnabled()) { + addRemoteIndexBuildStats(builder); +// } return builder.build(); } @@ -218,4 +222,65 @@ private Map> createGraphStatsMap() { graphStatsMap.put(StatNames.REFRESH.getName(), refreshMap); return graphStatsMap; } + + private void addRemoteIndexBuildStats(ImmutableMap.Builder> builder) { + builder.put(StatNames.REMOTE_VECTOR_INDEX_BUILD_STATS.getName(), new KNNStat<>(false, this::createRemoteIndexStatsMap)); + } + + private Map> createRemoteIndexStatsMap() { + Map clientStatsMap = new HashMap<>(); + clientStatsMap.put( + KNNRemoteIndexBuildValue.BUILD_REQUEST_SUCCESS_COUNT.getName(), + KNNRemoteIndexBuildValue.BUILD_REQUEST_SUCCESS_COUNT.getValue() + ); + clientStatsMap.put( + KNNRemoteIndexBuildValue.BUILD_REQUEST_FAILURE_COUNT.getName(), + KNNRemoteIndexBuildValue.BUILD_REQUEST_FAILURE_COUNT.getValue() + ); + clientStatsMap.put( + KNNRemoteIndexBuildValue.STATUS_REQUEST_SUCCESS_COUNT.getName(), + KNNRemoteIndexBuildValue.STATUS_REQUEST_SUCCESS_COUNT.getValue() + ); + clientStatsMap.put( + KNNRemoteIndexBuildValue.STATUS_REQUEST_FAILURE_COUNT.getName(), + KNNRemoteIndexBuildValue.STATUS_REQUEST_FAILURE_COUNT.getValue() + ); + clientStatsMap.put( + KNNRemoteIndexBuildValue.INDEX_BUILD_SUCCESS_COUNT.getName(), + KNNRemoteIndexBuildValue.INDEX_BUILD_SUCCESS_COUNT.getValue() + ); + clientStatsMap.put( + KNNRemoteIndexBuildValue.INDEX_BUILD_FAILURE_COUNT.getName(), + KNNRemoteIndexBuildValue.INDEX_BUILD_FAILURE_COUNT.getValue() + ); + clientStatsMap.put(KNNRemoteIndexBuildValue.WAITING_TIME.getName(), KNNRemoteIndexBuildValue.WAITING_TIME.getValue()); + + Map repoStatsMap = new HashMap<>(); + repoStatsMap.put(KNNRemoteIndexBuildValue.WRITE_SUCCESS_COUNT.getName(), KNNRemoteIndexBuildValue.WRITE_SUCCESS_COUNT.getValue()); + repoStatsMap.put(KNNRemoteIndexBuildValue.WRITE_FAILURE_COUNT.getName(), KNNRemoteIndexBuildValue.WRITE_FAILURE_COUNT.getValue()); + repoStatsMap.put(KNNRemoteIndexBuildValue.WRITE_TIME.getName(), KNNRemoteIndexBuildValue.WRITE_TIME.getValue()); + repoStatsMap.put(KNNRemoteIndexBuildValue.READ_SUCCESS_COUNT.getName(), KNNRemoteIndexBuildValue.READ_SUCCESS_COUNT.getValue()); + repoStatsMap.put(KNNRemoteIndexBuildValue.READ_FAILURE_COUNT.getName(), KNNRemoteIndexBuildValue.READ_FAILURE_COUNT.getValue()); + repoStatsMap.put(KNNRemoteIndexBuildValue.READ_TIME.getName(), KNNRemoteIndexBuildValue.READ_TIME.getValue()); + + Map buildStatsMap = new HashMap<>(); + buildStatsMap.put( + KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_CURRENT_OPERATIONS.getName(), + KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_CURRENT_OPERATIONS.getValue() + ); + buildStatsMap.put( + KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_CURRENT_SIZE.getName(), + KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_CURRENT_SIZE.getValue() + ); + buildStatsMap.put( + KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_TIME.getName(), + KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_TIME.getValue() + ); + + Map> remoteIndexBuildStatsMap = new HashMap<>(); + remoteIndexBuildStatsMap.put(StatNames.BUILD_STATS.getName(), buildStatsMap); + remoteIndexBuildStatsMap.put(StatNames.CLIENT_STATS.getName(), clientStatsMap); + remoteIndexBuildStatsMap.put(StatNames.REPOSITORY_STATS.getName(), repoStatsMap); + return remoteIndexBuildStatsMap; + } } diff --git a/src/main/java/org/opensearch/knn/plugin/stats/StatNames.java b/src/main/java/org/opensearch/knn/plugin/stats/StatNames.java index e7f4fd4a2..1ed338589 100644 --- a/src/main/java/org/opensearch/knn/plugin/stats/StatNames.java +++ b/src/main/java/org/opensearch/knn/plugin/stats/StatNames.java @@ -45,6 +45,10 @@ public enum StatNames { GRAPH_STATS("graph_stats"), REFRESH("refresh"), MERGE("merge"), + REMOTE_VECTOR_INDEX_BUILD_STATS("remote_vector_index_build_stats"), + CLIENT_STATS("client_stats"), + REPOSITORY_STATS("repository_stats"), + BUILD_STATS("build_stats"), MIN_SCORE_QUERY_REQUESTS(KNNCounter.MIN_SCORE_QUERY_REQUESTS.getName()), MIN_SCORE_QUERY_WITH_FILTER_REQUESTS(KNNCounter.MIN_SCORE_QUERY_WITH_FILTER_REQUESTS.getName()), MAX_DISTANCE_QUERY_REQUESTS(KNNCounter.MAX_DISTANCE_QUERY_REQUESTS.getName()),