Skip to content

Commit

Permalink
Implement explain analyze (#2541)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Explain analyze, can give the detail time cost of each task

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

---------

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
  • Loading branch information
JinHai-CN authored Mar 10, 2025
1 parent 88a5a56 commit 51f41bb
Show file tree
Hide file tree
Showing 18 changed files with 158 additions and 56 deletions.
5 changes: 2 additions & 3 deletions python/test_pysdk/test_explain.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ def test_explain(self, suffix):
res = table.output(["*"]).explain(ExplainType.Fragment)
print(res)

with pytest.raises(Exception, match=r".*Not implement*"):
res = table.output(["*"]).explain(ExplainType.Analyze)
print(res)
res = table.output(["*"]).explain(ExplainType.Analyze)
print(res)

db_obj.drop_table("test_explain_default"+suffix, ConflictType.Error)
2 changes: 2 additions & 0 deletions src/common/status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ Status Status::AlreadyLocked(const String &detail) { return Status(ErrorCode::kA

Status Status::NotLocked(const String &detail) { return Status(ErrorCode::kNotLocked, MakeUnique<String>(detail)); }

Status Status::InvalidParameter(const String &detail) { return Status(ErrorCode::kInvalidParameter, MakeUnique<String>(detail)); }

// 4. TXN fail
Status Status::TxnRollback(u64 txn_id, const String &rollback_reason) {
return Status(ErrorCode::kTxnRollback, MakeUnique<String>(fmt::format("Transaction: {} is rollback. {}", txn_id, rollback_reason)));
Expand Down
2 changes: 2 additions & 0 deletions src/common/status.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ export enum class ErrorCode : long {
kFailToStartTxn = 3093,
kAlreadyLocked = 3094,
kNotLocked = 3095,
kInvalidParameter = 3096,

// 4. Txn fail
kTxnRollback = 4001,
Expand Down Expand Up @@ -321,6 +322,7 @@ public:
static Status FailToStartTxn(const String &detail);
static Status AlreadyLocked(const String &detail);
static Status NotLocked(const String &detail);
static Status InvalidParameter(const String &detail);

// 4. TXN fail
static Status TxnRollback(u64 txn_id, const String &rollback_reason = "no reanson gived");
Expand Down
12 changes: 6 additions & 6 deletions src/executor/fragment_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,19 @@ void FragmentBuilder::BuildExplain(PhysicalOperator *phys_op, PlanFragment *curr
PhysicalExplain *explain_op = (PhysicalExplain *)phys_op;
switch (explain_op->explain_type()) {

case ExplainType::kAnalyze: {
Status status = Status::NotSupport("Not implement: Query analyze");
RecoverableError(status);
}
case ExplainType::kAst:
case ExplainType::kUnOpt:
case ExplainType::kOpt:
case ExplainType::kPhysical: {
current_fragment_ptr->AddOperator(phys_op);
break;
}
case ExplainType::kFragment:
case ExplainType::kAnalyze:
case ExplainType::kPipeline: {
query_context_ptr_->set_explain_analyze();
query_context_ptr_->CreateQueryProfiler();
}
case ExplainType::kFragment: {
// Build explain pipeline fragment
SharedPtr<Vector<SharedPtr<String>>> texts_ptr = MakeShared<Vector<SharedPtr<String>>>();
Vector<PhysicalOperator *> phys_ops{phys_op->left()};
Expand All @@ -88,7 +88,7 @@ void FragmentBuilder::BuildExplain(PhysicalOperator *phys_op, PlanFragment *curr
// Set texts to explain physical operator
current_fragment_ptr->AddOperator(phys_op);

if (explain_op->explain_type() == ExplainType::kPipeline) {
if (explain_op->explain_type() == ExplainType::kPipeline or explain_op->explain_type() == ExplainType::kAnalyze) {
current_fragment_ptr->AddChild(std::move(explain_child_fragment));
}
break;
Expand Down
99 changes: 87 additions & 12 deletions src/executor/operator/physical_explain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

module;

#include <vector>

module physical_explain;

import stl;
Expand All @@ -32,6 +34,9 @@ import status;
import infinity_exception;
import logical_type;
import logger;
import plan_fragment;
import fragment_task;
import third_party;

namespace infinity {

Expand All @@ -46,17 +51,17 @@ void PhysicalExplain::AlignParagraphs(Vector<SharedPtr<String>> &array1, Vector<
}
}

void PhysicalExplain::Init(QueryContext* query_context) {
void PhysicalExplain::Init(QueryContext *query_context) {
auto varchar_type = MakeShared<DataType>(LogicalType::kVarchar);

output_names_ = MakeShared<Vector<String>>();
output_types_ = MakeShared<Vector<SharedPtr<DataType>>>();

switch (explain_type_) {
case ExplainType::kAnalyze: {
output_names_->emplace_back("Query Analyze");
Status status = Status::NotSupport("Not implement: Query analyze");
RecoverableError(status);
output_names_->emplace_back("Pipeline");
output_names_->emplace_back("Task cost");
break;
}
case ExplainType::kAst: {
output_names_->emplace_back("Abstract Syntax Tree");
Expand Down Expand Up @@ -90,12 +95,12 @@ void PhysicalExplain::Init(QueryContext* query_context) {
}
output_types_->emplace_back(varchar_type);

if (explain_type_ == ExplainType::kPipeline) {
if (explain_type_ == ExplainType::kPipeline or explain_type_ == ExplainType::kAnalyze) {
output_types_->emplace_back(varchar_type);
}
}

bool PhysicalExplain::Execute(QueryContext *, OperatorState *operator_state) {
bool PhysicalExplain::Execute(QueryContext *query_context, OperatorState *operator_state) {
String title;

auto column_vector_ptr = ColumnVector::Make(MakeShared<DataType>(LogicalType::kVarchar));
Expand All @@ -105,8 +110,8 @@ bool PhysicalExplain::Execute(QueryContext *, OperatorState *operator_state) {

switch (explain_type_) {
case ExplainType::kAnalyze: {
Status status = Status::NotSupport("Not implement: Query analyze");
RecoverableError(status);
title = "Analyze";
break;
}
case ExplainType::kAst: {
title = "Abstract Syntax Tree";
Expand Down Expand Up @@ -144,13 +149,28 @@ bool PhysicalExplain::Execute(QueryContext *, OperatorState *operator_state) {
task_vector_ptr->Initialize(ColumnVectorType::kFlat, capacity);

if (explain_type_ == ExplainType::kPipeline) {
AlignParagraphs(*this->texts_, *this->task_texts_);
Vector<SharedPtr<String>> task_texts;
ExplainPipeline(task_texts, plan_fragment_ptr_, query_context->query_profiler());

AlignParagraphs(*this->texts_, task_texts);
for (SizeT idx = 0; idx < this->texts_->size(); ++idx) {
column_vector_ptr->AppendValue(Value::MakeVarchar(*(*this->texts_)[idx]));
}
for (SizeT idx = 0; idx < task_texts.size(); ++idx) {
task_vector_ptr->AppendValue(Value::MakeVarchar(*task_texts[idx]));
}
} else if (explain_type_ == ExplainType::kAnalyze) {
Vector<SharedPtr<String>> task_texts;
ExplainAnalyze(task_texts, plan_fragment_ptr_, query_context->query_profiler());

AlignParagraphs(*this->texts_, task_texts);
for (SizeT idx = 0; idx < this->texts_->size(); ++idx) {
column_vector_ptr->AppendValue(Value::MakeVarchar(*(*this->texts_)[idx]));
}
for (SizeT idx = 0; idx < this->task_texts_->size(); ++idx) {
task_vector_ptr->AppendValue(Value::MakeVarchar(*(*this->task_texts_)[idx]));
for (SizeT idx = 0; idx < task_texts.size(); ++idx) {
task_vector_ptr->AppendValue(Value::MakeVarchar(*task_texts[idx]));
}

} else {
for (SizeT idx = 0; idx < this->texts_->size(); ++idx) {
column_vector_ptr->AppendValue(Value::MakeVarchar(*(*this->texts_)[idx]));
Expand All @@ -161,7 +181,7 @@ bool PhysicalExplain::Execute(QueryContext *, OperatorState *operator_state) {
column_vectors.reserve(2);

column_vectors.push_back(column_vector_ptr);
if (explain_type_ == ExplainType::kPipeline) {
if (explain_type_ == ExplainType::kPipeline or explain_type_ == ExplainType::kAnalyze) {
column_vectors.push_back(task_vector_ptr);
}
output_data_block->Init(column_vectors);
Expand All @@ -172,4 +192,59 @@ bool PhysicalExplain::Execute(QueryContext *, OperatorState *operator_state) {
return true;
}

void PhysicalExplain::SetPlanFragment(PlanFragment *plan_fragment_ptr) { plan_fragment_ptr_ = plan_fragment_ptr; }

void PhysicalExplain::ExplainAnalyze(Vector<SharedPtr<String>> &result, PlanFragment *plan_fragment_ptr, QueryProfiler *query_profiler) {
Vector<UniquePtr<FragmentTask>> &tasks = plan_fragment_ptr->GetContext()->Tasks();
u64 fragment_id = plan_fragment_ptr->FragmentID();
{
String fragment_header = fmt::format("Fragment #{} * {} Tasks", fragment_id, tasks.size());
result.emplace_back(MakeShared<String>(fragment_header));
}
for (const auto &task : tasks) {
i64 task_id = task->TaskID();

Vector<TaskProfiler> &task_profiles = query_profiler->GetTaskProfile(fragment_id, task_id);
for (const auto &task_profile : task_profiles) {
i64 times = 0;
result.emplace_back(MakeShared<String>(fmt::format("-> Task {}, Seq: {}", task_id, times)));
for (const auto &operator_info : task_profile.timings_) {
String operator_info_str = fmt::format(" -> {} : ElapsedTime: {}, Output: {}",
operator_info.name_,
BaseProfiler::ElapsedToString(static_cast<infinity::NanoSeconds>(operator_info.elapsed_)),
operator_info.output_rows_);
result.emplace_back(MakeShared<String>(operator_info_str));
}
++times;
}
}
// NOTE: Insert blank elements after each Fragment for alignment
result.emplace_back(MakeShared<String>());

if (plan_fragment_ptr->HasChild()) {
// current fragment have children
for (const auto &child_fragment : plan_fragment_ptr->Children()) {
ExplainAnalyze(result, child_fragment.get(), query_profiler);
}
}
}

void PhysicalExplain::ExplainPipeline(Vector<SharedPtr<String>> &result, PlanFragment *plan_fragment_ptr, QueryProfiler *query_profiler) {
Vector<UniquePtr<FragmentTask>> &tasks = plan_fragment_ptr->GetContext()->Tasks();
u64 fragment_id = plan_fragment_ptr->FragmentID();
{
String fragment_header = fmt::format("Fragment #{} * {} Tasks", fragment_id, tasks.size());
result.emplace_back(MakeShared<String>(fragment_header));
}
// NOTE: Insert blank elements after each Fragment for alignment
result.emplace_back(MakeShared<String>());

if (plan_fragment_ptr->HasChild()) {
// current fragment have children
for (const auto &child_fragment : plan_fragment_ptr->Children()) {
ExplainPipeline(result, child_fragment.get(), query_profiler);
}
}
}

} // namespace infinity
12 changes: 11 additions & 1 deletion src/executor/operator/physical_explain.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import internal_types;
import explain_statement;
import data_type;
import logger;
import plan_fragment;
import profiler;

namespace infinity {

Expand All @@ -43,14 +45,16 @@ public:

~PhysicalExplain() override = default;

void Init(QueryContext* query_context) override;
void Init(QueryContext *query_context) override;

bool Execute(QueryContext *query_context, OperatorState *operator_state) final;

void SetExplainText(SharedPtr<Vector<SharedPtr<String>>> text) { texts_ = std::move(text); }

void SetExplainTaskText(SharedPtr<Vector<SharedPtr<String>>> text) { task_texts_ = std::move(text); }

void SetPlanFragment(PlanFragment *plan_fragment_ptr);

inline SharedPtr<Vector<String>> GetOutputNames() const final { return output_names_; }

inline SharedPtr<Vector<SharedPtr<DataType>>> GetOutputTypes() const final { return output_types_; }
Expand All @@ -59,13 +63,19 @@ public:

static void AlignParagraphs(Vector<SharedPtr<String>> &array1, Vector<SharedPtr<String>> &array2);

private:
void ExplainAnalyze(Vector<SharedPtr<String>> &result, PlanFragment *plan_fragment_ptr, QueryProfiler *query_profiler);
void ExplainPipeline(Vector<SharedPtr<String>> &result, PlanFragment *plan_fragment_ptr, QueryProfiler *query_profiler);

private:
ExplainType explain_type_{ExplainType::kPhysical};
SharedPtr<Vector<SharedPtr<String>>> texts_{nullptr};
SharedPtr<Vector<SharedPtr<String>>> task_texts_{nullptr};

SharedPtr<Vector<String>> output_names_{};
SharedPtr<Vector<SharedPtr<DataType>>> output_types_{};

PlanFragment *plan_fragment_ptr_;
};

} // namespace infinity
6 changes: 1 addition & 5 deletions src/executor/physical_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1260,11 +1260,6 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildExplain(const SharedPtr<Logica

UniquePtr<PhysicalExplain> explain_node{nullptr};
switch (logical_explain->explain_type()) {
case ExplainType::kAnalyze: {
Status status = Status::NotSupport("Not implement: Explain analyze");
RecoverableError(status);
break;
}
case ExplainType::kAst:
case ExplainType::kUnOpt:
case ExplainType::kOpt: {
Expand All @@ -1285,6 +1280,7 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildExplain(const SharedPtr<Logica
logical_operator->load_metas());
break;
}
case ExplainType::kAnalyze:
case ExplainType::kFragment:
case ExplainType::kPipeline: {
explain_node = MakeUnique<PhysicalExplain>(logical_explain->node_id(),
Expand Down
12 changes: 6 additions & 6 deletions src/main/infinity_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,16 +506,16 @@ void InfinityContext::UnInit() {

void InfinityContext::SetIndexThreadPool() {
LOG_TRACE("Set index thread pool.");
inverting_thread_pool_.resize(config_->DenseIndexBuildingWorker());
commiting_thread_pool_.resize(config_->SparseIndexBuildingWorker());
hnsw_build_thread_pool_.resize(config_->FulltextIndexBuildingWorker());
inverting_thread_pool_.resize(config_->FulltextIndexBuildingWorker());
commiting_thread_pool_.resize(config_->FulltextIndexBuildingWorker());
hnsw_build_thread_pool_.resize(config_->DenseIndexBuildingWorker());
}

void InfinityContext::RestoreIndexThreadPoolToDefault() {
LOG_TRACE("Restore index thread pool size to default.");
inverting_thread_pool_.resize(config_->DenseIndexBuildingWorker());
commiting_thread_pool_.resize(config_->SparseIndexBuildingWorker());
hnsw_build_thread_pool_.resize(config_->FulltextIndexBuildingWorker());
inverting_thread_pool_.resize(config_->FulltextIndexBuildingWorker());
commiting_thread_pool_.resize(config_->FulltextIndexBuildingWorker());
hnsw_build_thread_pool_.resize(config_->DenseIndexBuildingWorker());
}

void InfinityContext::AddThriftServerFn(std::function<void()> start_func, std::function<void()> stop_func) {
Expand Down
16 changes: 12 additions & 4 deletions src/main/profiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,10 @@ void TaskProfiler::StopOperator(const OperatorState *operator_state) {
output_rows += output_data_block->Finalized() ? output_data_block->row_count() : 0;
}

i64 elapsed_time = profiler_.Elapsed();

OperatorInformation
info(active_operator_->GetName(), profiler_.GetBegin(), profiler_.GetEnd(), profiler_.Elapsed(), input_rows, output_data_size, output_rows);
info(active_operator_->GetName(), profiler_.GetBegin(), profiler_.GetEnd(), elapsed_time, input_rows, output_data_size, output_rows);

timings_.push_back(std::move(info));
active_operator_ = nullptr;
Expand Down Expand Up @@ -248,6 +250,10 @@ void QueryProfiler::StopPhase(QueryPhase phase) {
}

void QueryProfiler::Stop() {
if (!enable_) {
return;
}

if (current_phase_ == QueryPhase::kInvalid) {
return;
}
Expand All @@ -256,9 +262,9 @@ void QueryProfiler::Stop() {
}

void QueryProfiler::Flush(TaskProfiler &&profiler) {
if (!enable_) {
return;
}
// if (!enable_) {
// return;
// }

std::unique_lock<std::mutex> lk(flush_lock_);
records_[profiler.binding_.fragment_id_][profiler.binding_.task_id_].push_back(profiler);
Expand Down Expand Up @@ -374,4 +380,6 @@ nlohmann::json QueryProfiler::Serialize(const QueryProfiler *profiler) {
return json;
}

Vector<TaskProfiler> &QueryProfiler::GetTaskProfile(u64 fragment_id, i64 task_id) { return records_[fragment_id][task_id]; }

} // namespace infinity
7 changes: 4 additions & 3 deletions src/main/profiler.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ public:

// Return the elapsed time from begin, if the profiler is ended, it will return total elapsed time.
[[nodiscard]] inline i64 Elapsed() const {
if (name_.empty()) {
return 0;
}
return ElapsedInternal().count();
}

Expand Down Expand Up @@ -166,6 +163,8 @@ public:
}
}

bool Enable() const { return enable_; }

void StartOperator(const PhysicalOperator *op);

void StopOperator(const OperatorState *output_state);
Expand Down Expand Up @@ -205,6 +204,8 @@ public:

static nlohmann::json Serialize(const QueryProfiler *profiler);

Vector<TaskProfiler> &GetTaskProfile(u64 fragment_id, i64 task_id);

private:
bool enable_{};

Expand Down
Loading

0 comments on commit 51f41bb

Please sign in to comment.