Skip to content

Commit

Permalink
Introduce RemoteIndexClient
Browse files Browse the repository at this point in the history
Add RemoteIndexClient initial implementation, its accompanying dependencies, and Build Request, Retry Strategy, and test files
  • Loading branch information
owenhalpert committed Feb 20, 2025
1 parent edcbe31 commit 57290c1
Show file tree
Hide file tree
Showing 7 changed files with 557 additions and 4 deletions.
6 changes: 5 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,11 @@ dependencies {
api "net.java.dev.jna:jna-platform:5.13.0"
// OpenSearch core is using slf4j 1.7.36. Therefore, we cannot change the version here.
implementation 'org.slf4j:slf4j-api:1.7.36'

api 'org.apache.httpcomponents.client5:httpclient5:5.4.1'
api 'org.apache.httpcomponents.core5:httpcore5:5.3.2'
api 'org.apache.httpcomponents.core5:httpcore5-h2:5.3.2'
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
zipArchive group: 'org.opensearch.plugin', name:'opensearch-security', version: "${opensearch_build}"
}

Expand Down
81 changes: 80 additions & 1 deletion src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Booleans;
import org.opensearch.common.settings.SecureSetting;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.settings.SecureString;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.IndexModule;
Expand All @@ -31,6 +33,7 @@

import java.security.InvalidParameterException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -96,6 +99,11 @@ public class KNNSettings {
public static final String KNN_DERIVED_SOURCE_ENABLED = "index.knn.derived_source.enabled";
public static final String KNN_INDEX_REMOTE_VECTOR_BUILD = "index.knn.remote_index_build.enabled";
public static final String KNN_REMOTE_VECTOR_REPO = "knn.remote_index_build.vector_repo";
public static final String KNN_REMOTE_BUILD_SERVICE_ENDPOINT = "knn.remote_build_service.endpoint";
public static final String KNN_REMOTE_BUILD_SERVICE_POLL_INTERVAL = "knn.remote_build_service.poll_interval";
public static final String KNN_REMOTE_BUILD_SERVICE_TIMEOUT = "knn.remote_build_service.timeout";
public static final String KNN_REMOTE_BUILD_SERVICE_USERNAME = "knn.remote_build_service.username";
public static final String KNN_REMOTE_BUILD_SERVICE_PASSWORD = "knn.remote_build_service.password";

/**
* Default setting values
Expand Down Expand Up @@ -127,6 +135,9 @@ public class KNNSettings {
public static final Integer KNN_DEFAULT_QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES = 60;
public static final boolean KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_VALUE = false;

public static final Integer KNN_DEFAULT_REMOTE_BUILD_SERVICE_TIMEOUT_MINUTES = 60;
public static final Integer KNN_DEFAULT_REMOTE_BUILD_SERVICE_POLL_INTERVAL_SECONDS = 30;

/**
* Settings Definition
*/
Expand Down Expand Up @@ -388,6 +399,49 @@ public class KNNSettings {
*/
public static final Setting<String> KNN_REMOTE_VECTOR_REPO_SETTING = Setting.simpleString(KNN_REMOTE_VECTOR_REPO, Dynamic, NodeScope);

/**
* List of remote build service endpoints to be used by remote build service. If greater than one, the client uses round-robin task assignment when workers are busy.
*/
public static final Setting<List<String>> KNN_REMOTE_BUILD_SERVICE_ENDPOINT_SETTING = Setting.listSetting(
KNN_REMOTE_BUILD_SERVICE_ENDPOINT,
Collections.emptyList(),
Function.identity(),
NodeScope,
Dynamic
);

/**
* Time the remote build service client will wait before falling back to CPU index build
*/
public static final Setting<TimeValue> KNN_REMOTE_BUILD_SERVICE_TIMEOUT_SETTING = Setting.timeSetting(
KNN_REMOTE_BUILD_SERVICE_TIMEOUT,
TimeValue.timeValueMinutes(KNN_DEFAULT_REMOTE_BUILD_SERVICE_TIMEOUT_MINUTES),
NodeScope,
Dynamic
);

/**
* Setting to control how often the remote build service client polls the build service for the status of the job
*/
public static final Setting<TimeValue> KNN_REMOTE_BUILD_SERVICE_POLL_INTERVAL_SETTING = Setting.timeSetting(
KNN_REMOTE_BUILD_SERVICE_POLL_INTERVAL,
TimeValue.timeValueSeconds(KNN_DEFAULT_REMOTE_BUILD_SERVICE_POLL_INTERVAL_SECONDS),
NodeScope,
Dynamic
);

/**
* Keystore settings for build service HTTP authorization
*/
public static final Setting<SecureString> KNN_REMOTE_BUILD_SERVICE_USERNAME_SETTING = SecureSetting.secureString(
KNN_REMOTE_BUILD_SERVICE_USERNAME,
null
);
public static final Setting<SecureString> KNN_REMOTE_BUILD_SERVICE_PASSWORD_SETTING = SecureSetting.secureString(
KNN_REMOTE_BUILD_SERVICE_PASSWORD,
null
);

/**
* Dynamic settings
*/
Expand Down Expand Up @@ -550,6 +604,26 @@ private Setting<?> getSetting(String key) {
return KNN_REMOTE_VECTOR_REPO_SETTING;
}

if (KNN_REMOTE_BUILD_SERVICE_ENDPOINT.equals(key)) {
return KNN_REMOTE_BUILD_SERVICE_ENDPOINT_SETTING;
}

if (KNN_REMOTE_BUILD_SERVICE_TIMEOUT.equals(key)) {
return KNN_REMOTE_BUILD_SERVICE_TIMEOUT_SETTING;
}

if (KNN_REMOTE_BUILD_SERVICE_POLL_INTERVAL.equals(key)) {
return KNN_REMOTE_BUILD_SERVICE_POLL_INTERVAL_SETTING;
}

if (KNN_REMOTE_BUILD_SERVICE_USERNAME.equals(key)) {
return KNN_REMOTE_BUILD_SERVICE_USERNAME_SETTING;
}

if (KNN_REMOTE_BUILD_SERVICE_PASSWORD.equals(key)) {
return KNN_REMOTE_BUILD_SERVICE_PASSWORD_SETTING;
}

throw new IllegalArgumentException("Cannot find setting by key [" + key + "]");
}

Expand Down Expand Up @@ -577,7 +651,12 @@ public List<Setting<?>> getSettings() {
KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_SETTING,
KNN_DERIVED_SOURCE_ENABLED_SETTING,
KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING,
KNN_REMOTE_VECTOR_REPO_SETTING
KNN_REMOTE_VECTOR_REPO_SETTING,
KNN_REMOTE_BUILD_SERVICE_ENDPOINT_SETTING,
KNN_REMOTE_BUILD_SERVICE_TIMEOUT_SETTING,
KNN_REMOTE_BUILD_SERVICE_POLL_INTERVAL_SETTING,
KNN_REMOTE_BUILD_SERVICE_USERNAME_SETTING,
KNN_REMOTE_BUILD_SERVICE_PASSWORD_SETTING
);
return Stream.concat(settings.stream(), Stream.concat(getFeatureFlags().stream(), dynamicCacheSettings.values().stream()))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.codec.nativeindex.NativeIndexBuildStrategy;
import org.opensearch.knn.index.codec.nativeindex.model.BuildIndexParams;
import org.opensearch.knn.index.remote.RemoteIndexClient;
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -88,12 +89,12 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException {
log.debug("Repository write took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());

stopWatch = new StopWatch().start();
submitVectorBuild();
String jobId = RemoteIndexClient.getInstance().submitVectorBuild();
time_in_millis = stopWatch.stop().totalTime().millis();
log.debug("Submit vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());

stopWatch = new StopWatch().start();
awaitVectorBuild();
String indexPath = RemoteIndexClient.getInstance().awaitVectorBuild(jobId);
time_in_millis = stopWatch.stop().totalTime().millis();
log.debug("Await vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index.remote;

import org.opensearch.common.xcontent.json.JsonXContent;
import lombok.Builder;
import lombok.Getter;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@Builder
@Getter
public class RemoteBuildRequest {
private final String repositoryType;
private final String containerName;
private final String objectPath;
private final String tenantId;
private final int dimension;
private final int docCount;
private final String dataType;
private final String engine;
private final String algorithm;
@Builder.Default
private final Map<String, Object> indexParameters = new HashMap<>();

// TODO: Add type checking to all parameters, add individual methods (e.g. setEfConstruction) to check index params

public String toJson() throws IOException {
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
builder.field("repository_type", repositoryType);
builder.field("container_name", containerName);
builder.field("object_path", objectPath);
builder.field("tenant_id", tenantId);
builder.field("dimension", dimension);
builder.field("doc_count", docCount);
builder.field("data_type", dataType);
builder.field("engine", engine);
builder.field("index_parameters", indexParameters);
builder.endObject();
return builder.toString();
}
}

}
Loading

0 comments on commit 57290c1

Please sign in to comment.