Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve readers by parallelizing I/O and compute operations #5401

Open
wants to merge 52 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
93f16c0
Threadpool changes
ypatia Nov 29, 2024
277272b
Add threadpool helper function
ypatia Nov 29, 2024
e000852
Clang format changes
ypatia Nov 29, 2024
2d0a155
Cleanup not needed changes
ypatia Nov 29, 2024
77b52a2
Add some more documentation to new classes
ypatia Nov 29, 2024
bdd4b48
Default initialize and check for null threadpool in task classes
ypatia Dec 2, 2024
c92a83c
Address review comments
ypatia Dec 3, 2024
1066214
Address review comments round 2
ypatia Dec 4, 2024
85dee35
Fix unit test
ypatia Dec 4, 2024
abcb944
Move I/O wait to datablock instead of reader.
Shelnutt2 Oct 29, 2024
02d85d2
Switch to SharedTask in order to allow multi-threaded access to futur…
Shelnutt2 Oct 29, 2024
f1eac3a
Fix unit test compilation
Shelnutt2 Oct 29, 2024
deabecf
WIP: parallelize filter pipeline and interleave comparisons
Shelnutt2 Oct 30, 2024
aaed67a
Switch to ThreadPool::SharedTask instead of shared_ptr
Shelnutt2 Oct 30, 2024
ed6ca13
Add recursive_mutex for thread-safety of tile ThreadPool::SharedTask …
Shelnutt2 Oct 31, 2024
d81dd29
WIP: try to store reference to FilterData on result tile, need to fix…
Shelnutt2 Oct 31, 2024
e79d531
Adjust lambdas and avoid task components going out of scope.
Shelnutt2 Oct 31, 2024
f1c24bf
Add new data_unsafe to Tile accessorsa.
Shelnutt2 Oct 31, 2024
c3891fd
Add stats tracking to new tasks for reading and unfiltering tiles
Shelnutt2 Oct 31, 2024
b30b3f9
Fix unit test compilation
Shelnutt2 Oct 31, 2024
fe65979
Add new zip_coordinates_unsafe
Shelnutt2 Nov 1, 2024
04ecccc
Wait until tasks are done before freeing tiles
ypatia Nov 7, 2024
338aa12
Remove redundant shared future get
ypatia Nov 11, 2024
ceedb1f
Fix null counts, check tasks are valid and other fixes
ypatia Nov 15, 2024
c8c6b17
Fix RLE and dict decompression
ypatia Nov 15, 2024
0bb8cdd
Fix budget tests, g.o.r. result tile is now 3496 bytes in size
ypatia Nov 18, 2024
766a8bb
Merge branch 'dev' into yt/sc-59606/threadpool_with_tasks
Shelnutt2 Dec 6, 2024
f5c0003
Adaptations to new threadpool
ypatia Nov 25, 2024
15e7a4d
Fix compute task outliving dense reader
ypatia Nov 28, 2024
0ae71fb
Remove mutex that causes problems (TBD)
ypatia Nov 29, 2024
9cb9064
Adapt some tests after threadpool changes
ypatia Dec 4, 2024
bc79307
Fix deadlock in merge_result_cell_slabs
ypatia Dec 4, 2024
dfdc581
Fix linux compilation issue
ypatia Dec 6, 2024
accc681
Fix gcc future exception
Shelnutt2 Dec 6, 2024
2158495
Fix missing unit test threadpool linkage
Shelnutt2 Dec 6, 2024
68a8235
Merge branch 'yt/sc-59606/threadpool_with_tasks' into yt/sc-59605/don…
ypatia Dec 7, 2024
3782594
Merge branch 'dev' into yt/sc-59605/dont_block_io
ypatia Dec 9, 2024
c51ba44
Fix tile missing threadpool linkage
ypatia Dec 12, 2024
af4857e
Remove duplicate library in cmake
ypatia Dec 12, 2024
3de9216
Disable temporarily flaky test
ypatia Dec 13, 2024
9647fff
Attempt to fix asan error
ypatia Dec 13, 2024
888652a
Fix segfault in legacy reader
ypatia Dec 16, 2024
81f3166
Revert "Attempt to fix asan error"
ypatia Dec 17, 2024
88c0ecb
Fix some windows tests
ypatia Dec 18, 2024
d391375
Fix lifetime issues and some namings
ypatia Dec 18, 2024
0a97612
Fix ASAN: Destructors of base classes must be virtual
ypatia Dec 19, 2024
446700a
Some more PR cleanup
ypatia Dec 19, 2024
eacdc79
Merge branch 'dev' into yt/sc-59605/dont_block_io
ypatia Jan 3, 2025
a2fb4e1
Constrain non blocking tasks to I/O
ypatia Jan 22, 2025
c8b6de6
Fixes to tests
ypatia Jan 23, 2025
c9aa97c
Merge branch 'main' into yt/sc-59605/dont_block_io
ypatia Jan 24, 2025
4105c1a
Merge branch 'main' into yt/sc-59605/dont_block_io
ypatia Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion test/src/unit-ReadCellSlabIter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ void set_result_tile_dim(
std::nullopt,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
result_tile.init_coord_tile(
constants::format_version,
array_schema,
Expand Down
4 changes: 2 additions & 2 deletions test/src/unit-cppapi-consolidation-with-timestamps.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ TEST_CASE_METHOD(

// Will only allow to load two tiles out of 3.
Config cfg;
cfg.set("sm.mem.total_budget", "30000");
cfg.set("sm.mem.total_budget", "50000");
cfg.set("sm.mem.reader.sparse_global_order.ratio_coords", "0.15");
ctx_ = Context(cfg);

Expand Down Expand Up @@ -685,7 +685,7 @@ TEST_CASE_METHOD(

// Will only allow to load two tiles out of 3.
Config cfg;
cfg.set("sm.mem.total_budget", "30000");
cfg.set("sm.mem.total_budget", "50000");
cfg.set("sm.mem.reader.sparse_global_order.ratio_coords", "0.15");
ctx_ = Context(cfg);

Expand Down
20 changes: 16 additions & 4 deletions test/src/unit-result-tile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand All @@ -230,7 +233,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand Down Expand Up @@ -326,7 +332,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand All @@ -343,7 +352,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand Down
8 changes: 4 additions & 4 deletions test/src/unit-sparse-global-order-reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -789,9 +789,9 @@ TEST_CASE_METHOD(
write_1d_fragment(coords, &coords_size, data, &data_size);
}

// Two result tile (2 * (~3000 + 8) will be bigger than the per fragment
// budget (1000).
total_budget_ = "35000";
// Two result tiles (2 * (~4500 + 8) will be bigger than the per fragment
// budget (3905).
total_budget_ = "60000";
ratio_coords_ = "0.11";
update_config();

Expand Down Expand Up @@ -1312,7 +1312,7 @@ TEST_CASE_METHOD(
write_1d_fragment(coords, &coords_size, data, &data_size);
}

// Two result tile (2 * (~4000 + 8) will be bigger than the per fragment
// Two result tile (2 * (~4500 + 8) will be bigger than the per fragment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above this is 3000=>4500, here it is 4000=>4500.

Does the 3000/4000 value come from the tile size? Or is it out of date?

If these are both 4500 now then is the size padded for alignment?

// budget (1000).
total_budget_ = "40000";
ratio_coords_ = "0.22";
Expand Down
8 changes: 6 additions & 2 deletions test/src/unit-sparse-unordered-with-dups-reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1064,9 +1064,13 @@ TEST_CASE_METHOD(

if (one_frag) {
CHECK(1 == loop_num->second);
} else {
CHECK(9 == loop_num->second);
}
/**
* FIXME: The loop_num appears to be different on different
* architectures/build modes. SC-61065 to investigate why. } else { CHECK(20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on previous discussion, I imagine there's a pretty good conjecture -

If the memory budget is tight, and the new fields you've added to ResultCoords balloon the size of each tile, AND the size of those fields is architecture/compiler dependent, then it is architecture dependent how many tiles fit in the memory budget. And thus also architecture dependent as to how many internal loops we have.

* == loop_num->second);
* }
*/

// Try to read multiple frags without partial tile offset reading. Should
// fail
Expand Down
2 changes: 1 addition & 1 deletion tiledb/sm/filter/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ include(object_library)
#
commence(object_library filter)
this_target_sources(filter.cc filter_buffer.cc filter_storage.cc)
this_target_object_libraries(baseline buffer tiledb_crypto)
this_target_object_libraries(baseline buffer tiledb_crypto thread_pool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed? The unfiltering will be in part 2, right? Should this also be?

conclude(object_library)

#
Expand Down
2 changes: 1 addition & 1 deletion tiledb/sm/filter/compression_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ Status CompressionFilter::decompress_var_string_coords(
auto output_view = span<std::byte>(
reinterpret_cast<std::byte*>(output_buffer->data()), uncompressed_size);
auto offsets_view = span<uint64_t>(
offsets_tile->data_as<offsets_t>(), uncompressed_offsets_size);
offsets_tile->data_as_unsafe<offsets_t>(), uncompressed_offsets_size);

if (compressor_ == Compressor::RLE) {
uint8_t rle_len_bytesize, string_len_bytesize;
Expand Down
4 changes: 2 additions & 2 deletions tiledb/sm/filter/filter_pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ Status FilterPipeline::run_reverse(
// If the pipeline is empty, just copy input to output.
if (filters_.empty()) {
void* output_chunk_buffer =
tile->data_as<char>() + chunk_data.chunk_offsets_[i];
tile->data_as_unsafe<char>() + chunk_data.chunk_offsets_[i];
RETURN_NOT_OK(input_data.copy_to(output_chunk_buffer));
continue;
}
Expand All @@ -484,7 +484,7 @@ Status FilterPipeline::run_reverse(
bool last_filter = filter_idx == 0;
if (last_filter) {
void* output_chunk_buffer =
tile->data_as<char>() + chunk_data.chunk_offsets_[i];
tile->data_as_unsafe<char>() + chunk_data.chunk_offsets_[i];
RETURN_NOT_OK(output_data.set_fixed_allocation(
output_chunk_buffer, chunk.unfiltered_data_size_));
reader_stats->add_counter(
Expand Down
10 changes: 7 additions & 3 deletions tiledb/sm/filter/test/filter_test_support.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ shared_ptr<WriterTile> make_increasing_tile(
Datatype::UINT64,
cell_size,
tile_size,
tracker);
tracker,
true);
for (uint64_t i = 0; i < nelts; i++) {
CHECK_NOTHROW(tile->write(&i, i * sizeof(uint64_t), sizeof(uint64_t)));
}
Expand All @@ -178,7 +179,8 @@ shared_ptr<WriterTile> make_offsets_tile(
Datatype::UINT64,
constants::cell_var_offset_size,
offsets_tile_size,
tracker);
tracker,
true);

// Set up test data
for (uint64_t i = 0; i < offsets.size(); i++) {
Expand All @@ -203,7 +205,9 @@ Tile create_tile_for_unfiltering(
tile->cell_size() * nelts,
tile->filtered_buffer().data(),
tile->filtered_buffer().size(),
tracker};
tracker,
ThreadPool::SharedTask(),
true};
}

void run_reverse(
Expand Down
10 changes: 7 additions & 3 deletions tiledb/sm/filter/test/tile_data_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ class TileDataGenerator {
datatype(),
cell_size(),
original_tile_size(),
memory_tracker);
memory_tracker,
true);
}

/**
Expand Down Expand Up @@ -99,7 +100,9 @@ class TileDataGenerator {
original_tile_size(),
filtered_buffer.data(),
filtered_buffer.size(),
memory_tracker);
memory_tracker,
ThreadPool::SharedTask(),
true);
}

/** Returns the size of the original unfiltered data. */
Expand Down Expand Up @@ -187,7 +190,8 @@ class IncrementTileDataGenerator : public TileDataGenerator {
Datatype::UINT64,
constants::cell_var_offset_size,
offsets.size() * constants::cell_var_offset_size,
memory_tracker);
memory_tracker,
true);
for (uint64_t index = 0; index < offsets.size(); ++index) {
CHECK_NOTHROW(offsets_tile->write(
&offsets[index],
Expand Down
9 changes: 6 additions & 3 deletions tiledb/sm/metadata/test/unit_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ TEST_CASE(
tile1->size(),
tile1->filtered_buffer().data(),
tile1->filtered_buffer().size(),
tracker);
tracker,
ThreadPool::SharedTask());
memcpy(metadata_tiles[0]->data(), tile1->data(), tile1->size());

metadata_tiles[1] = tdb::make_shared<Tile>(
Expand All @@ -135,7 +136,8 @@ TEST_CASE(
tile2->size(),
tile2->filtered_buffer().data(),
tile2->filtered_buffer().size(),
tracker);
tracker,
ThreadPool::SharedTask());
memcpy(metadata_tiles[1]->data(), tile2->data(), tile2->size());

metadata_tiles[2] = tdb::make_shared<Tile>(
Expand All @@ -147,7 +149,8 @@ TEST_CASE(
tile3->size(),
tile3->filtered_buffer().data(),
tile3->filtered_buffer().size(),
tracker);
tracker,
ThreadPool::SharedTask());
memcpy(metadata_tiles[2]->data(), tile3->data(), tile3->size());

meta = Metadata::deserialize(metadata_tiles);
Expand Down
59 changes: 32 additions & 27 deletions tiledb/sm/query/readers/dense_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ Status DenseReader::dense_read() {
// processing.
if (qc_coords_mode_) {
t_start = t_end;
if (compute_task.valid()) {
throw_if_not_ok(compute_task.wait());
}
continue;
}

Expand Down Expand Up @@ -769,8 +772,8 @@ DenseReader::compute_result_space_tiles(
const auto fragment_num = (unsigned)frag_tile_domains.size();
const auto& tile_coords = subarray.tile_coords();

// Keep track of the required memory to load the result space tiles. Split up
// filtered versus unfiltered. The memory budget is combined for all
// Keep track of the required memory to load the result space tiles. Split
// up filtered versus unfiltered. The memory budget is combined for all
// query condition attributes.
uint64_t required_memory_query_condition_unfiltered = 0;
std::vector<uint64_t> required_memory_unfiltered(
Expand All @@ -786,28 +789,28 @@ DenseReader::compute_result_space_tiles(
aggregate_only_field[n - condition_names.size()] = aggregate_only(name);
}

// Here we estimate the size of the tile structures. First, we have to account
// the size of the space tile structure. We could go deeper in the class to
// account for other things but for now we keep it simpler. Second, we try to
// account for the tile subarray (DenseTileSubarray). This class will have a
// vector of ranges per dimensions, so 1 + dim_num * sizeof(vector). Here we
// choose 32 for the size of the vector to anticipate the conversion to a PMR
// vector. We also add dim_num * 2 * sizeof(DimType) to account for at least
// one range per dimension (this should be improved by accounting for the
// exact number of ranges). Finally for the original range index member, we
// have to add 1 + dim_num * sizeof(vector) as well and one uint64_t per
// dimension (this can also be improved by accounting for the
// exact number of ranges).
// Here we estimate the size of the tile structures. First, we have to
// account the size of the space tile structure. We could go deeper in the
// class to account for other things but for now we keep it simpler. Second,
// we try to account for the tile subarray (DenseTileSubarray). This class
// will have a vector of ranges per dimensions, so 1 + dim_num *
// sizeof(vector). Here we choose 32 for the size of the vector to
// anticipate the conversion to a PMR vector. We also add dim_num * 2 *
// sizeof(DimType) to account for at least one range per dimension (this
// should be improved by accounting for the exact number of ranges). Finally
// for the original range index member, we have to add 1 + dim_num *
// sizeof(vector) as well and one uint64_t per dimension (this can also be
// improved by accounting for the exact number of ranges).
uint64_t est_tile_structs_size =
sizeof(ResultSpaceTile<DimType>) + (1 + dim_num) * 2 * 32 +
dim_num * (2 * sizeof(DimType) + sizeof(uint64_t));

// Create the vector of result tiles to operate on. We stop once we reach
// the end or the memory budget. We either reach the tile upper memory limit,
// which is only for unfiltered data, or the limit of the available budget,
// which is for filtered data, unfiltered data and the tile structs. We try to
// process two tile batches at a time so the available memory is half of what
// we have available.
// the end or the memory budget. We either reach the tile upper memory
// limit, which is only for unfiltered data, or the limit of the available
// budget, which is for filtered data, unfiltered data and the tile structs.
// We try to process two tile batches at a time so the available memory is
// half of what we have available.
uint64_t t_end = t_start;
bool wait_compute_task_before_read = false;
bool done = false;
Expand Down Expand Up @@ -895,8 +898,8 @@ DenseReader::compute_result_space_tiles(
uint64_t tile_memory_filtered = 0;
uint64_t r_idx = n - condition_names.size();

// We might not need to load this tile into memory at all for aggregation
// only.
// We might not need to load this tile into memory at all for
// aggregation only.
if (aggregate_only_field[r_idx] &&
can_aggregate_tile_with_frag_md(
names[n], result_space_tile, tiles_cell_num[t_end])) {
Expand Down Expand Up @@ -953,13 +956,14 @@ DenseReader::compute_result_space_tiles(
required_memory_unfiltered[r_idx] +
est_tile_structs_size;

// Disable the multiple iterations if the tiles don't fit in the iteration
// budget.
// Disable the multiple iterations if the tiles don't fit in the
// iteration budget.
if (total_memory > available_memory_iteration) {
wait_compute_task_before_read = true;
}

// If a single tile doesn't fit in the available memory, we can't proceed.
// If a single tile doesn't fit in the available memory, we can't
// proceed.
if (total_memory > available_memory) {
throw DenseReaderException(
"Cannot process a single tile requiring " +
Expand Down Expand Up @@ -1003,7 +1007,8 @@ std::vector<ResultTile*> DenseReader::result_tiles_to_load(
const auto& tile_coords = subarray.tile_coords();
const bool agg_only = name.has_value() && aggregate_only(name.value());

// If the result is already loaded in query condition, return the empty list;
// If the result is already loaded in query condition, return the empty
// list;
std::vector<ResultTile*> ret;
if (name.has_value() && condition_names.count(name.value()) != 0) {
return ret;
Expand Down Expand Up @@ -1033,8 +1038,8 @@ std::vector<ResultTile*> DenseReader::result_tiles_to_load(

/**
* Apply the query condition. The computation will be pushed on the compute
* thread pool in `compute_task`. Callers should wait on this task before using
* the results of the query condition.
* thread pool in `compute_task`. Callers should wait on this task before
* using the results of the query condition.
*/
template <class DimType, class OffType>
Status DenseReader::apply_query_condition(
Expand Down
Loading
Loading