Skip to content

Commit

Permalink
Fixed the object store key when more than 1 shard is present for an i…
Browse files Browse the repository at this point in the history
…ndex

Signed-off-by: Navneet Verma <navneev@amazon.com>
  • Loading branch information
navneet1v committed Dec 29, 2024
1 parent f1d0a75 commit b0db75c
Showing 1 changed file with 10 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.knn.remote.index.s3.S3Client;

import java.io.InputStream;
import java.util.UUID;
import java.util.function.Supplier;

import static org.opensearch.knn.index.codec.util.KNNCodecUtil.buildEngineFileName;
Expand Down Expand Up @@ -51,9 +52,9 @@ public RemoteIndexBuild(final String indexUUID, final SegmentWriteState segmentW
public void buildIndexRemotely(FieldInfo fieldInfo, Supplier<KNNVectorValues<?>> knnVectorValuesSupplier, int totalLiveDocs) {
try {
// First upload all the vectors to S3
uploadToS3(fieldInfo, knnVectorValuesSupplier);
String objectKey = uploadToS3(fieldInfo, knnVectorValuesSupplier);
log.info("Creating the IndexRequest...");
CreateIndexRequest createIndexRequest = buildCreateIndexRequest(fieldInfo, totalLiveDocs);
CreateIndexRequest createIndexRequest = buildCreateIndexRequest(fieldInfo, totalLiveDocs, objectKey);
log.info("Submitting request to remote indexbuildService");
// call the CreateIndex api to kick off the index creation
CreateIndexResponse response = indexBuildServiceClient.createIndex(createIndexRequest);
Expand Down Expand Up @@ -87,7 +88,7 @@ private GetJobResponse isIndexBuildCompletedWithoutErrors(final CreateIndexRespo
return GetJobResponse.builder().status("errored").build();
}

private void uploadToS3(final FieldInfo fieldInfo, final Supplier<KNNVectorValues<?>> knnVectorValuesSupplier) {
private String uploadToS3(final FieldInfo fieldInfo, final Supplier<KNNVectorValues<?>> knnVectorValuesSupplier) {
// s3 uploader
String s3Key = createObjectKey(fieldInfo);
try (InputStream vectorInputStream = new VectorValuesInputStream((KNNFloatVectorValues) knnVectorValuesSupplier.get())) {
Expand All @@ -107,6 +108,7 @@ private void uploadToS3(final FieldInfo fieldInfo, final Supplier<KNNVectorValue
// logging here as this is in internal error
log.error("Error while uploading data to s3.", e);
}
return s3Key;
}

private void downloadGraphFileFromS3(GetJobResponse getJobResponse, FieldInfo fieldInfo) {
Expand All @@ -131,20 +133,20 @@ private void downloadGraphFileFromS3(GetJobResponse getJobResponse, FieldInfo fi
}
}

private CreateIndexRequest buildCreateIndexRequest(final FieldInfo fieldInfo, int totalLiveDocs) {
String s3Key = createObjectKey(fieldInfo);
private CreateIndexRequest buildCreateIndexRequest(final FieldInfo fieldInfo, int totalLiveDocs, String objectKey) {
int dimension = fieldInfo.getVectorDimension();
return CreateIndexRequest.builder()
.bucketName(S3Client.BUCKET_NAME)
.objectLocation(s3Key)
.objectLocation(objectKey)
.dimensions(dimension)
.numberOfVectors(totalLiveDocs)
.build();
}

private String createObjectKey(FieldInfo fieldInfo) {
String fieldName = fieldInfo.getName();
// shard information will also be needed to ensure that we can correct paths
return indexUUID + "_" + segmentWriteState.segmentInfo.name + "_" + fieldName + ".s3vec";
// shard information will also be needed to ensure that we can correct paths.
// We need to see what we need to replace this UUID with.
return indexUUID + "_" + UUID.randomUUID() + segmentWriteState.segmentInfo.name + "_" + fieldName + ".s3vec";
}
}

0 comments on commit b0db75c

Please sign in to comment.