Skip to content

Commit

Permalink
Fix explain analyze 1
Browse files Browse the repository at this point in the history
Signed-off-by: Jin Hai <haijin.chn@gmail.com>
  • Loading branch information
JinHai-CN committed Mar 6, 2025
1 parent a8082ca commit c163b94
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 54 deletions.
36 changes: 15 additions & 21 deletions src/executor/fragment_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,19 @@ import explain_statement;

namespace infinity {

SharedPtr<PlanFragment> FragmentBuilder::BuildFragment(const Vector<PhysicalOperator *> &phys_ops) {
SharedPtr<PlanFragment> FragmentBuilder::BuildFragment(PhysicalOperator *phys_op) {
SharedPtr<PlanFragment> result = nullptr;
for (auto *phys_op : phys_ops) {
auto plan_fragment = MakeUnique<PlanFragment>(GetFragmentId());
plan_fragment->SetSinkNode(query_context_ptr_, SinkType::kResult, phys_op->GetOutputNames(), phys_op->GetOutputTypes());
BuildFragments(phys_op, plan_fragment.get());
if (plan_fragment->GetSourceNode() == nullptr) {
plan_fragment->SetSourceNode(query_context_ptr_, SourceType::kEmpty, phys_op->GetOutputNames(), phys_op->GetOutputTypes());
}
if (result.get() == nullptr) {
result = std::move(plan_fragment);
} else {
PlanFragment::AddNext(result, plan_fragment.get());
result = std::move(plan_fragment);
}
auto plan_fragment = MakeUnique<PlanFragment>(GetFragmentId());
plan_fragment->SetSinkNode(query_context_ptr_, SinkType::kResult, phys_op->GetOutputNames(), phys_op->GetOutputTypes());
BuildFragments(phys_op, plan_fragment.get());
if (plan_fragment->GetSourceNode() == nullptr) {
plan_fragment->SetSourceNode(query_context_ptr_, SourceType::kEmpty, phys_op->GetOutputNames(), phys_op->GetOutputTypes());
}
if (result.get() == nullptr) {
result = std::move(plan_fragment);
} else {
PlanFragment::AddNext(result, plan_fragment.get());
result = std::move(plan_fragment);
}
return result;
}
Expand All @@ -62,23 +60,19 @@ void FragmentBuilder::BuildExplain(PhysicalOperator *phys_op, PlanFragment *curr
PhysicalExplain *explain_op = (PhysicalExplain *)phys_op;
switch (explain_op->explain_type()) {

case ExplainType::kAnalyze: {
Status status = Status::NotSupport("Not implement: Query analyze");
RecoverableError(status);
}
case ExplainType::kAst:
case ExplainType::kUnOpt:
case ExplainType::kOpt:
case ExplainType::kPhysical: {
current_fragment_ptr->AddOperator(phys_op);
break;
}
case ExplainType::kAnalyze:
case ExplainType::kFragment:
case ExplainType::kPipeline: {
// Build explain pipeline fragment
SharedPtr<Vector<SharedPtr<String>>> texts_ptr = MakeShared<Vector<SharedPtr<String>>>();
Vector<PhysicalOperator *> phys_ops{phys_op->left()};
auto explain_child_fragment = this->BuildFragment(phys_ops);
auto explain_child_fragment = this->BuildFragment(phys_op->left());

// Generate explain context of the child fragment
ExplainFragment::Explain(explain_child_fragment.get(), texts_ptr);
Expand All @@ -88,7 +82,7 @@ void FragmentBuilder::BuildExplain(PhysicalOperator *phys_op, PlanFragment *curr
// Set texts to explain physical operator
current_fragment_ptr->AddOperator(phys_op);

if (explain_op->explain_type() == ExplainType::kPipeline) {
if (explain_op->explain_type() == ExplainType::kPipeline or explain_op->explain_type() == ExplainType::kAnalyze) {
current_fragment_ptr->AddChild(std::move(explain_child_fragment));
}
break;
Expand Down
2 changes: 1 addition & 1 deletion src/executor/fragment_builder.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public:
#endif
}

SharedPtr<PlanFragment> BuildFragment(const Vector<PhysicalOperator *> &physical_plans);
SharedPtr<PlanFragment> BuildFragment(PhysicalOperator *physical_plan);

private:
void BuildFragments(PhysicalOperator *phys_op, PlanFragment *current_fragment_ptr);
Expand Down
18 changes: 9 additions & 9 deletions src/executor/operator/physical_explain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@ void PhysicalExplain::AlignParagraphs(Vector<SharedPtr<String>> &array1, Vector<
}
}

void PhysicalExplain::Init(QueryContext* query_context) {
void PhysicalExplain::Init(QueryContext *query_context) {
auto varchar_type = MakeShared<DataType>(LogicalType::kVarchar);

output_names_ = MakeShared<Vector<String>>();
output_types_ = MakeShared<Vector<SharedPtr<DataType>>>();

switch (explain_type_) {
case ExplainType::kAnalyze: {
output_names_->emplace_back("Query Analyze");
Status status = Status::NotSupport("Not implement: Query analyze");
RecoverableError(status);
output_names_->emplace_back("Pipeline");
output_names_->emplace_back("Task cost");
break;
}
case ExplainType::kAst: {
output_names_->emplace_back("Abstract Syntax Tree");
Expand Down Expand Up @@ -90,7 +90,7 @@ void PhysicalExplain::Init(QueryContext* query_context) {
}
output_types_->emplace_back(varchar_type);

if (explain_type_ == ExplainType::kPipeline) {
if (explain_type_ == ExplainType::kPipeline or explain_type_ == ExplainType::kAnalyze) {
output_types_->emplace_back(varchar_type);
}
}
Expand All @@ -105,8 +105,8 @@ bool PhysicalExplain::Execute(QueryContext *, OperatorState *operator_state) {

switch (explain_type_) {
case ExplainType::kAnalyze: {
Status status = Status::NotSupport("Not implement: Query analyze");
RecoverableError(status);
title = "Analyze";
break;
}
case ExplainType::kAst: {
title = "Abstract Syntax Tree";
Expand Down Expand Up @@ -143,7 +143,7 @@ bool PhysicalExplain::Execute(QueryContext *, OperatorState *operator_state) {
column_vector_ptr->Initialize(ColumnVectorType::kFlat, capacity);
task_vector_ptr->Initialize(ColumnVectorType::kFlat, capacity);

if (explain_type_ == ExplainType::kPipeline) {
if (explain_type_ == ExplainType::kPipeline or explain_type_ == ExplainType::kAnalyze) {
AlignParagraphs(*this->texts_, *this->task_texts_);
for (SizeT idx = 0; idx < this->texts_->size(); ++idx) {
column_vector_ptr->AppendValue(Value::MakeVarchar(*(*this->texts_)[idx]));
Expand All @@ -161,7 +161,7 @@ bool PhysicalExplain::Execute(QueryContext *, OperatorState *operator_state) {
column_vectors.reserve(2);

column_vectors.push_back(column_vector_ptr);
if (explain_type_ == ExplainType::kPipeline) {
if (explain_type_ == ExplainType::kPipeline or explain_type_ == ExplainType::kAnalyze) {
column_vectors.push_back(task_vector_ptr);
}
output_data_block->Init(column_vectors);
Expand Down
6 changes: 1 addition & 5 deletions src/executor/physical_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1260,11 +1260,6 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildExplain(const SharedPtr<Logica

UniquePtr<PhysicalExplain> explain_node{nullptr};
switch (logical_explain->explain_type()) {
case ExplainType::kAnalyze: {
Status status = Status::NotSupport("Not implement: Explain analyze");
RecoverableError(status);
break;
}
case ExplainType::kAst:
case ExplainType::kUnOpt:
case ExplainType::kOpt: {
Expand All @@ -1285,6 +1280,7 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildExplain(const SharedPtr<Logica
logical_operator->load_metas());
break;
}
case ExplainType::kAnalyze:
case ExplainType::kFragment:
case ExplainType::kPipeline: {
explain_node = MakeUnique<PhysicalExplain>(logical_explain->node_id(),
Expand Down
19 changes: 3 additions & 16 deletions src/main/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,7 @@ QueryResult QueryContext::QueryStatement(const BaseStatement *base_statement) {
StopProfile(QueryPhase::kPhysicalPlan);
// LOG_WARN(fmt::format("Before pipeline cost: {}", profiler.ElapsedToString()));
StartProfile(QueryPhase::kPipelineBuild);
// Fragment Builder, only for test now.
{
Vector<PhysicalOperator *> physical_plan_ptrs;
for (auto &physical_plan : physical_plans) {
physical_plan_ptrs.push_back(physical_plan.get());
}
plan_fragment = fragment_builder_->BuildFragment(physical_plan_ptrs);
}
plan_fragment = fragment_builder_->BuildFragment(physical_plans[0].get());
StopProfile(QueryPhase::kPipelineBuild);

StartProfile(QueryPhase::kTaskBuild);
Expand Down Expand Up @@ -370,13 +363,7 @@ bool QueryContext::ExecuteBGStatement(BaseStatement *base_statement, BGQueryStat
state.physical_plans.push_back(std::move(physical_plan));
}

{
Vector<PhysicalOperator *> physical_plan_ptrs;
for (auto &physical_plan : state.physical_plans) {
physical_plan_ptrs.push_back(physical_plan.get());
}
state.plan_fragment = fragment_builder_->BuildFragment(physical_plan_ptrs);
}
state.plan_fragment = fragment_builder_->BuildFragment(state.physical_plans[0].get());

state.notifier = MakeUnique<Notifier>();
FragmentContext::BuildTask(this, nullptr, state.plan_fragment.get(), state.notifier.get());
Expand Down Expand Up @@ -423,7 +410,7 @@ QueryResult QueryContext::HandleAdminStatement(const AdminStatement *admin_state
void QueryContext::BeginTxn(const BaseStatement *base_statement) {
if (session_ptr_->GetTxn() == nullptr) {
Txn *new_txn = nullptr;
if(base_statement == nullptr) {
if (base_statement == nullptr) {
new_txn = storage_->txn_manager()->BeginTxn(MakeUnique<String>(""), TransactionType::kNormal);
} else {
// TODO: more type check and setting
Expand Down
2 changes: 1 addition & 1 deletion src/scheduler/fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ void FragmentContext::BuildTask(QueryContext *query_context, FragmentContext *pa
Vector<SharedPtr<String>> result;
PhysicalExplain *explain_op = (PhysicalExplain *)fragment_operators[0];

if (explain_op->explain_type() == ExplainType::kPipeline) {
if (explain_op->explain_type() == ExplainType::kPipeline or explain_op->explain_type() == ExplainType::kAnalyze) {
CollectTasks(result, plan_fragment_ptr->Children()[0].get());
explain_op->SetExplainTaskText(MakeShared<Vector<SharedPtr<String>>>(result));
break;
Expand Down
2 changes: 1 addition & 1 deletion src/unit_test/test_helper/sql_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ SharedPtr<DataTable> SQLRunner::Run(const String &sql_text, bool print) {

// Create execution pipeline
// Fragment Builder, only for test now. plan fragment is same as pipeline.
auto plan_fragment = query_context_ptr->fragment_builder()->BuildFragment({physical_plan.get()});
auto plan_fragment = query_context_ptr->fragment_builder()->BuildFragment(physical_plan.get());

auto notifier = MakeUnique<Notifier>();

Expand Down

0 comments on commit c163b94

Please sign in to comment.