Skip to content

Commit

Permalink
Implented the code for polling the remote index status, and also adde…
Browse files Browse the repository at this point in the history
…d some settings to run remote-index-build service

Signed-off-by: Navneet Verma <navneev@amazon.com>
  • Loading branch information
navneet1v committed Dec 22, 2024
1 parent e088148 commit f1d0a75
Show file tree
Hide file tree
Showing 8 changed files with 498 additions and 80 deletions.
40 changes: 40 additions & 0 deletions src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public class KNNSettings {
public static final String KNN_S3_TOKEN_KEY = "knn.s3.token.key";
public static final String REMOTE_SERVICE_ENDPOINT = "knn.remote.index.build.service.endpoint";
public static final String REMOTE_SERVICE_PORT = "knn.remote.index.build.service.port";
public static final String REMOTE_INDEX_BUILD_ENABLED = "knn.remote.index.build.enabled";
public static final String REMOTE_INDEX_BUILD_STATUS_WAIT_TIME = "knn.remote.index.build.status.wait_time";
public static final String REMOTE_INDEX_BUILD_MAX_DOCS = "knn.remote.index.build.max_docs";

/**
* Default setting values
Expand Down Expand Up @@ -196,6 +199,28 @@ public class KNNSettings {
Setting.Property.NodeScope
);

public static final Setting<Boolean> REMOTE_INDEX_BUILD_ENABLED_SETTING = Setting.boolSetting(
REMOTE_INDEX_BUILD_ENABLED,
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<TimeValue> REMOTE_INDEX_BUILD_STATUS_WAIT_TIME_SETTING = Setting.timeSetting(
REMOTE_INDEX_BUILD_STATUS_WAIT_TIME,
TimeValue.timeValueSeconds(1),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<Integer> REMOTE_INDEX_BUILD_MAX_DOCS_SETTING = Setting.intSetting(
REMOTE_INDEX_BUILD_MAX_DOCS,
100000,
0,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* build_vector_data_structure_threshold - This parameter determines when to build vector data structure for knn fields during indexing
* and merging. Setting -1 (min) will skip building graph, whereas on any other values, the graph will be built if
Expand Down Expand Up @@ -398,6 +423,9 @@ public class KNNSettings {
put(KNN_S3_TOKEN_KEY, KNN_S3_TOKEN_KEY_SETTING);
put(REMOTE_SERVICE_ENDPOINT, REMOTE_SERVICE_ENDPOINT_SETTING);
put(REMOTE_SERVICE_PORT, REMOTE_SERVICE_PORT_SETTING);
put(REMOTE_INDEX_BUILD_ENABLED, REMOTE_INDEX_BUILD_ENABLED_SETTING);
put(REMOTE_INDEX_BUILD_STATUS_WAIT_TIME, REMOTE_INDEX_BUILD_STATUS_WAIT_TIME_SETTING);
put(REMOTE_INDEX_BUILD_MAX_DOCS, REMOTE_INDEX_BUILD_MAX_DOCS_SETTING);
}
};

Expand Down Expand Up @@ -663,6 +691,18 @@ public static Integer getRemoteServicePort() {
return KNNSettings.state().getSettingValue(REMOTE_SERVICE_PORT);
}

public static boolean isRemoteIndexBuildEnabled() {
return KNNSettings.state().getSettingValue(REMOTE_INDEX_BUILD_ENABLED);
}

public static long getIndexBuildStatusWaitTime() {
return ((TimeValue) KNNSettings.state().getSettingValue(REMOTE_INDEX_BUILD_STATUS_WAIT_TIME)).getMillis();
}

public static Integer getRemoteIndexBuildMaxDocs() {
return KNNSettings.state().getSettingValue(REMOTE_INDEX_BUILD_MAX_DOCS);
}

public void initialize(Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,17 @@
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.common.StopWatch;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.VectorDataType;
import org.opensearch.knn.index.codec.nativeindex.NativeIndexWriter;
import org.opensearch.knn.index.codec.nativeindex.RemoteIndexBuild;
import org.opensearch.knn.index.quantizationservice.QuantizationService;
import org.opensearch.knn.index.vectorvalues.KNNFloatVectorValues;
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
import org.opensearch.knn.index.vectorvalues.VectorValuesInputStream;
import org.opensearch.knn.plugin.stats.KNNGraphValue;
import org.opensearch.knn.quantization.models.quantizationParams.QuantizationParams;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationState;
import org.opensearch.knn.remote.index.client.IndexBuildServiceClient;
import org.opensearch.knn.remote.index.model.CreateIndexRequest;
import org.opensearch.knn.remote.index.model.CreateIndexResponse;
import org.opensearch.knn.remote.index.s3.S3Client;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
Expand All @@ -61,9 +56,7 @@ public class NativeEngines990KnnVectorsWriter extends KnnVectorsWriter {
private final List<NativeEngineFieldVectorsWriter<?>> fields = new ArrayList<>();
private boolean finished;
private final Integer approximateThreshold;
private final S3Client s3Client;
private final IndexBuildServiceClient indexBuildServiceClient;
private final String indexUUID;
private final RemoteIndexBuild remoteIndexBuild;

public NativeEngines990KnnVectorsWriter(
SegmentWriteState segmentWriteState,
Expand All @@ -82,13 +75,7 @@ public NativeEngines990KnnVectorsWriter(
this.segmentWriteState = segmentWriteState;
this.flatVectorsWriter = flatVectorsWriter;
this.approximateThreshold = approximateThreshold;
this.indexUUID = indexUUID;
try {
s3Client = S3Client.getInstance();
indexBuildServiceClient = IndexBuildServiceClient.getInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
this.remoteIndexBuild = new RemoteIndexBuild(indexUUID, segmentWriteState);
}

/**
Expand Down Expand Up @@ -141,21 +128,11 @@ public void flush(int maxDoc, final Sorter.DocMap sortMap) throws IOException {
);
continue;
}
// this.remoteIndexBuild.buildIndexRemotely(fieldInfo, knnVectorValuesSupplier, totalLiveDocs);

uploadToS3(fieldInfo, knnVectorValuesSupplier);
log.info("Creating the IndexRequest...");
CreateIndexRequest createIndexRequest = buildCreateIndexRequest(fieldInfo, totalLiveDocs);
log.info("Submitting request to remote indexbuildService");
try {
CreateIndexResponse response = indexBuildServiceClient.createIndex(createIndexRequest);
log.info("Request completed with response : {}", response);
} catch (Exception e) {
log.error("Failed to call indexBuildServiceClient.createIndex for input: {}", createIndexRequest, e);
}
final NativeIndexWriter writer = NativeIndexWriter.getWriter(fieldInfo, segmentWriteState, quantizationState);
final KNNVectorValues<?> knnVectorValues = knnVectorValuesSupplier.get();

StopWatch stopWatch = new StopWatch().start();
final KNNVectorValues<?> knnVectorValues = knnVectorValuesSupplier.get();
writer.flushIndex(knnVectorValues, totalLiveDocs);
long time_in_millis = stopWatch.stop().totalTime().millis();
KNNGraphValue.REFRESH_TOTAL_TIME_IN_MILLIS.incrementBy(time_in_millis);
Expand Down Expand Up @@ -192,56 +169,24 @@ public void mergeOneField(final FieldInfo fieldInfo, final MergeState mergeState
);
return;
}
final NativeIndexWriter writer = NativeIndexWriter.getWriter(fieldInfo, segmentWriteState, quantizationState);
final KNNVectorValues<?> knnVectorValues = knnVectorValuesSupplier.get();

StopWatch stopWatch = new StopWatch().start();

writer.mergeIndex(knnVectorValues, totalLiveDocs);

long time_in_millis = stopWatch.stop().totalTime().millis();
KNNGraphValue.MERGE_TOTAL_TIME_IN_MILLIS.incrementBy(time_in_millis);
log.debug("Merge took {} ms for vector field [{}]", time_in_millis, fieldInfo.getName());
}

private void uploadToS3(final FieldInfo fieldInfo, final Supplier<KNNVectorValues<?>> knnVectorValuesSupplier) {
// s3 uploader
String s3Key = createObjectKey(fieldInfo);
try (InputStream vectorInputStream = new VectorValuesInputStream((KNNFloatVectorValues) knnVectorValuesSupplier.get())) {
StopWatch stopWatch = new StopWatch().start();
// Lets upload data to s3.
long totalBytesUploaded = s3Client.uploadWithProgress(vectorInputStream, s3Key);
long time_in_millis = stopWatch.stop().totalTime().millis();
if (KNNSettings.isRemoteIndexBuildEnabled() && totalLiveDocs >= KNNSettings.getRemoteIndexBuildMaxDocs()) {
log.info(
"Time taken to upload vector for segment : {}, field: {}, totalBytes: {}, dimension: {} is : {}ms",
segmentWriteState.segmentInfo.name,
fieldInfo.getName(),
totalBytesUploaded,
fieldInfo.getVectorDimension(),
time_in_millis
"Remote index build is enabled and total live docs {} are greater than equal to the threshold {}",
totalLiveDocs,
KNNSettings.getRemoteIndexBuildMaxDocs()
);
} catch (Exception e) {
// logging here as this is in internal error
log.error("Error while uploading data to s3.", e);
this.remoteIndexBuild.buildIndexRemotely(fieldInfo, knnVectorValuesSupplier, totalLiveDocs);
} else {
log.info("Building index locally, live docs {}, setting value: {}", totalLiveDocs, KNNSettings.getRemoteIndexBuildMaxDocs());
final NativeIndexWriter writer = NativeIndexWriter.getWriter(fieldInfo, segmentWriteState, quantizationState);
final KNNVectorValues<?> knnVectorValues = knnVectorValuesSupplier.get();
writer.mergeIndex(knnVectorValues, totalLiveDocs);
}
}

private CreateIndexRequest buildCreateIndexRequest(final FieldInfo fieldInfo, int totalLiveDocs) {
String s3Key = createObjectKey(fieldInfo);
int dimension = fieldInfo.getVectorDimension();
return CreateIndexRequest.builder()
.bucketName(S3Client.BUCKET_NAME)
.objectLocation(s3Key)
.dimensions(dimension)
.numberOfVectors(totalLiveDocs)
.build();
}

private String createObjectKey(FieldInfo fieldInfo) {
String segmentName = segmentWriteState.segmentInfo.name;
String fieldName = fieldInfo.getName();
// shard information will also be needed to ensure that we can correct paths
return indexUUID + "_" + segmentName + "_" + fieldName + ".s3vec";
long time_in_millis = stopWatch.stop().totalTime().millis();
KNNGraphValue.MERGE_TOTAL_TIME_IN_MILLIS.incrementBy(time_in_millis);
log.debug("Merge took {} ms for vector field [{}]", time_in_millis, fieldInfo.getName());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index.codec.nativeindex;

import lombok.Builder;
import lombok.Value;
import org.apache.commons.lang.NotImplementedException;
import org.apache.lucene.store.IndexOutput;

import java.io.IOException;
import java.io.OutputStream;

@Builder
@Value
public class NativeIndexOutputStream extends OutputStream {

IndexOutput indexOutput;

/**
* Writes the specified byte to this output stream. The general
* contract for {@code write} is that one byte is written
* to the output stream. The byte to be written is the eight
* low-order bits of the argument {@code b}. The 24
* high-order bits of {@code b} are ignored.
*
* @param b the {@code byte}.
* @throws IOException if an I/O error occurs. In particular,
* an {@code IOException} may be thrown if the
* output stream has been closed.
*/
@Override
public void write(int b) throws IOException {
throw new NotImplementedException("This function is not implemented");
}

/**
* Writes {@code b.length} bytes from the specified byte array
* to this output stream. The general contract for {@code write(b)}
* is that it should have exactly the same effect as the call
* {@code write(b, 0, b.length)}.
*
* @param b the data.
* @throws IOException if an I/O error occurs.
* @see OutputStream#write(byte[], int, int)
*/
@Override
public void write(byte[] b) throws IOException {
indexOutput.writeBytes(b, b.length);
}
}
Loading

0 comments on commit f1d0a75

Please sign in to comment.