Skip to content

Commit

Permalink
Refactor set command (#1206)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

As title.

Issue link:#1180

### Type of change

- [x] Refactoring

---------

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
  • Loading branch information
JinHai-CN authored May 14, 2024
1 parent 8d0f8c8 commit c6060d2
Show file tree
Hide file tree
Showing 18 changed files with 1,455 additions and 1,293 deletions.
8 changes: 5 additions & 3 deletions src/common/status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ Status Status::InvalidLogLevel(const String &log_level) {
return Status(ErrorCode::kInvalidLogLevel, MakeUnique<String>(fmt::format("Invalid log level: {}.", log_level)));
}

Status Status::InvalidConfig(const String &detailed_info) {
return Status(ErrorCode::kInvalidConfig, MakeUnique<String>(detailed_info));
}
Status Status::InvalidConfig(const String &detailed_info) { return Status(ErrorCode::kInvalidConfig, MakeUnique<String>(detailed_info)); }

// 2. Auth error
Status Status::WrongPasswd(const String &user_name) {
Expand Down Expand Up @@ -365,6 +363,10 @@ Status Status::AggregateFunctionWithEmptyArgs() {
return Status(ErrorCode::kAggregateFunctionWithEmptyArgs, MakeUnique<String>("Aggregate function with empty arguments"));
}

Status Status::InvalidCommand(const String &detailed_error) {
return Status(ErrorCode::kInvalidCommand, MakeUnique<String>(fmt::format("Invalid command: {}", detailed_error)));
}

// 4. TXN fail
Status Status::TxnRollback(u64 txn_id) {
return Status(ErrorCode::kTxnRollback, MakeUnique<String>(fmt::format("Transaction: {} is rollback", txn_id)));
Expand Down
2 changes: 2 additions & 0 deletions src/common/status.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ export enum class ErrorCode : long {
kInvalidTopKType = 3073,
kInvalidCreateOption = 3074,
kInvalidDropOption = 3075,
kInvalidCommand = 3076,

// 4. Txn fail
kTxnRollback = 4001,
Expand Down Expand Up @@ -249,6 +250,7 @@ public:
static Status SegmentNotExist(const SegmentID &segment_id);
static Status BlockNotExist(const BlockID &block_id);
static Status AggregateFunctionWithEmptyArgs();
static Status InvalidCommand(const String& detailed_error);

// 4. TXN fail
static Status TxnRollback(u64 txn_id);
Expand Down
156 changes: 89 additions & 67 deletions src/executor/operator/physical_command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import config;
import status;
import infinity_exception;
import compact_segments_task;
import variables;

namespace infinity {

Expand All @@ -50,84 +51,105 @@ bool PhysicalCommand::Execute(QueryContext *query_context, OperatorState *operat
}
case CommandType::kSet: {
SetCmd *set_command = (SetCmd *)(command_info_.get());
if (set_command->var_name() == enable_profiling_name) {
if (set_command->value_type() != SetVarType::kBool) {
RecoverableError(Status::DataTypeMismatch("Boolean", set_command->value_type_str()));
switch(set_command->scope()) {
case SetScope::kSession: {
SessionVariable session_var = VarUtil::GetSessionVarByName(set_command->var_name());
switch(session_var) {
case SessionVariable::kEnableProfile: {
if (set_command->value_type() != SetVarType::kBool) {
RecoverableError(Status::DataTypeMismatch("Boolean", set_command->value_type_str()));
}
query_context->current_session()->SessionVariables()->enable_profile_ = set_command->value_bool();
return true;
}
case SessionVariable::kInvalid: {
RecoverableError(Status::InvalidCommand(fmt::format("Unknown session variable: {}", set_command->var_name())));
}
default: {
RecoverableError(Status::InvalidCommand(fmt::format("Session variable: {} is read-only", set_command->var_name())));
}
}
break;
}
query_context->current_session()->SessionVariables()->enable_profile_ = set_command->value_bool();
return true;
}

if (set_command->var_name() == profile_history_capacity_name) {
if (set_command->value_type() != SetVarType::kInteger) {
RecoverableError(Status::DataTypeMismatch("Integer", set_command->value_type_str()));
}
query_context->current_session()->SessionVariables()->profile_record_capacity_ = set_command->value_int();
return true;
}

if (set_command->var_name() == log_level) {
if (set_command->value_type() != SetVarType::kString) {
RecoverableError(Status::DataTypeMismatch("String", set_command->value_type_str()));
}

if (set_command->scope() != SetScope::kGlobal) {
RecoverableError(Status::SyntaxError(fmt::format("log_level is a global config parameter.", set_command->var_name())));
}

if (set_command->value_str() == "trace") {
SetLogLevel(LogLevel::kTrace);
return true;
}

if (set_command->value_str() == "debug") {
SetLogLevel(LogLevel::kDebug);
return true;
}

if (set_command->value_str() == "info") {
SetLogLevel(LogLevel::kInfo);
return true;
}

if (set_command->value_str() == "warning") {
SetLogLevel(LogLevel::kWarning);
return true;
}

if (set_command->value_str() == "error") {
SetLogLevel(LogLevel::kError);
return true;
}

if (set_command->value_str() == "critical") {
SetLogLevel(LogLevel::kCritical);
return true;
case SetScope::kGlobal: {
GlobalVariable global_var = VarUtil::GetGlobalVarByName(set_command->var_name());
switch(global_var) {
case GlobalVariable::kProfileRecordCapacity: {
if (set_command->value_type() != SetVarType::kInteger) {
RecoverableError(Status::DataTypeMismatch("Integer", set_command->value_type_str()));
}
query_context->storage()->catalog()->ResizeProfileHistory(set_command->value_int());
return true;
}
case GlobalVariable::kInvalid: {
RecoverableError(Status::InvalidCommand(fmt::format("Unknown global variable: {}", set_command->var_name())));
}
default: {
RecoverableError(Status::InvalidCommand(fmt::format("Global variable: {} is read-only", set_command->var_name())));
}
}
break;
}

RecoverableError(Status::SetInvalidVarValue("log level", "trace, debug, info, warning, error, critical"));
return true;
}

if (set_command->var_name() == worker_cpu_limit) {
if (set_command->value_type() != SetVarType::kInteger) {
RecoverableError(Status::DataTypeMismatch("Integer", set_command->value_type_str()));
case SetScope::kConfig: {
Config* config = query_context->global_config();
GlobalOptionIndex config_index = config->GetOptionIndex(set_command->var_name());
switch(config_index) {
case GlobalOptionIndex::kLogLevel: {
if (set_command->value_str() == "trace") {
SetLogLevel(LogLevel::kTrace);
return true;
}

if (set_command->value_str() == "debug") {
SetLogLevel(LogLevel::kDebug);
return true;
}

if (set_command->value_str() == "info") {
SetLogLevel(LogLevel::kInfo);
return true;
}

if (set_command->value_str() == "warning") {
SetLogLevel(LogLevel::kWarning);
return true;
}

if (set_command->value_str() == "error") {
SetLogLevel(LogLevel::kError);
return true;
}

if (set_command->value_str() == "critical") {
SetLogLevel(LogLevel::kCritical);
return true;
}

RecoverableError(Status::SetInvalidVarValue("log level", "trace, debug, info, warning, error, critical"));
break;
}
case GlobalOptionIndex::kInvalid: {
RecoverableError(Status::InvalidCommand(fmt::format("Unknown config: {}", set_command->var_name())));
break;
}
default: {
RecoverableError(Status::InvalidCommand(fmt::format("Config {} is read-only", set_command->var_name())));
break;
}
}
break;
}

if (set_command->scope() != SetScope::kGlobal) {
RecoverableError(Status::SyntaxError(fmt::format("cpu_count is a global config parameter.", set_command->var_name())));
default: {
RecoverableError(Status::InvalidCommand("Invalid set command scope, neither session nor global"));
}

RecoverableError(Status::ReadOnlySysVar(set_command->var_name()));
return true;
}

{ UnrecoverableError(fmt::format("Unknown command: {}", set_command->var_name())); }
break;
}
case CommandType::kExport: {
ExportCmd *export_command = (ExportCmd *)(command_info_.get());
auto profiler_record = query_context->current_session()->GetProfilerRecord(export_command->file_no());
auto profiler_record = query_context->current_session()->GetProfileRecord(export_command->file_no());
if (profiler_record == nullptr) {
RecoverableError(Status::DataNotExist(fmt::format("The record does not exist: {}", export_command->file_no())));
}
Expand Down
89 changes: 45 additions & 44 deletions src/executor/operator/physical_show.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,7 @@ void PhysicalShow::ExecuteShowProfiles(QueryContext *query_context, ShowOperator
SizeT row_count = 0;
output_block_ptr->Init(column_types);

auto records = catalog->GetProfilerRecords();
auto records = catalog->GetProfileRecords();
for (SizeT i = 0; i < records.size(); ++i) {
if (!output_block_ptr) {
output_block_ptr = DataBlock::MakeUniquePtr();
Expand Down Expand Up @@ -2677,25 +2677,6 @@ void PhysicalShow::ExecuteShowSessionVariable(QueryContext *query_context, ShowO
value_expr.AppendToChunk(output_block_ptr->column_vectors[0]);
break;
}
case SessionVariable::kProfileRecordCapacity: {
Vector<SharedPtr<ColumnDef>> output_column_defs = {
MakeShared<ColumnDef>(0, integer_type, "value", HashSet<ConstraintType>()),
};

SharedPtr<TableDef> table_def = TableDef::Make(MakeShared<String>("default_db"), MakeShared<String>("variables"), output_column_defs);
output_ = MakeShared<DataTable>(table_def, TableType::kResult);

Vector<SharedPtr<DataType>> output_column_types{
integer_type,
};

output_block_ptr->Init(output_column_types);

Value value = Value::MakeBigInt(session_ptr->SessionVariables()->profile_record_capacity_);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[0]);
break;
}
default: {
operator_state->status_ = Status::NoSysVar(object_name_);
RecoverableError(operator_state->status_);
Expand Down Expand Up @@ -2841,28 +2822,6 @@ void PhysicalShow::ExecuteShowSessionVariables(QueryContext *query_context, Show
}
break;
}
case SessionVariable::kProfileRecordCapacity: {
{
// option name
Value value = Value::MakeVarchar(var_name);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[0]);
}
{
// option value
u64 profile_record_capacity = session_ptr->SessionVariables()->profile_record_capacity_;
Value value = Value::MakeVarchar(std::to_string(profile_record_capacity));
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[1]);
}
{
// option description
Value value = Value::MakeVarchar("Profile record history capacity");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[2]);
}
break;
}
default: {
operator_state->status_ = Status::NoSysVar(var_name);
RecoverableError(operator_state->status_);
Expand Down Expand Up @@ -3143,6 +3102,26 @@ void PhysicalShow::ExecuteShowGlobalVariable(QueryContext *query_context, ShowOp
value_expr.AppendToChunk(output_block_ptr->column_vectors[0]);
break;
}
case GlobalVariable::kProfileRecordCapacity: {
Vector<SharedPtr<ColumnDef>> output_column_defs = {
MakeShared<ColumnDef>(0, integer_type, "value", HashSet<ConstraintType>()),
};

SharedPtr<TableDef> table_def = TableDef::Make(MakeShared<String>("default_db"), MakeShared<String>("variables"), output_column_defs);
output_ = MakeShared<DataTable>(table_def, TableType::kResult);

Vector<SharedPtr<DataType>> output_column_types{
integer_type,
};

output_block_ptr->Init(output_column_types);

Catalog *catalog_ptr = query_context->storage()->catalog();
Value value = Value::MakeBigInt(catalog_ptr->ProfileHistorySize());
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[0]);
break;
}
default: {
operator_state->status_ = Status::NoSysVar(object_name_);
RecoverableError(operator_state->status_);
Expand Down Expand Up @@ -3373,7 +3352,7 @@ void PhysicalShow::ExecuteShowGlobalVariables(QueryContext *query_context, ShowO
}
{
// option description
Value value = Value::MakeVarchar("Unused object in buffer manager waiting for garbage collection");
Value value = Value::MakeVarchar("Active transaction count");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[2]);
}
Expand All @@ -3395,7 +3374,7 @@ void PhysicalShow::ExecuteShowGlobalVariables(QueryContext *query_context, ShowO
}
{
// option description
Value value = Value::MakeVarchar("Unused object in buffer manager waiting for garbage collection");
Value value = Value::MakeVarchar("Current timestamp");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[2]);
}
Expand Down Expand Up @@ -3467,6 +3446,28 @@ void PhysicalShow::ExecuteShowGlobalVariables(QueryContext *query_context, ShowO
}
break;
}
case GlobalVariable::kProfileRecordCapacity: {
{
// option name
Value value = Value::MakeVarchar(var_name);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[0]);
}
{
// option value
Catalog *catalog_ptr = query_context->storage()->catalog();
Value value = Value::MakeVarchar(std::to_string(catalog_ptr->ProfileHistorySize()));
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[1]);
}
{
// option description
Value value = Value::MakeVarchar("Profile record history capacity");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[2]);
}
break;
}
default: {
operator_state->status_ = Status::NoSysVar(var_name);
RecoverableError(operator_state->status_);
Expand Down
3 changes: 2 additions & 1 deletion src/main/config.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import command_statement;
namespace infinity {

export constexpr std::string_view profile_history_capacity_name = "profile_history_capacity";
export constexpr std::string_view enable_profiling_name = "enable_profile";
export constexpr std::string_view enable_profile_name = "enable_profile";
export constexpr std::string_view worker_cpu_limit = "cpu_count";
export constexpr std::string_view log_level = "log_level";

Expand Down Expand Up @@ -95,6 +95,7 @@ public:
// Get config by name
Tuple<BaseOption *, Status> GetConfigByName(const String& name);

GlobalOptionIndex GetOptionIndex(const String& var_name) const { return global_options_.GetOptionIndex(var_name); }
private:
static void ParseTimeZoneStr(const String &time_zone_str, String &parsed_time_zone, i32 &parsed_time_zone_bias);

Expand Down
3 changes: 3 additions & 0 deletions src/main/infinity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ QueryResult Infinity::SetVariable(const String &variable_name, const String &var
// command_statement->command_info_ = MakeUnique<SetCmd>(infinity::SetScope::kGlobal, infinity::SetVarType::kBool, $3, false);
break;
}
case SetScope::kConfig: {
break;
}
default: {
UnrecoverableError("Invalid set scope.");
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Status GlobalOptions::AddOption(UniquePtr<BaseOption> option) {
return Status::OK();
}

GlobalOptionIndex GlobalOptions::GetOptionIndex(const String &option_name) {
GlobalOptionIndex GlobalOptions::GetOptionIndex(const String &option_name) const {
auto iter = name2index_.find(option_name);
if(iter == name2index_.end()) {
return GlobalOptionIndex::kInvalid;
Expand Down
Loading

0 comments on commit c6060d2

Please sign in to comment.