Skip to content

Commit

Permalink
Add download + indexOuput#write implementation to RemoteIndexBuildStr…
Browse files Browse the repository at this point in the history
…ategy

Signed-off-by: Jay Deng <jayd0104@gmail.com>
  • Loading branch information
jed326 authored and Jay Deng committed Feb 24, 2025
1 parent c7ac05c commit 6d950eb
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD)
### Features
* [Remote Vector Index Build] Implement data download and IndexOutput write functionality [#2554](https://github.com/opensearch-project/k-NN/pull/2554)
### Enhancements
### Bug Fixes
### Infrastructure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,28 @@

package org.opensearch.knn.index.codec.nativeindex.remote;

import com.google.common.annotations.VisibleForTesting;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang.NotImplementedException;
import org.apache.lucene.index.SegmentWriteState;
import org.opensearch.common.StopWatch;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.index.IndexSettings;
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.codec.nativeindex.NativeIndexBuildStrategy;
import org.opensearch.knn.index.codec.nativeindex.model.BuildIndexParams;
import org.opensearch.knn.index.store.IndexOutputWithBuffer;
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;
import java.io.InputStream;
import java.util.function.Supplier;

import static org.opensearch.knn.index.KNNSettings.KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING;
Expand All @@ -37,8 +42,10 @@ public class RemoteIndexBuildStrategy implements NativeIndexBuildStrategy {

private final Supplier<RepositoriesService> repositoriesServiceSupplier;
private final NativeIndexBuildStrategy fallbackStrategy;

private static final String VECTOR_BLOB_FILE_EXTENSION = ".knnvec";
private static final String DOC_ID_FILE_EXTENSION = ".knndid";
private static final String VECTORS_PATH = "_vectors";

/**
* Public constructor
Expand Down Expand Up @@ -93,12 +100,12 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException {
log.debug("Submit vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());

stopWatch = new StopWatch().start();
awaitVectorBuild();
BlobPath downloadPath = awaitVectorBuild();
time_in_millis = stopWatch.stop().totalTime().millis();
log.debug("Await vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());

stopWatch = new StopWatch().start();
readFromRepository();
readFromRepository(downloadPath, indexInfo.getIndexOutputWithBuffer());
time_in_millis = stopWatch.stop().totalTime().millis();
log.debug("Repository read took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
} catch (Exception e) {
Expand Down Expand Up @@ -155,15 +162,22 @@ private void submitVectorBuild() {

/**
* Wait on remote vector build to complete
* @return BlobPath The path from which we should perform download
*/
private void awaitVectorBuild() {
private BlobPath awaitVectorBuild() throws NotImplementedException {
throw new NotImplementedException();
}

/**
* Read constructed vector file from remote repository and write to IndexOutput
*/
private void readFromRepository() {
throw new NotImplementedException();
@VisibleForTesting
void readFromRepository(BlobPath downloadPath, IndexOutputWithBuffer indexOutputWithBuffer) throws IOException {
BlobContainer blobContainer = getRepository().blobStore().blobContainer(downloadPath.parent());
// TODO: We are using the sequential download API as multi-part parallel download is difficult for us to implement today and
// requires some changes in core. For more details, see: https://github.com/opensearch-project/k-NN/issues/2464
String fileName = downloadPath.toArray()[downloadPath.toArray().length - 1];
InputStream graphStream = blobContainer.readBlob(fileName);
indexOutputWithBuffer.writeFromStreamWithBuffer(graphStream);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,25 @@
import org.apache.lucene.store.IndexOutput;

import java.io.IOException;
import java.io.InputStream;

/**
* Wrapper around {@link IndexOutput} to perform writes in a buffered manner. This class is created per flush/merge, and may be used twice if
* {@link org.opensearch.knn.index.codec.nativeindex.remote.RemoteIndexBuildStrategy} needs to fall back to a different build strategy.
*/
public class IndexOutputWithBuffer {
// Underlying `IndexOutput` obtained from Lucene's Directory.
private IndexOutput indexOutput;
// Write buffer. Native engine will copy bytes into this buffer.
// Allocating 64KB here since it show better performance in NMSLIB with the size. (We had slightly improvement in FAISS than having 4KB)
// NMSLIB writes an adjacent list size first, then followed by serializing the list. Since we usually have more adjacent lists, having
// 64KB to accumulate bytes as possible to reduce the times of calling `writeBytes`.
private byte[] buffer = new byte[64 * 1024];
static final int CHUNK_SIZE = 64 * 1024;
private final byte[] buffer;

public IndexOutputWithBuffer(IndexOutput indexOutput) {
this.indexOutput = indexOutput;
this.buffer = new byte[CHUNK_SIZE];
}

// This method will be called in JNI layer which precisely knows
Expand All @@ -33,6 +40,43 @@ public void writeBytes(int length) {
}
}

/**
* Writes to the {@link IndexOutput} by buffer bytes into the existing buffer in this class.
*
* @param inputStream The stream from which we are reading bytes to write
* @throws IOException
* @see IndexOutputWithBuffer#writeFromStreamWithBuffer(InputStream, byte[])
*/
public void writeFromStreamWithBuffer(InputStream inputStream) throws IOException {
writeFromStreamWithBuffer(inputStream, buffer);
}

/**
* Writes to the {@link IndexOutput} by buffering bytes with @param outputBuffer. This method allows
* {@link org.opensearch.knn.index.codec.nativeindex.remote.RemoteIndexBuildStrategy} to provide a separate, larger buffer as that buffer is for buffering
* bytes downloaded from the repository, so it may be more performant to use a larger buffer.
* We do not change the size of the existing buffer in case a fallback to the existing build strategy is needed.
* TODO: Tune the size of the buffer used by RemoteIndexBuildStrategy based on benchmarking
*
* @param inputStream The stream from which we are reading bytes to write
* @param outputBuffer The buffer used to buffer bytes
* @throws IOException
* @see IndexOutputWithBuffer#writeFromStreamWithBuffer(InputStream)
*/
public void writeFromStreamWithBuffer(InputStream inputStream, byte[] outputBuffer) throws IOException {
int bytesRead = 0;
// InputStream uses -1 indicates there are no more bytes to be read
while (bytesRead != -1) {
// Try to read CHUNK_SIZE into the buffer. The actual amount read may be less.
bytesRead = inputStream.read(outputBuffer, 0, CHUNK_SIZE);
assert bytesRead <= CHUNK_SIZE;
// However many bytes we read, write it to the IndexOutput if != -1
if (bytesRead != -1) {
indexOutput.writeBytes(outputBuffer, 0, bytesRead);
}
}
}

@Override
public String toString() {
return "{indexOutput=" + indexOutput + ", len(buffer)=" + buffer.length + "}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,18 @@

package org.opensearch.knn.index.codec.nativeindex.remote;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.junit.Before;
import org.mockito.Mockito;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.knn.KNNTestCase;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.VectorDataType;
import org.opensearch.knn.index.codec.nativeindex.NativeIndexBuildStrategy;
import org.opensearch.knn.index.codec.nativeindex.model.BuildIndexParams;
Expand All @@ -16,17 +27,21 @@
import org.opensearch.knn.index.vectorvalues.TestVectorValues;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Random;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.knn.index.KNNSettings.KNN_REMOTE_VECTOR_REPO_SETTING;

public class RemoteIndexBuildStrategyTests extends OpenSearchTestCase {
public class RemoteIndexBuildStrategyTests extends KNNTestCase {

static int fallbackCounter = 0;

Expand All @@ -38,6 +53,16 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException {
}
}

@Before
@Override
public void setUp() throws Exception {
super.setUp();
ClusterSettings clusterSettings = mock(ClusterSettings.class);
when(clusterSettings.get(KNN_REMOTE_VECTOR_REPO_SETTING)).thenReturn("test-repo-name");
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
KNNSettings.state().setClusterService(clusterService);
}

public void testFallback() throws IOException {
List<float[]> vectorValues = List.of(new float[] { 1, 2 }, new float[] { 2, 3 }, new float[] { 3, 4 });
final TestVectorValues.PreDefinedFloatVectorValues randomVectorValues = new TestVectorValues.PreDefinedFloatVectorValues(
Expand All @@ -64,4 +89,55 @@ public void testFallback() throws IOException {
objectUnderTest.buildAndWriteIndex(buildIndexParams);
assertEquals(1, fallbackCounter);
}

/**
* Verify the buffered read method in {@link RemoteIndexBuildStrategy#readFromRepository} produces the correct result
*/
public void testRepositoryRead() throws IOException {
// Create an InputStream with random values
int TEST_ARRAY_SIZE = 64 * 1024 * 10;
byte[] byteArray = new byte[TEST_ARRAY_SIZE];
Random random = new Random();
random.nextBytes(byteArray);
InputStream randomStream = new ByteArrayInputStream(byteArray);

// Create a test segment that we will read/write from
Directory directory;
directory = newFSDirectory(createTempDir());
String TEST_SEGMENT_NAME = "test-segment-name";
IndexOutput testIndexOutput = directory.createOutput(TEST_SEGMENT_NAME, IOContext.DEFAULT);
IndexOutputWithBuffer testIndexOutputWithBuffer = new IndexOutputWithBuffer(testIndexOutput);

// Set up RemoteIndexBuildStrategy and write to IndexOutput
RepositoriesService repositoriesService = mock(RepositoriesService.class);
BlobStoreRepository mockRepository = mock(BlobStoreRepository.class);
BlobPath testBasePath = new BlobPath().add("testBasePath");
BlobStore mockBlobStore = mock(BlobStore.class);
AsyncMultiStreamBlobContainer mockBlobContainer = mock(AsyncMultiStreamBlobContainer.class);

when(repositoriesService.repository(any())).thenReturn(mockRepository);
when(mockRepository.basePath()).thenReturn(testBasePath);
when(mockRepository.blobStore()).thenReturn(mockBlobStore);
when(mockBlobStore.blobContainer(any())).thenReturn(mockBlobContainer);
when(mockBlobContainer.readBlob("testFile")).thenReturn(randomStream);

RemoteIndexBuildStrategy objectUnderTest = new RemoteIndexBuildStrategy(
() -> repositoriesService,
mock(NativeIndexBuildStrategy.class)
);
// This should read from randomStream into testIndexOutput
BlobPath testPath = new BlobPath().add("testBasePath").add("testDirectory").add("testFile");
objectUnderTest.readFromRepository(testPath, testIndexOutputWithBuffer);
testIndexOutput.close();

// Now try to read from the IndexOutput
IndexInput testIndexInput = directory.openInput(TEST_SEGMENT_NAME, IOContext.DEFAULT);
byte[] resultByteArray = new byte[TEST_ARRAY_SIZE];
testIndexInput.readBytes(resultByteArray, 0, TEST_ARRAY_SIZE);
assertArrayEquals(byteArray, resultByteArray);

// Test Cleanup
testIndexInput.close();
directory.close();
}
}

0 comments on commit 6d950eb

Please sign in to comment.