Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce RemoteIndexClient #2548

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD)
### Features
* [Remote Vector Index Build] Introduce Client Skeleton + Build Request implementation [#2548](https://github.com/opensearch-project/k-NN/pull/2548/files)
### Enhancements
### Bug Fixes
### Infrastructure
Expand Down
6 changes: 5 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,11 @@ dependencies {
api "net.java.dev.jna:jna-platform:5.13.0"
// OpenSearch core is using slf4j 1.7.36. Therefore, we cannot change the version here.
implementation 'org.slf4j:slf4j-api:1.7.36'

api "org.apache.httpcomponents.client5:httpclient5:${versions.httpclient5}"
api "org.apache.httpcomponents.core5:httpcore5:${versions.httpcore5}"
api "org.apache.httpcomponents.core5:httpcore5-h2:${versions.httpcore5}"
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
zipArchive group: 'org.opensearch.plugin', name:'opensearch-security', version: "${opensearch_build}"
}

Expand Down
78 changes: 77 additions & 1 deletion src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Booleans;
import org.opensearch.common.settings.SecureSetting;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.settings.SecureString;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.IndexModule;
Expand Down Expand Up @@ -96,6 +98,11 @@ public class KNNSettings {
public static final String KNN_DERIVED_SOURCE_ENABLED = "index.knn.derived_source.enabled";
public static final String KNN_INDEX_REMOTE_VECTOR_BUILD = "index.knn.remote_index_build.enabled";
public static final String KNN_REMOTE_VECTOR_REPO = "knn.remote_index_build.vector_repo";
public static final String KNN_REMOTE_BUILD_SERVICE_ENDPOINT = "knn.remote_build_service.endpoint";
public static final String KNN_REMOTE_BUILD_SERVICE_POLL_INTERVAL = "knn.remote_build_service.poll_interval";
public static final String KNN_REMOTE_BUILD_SERVICE_TIMEOUT = "knn.remote_build_service.timeout";
public static final String KNN_REMOTE_BUILD_SERVICE_USERNAME = "knn.remote_build_service.username";
public static final String KNN_REMOTE_BUILD_SERVICE_PASSWORD = "knn.remote_build_service.password";

/**
* Default setting values
Expand Down Expand Up @@ -127,6 +134,9 @@ public class KNNSettings {
public static final Integer KNN_DEFAULT_QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES = 60;
public static final boolean KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_VALUE = false;

public static final Integer KNN_DEFAULT_REMOTE_BUILD_SERVICE_TIMEOUT_MINUTES = 60;
public static final Integer KNN_DEFAULT_REMOTE_BUILD_SERVICE_POLL_INTERVAL_SECONDS = 30;

/**
* Settings Definition
*/
Expand Down Expand Up @@ -388,6 +398,47 @@ public class KNNSettings {
*/
public static final Setting<String> KNN_REMOTE_VECTOR_REPO_SETTING = Setting.simpleString(KNN_REMOTE_VECTOR_REPO, Dynamic, NodeScope);

/**
* Remote build service endpoint to be used for remote index build. //TODO we can add String validators on these endpoint settings
*/
public static final Setting<String> KNN_REMOTE_BUILD_SERVICE_ENDPOINT_SETTING = Setting.simpleString(
KNN_REMOTE_BUILD_SERVICE_ENDPOINT,
NodeScope,
Dynamic
);

/**
* Time the remote build service client will wait before falling back to CPU index build.
*/
public static final Setting<TimeValue> KNN_REMOTE_BUILD_SERVICE_TIMEOUT_SETTING = Setting.timeSetting(
KNN_REMOTE_BUILD_SERVICE_TIMEOUT,
TimeValue.timeValueMinutes(KNN_DEFAULT_REMOTE_BUILD_SERVICE_TIMEOUT_MINUTES),
NodeScope,
Dynamic
);

/**
* Setting to control how often the remote build service client polls the build service for the status of the job.
*/
public static final Setting<TimeValue> KNN_REMOTE_BUILD_SERVICE_POLL_INTERVAL_SETTING = Setting.timeSetting(
KNN_REMOTE_BUILD_SERVICE_POLL_INTERVAL,
TimeValue.timeValueSeconds(KNN_DEFAULT_REMOTE_BUILD_SERVICE_POLL_INTERVAL_SECONDS),
NodeScope,
Dynamic
);

/**
* Keystore settings for build service HTTP authorization
*/
public static final Setting<SecureString> KNN_REMOTE_BUILD_SERVICE_USERNAME_SETTING = SecureSetting.secureString(
KNN_REMOTE_BUILD_SERVICE_USERNAME,
null
);
public static final Setting<SecureString> KNN_REMOTE_BUILD_SERVICE_PASSWORD_SETTING = SecureSetting.secureString(
KNN_REMOTE_BUILD_SERVICE_PASSWORD,
null
);

/**
* Dynamic settings
*/
Expand Down Expand Up @@ -550,6 +601,26 @@ private Setting<?> getSetting(String key) {
return KNN_REMOTE_VECTOR_REPO_SETTING;
}

if (KNN_REMOTE_BUILD_SERVICE_ENDPOINT.equals(key)) {
return KNN_REMOTE_BUILD_SERVICE_ENDPOINT_SETTING;
}

if (KNN_REMOTE_BUILD_SERVICE_TIMEOUT.equals(key)) {
return KNN_REMOTE_BUILD_SERVICE_TIMEOUT_SETTING;
}

if (KNN_REMOTE_BUILD_SERVICE_POLL_INTERVAL.equals(key)) {
return KNN_REMOTE_BUILD_SERVICE_POLL_INTERVAL_SETTING;
}

if (KNN_REMOTE_BUILD_SERVICE_USERNAME.equals(key)) {
return KNN_REMOTE_BUILD_SERVICE_USERNAME_SETTING;
}

if (KNN_REMOTE_BUILD_SERVICE_PASSWORD.equals(key)) {
return KNN_REMOTE_BUILD_SERVICE_PASSWORD_SETTING;
}

throw new IllegalArgumentException("Cannot find setting by key [" + key + "]");
}

Expand Down Expand Up @@ -577,7 +648,12 @@ public List<Setting<?>> getSettings() {
KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_SETTING,
KNN_DERIVED_SOURCE_ENABLED_SETTING,
KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING,
KNN_REMOTE_VECTOR_REPO_SETTING
KNN_REMOTE_VECTOR_REPO_SETTING,
KNN_REMOTE_BUILD_SERVICE_ENDPOINT_SETTING,
KNN_REMOTE_BUILD_SERVICE_TIMEOUT_SETTING,
KNN_REMOTE_BUILD_SERVICE_POLL_INTERVAL_SETTING,
KNN_REMOTE_BUILD_SERVICE_USERNAME_SETTING,
KNN_REMOTE_BUILD_SERVICE_PASSWORD_SETTING
);
return Stream.concat(settings.stream(), Stream.concat(getFeatureFlags().stream(), dynamicCacheSettings.values().stream()))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,27 @@
import org.opensearch.common.StopWatch;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.IndexSettings;
import org.opensearch.knn.common.KNNConstants;
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.codec.nativeindex.NativeIndexBuildStrategy;
import org.opensearch.knn.index.codec.nativeindex.model.BuildIndexParams;
import org.opensearch.knn.index.remote.RemoteBuildRequest;
import org.opensearch.knn.index.remote.RemoteIndexClient;
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

import static org.opensearch.knn.index.KNNSettings.KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING;
import static org.opensearch.knn.index.KNNSettings.KNN_REMOTE_VECTOR_REPO_SETTING;
import static org.opensearch.knn.index.KNNSettings.state;

/**
* This class orchestrates building vector indices. It handles uploading data to a repository, submitting a remote
Expand Down Expand Up @@ -54,7 +60,7 @@ public RemoteIndexBuildStrategy(Supplier<RepositoriesService> repositoriesServic
* @return whether to use the remote build feature
*/
public static boolean shouldBuildIndexRemotely(IndexSettings indexSettings) {
String vectorRepo = KNNSettings.state().getSettingValue(KNN_REMOTE_VECTOR_REPO_SETTING.getKey());
String vectorRepo = state().getSettingValue(KNN_REMOTE_VECTOR_REPO_SETTING.getKey());
return KNNFeatureFlags.isKNNRemoteVectorBuildEnabled()
&& indexSettings.getValue(KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING)
&& vectorRepo != null
Expand Down Expand Up @@ -88,17 +94,18 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException {
log.debug("Repository write took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());

stopWatch = new StopWatch().start();
submitVectorBuild();
RemoteBuildRequest buildRequest = constructBuildRequest(indexInfo);
String jobId = RemoteIndexClient.getInstance().submitVectorBuild(buildRequest);
time_in_millis = stopWatch.stop().totalTime().millis();
log.debug("Submit vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());

stopWatch = new StopWatch().start();
awaitVectorBuild();
String indexPath = awaitVectorBuild(jobId);
time_in_millis = stopWatch.stop().totalTime().millis();
log.debug("Await vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());

stopWatch = new StopWatch().start();
readFromRepository();
readFromRepository(indexPath);
time_in_millis = stopWatch.stop().totalTime().millis();
log.debug("Repository read took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
} catch (Exception e) {
Expand Down Expand Up @@ -156,14 +163,57 @@ private void submitVectorBuild() {
/**
* Wait on remote vector build to complete
*/
private void awaitVectorBuild() {
private String awaitVectorBuild(String jobId) {
throw new NotImplementedException();
}

/**
* Read constructed vector file from remote repository and write to IndexOutput
*/
private void readFromRepository() {
private void readFromRepository(String indexPath) {
throw new NotImplementedException();
}

/**
* Construct the JSON request body and HTTP request for the index build request
* @return HttpExecuteRequest for the index build request with parameters set
*/
public RemoteBuildRequest constructBuildRequest(BuildIndexParams indexInfo) throws IOException {
String repositoryType = getRepository().getMetadata().type();
String containerName = switch (repositoryType) {
case "s3" -> getRepository().getMetadata().settings().get("bucket");
case "fs" -> getRepository().getMetadata().settings().get("location");
default -> throw new IllegalStateException("Unexpected value: " + repositoryType);
};
String vectorPath = null; // blobName + VECTOR_BLOB_FILE_EXTENSION
String docIdPath = null; // blobName + DOC_ID_FILE_EXTENSION
String tenantId = null; // indexSettings.getSettings().get(ClusterName.CLUSTER_NAME_SETTING.getKey());
int dimension = 0; // TODO
int docCount = indexInfo.getTotalLiveDocs();
String dataType = indexInfo.getVectorDataType().getValue(); // TODO need to fetch encoder param to get fp16 vs fp32
String engine = indexInfo.getKnnEngine().getName();

String spaceType = indexInfo.getParameters().get(KNNConstants.SPACE_TYPE).toString(); // OR

Map<String, Object> algorithmParams = new HashMap<>();
algorithmParams.put("ef_construction", 100);
algorithmParams.put("m", 16);

Map<String, Object> indexParameters = new HashMap<>();
indexParameters.put("algorithm", "hnsw");
indexParameters.put("algorithm_parameters", algorithmParams);

return RemoteBuildRequest.builder()
.repositoryType(repositoryType)
.containerName(containerName)
.vectorPath(vectorPath)
.docIdPath(docIdPath)
.tenantId(tenantId)
.dimension(dimension)
.docCount(docCount)
.dataType(dataType)
.engine(engine)
.indexParameters(indexParameters)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index.remote;

import org.opensearch.common.xcontent.json.JsonXContent;
import lombok.Builder;
import lombok.Getter;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@Builder
@Getter
public class RemoteBuildRequest {
private final String repositoryType;
private final String containerName;
private final String vectorPath;
private final String docIdPath;
private final String tenantId;
private final int dimension;
private final int docCount;
private final String dataType;
private final String engine;
@Builder.Default
private final Map<String, Object> indexParameters = new HashMap<>();

public String toJson() throws IOException {
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
builder.field("repository_type", repositoryType);
builder.field("container_name", containerName);
builder.field("vector_path", vectorPath);
builder.field("doc_id_path", docIdPath);
builder.field("tenant_id", tenantId);
builder.field("dimension", dimension);
builder.field("doc_count", docCount);
builder.field("data_type", dataType);
builder.field("engine", engine);
builder.field("index_parameters", indexParameters);
builder.endObject();
return builder.toString();
}
}

}
Loading
Loading