Skip to content

Commit beb166a

Browse files
authored
Add show buffer command (#1299)
### What problem does this PR solve? Support command: SHOW BUFFER; It will list all the buffer object managed by buffer manager. Object path, type, size, status and buffered type will be showed. ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Signed-off-by: Jin Hai <haijin.chn@gmail.com>
1 parent 0f5e6b6 commit beb166a

26 files changed

+1410
-1087
lines changed

src/CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ add_library(infinity_core
178178
${expression_cpp}
179179
${executor_cpp}
180180
${common_cpp}
181-
)
181+
storage/buffer/file_worker/file_worker_type.cppm)
182182

183183
target_sources(infinity_core
184184
PUBLIC

src/executor/explain_physical_plan.cpp

+14
Original file line numberDiff line numberDiff line change
@@ -1242,6 +1242,20 @@ void ExplainPhysicalPlan::Explain(const PhysicalShow *show_node, SharedPtr<Vecto
12421242
result->emplace_back(MakeShared<String>(output_columns_str));
12431243
break;
12441244
}
1245+
case ShowType::kShowBuffer: {
1246+
String show_str;
1247+
if (intent_size != 0) {
1248+
show_str = String(intent_size - 2, ' ') + "-> SHOW BUFFER ";
1249+
} else {
1250+
show_str = "SHOW BUFFER ";
1251+
}
1252+
show_str += "(" + std::to_string(show_node->node_id()) + ")";
1253+
result->emplace_back(MakeShared<String>(show_str));
1254+
1255+
String output_columns_str = String(intent_size, ' ') + " - output columns: [path, status, size, buffered_type, type]";
1256+
result->emplace_back(MakeShared<String>(output_columns_str));
1257+
break;
1258+
}
12451259
case ShowType::kShowViews: {
12461260
String show_str;
12471261
if (intent_size != 0) {

src/executor/operator/physical_show.cpp

+105-1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ import chunk_index_entry;
6868
import background_process;
6969
import compaction_process;
7070
import bg_task;
71+
import buffer_obj;
72+
import file_worker_type;
7173

7274
namespace infinity {
7375

@@ -377,6 +379,21 @@ void PhysicalShow::Init() {
377379
output_types_->emplace_back(varchar_type);
378380
break;
379381
}
382+
case ShowType::kShowBuffer: {
383+
output_names_->reserve(5);
384+
output_types_->reserve(5);
385+
output_names_->emplace_back("path");
386+
output_names_->emplace_back("status");
387+
output_names_->emplace_back("size");
388+
output_names_->emplace_back("buffered_type");
389+
output_names_->emplace_back("type");
390+
output_types_->emplace_back(varchar_type);
391+
output_types_->emplace_back(varchar_type);
392+
output_types_->emplace_back(bigint_type);
393+
output_types_->emplace_back(varchar_type);
394+
output_types_->emplace_back(varchar_type);
395+
break;
396+
}
380397
default: {
381398
Status status = Status::NotSupport("Not implemented show type");
382399
LOG_ERROR(status.message());
@@ -478,6 +495,10 @@ bool PhysicalShow::Execute(QueryContext *query_context, OperatorState *operator_
478495
ExecuteShowConfig(query_context, show_operator_state);
479496
break;
480497
}
498+
case ShowType::kShowBuffer: {
499+
ExecuteShowBuffer(query_context, show_operator_state);
500+
break;
501+
}
481502
default: {
482503
String error_message = "Invalid chunk scan type";
483504
LOG_CRITICAL(error_message);
@@ -2789,7 +2810,7 @@ void PhysicalShow::ExecuteShowIndexes(QueryContext *query_context, ShowOperatorS
27892810
{
27902811
auto map_guard = table_entry->IndexMetaMap();
27912812
for (const auto &[index_name, index_meta] : *map_guard) {
2792-
if (!output_block_ptr) {
2813+
if (output_block_ptr.get() == nullptr) {
27932814
output_block_ptr = DataBlock::MakeUniquePtr();
27942815
output_block_ptr->Init(column_types);
27952816
}
@@ -4080,4 +4101,87 @@ void PhysicalShow::ExecuteShowConfig(QueryContext *query_context, ShowOperatorSt
40804101
operator_state->output_.emplace_back(std::move(output_block_ptr));
40814102
}
40824103

4104+
void PhysicalShow::ExecuteShowBuffer(QueryContext *query_context, ShowOperatorState *operator_state) {
4105+
auto varchar_type = MakeShared<DataType>(LogicalType::kVarchar);
4106+
auto bigint_type = MakeShared<DataType>(LogicalType::kBigInt);
4107+
4108+
Vector<SharedPtr<ColumnDef>> column_defs = {
4109+
MakeShared<ColumnDef>(0, varchar_type, "path", std::set<ConstraintType>()),
4110+
MakeShared<ColumnDef>(1, varchar_type, "status", std::set<ConstraintType>()),
4111+
MakeShared<ColumnDef>(2, bigint_type, "size", std::set<ConstraintType>()),
4112+
MakeShared<ColumnDef>(3, varchar_type, "buffered_type", std::set<ConstraintType>()),
4113+
MakeShared<ColumnDef>(4, varchar_type, "type", std::set<ConstraintType>()),
4114+
};
4115+
4116+
SharedPtr<TableDef> table_def = TableDef::Make(MakeShared<String>("default_db"), MakeShared<String>("show_buffer"), column_defs);
4117+
4118+
// create data block for output state
4119+
Vector<SharedPtr<DataType>> column_types{
4120+
varchar_type,
4121+
varchar_type,
4122+
bigint_type,
4123+
varchar_type,
4124+
varchar_type,
4125+
};
4126+
4127+
UniquePtr<DataBlock> output_block_ptr = DataBlock::MakeUniquePtr();
4128+
output_block_ptr->Init(column_types);
4129+
SizeT row_count = 0;
4130+
4131+
BufferManager *buffer_manager = query_context->storage()->buffer_manager();
4132+
Vector<BufferObjectInfo> buffer_object_info_array = buffer_manager->GetBufferObjectsInfo();
4133+
for(const auto& buffer_object_info: buffer_object_info_array) {
4134+
4135+
if (output_block_ptr.get() == nullptr) {
4136+
output_block_ptr = DataBlock::MakeUniquePtr();
4137+
output_block_ptr->Init(column_types);
4138+
}
4139+
4140+
{
4141+
// path
4142+
Value value = Value::MakeVarchar(buffer_object_info.object_path_);
4143+
ValueExpression value_expr(value);
4144+
value_expr.AppendToChunk(output_block_ptr->column_vectors[0]);
4145+
}
4146+
{
4147+
// status
4148+
Value value = Value::MakeVarchar(BufferStatusToString(buffer_object_info.buffered_status_));
4149+
ValueExpression value_expr(value);
4150+
value_expr.AppendToChunk(output_block_ptr->column_vectors[1]);
4151+
}
4152+
{
4153+
// size
4154+
i64 buffer_object_size = static_cast<i64>(buffer_object_info.object_size_);
4155+
Value value = Value::MakeBigInt(buffer_object_size);
4156+
ValueExpression value_expr(value);
4157+
value_expr.AppendToChunk(output_block_ptr->column_vectors[2]);
4158+
}
4159+
{
4160+
// buffered type
4161+
Value value = Value::MakeVarchar(BufferTypeToString(buffer_object_info.buffered_type_));
4162+
ValueExpression value_expr(value);
4163+
value_expr.AppendToChunk(output_block_ptr->column_vectors[3]);
4164+
}
4165+
{
4166+
// type
4167+
Value value = Value::MakeVarchar(FileWorkerType2Str(buffer_object_info.file_type_));
4168+
ValueExpression value_expr(value);
4169+
value_expr.AppendToChunk(output_block_ptr->column_vectors[4]);
4170+
}
4171+
4172+
++ row_count;
4173+
if (row_count == output_block_ptr->capacity()) {
4174+
output_block_ptr->Finalize();
4175+
operator_state->output_.emplace_back(std::move(output_block_ptr));
4176+
output_block_ptr = nullptr;
4177+
row_count = 0;
4178+
}
4179+
4180+
}
4181+
4182+
output_block_ptr->Finalize();
4183+
operator_state->output_.emplace_back(std::move(output_block_ptr));
4184+
return ;
4185+
}
4186+
40834187
} // namespace infinity

src/executor/operator/physical_show.cppm

+2
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ private:
124124

125125
void ExecuteShowConfig(QueryContext *query_context, ShowOperatorState *operator_state);
126126

127+
void ExecuteShowBuffer(QueryContext *query_context, ShowOperatorState *operator_state);
128+
127129
private:
128130
ShowType scan_type_{ShowType::kInvalid};
129131
String db_name_{};

0 commit comments

Comments
 (0)