Skip to content

Commit

Permalink
Constrain non blocking tasks to I/O
Browse files Browse the repository at this point in the history
  • Loading branch information
ypatia committed Jan 22, 2025
1 parent eacdc79 commit a81fa30
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 263 deletions.
3 changes: 1 addition & 2 deletions test/src/unit-ReadCellSlabIter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,7 @@ void set_result_tile_dim(
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
nullptr};
{nullptr, ThreadPool::SharedTask()}};
result_tile.init_coord_tile(
constants::format_version,
array_schema,
Expand Down
2 changes: 1 addition & 1 deletion test/src/unit-cppapi-consolidation-with-timestamps.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ TEST_CASE_METHOD(

// Will only allow to load two tiles out of 3.
Config cfg;
cfg.set("sm.mem.total_budget", "65000");
cfg.set("sm.mem.total_budget", "50000");
cfg.set("sm.mem.reader.sparse_global_order.ratio_coords", "0.15");
ctx_ = Context(cfg);

Expand Down
12 changes: 4 additions & 8 deletions test/src/unit-result-tile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,7 @@ TEST_CASE_METHOD(
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
nullptr};
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand All @@ -237,8 +236,7 @@ TEST_CASE_METHOD(
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
nullptr};
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand Down Expand Up @@ -337,8 +335,7 @@ TEST_CASE_METHOD(
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
nullptr};
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand All @@ -358,8 +355,7 @@ TEST_CASE_METHOD(
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
nullptr};
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand Down
17 changes: 15 additions & 2 deletions tiledb/sm/query/readers/dense_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "tiledb/sm/misc/parallel_functions.h"
#include "tiledb/sm/query/legacy/cell_slab_iter.h"
#include "tiledb/sm/query/query_macros.h"
#include "tiledb/sm/query/readers/filtered_data.h"
#include "tiledb/sm/query/readers/result_tile.h"
#include "tiledb/sm/stats/global_stats.h"
#include "tiledb/sm/subarray/subarray.h"
Expand Down Expand Up @@ -462,6 +463,7 @@ Status DenseReader::dense_read() {
// clear the memory. Also, a name in names might not be in the user buffers
// so we might skip the copy but still clear the memory.
for (auto& name : names) {
shared_ptr<std::list<FilteredData>> filtered_data;
std::vector<ResultTile*> result_tiles;
bool validity_only = null_count_aggregate_only(name);
bool dense_dim = name == constants::coords || array_schema_.is_dim(name);
Expand All @@ -485,7 +487,8 @@ Status DenseReader::dense_read() {
// Read and unfilter tiles.
std::vector<ReaderBase::NameToLoad> to_load;
to_load.emplace_back(name, validity_only);
read_attribute_tiles(to_load, result_tiles);
filtered_data = make_shared<std::list<FilteredData>>(
read_attribute_tiles(to_load, result_tiles));
}

if (compute_task.valid()) {
Expand All @@ -497,6 +500,7 @@ Status DenseReader::dense_read() {

compute_task = resources_.compute_tp().execute([&,
iteration_tile_data,
filtered_data,
dense_dim,
name,
validity_only,
Expand All @@ -508,6 +512,9 @@ Status DenseReader::dense_read() {
// Unfilter tiles.
RETURN_NOT_OK(unfilter_tiles(name, validity_only, result_tiles));

// The filtered data is no longer required, release it.
filtered_data.reset();

// Only copy names that are present in the user buffers.
if (buffers_.count(name) != 0) {
// Copy attribute data to users buffers.
Expand Down Expand Up @@ -1068,13 +1075,16 @@ Status DenseReader::apply_query_condition(
tiles_cell_num);

// Read and unfilter query condition attributes.
read_attribute_tiles(NameToLoad::from_string_vec(qc_names), result_tiles);
shared_ptr<std::list<FilteredData>> filtered_data =
make_shared<std::list<FilteredData>>(read_attribute_tiles(
NameToLoad::from_string_vec(qc_names), result_tiles));

if (compute_task.valid()) {
throw_if_not_ok(compute_task.wait());
}

compute_task = resources_.compute_tp().execute([&,
filtered_data,
iteration_tile_data,
qc_names,
num_range_threads,
Expand All @@ -1091,6 +1101,9 @@ Status DenseReader::apply_query_condition(
RETURN_NOT_OK(unfilter_tiles(name, false, result_tiles));
}

// The filtered data is no longer required, release it.
filtered_data.reset();

if (stride == UINT64_MAX) {
stride = 1;
}
Expand Down
16 changes: 9 additions & 7 deletions tiledb/sm/query/readers/filtered_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,14 @@ class FilteredData {
/* ********************************* */

/**
* Get the fixed filtered data for the result tile.
* Get a pointer to the fixed filtered data for the result tile and a future
* which signals when the data is valid.
*
* @param fragment Fragment metadata for the tile.
* @param rt Result tile.
* @return Fixed filtered data pointer.
*/
inline std::tuple<void*, ThreadPool::SharedTask> fixed_filtered_data(
inline std::pair<void*, ThreadPool::SharedTask> fixed_filtered_data(
const FragmentMetadata* fragment, const ResultTile* rt) {
auto offset{
fragment->loaded_metadata()->file_offset(name_, rt->tile_idx())};
Expand All @@ -346,13 +347,13 @@ class FilteredData {
}

/**
* Get the var filtered data for the result tile.
*
* Get a pointer to the var filtered data for the result tile and a future
* which signals when the data is valid. *
* @param fragment Fragment metadata for the tile.
* @param rt Result tile.
* @return Var filtered data pointer.
*/
inline std::tuple<void*, ThreadPool::SharedTask> var_filtered_data(
inline std::pair<void*, ThreadPool::SharedTask> var_filtered_data(
const FragmentMetadata* fragment, const ResultTile* rt) {
if (!var_sized_) {
return {nullptr, ThreadPool::SharedTask()};
Expand All @@ -367,13 +368,14 @@ class FilteredData {
}

/**
* Get the nullable filtered data for the result tile.
* Get a pointer to the nullable filtered data for the result tile and a
* future which signals when the data is valid.
*
* @param fragment Fragment metadata for the tile.
* @param rt Result tile.
* @return Nullable filtered data pointer.
*/
inline std::tuple<void*, ThreadPool::SharedTask> nullable_filtered_data(
inline std::pair<void*, ThreadPool::SharedTask> nullable_filtered_data(
const FragmentMetadata* fragment, const ResultTile* rt) {
if (!nullable_) {
return {nullptr, ThreadPool::SharedTask()};
Expand Down
Loading

0 comments on commit a81fa30

Please sign in to comment.