Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement explain analyze #2541

Merged
merged 9 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions python/test_pysdk/test_explain.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ def test_explain(self, suffix):
res = table.output(["*"]).explain(ExplainType.Fragment)
print(res)

with pytest.raises(Exception, match=r".*Not implement*"):
res = table.output(["*"]).explain(ExplainType.Analyze)
print(res)
res = table.output(["*"]).explain(ExplainType.Analyze)
print(res)

db_obj.drop_table("test_explain_default"+suffix, ConflictType.Error)
2 changes: 2 additions & 0 deletions src/common/status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ Status Status::AlreadyLocked(const String &detail) { return Status(ErrorCode::kA

Status Status::NotLocked(const String &detail) { return Status(ErrorCode::kNotLocked, MakeUnique<String>(detail)); }

Status Status::InvalidParameter(const String &detail) { return Status(ErrorCode::kInvalidParameter, MakeUnique<String>(detail)); }

// 4. TXN fail
Status Status::TxnRollback(u64 txn_id, const String &rollback_reason) {
return Status(ErrorCode::kTxnRollback, MakeUnique<String>(fmt::format("Transaction: {} is rollback. {}", txn_id, rollback_reason)));
Expand Down
2 changes: 2 additions & 0 deletions src/common/status.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ export enum class ErrorCode : long {
kFailToStartTxn = 3093,
kAlreadyLocked = 3094,
kNotLocked = 3095,
kInvalidParameter = 3096,

// 4. Txn fail
kTxnRollback = 4001,
Expand Down Expand Up @@ -321,6 +322,7 @@ public:
static Status FailToStartTxn(const String &detail);
static Status AlreadyLocked(const String &detail);
static Status NotLocked(const String &detail);
static Status InvalidParameter(const String &detail);

// 4. TXN fail
static Status TxnRollback(u64 txn_id, const String &rollback_reason = "no reanson gived");
Expand Down
12 changes: 6 additions & 6 deletions src/executor/fragment_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,19 @@ void FragmentBuilder::BuildExplain(PhysicalOperator *phys_op, PlanFragment *curr
PhysicalExplain *explain_op = (PhysicalExplain *)phys_op;
switch (explain_op->explain_type()) {

case ExplainType::kAnalyze: {
Status status = Status::NotSupport("Not implement: Query analyze");
RecoverableError(status);
}
case ExplainType::kAst:
case ExplainType::kUnOpt:
case ExplainType::kOpt:
case ExplainType::kPhysical: {
current_fragment_ptr->AddOperator(phys_op);
break;
}
case ExplainType::kFragment:
case ExplainType::kAnalyze:
case ExplainType::kPipeline: {
query_context_ptr_->set_explain_analyze();
query_context_ptr_->CreateQueryProfiler();
}
case ExplainType::kFragment: {
// Build explain pipeline fragment
SharedPtr<Vector<SharedPtr<String>>> texts_ptr = MakeShared<Vector<SharedPtr<String>>>();
Vector<PhysicalOperator *> phys_ops{phys_op->left()};
Expand All @@ -88,7 +88,7 @@ void FragmentBuilder::BuildExplain(PhysicalOperator *phys_op, PlanFragment *curr
// Set texts to explain physical operator
current_fragment_ptr->AddOperator(phys_op);

if (explain_op->explain_type() == ExplainType::kPipeline) {
if (explain_op->explain_type() == ExplainType::kPipeline or explain_op->explain_type() == ExplainType::kAnalyze) {
current_fragment_ptr->AddChild(std::move(explain_child_fragment));
}
break;
Expand Down
99 changes: 87 additions & 12 deletions src/executor/operator/physical_explain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

module;

#include <vector>

module physical_explain;

import stl;
Expand All @@ -32,6 +34,9 @@ import status;
import infinity_exception;
import logical_type;
import logger;
import plan_fragment;
import fragment_task;
import third_party;

namespace infinity {

Expand All @@ -46,17 +51,17 @@ void PhysicalExplain::AlignParagraphs(Vector<SharedPtr<String>> &array1, Vector<
}
}

void PhysicalExplain::Init(QueryContext* query_context) {
void PhysicalExplain::Init(QueryContext *query_context) {
auto varchar_type = MakeShared<DataType>(LogicalType::kVarchar);

output_names_ = MakeShared<Vector<String>>();
output_types_ = MakeShared<Vector<SharedPtr<DataType>>>();

switch (explain_type_) {
case ExplainType::kAnalyze: {
output_names_->emplace_back("Query Analyze");
Status status = Status::NotSupport("Not implement: Query analyze");
RecoverableError(status);
output_names_->emplace_back("Pipeline");
output_names_->emplace_back("Task cost");
break;
}
case ExplainType::kAst: {
output_names_->emplace_back("Abstract Syntax Tree");
Expand Down Expand Up @@ -90,12 +95,12 @@ void PhysicalExplain::Init(QueryContext* query_context) {
}
output_types_->emplace_back(varchar_type);

if (explain_type_ == ExplainType::kPipeline) {
if (explain_type_ == ExplainType::kPipeline or explain_type_ == ExplainType::kAnalyze) {
output_types_->emplace_back(varchar_type);
}
}

bool PhysicalExplain::Execute(QueryContext *, OperatorState *operator_state) {
bool PhysicalExplain::Execute(QueryContext *query_context, OperatorState *operator_state) {
String title;

auto column_vector_ptr = ColumnVector::Make(MakeShared<DataType>(LogicalType::kVarchar));
Expand All @@ -105,8 +110,8 @@ bool PhysicalExplain::Execute(QueryContext *, OperatorState *operator_state) {

switch (explain_type_) {
case ExplainType::kAnalyze: {
Status status = Status::NotSupport("Not implement: Query analyze");
RecoverableError(status);
title = "Analyze";
break;
}
case ExplainType::kAst: {
title = "Abstract Syntax Tree";
Expand Down Expand Up @@ -144,13 +149,28 @@ bool PhysicalExplain::Execute(QueryContext *, OperatorState *operator_state) {
task_vector_ptr->Initialize(ColumnVectorType::kFlat, capacity);

if (explain_type_ == ExplainType::kPipeline) {
AlignParagraphs(*this->texts_, *this->task_texts_);
Vector<SharedPtr<String>> task_texts;
ExplainPipeline(task_texts, plan_fragment_ptr_, query_context->query_profiler());

AlignParagraphs(*this->texts_, task_texts);
for (SizeT idx = 0; idx < this->texts_->size(); ++idx) {
column_vector_ptr->AppendValue(Value::MakeVarchar(*(*this->texts_)[idx]));
}
for (SizeT idx = 0; idx < task_texts.size(); ++idx) {
task_vector_ptr->AppendValue(Value::MakeVarchar(*task_texts[idx]));
}
} else if (explain_type_ == ExplainType::kAnalyze) {
Vector<SharedPtr<String>> task_texts;
ExplainAnalyze(task_texts, plan_fragment_ptr_, query_context->query_profiler());

AlignParagraphs(*this->texts_, task_texts);
for (SizeT idx = 0; idx < this->texts_->size(); ++idx) {
column_vector_ptr->AppendValue(Value::MakeVarchar(*(*this->texts_)[idx]));
}
for (SizeT idx = 0; idx < this->task_texts_->size(); ++idx) {
task_vector_ptr->AppendValue(Value::MakeVarchar(*(*this->task_texts_)[idx]));
for (SizeT idx = 0; idx < task_texts.size(); ++idx) {
task_vector_ptr->AppendValue(Value::MakeVarchar(*task_texts[idx]));
}

} else {
for (SizeT idx = 0; idx < this->texts_->size(); ++idx) {
column_vector_ptr->AppendValue(Value::MakeVarchar(*(*this->texts_)[idx]));
Expand All @@ -161,7 +181,7 @@ bool PhysicalExplain::Execute(QueryContext *, OperatorState *operator_state) {
column_vectors.reserve(2);

column_vectors.push_back(column_vector_ptr);
if (explain_type_ == ExplainType::kPipeline) {
if (explain_type_ == ExplainType::kPipeline or explain_type_ == ExplainType::kAnalyze) {
column_vectors.push_back(task_vector_ptr);
}
output_data_block->Init(column_vectors);
Expand All @@ -172,4 +192,59 @@ bool PhysicalExplain::Execute(QueryContext *, OperatorState *operator_state) {
return true;
}

void PhysicalExplain::SetPlanFragment(PlanFragment *plan_fragment_ptr) { plan_fragment_ptr_ = plan_fragment_ptr; }

void PhysicalExplain::ExplainAnalyze(Vector<SharedPtr<String>> &result, PlanFragment *plan_fragment_ptr, QueryProfiler *query_profiler) {
Vector<UniquePtr<FragmentTask>> &tasks = plan_fragment_ptr->GetContext()->Tasks();
u64 fragment_id = plan_fragment_ptr->FragmentID();
{
String fragment_header = fmt::format("Fragment #{} * {} Tasks", fragment_id, tasks.size());
result.emplace_back(MakeShared<String>(fragment_header));
}
for (const auto &task : tasks) {
i64 task_id = task->TaskID();

Vector<TaskProfiler> &task_profiles = query_profiler->GetTaskProfile(fragment_id, task_id);
for (const auto &task_profile : task_profiles) {
i64 times = 0;
result.emplace_back(MakeShared<String>(fmt::format("-> Task {}, Seq: {}", task_id, times)));
for (const auto &operator_info : task_profile.timings_) {
String operator_info_str = fmt::format(" -> {} : ElapsedTime: {}, Output: {}",
operator_info.name_,
BaseProfiler::ElapsedToString(static_cast<infinity::NanoSeconds>(operator_info.elapsed_)),
operator_info.output_rows_);
result.emplace_back(MakeShared<String>(operator_info_str));
}
++times;
}
}
// NOTE: Insert blank elements after each Fragment for alignment
result.emplace_back(MakeShared<String>());

if (plan_fragment_ptr->HasChild()) {
// current fragment have children
for (const auto &child_fragment : plan_fragment_ptr->Children()) {
ExplainAnalyze(result, child_fragment.get(), query_profiler);
}
}
}

void PhysicalExplain::ExplainPipeline(Vector<SharedPtr<String>> &result, PlanFragment *plan_fragment_ptr, QueryProfiler *query_profiler) {
Vector<UniquePtr<FragmentTask>> &tasks = plan_fragment_ptr->GetContext()->Tasks();
u64 fragment_id = plan_fragment_ptr->FragmentID();
{
String fragment_header = fmt::format("Fragment #{} * {} Tasks", fragment_id, tasks.size());
result.emplace_back(MakeShared<String>(fragment_header));
}
// NOTE: Insert blank elements after each Fragment for alignment
result.emplace_back(MakeShared<String>());

if (plan_fragment_ptr->HasChild()) {
// current fragment have children
for (const auto &child_fragment : plan_fragment_ptr->Children()) {
ExplainPipeline(result, child_fragment.get(), query_profiler);
}
}
}

} // namespace infinity
12 changes: 11 additions & 1 deletion src/executor/operator/physical_explain.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import internal_types;
import explain_statement;
import data_type;
import logger;
import plan_fragment;
import profiler;

namespace infinity {

Expand All @@ -43,14 +45,16 @@ public:

~PhysicalExplain() override = default;

void Init(QueryContext* query_context) override;
void Init(QueryContext *query_context) override;

bool Execute(QueryContext *query_context, OperatorState *operator_state) final;

void SetExplainText(SharedPtr<Vector<SharedPtr<String>>> text) { texts_ = std::move(text); }

void SetExplainTaskText(SharedPtr<Vector<SharedPtr<String>>> text) { task_texts_ = std::move(text); }

void SetPlanFragment(PlanFragment *plan_fragment_ptr);

inline SharedPtr<Vector<String>> GetOutputNames() const final { return output_names_; }

inline SharedPtr<Vector<SharedPtr<DataType>>> GetOutputTypes() const final { return output_types_; }
Expand All @@ -59,13 +63,19 @@ public:

static void AlignParagraphs(Vector<SharedPtr<String>> &array1, Vector<SharedPtr<String>> &array2);

private:
void ExplainAnalyze(Vector<SharedPtr<String>> &result, PlanFragment *plan_fragment_ptr, QueryProfiler *query_profiler);
void ExplainPipeline(Vector<SharedPtr<String>> &result, PlanFragment *plan_fragment_ptr, QueryProfiler *query_profiler);

private:
ExplainType explain_type_{ExplainType::kPhysical};
SharedPtr<Vector<SharedPtr<String>>> texts_{nullptr};
SharedPtr<Vector<SharedPtr<String>>> task_texts_{nullptr};

SharedPtr<Vector<String>> output_names_{};
SharedPtr<Vector<SharedPtr<DataType>>> output_types_{};

PlanFragment *plan_fragment_ptr_;
};

} // namespace infinity
6 changes: 1 addition & 5 deletions src/executor/physical_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1260,11 +1260,6 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildExplain(const SharedPtr<Logica

UniquePtr<PhysicalExplain> explain_node{nullptr};
switch (logical_explain->explain_type()) {
case ExplainType::kAnalyze: {
Status status = Status::NotSupport("Not implement: Explain analyze");
RecoverableError(status);
break;
}
case ExplainType::kAst:
case ExplainType::kUnOpt:
case ExplainType::kOpt: {
Expand All @@ -1285,6 +1280,7 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildExplain(const SharedPtr<Logica
logical_operator->load_metas());
break;
}
case ExplainType::kAnalyze:
case ExplainType::kFragment:
case ExplainType::kPipeline: {
explain_node = MakeUnique<PhysicalExplain>(logical_explain->node_id(),
Expand Down
12 changes: 6 additions & 6 deletions src/main/infinity_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,16 +506,16 @@ void InfinityContext::UnInit() {

void InfinityContext::SetIndexThreadPool() {
LOG_TRACE("Set index thread pool.");
inverting_thread_pool_.resize(config_->DenseIndexBuildingWorker());
commiting_thread_pool_.resize(config_->SparseIndexBuildingWorker());
hnsw_build_thread_pool_.resize(config_->FulltextIndexBuildingWorker());
inverting_thread_pool_.resize(config_->FulltextIndexBuildingWorker());
commiting_thread_pool_.resize(config_->FulltextIndexBuildingWorker());
hnsw_build_thread_pool_.resize(config_->DenseIndexBuildingWorker());
}

void InfinityContext::RestoreIndexThreadPoolToDefault() {
LOG_TRACE("Restore index thread pool size to default.");
inverting_thread_pool_.resize(config_->DenseIndexBuildingWorker());
commiting_thread_pool_.resize(config_->SparseIndexBuildingWorker());
hnsw_build_thread_pool_.resize(config_->FulltextIndexBuildingWorker());
inverting_thread_pool_.resize(config_->FulltextIndexBuildingWorker());
commiting_thread_pool_.resize(config_->FulltextIndexBuildingWorker());
hnsw_build_thread_pool_.resize(config_->DenseIndexBuildingWorker());
}

void InfinityContext::AddThriftServerFn(std::function<void()> start_func, std::function<void()> stop_func) {
Expand Down
16 changes: 12 additions & 4 deletions src/main/profiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,10 @@ void TaskProfiler::StopOperator(const OperatorState *operator_state) {
output_rows += output_data_block->Finalized() ? output_data_block->row_count() : 0;
}

i64 elapsed_time = profiler_.Elapsed();

OperatorInformation
info(active_operator_->GetName(), profiler_.GetBegin(), profiler_.GetEnd(), profiler_.Elapsed(), input_rows, output_data_size, output_rows);
info(active_operator_->GetName(), profiler_.GetBegin(), profiler_.GetEnd(), elapsed_time, input_rows, output_data_size, output_rows);

timings_.push_back(std::move(info));
active_operator_ = nullptr;
Expand Down Expand Up @@ -248,6 +250,10 @@ void QueryProfiler::StopPhase(QueryPhase phase) {
}

void QueryProfiler::Stop() {
if (!enable_) {
return;
}

if (current_phase_ == QueryPhase::kInvalid) {
return;
}
Expand All @@ -256,9 +262,9 @@ void QueryProfiler::Stop() {
}

void QueryProfiler::Flush(TaskProfiler &&profiler) {
if (!enable_) {
return;
}
// if (!enable_) {
// return;
// }

std::unique_lock<std::mutex> lk(flush_lock_);
records_[profiler.binding_.fragment_id_][profiler.binding_.task_id_].push_back(profiler);
Expand Down Expand Up @@ -374,4 +380,6 @@ nlohmann::json QueryProfiler::Serialize(const QueryProfiler *profiler) {
return json;
}

Vector<TaskProfiler> &QueryProfiler::GetTaskProfile(u64 fragment_id, i64 task_id) { return records_[fragment_id][task_id]; }

} // namespace infinity
7 changes: 4 additions & 3 deletions src/main/profiler.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ public:

// Return the elapsed time from begin, if the profiler is ended, it will return total elapsed time.
[[nodiscard]] inline i64 Elapsed() const {
if (name_.empty()) {
return 0;
}
return ElapsedInternal().count();
}

Expand Down Expand Up @@ -166,6 +163,8 @@ public:
}
}

bool Enable() const { return enable_; }

void StartOperator(const PhysicalOperator *op);

void StopOperator(const OperatorState *output_state);
Expand Down Expand Up @@ -205,6 +204,8 @@ public:

static nlohmann::json Serialize(const QueryProfiler *profiler);

Vector<TaskProfiler> &GetTaskProfile(u64 fragment_id, i64 task_id);

private:
bool enable_{};

Expand Down
Loading