Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Have consolidate_updates() reuse existing centroids by default with an option to re-compute them #177

Closed

Conversation

jparismorgan
Copy link
Contributor

What

When a user provides information about how to create the centroids in ingest() and then later calls consolidate_updates(), we will not remember any of the information they provided and will instead revert back to the defaults on how to create centroids in ingest().

Here we add a reuse_centroids parameter to consolidate_updates() which defaults to true. If true, we will re-use the already computed centroids even after new updates to data. If false, we will re-compute the centroids from scratch, either using the defaults or the user-provided information (i.e. we make all the options about how to compute centroids in ingest() also arguments to consolidate_updates()).

We also make partitions required along with copy_centroids_uri. This is because if we've updated the array and so the size is larger, we'll end up having partitions be larger and thus try to read from invalid parts of copy_centroids_uri, leading us to end up with nan's. Example:

First round
[ingestion@ingest_vectors] partitions calculated from size (5), partitions:  2
[ingest@copy_centroids] src_centroids
 [[1.  5. ]
 [1.1 5.1]
 [1.2 5.2]
 [1.3 5.3]]

Then we add more data
[ingestion@ingest_vectors] partitions calculated from size (10), partitions:  3
[ingest@copy_centroids] src_centroids
 [[1.  5.  nan]
 [1.1 5.1 nan]
 [1.2 5.2 nan]
 [1.3 5.3 nan]]

Isaiah also suggested instead of having to pass partitions along with copy_centroids_uri, we could instead, under the hood, inspect the non-empty domain region of copy_centroids_uri and compute partitions from that. This PR goes with the explicit approach instead as it doesn't seem too cumbersome and is easier. But happy to switch over if we'd like / we hear it's cumbersome to use like this.

Example

Here is the following unit test:

def test_ingest_with_training_source_uri_tdb(tmp_path):
    ################################################################################################
    # First set up the data.
    ################################################################################################
    dataset_dir = os.path.join(tmp_path, "dataset")
    os.mkdir(dataset_dir)
    # data.shape should give you (cols, rows). So we transpose this before using it.
    data = np.array([
        [1.0, 1.1, 1.2, 1.3], 
        [2.0, 2.1, 2.2, 2.3], 
        [3.0, 3.1, 3.2, 3.3], 
        [4.0, 4.1, 4.2, 4.3], 
        [5.0, 5.1, 5.2, 5.3]], dtype=np.float32).transpose()
    create_array(path=os.path.join(dataset_dir, "data.tdb"), data=data)

    training_data = np.array([
        [1.0, 1.1, 1.2, 1.3], 
        [5.0, 5.1, 5.2, 5.3]], dtype=np.float32).transpose()
    create_array(path=os.path.join(dataset_dir, "training_data.tdb"), data=training_data)

    # Run a quick test that if we set up training_data incorrectly, we will raise an exception.
    with pytest.raises(ValueError) as error:
        training_data_invalid = np.array([
            [1.0, 1.1, 1.2], 
            [5.0, 5.1, 5.2]], dtype=np.float32).transpose()
        create_array(path=os.path.join(dataset_dir, "training_data_invalid.tdb"), data=training_data_invalid)
        index = ingest(
            index_type="IVF_FLAT", 
            index_uri=os.path.join(tmp_path, f"array_invalid"), 
            source_uri=os.path.join(dataset_dir, "data.tdb"),
            training_source_uri=os.path.join(dataset_dir, "training_data_invalid.tdb")
        )
    assert "training data dimensions" in str(error.value)

    ################################################################################################
    # Test we can ingest, query, update, and consolidate with a training_source_uri.
    ################################################################################################
    print('[test_ingestion@test_ingest_with_training_source_uri_tdb] ingest() ======================================')
    index_uri = os.path.join(tmp_path, "array")
    index = ingest(
        index_type="IVF_FLAT", 
        index_uri=index_uri, 
        source_uri=os.path.join(dataset_dir, "data.tdb"),
        training_source_uri=os.path.join(dataset_dir, "training_data.tdb")
    )

    print('[test_ingestion@test_ingest_with_training_source_uri_tdb] query() ======================================')
    query_vector_index = 1
    query_vectors = np.array([data.transpose()[query_vector_index]], dtype=np.float32)
    result_d, result_i = index.query(query_vectors, k=1)
    check_equals(result_d=result_d, result_i=result_i, expected_result_d=[[0]], expected_result_i=[[query_vector_index]])

    update_vectors = np.empty([3], dtype=object)
    update_vectors[0] = np.array([6.0, 6.1, 6.2, 6.3], dtype=np.dtype(np.float32))
    update_vectors[1] = np.array([7.0, 7.1, 7.2, 7.3], dtype=np.dtype(np.float32))
    update_vectors[2] = np.array([8.0, 8.1, 8.2, 8.3], dtype=np.dtype(np.float32))
    index.update_batch(vectors=update_vectors, external_ids=np.array([1000, 1001, 1002]))
    
    print('[test_ingestion@test_ingest_with_training_source_uri_tdb] index.consolidate_updates() ======================================')
    index = index.consolidate_updates()

    print('[test_ingestion@test_ingest_with_training_source_uri_tdb] query() ======================================')
    query_vectors = np.array([update_vectors[2]], dtype=np.float32)
    result_d, result_i = index.query(query_vectors, k=1)
    check_equals(result_d=result_d, result_i=result_i, expected_result_d=[[0]], expected_result_i=[[1002]])

    ################################################################################################
    # Test we can load the index again and query, update, and consolidate.
    ################################################################################################
    print('[test_ingestion@test_ingest_with_training_source_uri_tdb] index_ram = IVFFlatIndex(uri=index_uri) ======================================')
    index_ram = IVFFlatIndex(uri=index_uri)

    print('[test_ingestion@test_ingest_with_training_source_uri_tdb] query() ======================================')
    result_d, result_i = index.query(query_vectors, k=1)
    check_equals(result_d=result_d, result_i=result_i, expected_result_d=[[0]], expected_result_i=[[1002]])
    
    print('[test_ingestion@test_ingest_with_training_source_uri_tdb] index.consolidate_updates() ======================================')
    update_vectors = np.empty([2], dtype=object)
    update_vectors[0] = np.array([9.0, 9.1, 9.2, 9.3], dtype=np.dtype(np.float32))
    update_vectors[1] = np.array([10.0, 10.1, 10.2, 10.3], dtype=np.dtype(np.float32))
    index.update_batch(vectors=update_vectors, external_ids=np.array([1003, 1004]))
    index_ram = index_ram.consolidate_updates()
    
    print('[test_ingestion@test_ingest_with_training_source_uri_tdb] query() ======================================')
    query_vectors = np.array([update_vectors[0]], dtype=np.float32)
    result_d, result_i = index_ram.query(query_vectors, k=1)
    check_equals(result_d=result_d, result_i=result_i, expected_result_d=[[0]], expected_result_i=[[1003]])

    print('[test_ingestion@test_ingest_with_training_source_uri_tdb] consolidate_updates(reuse_centroids=False) ======================================')
    update_vectors = np.empty([2], dtype=object)
    update_vectors[0] = np.array([11.0, 11.1, 11.2, 11.3], dtype=np.dtype(np.float32))
    update_vectors[1] = np.array([12.0, 12.1, 12.2, 12.3], dtype=np.dtype(np.float32))
    index.update_batch(vectors=update_vectors, external_ids=np.array([1003, 1004]))
    index_ram = index_ram.consolidate_updates(reuse_centroids=False, training_sample_size=3)

Before you can see centroids (4, 2) is different each time we call consolidate_updates():

(TileDB-Vector-Search-2) ~/repo/TileDB-Vector-Search-2/apis/python pytest test/test_ingestion.py -s          ✹main 
=============================================== test session starts ================================================
platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0
rootdir: /Users/parismorgan/repo/TileDB-Vector-Search-2/apis/python
collected 1 item                                                                                                   

test/test_ingestion.py [ingestion@ingest] copy_centroids_uri None training_sample_size -1 training_input_vectors None training_source_uri /private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1245/test_ingest_with_training_sour0/dataset/training_data_invalid.tdb training_source_type None
[ivf_flat_index.py@create] dimensions 4 vector_type float32
[ingest@centralised_kmeans] training_sample_size 5 training_source_uri /private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1245/test_ingest_with_training_sour0/dataset/training_data_invalid.tdb training_source_type None
[ingest@centralised_kmeans] reading from training_source_uri: training_source_uri /private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1245/test_ingest_with_training_sour0/dataset/training_data_invalid.tdb training_source_type TILEDB_ARRAY training_in_size 2 training_dimensions 3 training_vector_type float32
[test_ingestion@test_ingest_with_training_source_uri_tdb] ingest() ======================================
[ingestion@ingest] copy_centroids_uri None training_sample_size -1 training_input_vectors None training_source_uri /private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1245/test_ingest_with_training_sour0/dataset/training_data.tdb training_source_type None
[ivf_flat_index.py@create] dimensions 4 vector_type float32
[ingest@centralised_kmeans] training_sample_size 5 training_source_uri /private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1245/test_ingest_with_training_sour0/dataset/training_data.tdb training_source_type None
[ingest@centralised_kmeans] reading from training_source_uri: training_source_uri /private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1245/test_ingest_with_training_sour0/dataset/training_data.tdb training_source_type TILEDB_ARRAY training_in_size 2 training_dimensions 4 training_vector_type float32
[ingestion@centralized_kmeans] sample_vectors (2, 4) 
 [[1.  1.1 1.2 1.3]
 [5.  5.1 5.2 5.3]] 
centroids (4, 2) 
 [[1.  5. ]
 [1.1 5.1]
 [1.2 5.2]
 [1.3 5.3]]
[test_ingestion@test_ingest_with_training_source_uri_tdb] query() ======================================
[test_ingestion@test_ingest_with_training_source_uri_tdb] index.consolidate_updates() ======================================
[ingestion@ingest] copy_centroids_uri None training_sample_size -1 training_input_vectors None training_source_uri None training_source_type None
[ingest@centralised_kmeans] training_sample_size 5 training_source_uri None training_source_type None
[ingest@centralised_kmeans] reading from source_uri: source_uri file:///private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1245/test_ingest_with_training_sour0/array/shuffled_vectors source_type TILEDB_ARRAY vector_type float32 dimensions 4 training_sample_size 5
[ingestion@centralized_kmeans] sample_vectors (5, 4) 
 [[1.  1.1 1.2 1.3]
 [2.  2.1 2.2 2.3]
 [3.  3.1 3.2 3.3]
 [4.  4.1 4.2 4.3]
 [5.  5.1 5.2 5.3]] 
centroids (4, 2) 
 [[1.5       4.       ]
 [1.5999999 4.1      ]
 [1.7       4.2      ]
 [1.8       4.3      ]]
[test_ingestion@test_ingest_with_training_source_uri_tdb] query() ======================================
[test_ingestion@test_ingest_with_training_source_uri_tdb] index_ram = IVFFlatIndex(uri=index_uri) ======================================
[test_ingestion@test_ingest_with_training_source_uri_tdb] query() ======================================
[test_ingestion@test_ingest_with_training_source_uri_tdb] index.consolidate_updates() ======================================
[ingestion@ingest] copy_centroids_uri None training_sample_size -1 training_input_vectors None training_source_uri None training_source_type None
[ingest@centralised_kmeans] training_sample_size 8 training_source_uri None training_source_type None
[ingest@centralised_kmeans] reading from source_uri: source_uri file:///private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1245/test_ingest_with_training_sour0/array/shuffled_vectors source_type TILEDB_ARRAY vector_type float32 dimensions 4 training_sample_size 8
[ingestion@centralized_kmeans] sample_vectors (8, 4) 
 [[1.  1.1 1.2 1.3]
 [2.  2.1 2.2 2.3]
 [3.  3.1 3.2 3.3]
 [4.  4.1 4.2 4.3]
 [5.  5.1 5.2 5.3]
 [6.  6.1 6.2 6.3]
 [7.  7.1 7.2 7.3]
 [8.  8.1 8.2 8.3]] 
centroids (4, 2) 
 [[3.        7.       ]
 [3.1       7.1      ]
 [3.2       7.1999993]
 [3.3       7.3000007]]
[test_ingestion@test_ingest_with_training_source_uri_tdb] query() ======================================
[test_ingestion@test_ingest_with_training_source_uri_tdb] consolidate_updates(reuse_centroids=False) ======================================
[ingestion@ingest] copy_centroids_uri None training_sample_size -1 training_input_vectors None training_source_uri None training_source_type None
[ingest@centralised_kmeans] training_sample_size 10 training_source_uri None training_source_type None
[ingest@centralised_kmeans] reading from source_uri: source_uri file:///private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1245/test_ingest_with_training_sour0/array/shuffled_vectors source_type TILEDB_ARRAY vector_type float32 dimensions 4 training_sample_size 10
[ingestion@centralized_kmeans] sample_vectors (10, 4) 
 [[ 1.   1.1  1.2  1.3]
 [ 2.   2.1  2.2  2.3]
 [ 3.   3.1  3.2  3.3]
 [ 4.   4.1  4.2  4.3]
 [ 5.   5.1  5.2  5.3]
 [ 6.   6.1  6.2  6.3]
 [ 7.   7.1  7.2  7.3]
 [ 8.   8.1  8.2  8.3]
 [ 9.   9.1  9.2  9.3]
 [10.  10.1 10.2 10.3]] 
centroids (4, 3) 
 [[4.5       8.5       1.5      ]
 [4.6       8.6       1.5999999]
 [4.7       8.7       1.7      ]
 [4.8       8.8       1.8      ]]
.

After these changes, you can see centroids (4, 2) is the same each time we call consolidate_updates(), except when we call consolidate_updates(reuse_centroids=False):

(TileDB-Vector-Search-2) ~/repo/TileDB-Vector-Search-2/apis/python pytest test/test_ingestion.py -s          ✹main 
=============================================== test session starts ================================================
platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0
rootdir: /Users/parismorgan/repo/TileDB-Vector-Search-2/apis/python
collected 1 item                                                                                                   

test/test_ingestion.py [ingestion@ingest] copy_centroids_uri None training_sample_size -1 training_input_vectors None training_source_uri /private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1249/test_ingest_with_training_sour0/dataset/training_data_invalid.tdb training_source_type None
[ivf_flat_index.py@create] dimensions 4 vector_type float32
[ingest@centralised_kmeans] training_sample_size 5 training_source_uri /private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1249/test_ingest_with_training_sour0/dataset/training_data_invalid.tdb training_source_type None
[ingest@centralised_kmeans] reading from training_source_uri: training_source_uri /private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1249/test_ingest_with_training_sour0/dataset/training_data_invalid.tdb training_source_type TILEDB_ARRAY training_in_size 2 training_dimensions 3 training_vector_type float32
[test_ingestion@test_ingest_with_training_source_uri_tdb] ingest() ======================================
[ingestion@ingest] copy_centroids_uri None training_sample_size -1 training_input_vectors None training_source_uri /private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1249/test_ingest_with_training_sour0/dataset/training_data.tdb training_source_type None
[ivf_flat_index.py@create] dimensions 4 vector_type float32
[ingest@centralised_kmeans] training_sample_size 5 training_source_uri /private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1249/test_ingest_with_training_sour0/dataset/training_data.tdb training_source_type None
[ingest@centralised_kmeans] reading from training_source_uri: training_source_uri /private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1249/test_ingest_with_training_sour0/dataset/training_data.tdb training_source_type TILEDB_ARRAY training_in_size 2 training_dimensions 4 training_vector_type float32
[ingestion@centralized_kmeans] sample_vectors (2, 4) 
 [[1.  1.1 1.2 1.3]
 [5.  5.1 5.2 5.3]] 
centroids (4, 2) 
 [[1.  5. ]
 [1.1 5.1]
 [1.2 5.2]
 [1.3 5.3]]
[test_ingestion@test_ingest_with_training_source_uri_tdb] query() ======================================
[test_ingestion@test_ingest_with_training_source_uri_tdb] index.consolidate_updates() ======================================
[ingestion@ingest] copy_centroids_uri file:///private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1249/test_ingest_with_training_sour0/array/partition_centroids training_sample_size -1 training_input_vectors None training_source_uri None training_source_type None
[ingestion@copy_centroids] partitions 2 dimensions 4
[ingest@copy_centroids] centroids_uri file:///private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1249/test_ingest_with_training_sour0/array/partition_centroids
[ingest@copy_centroids] src_centroids OrderedDict([('centroids', array([[1. , 5. ],
       [1.1, 5.1],
       [1.2, 5.2],
       [1.3, 5.3]], dtype=float32))])
[ingest@copy_centroids] dst_centroids before write OrderedDict([('centroids', array([[1. , 5. ],
       [1.1, 5.1],
       [1.2, 5.2],
       [1.3, 5.3]], dtype=float32))])
[ingest@copy_centroids] dest DenseArray(uri='file:///private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1249/test_ingest_with_training_sour0/array/partition_centroids', mode=w, ndim=2)
[ingest@copy_centroids] dest after OrderedDict([('centroids', array([[1. , 5. ],
       [1.1, 5.1],
       [1.2, 5.2],
       [1.3, 5.3]], dtype=float32))])
[test_ingestion@test_ingest_with_training_source_uri_tdb] query() ======================================
[test_ingestion@test_ingest_with_training_source_uri_tdb] index_ram = IVFFlatIndex(uri=index_uri) ======================================
[test_ingestion@test_ingest_with_training_source_uri_tdb] query() ======================================
[test_ingestion@test_ingest_with_training_source_uri_tdb] index.consolidate_updates() ======================================
[ingestion@ingest] copy_centroids_uri file:///private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1249/test_ingest_with_training_sour0/array/partition_centroids training_sample_size -1 training_input_vectors None training_source_uri None training_source_type None
[ingestion@copy_centroids] partitions 2 dimensions 4
[ingest@copy_centroids] centroids_uri file:///private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1249/test_ingest_with_training_sour0/array/partition_centroids
[ingest@copy_centroids] src_centroids OrderedDict([('centroids', array([[1. , 5. ],
       [1.1, 5.1],
       [1.2, 5.2],
       [1.3, 5.3]], dtype=float32))])
[ingest@copy_centroids] dst_centroids before write OrderedDict([('centroids', array([[1. , 5. ],
       [1.1, 5.1],
       [1.2, 5.2],
       [1.3, 5.3]], dtype=float32))])
[ingest@copy_centroids] dest DenseArray(uri='file:///private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1249/test_ingest_with_training_sour0/array/partition_centroids', mode=w, ndim=2)
[ingest@copy_centroids] dest after OrderedDict([('centroids', array([[1. , 5. ],
       [1.1, 5.1],
       [1.2, 5.2],
       [1.3, 5.3]], dtype=float32))])
[test_ingestion@test_ingest_with_training_source_uri_tdb] query() ======================================
[test_ingestion@test_ingest_with_training_source_uri_tdb] consolidate_updates(reuse_centroids=False) ======================================
[ingestion@ingest] copy_centroids_uri None training_sample_size 3 training_input_vectors None training_source_uri None training_source_type None
[ingest@centralised_kmeans] training_sample_size 3 training_source_uri None training_source_type None
[ingest@centralised_kmeans] reading from source_uri: source_uri file:///private/var/folders/jb/5gq49wh97wn0j7hj6zfn9pzh0000gn/T/pytest-of-parismorgan/pytest-1249/test_ingest_with_training_sour0/array/shuffled_vectors source_type TILEDB_ARRAY vector_type float32 dimensions 4 training_sample_size 3
[ingestion@centralized_kmeans] sample_vectors (3, 4) 
 [[1.  1.1 1.2 1.3]
 [2.  2.1 2.2 2.3]
 [3.  3.1 3.2 3.3]] 
centroids (4, 3) 
 [[2.  0.  0. ]
 [2.1 0.  0. ]
 [2.2 0.  0. ]
 [2.3 0.  0. ]]
.
Screenshot 2023-12-20 at 5 57 14 PM

@jparismorgan jparismorgan deleted the jparismorgan/consolidate-updates-reuse-centroids branch February 16, 2024 10:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant