Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ypatia committed Jan 7, 2025
1 parent eacdc79 commit c5899b7
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 31 deletions.
15 changes: 9 additions & 6 deletions tiledb/sm/query/readers/filtered_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,14 @@ class FilteredData {
/* ********************************* */

/**
* Get the fixed filtered data for the result tile.
* Get a pointer to the fixed filtered data for the result tile and a future
* which signals when the data is valid.
*
* @param fragment Fragment metadata for the tile.
* @param rt Result tile.
* @return Fixed filtered data pointer.
*/
inline std::tuple<void*, ThreadPool::SharedTask> fixed_filtered_data(
inline std::pair<void*, ThreadPool::SharedTask> fixed_filtered_data(
const FragmentMetadata* fragment, const ResultTile* rt) {
auto offset{
fragment->loaded_metadata()->file_offset(name_, rt->tile_idx())};
Expand All @@ -346,13 +347,14 @@ class FilteredData {
}

/**
* Get the var filtered data for the result tile.
* Get a pointer to the var filtered data for the result tile and a future
* which signals when the data is valid.
*
* @param fragment Fragment metadata for the tile.
* @param rt Result tile.
* @return Var filtered data pointer.
*/
inline std::tuple<void*, ThreadPool::SharedTask> var_filtered_data(
inline std::pair<void*, ThreadPool::SharedTask> var_filtered_data(
const FragmentMetadata* fragment, const ResultTile* rt) {
if (!var_sized_) {
return {nullptr, ThreadPool::SharedTask()};
Expand All @@ -367,13 +369,14 @@ class FilteredData {
}

/**
* Get the nullable filtered data for the result tile.
* Get a pointer to the nullable filtered data for the result tile and a
* future which signals when the data is valid.
*
* @param fragment Fragment metadata for the tile.
* @param rt Result tile.
* @return Nullable filtered data pointer.
*/
inline std::tuple<void*, ThreadPool::SharedTask> nullable_filtered_data(
inline std::pair<void*, ThreadPool::SharedTask> nullable_filtered_data(
const FragmentMetadata* fragment, const ResultTile* rt) {
if (!nullable_) {
return {nullptr, ThreadPool::SharedTask()};
Expand Down
5 changes: 1 addition & 4 deletions tiledb/sm/query/readers/reader_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ void ReaderBase::read_tiles(
// 'TileData' objects should be returned by this function and passed into
// 'unfilter_tiles' so that the filter pipeline can stop using the
// 'ResultTile' object to get access to the filtered data.
std::tuple<void*, ThreadPool::SharedTask> n = {
std::pair<void*, ThreadPool::SharedTask> n = {
nullptr, ThreadPool::SharedTask()};
ResultTile::TileData tile_data{
val_only ? n :
Expand Down Expand Up @@ -929,9 +929,6 @@ Status ReaderBase::unfilter_tiles(

for (size_t i = 0; i < num_tiles; i++) {
auto result_tile = result_tiles[i];
// if (skip_field(result_tile->frag_idx(), name)) {
// continue;
// }
ThreadPool::SharedTask task =
resources_.compute_tp().execute([name,
validity_only,
Expand Down
3 changes: 0 additions & 3 deletions tiledb/sm/query/readers/reader_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,6 @@ class ReaderBase : public StrategyBase {
* @param names The attribute names.
* @param result_tiles The retrieved tiles will be stored inside the
* `ResultTile` instances in this vector.
* @return Filtered data blocks.
*/
void read_attribute_tiles(
const std::vector<NameToLoad>& names,
Expand All @@ -567,7 +566,6 @@ class ReaderBase : public StrategyBase {
* @param names The coordinate/dimension names.
* @param result_tiles The retrieved tiles will be stored inside the
* `ResultTile` instances in this vector.
* @return Filtered data blocks.
*/
void read_coordinate_tiles(
const std::vector<std::string>& names,
Expand All @@ -584,7 +582,6 @@ class ReaderBase : public StrategyBase {
* @param result_tiles The retrieved tiles will be stored inside the
* `ResultTile` instances in this vector.
* @param validity_only Is the field read for validity only.
* @return Filtered data blocks.
*/
void read_tiles(
const std::vector<NameToLoad>& names,
Expand Down
6 changes: 5 additions & 1 deletion tiledb/sm/query/readers/result_tile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,13 @@ ResultTile::~ResultTile() {
try {
// Wait for all tasks to be done
wait_all_attrs();
} catch (...) {
}

try {
// Wait for all tasks to be done
wait_all_coords();
} catch (...) {
return;
}
}

Expand Down
27 changes: 16 additions & 11 deletions tiledb/sm/query/readers/result_tile.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,16 @@ class ResultTile {
/* CONSTRUCTORS & DESTRUCTORS */
/* ********************************* */
TileData(
std::tuple<void*, ThreadPool::SharedTask> fixed_filtered_data,
std::tuple<void*, ThreadPool::SharedTask> var_filtered_data,
std::tuple<void*, ThreadPool::SharedTask> validity_filtered_data,
std::pair<void*, ThreadPool::SharedTask> fixed_filtered_data,
std::pair<void*, ThreadPool::SharedTask> var_filtered_data,
std::pair<void*, ThreadPool::SharedTask> validity_filtered_data,
shared_ptr<FilteredData> filtered_data)
: fixed_filtered_data_(std::get<0>(fixed_filtered_data))
, var_filtered_data_(std::get<0>(var_filtered_data))
, validity_filtered_data_(std::get<0>(validity_filtered_data))
, fixed_filtered_data_task_(std::get<1>(fixed_filtered_data))
, var_filtered_data_task_(std::get<1>(var_filtered_data))
, validity_filtered_data_task_(std::get<1>(validity_filtered_data))
: fixed_filtered_data_(fixed_filtered_data.first)
, var_filtered_data_(var_filtered_data.first)
, validity_filtered_data_(validity_filtered_data.first)
, fixed_filtered_data_task_(fixed_filtered_data.second)
, var_filtered_data_task_(var_filtered_data.second)
, validity_filtered_data_task_(validity_filtered_data.second)
, filtered_data_(std::move(filtered_data)) {
}

Expand All @@ -232,16 +232,21 @@ class ResultTile {
if (fixed_filtered_data_task_.valid()) {
auto st = fixed_filtered_data_task_.wait();
}
} catch (...) {
}

try {
if (var_filtered_data_task_.valid()) {
auto st = var_filtered_data_task_.wait();
}
} catch (...) {
}

try {
if (validity_filtered_data_task_.valid()) {
auto st = validity_filtered_data_task_.wait();
}
} catch (...) {
return;
}
}

Expand Down Expand Up @@ -285,7 +290,7 @@ class ResultTile {
}

/** Clear the held filtered data. */
inline void clear_filtered_data() {
inline void release_filtered_data() {
filtered_data_ = nullptr;
}

Expand Down
19 changes: 13 additions & 6 deletions tiledb/sm/query/readers/sparse_global_order_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,12 @@ bool SparseGlobalOrderReader<BitmapType>::add_next_cell_to_queue(
// Try to find a new tile.
if (result_tiles_it[frag_idx] != result_tiles[frag_idx].end()) {
// Find a cell in the current result tile.

// This enforces all the coords unfiltering results to be available before
// taking the lock on tile_queue_mutex_. This is to avoid a deadlock where
// a lock is held forever while waiting for a result to be available,
// while the next scheduled task is deadlocking on that lock
rc.tile_->wait_all_coords();
rc = GlobalOrderResultCoords(&*result_tiles_it[frag_idx], 0);

// All tiles should at least have one cell available.
Expand Down Expand Up @@ -815,12 +821,6 @@ bool SparseGlobalOrderReader<BitmapType>::add_next_cell_to_queue(
return true;
}

// This enforces all the coords unfiltering results to be available before
// taking the lock on tile_queue_mutex_. This is to avoid a deadlock where a
// lock is held forever while waiting for a result to be available, while
// the next scheduled task is deadlocking on that lock
rc.tile_->wait_all_coords();

std::unique_lock<std::mutex> ul(tile_queue_mutex_);

// Add all the cells in this tile with the same coordinates as this cell
Expand Down Expand Up @@ -945,6 +945,13 @@ SparseGlobalOrderReader<BitmapType>::merge_result_cell_slabs(
read_state_.frag_idx()[f].cell_idx_ :
0;
GlobalOrderResultCoords rc(&*(rt_it[f]), cell_idx);
// This enforces all the coords unfiltering results to be available
// before taking the lock on tile_queue_mutex_. This is to avoid a
// deadlock where a lock is held forever while waiting for a result to
// be available, while the next scheduled task is deadlocking on that
// lock
rc.tile_->wait_all_coords();

bool res = add_next_cell_to_queue(
rc, rt_it, result_tiles, tile_queue, to_delete);
{
Expand Down

0 comments on commit c5899b7

Please sign in to comment.