Skip to content

Commit

Permalink
Add columns.
Browse files Browse the repository at this point in the history
  • Loading branch information
small-turtle-1 committed Mar 10, 2025
1 parent 9844da1 commit c8cb7bb
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 74 deletions.
58 changes: 5 additions & 53 deletions src/storage/new_txn/new_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,6 @@ Status NewTxn::AddColumns(const String &db_name, const String &table_name, const
}
}

this->AddColumnsData(*table_meta, column_defs);

// Generate add column cmd
SharedPtr<WalCmdAddColumns> wal_command = MakeShared<WalCmdAddColumns>(db_name, table_name, column_defs);
wal_command->table_key_ = table_key;
Expand Down Expand Up @@ -397,57 +395,6 @@ Status NewTxn::DropColumns(const String &db_name, const String &table_name, cons
return Status::OK();
}

// Status NewTxn::AddColumns(TableEntry *table_entry, const Vector<SharedPtr<ColumnDef>> &column_defs) {
// TxnTimeStamp begin_ts = this->BeginTS();

// auto [db_entry, db_status] = catalog_->GetDatabase(*table_entry->GetDBName(), txn_context_ptr_->txn_id_, begin_ts);
// if (!db_status.ok()) {
// return db_status;
// }
// UniquePtr<TableEntry> new_table_entry = table_entry->Clone();
// new_table_entry->InitCompactionAlg(begin_ts);

// // const String &table_name = *table_entry->GetTableName();
// // NewTxnTableStore *txn_table_store = txn_store_.GetNewTxnTableStore(table_name);
// // TODO: adapt nullptr
// new_table_entry->AddColumns(column_defs, nullptr);
// auto add_status = db_entry->AddTable(std::move(new_table_entry), txn_context_ptr_->txn_id_, begin_ts, nullptr, true /*add_if_found*/);
// if (!add_status.ok()) {
// return add_status;
// }

// SharedPtr<WalCmd> wal_command = MakeShared<WalCmdAddColumns>(*table_entry->GetDBName(), *table_entry->GetTableName(), column_defs);
// wal_entry_->cmds_.push_back(wal_command);
// txn_context_ptr_->AddOperation(MakeShared<String>(wal_command->ToString()));

// return Status::OK();
// }

// Status NewTxn::DropColumns(TableEntry *table_entry, const Vector<String> &column_names) {
// TxnTimeStamp begin_ts = this->BeginTS();

// auto [db_entry, db_status] = catalog_->GetDatabase(*table_entry->GetDBName(), txn_context_ptr_->txn_id_, begin_ts);
// if (!db_status.ok()) {
// return db_status;
// }
// UniquePtr<TableEntry> new_table_entry = table_entry->Clone();
// new_table_entry->InitCompactionAlg(begin_ts);
// // const String &table_name = *table_entry->GetTableName();
// // NewTxnTableStore *txn_table_store = txn_store_.GetNewTxnTableStore(table_name);
// // TODO: adapt nullptr
// new_table_entry->DropColumns(column_names, nullptr);
// auto drop_status = db_entry->AddTable(std::move(new_table_entry), txn_context_ptr_->txn_id_, begin_ts, nullptr, true /*add_if_found*/);
// if (!drop_status.ok()) {
// return drop_status;
// }

// SharedPtr<WalCmd> wal_command = MakeShared<WalCmdDropColumns>(*table_entry->GetDBName(), *table_entry->GetTableName(), column_names);
// wal_entry_->cmds_.push_back(wal_command);
// txn_context_ptr_->AddOperation(MakeShared<String>(wal_command->ToString()));

// return Status::OK();
// }

Status NewTxn::DropTable(const String &db_name, const String &table_name, ConflictType conflict_type) {

if (conflict_type == ConflictType::kReplace) {
Expand Down Expand Up @@ -1193,6 +1140,11 @@ Status NewTxn::CommitAddColumns(const WalCmdAddColumns *add_columns_cmd) {
return status;
}
}

status = this->AddColumnsData(*table_meta, add_columns_cmd->column_defs_);
if (!status.ok()) {
return status;
}
return Status::OK();
}

Expand Down
52 changes: 31 additions & 21 deletions src/storage/new_txn/new_txn_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ import table_index_meeta;
import meta_key;
import db_meeta;

import base_expression;
import cast_expression;
import value_expression;
import expression_binder;
import cast_function;
import bound_cast_func;
import constant_expr;
import expression_state;
import expression_evaluator;

namespace infinity {

struct NewTxnCompactState {
Expand Down Expand Up @@ -646,32 +656,32 @@ Status NewTxn::AddColumnsData(TableMeeta &table_meta, const Vector<SharedPtr<Col
}

Vector<Value> default_values;
ExpressionBinder tmp_binder(nullptr);
for (const auto &column_def : column_defs) {
if (!column_def->default_value()) {
return Status::NotSupport(fmt::format("Column {} has no default value", column_def->name()));
}
SharedPtr<ConstantExpr> default_expr = column_def->default_value();
auto expr = tmp_binder.BuildValueExpr(*default_expr, nullptr, 0, false);
auto *value_expr = static_cast<ValueExpression *>(expr.get());

// SharedPtr<ConstantExpr> default_expr = column_def->default_value();
// auto expr = tmp_binder.BuildValueExpr(*default_expr, nullptr, 0, false);
// auto *value_expr = static_cast<ValueExpression *>(expr.get());

// const SharedPtr<DataType> &column_type = column_def->type();
// if (value_expr->Type() == *column_type) {
// default_values.push_back(value_expr->GetValue());
// } else {
// const SharedPtr<DataType> &column_type = column_def->type();
const SharedPtr<DataType> &column_type = column_def->type();
if (value_expr->Type() == *column_type) {
default_values.push_back(value_expr->GetValue());
} else {
const SharedPtr<DataType> &column_type = column_def->type();

// BoundCastFunc cast = CastFunction::GetBoundFunc(value_expr->Type(), *column_type);
// SharedPtr<BaseExpression> cast_expr = MakeShared<CastExpression>(cast, expr, *column_type);
// SharedPtr<ExpressionState> expr_state = ExpressionState::CreateState(cast_expr);
// SharedPtr<ColumnVector> output_column_vector = ColumnVector::Make(column_type);
// output_column_vector->Initialize(ColumnVectorType::kConstant, 1);
// ExpressionEvaluator evaluator;
// evaluator.Init(nullptr);
// evaluator.Execute(cast_expr, expr_state, output_column_vector);
BoundCastFunc cast = CastFunction::GetBoundFunc(value_expr->Type(), *column_type);
SharedPtr<BaseExpression> cast_expr = MakeShared<CastExpression>(cast, expr, *column_type);
SharedPtr<ExpressionState> expr_state = ExpressionState::CreateState(cast_expr);
SharedPtr<ColumnVector> output_column_vector = ColumnVector::Make(column_type);
output_column_vector->Initialize(ColumnVectorType::kConstant, 1);
ExpressionEvaluator evaluator;
evaluator.Init(nullptr);
evaluator.Execute(cast_expr, expr_state, output_column_vector);

// default_values.push_back(output_column_vector->GetValue(0));
// }
default_values.push_back(output_column_vector->GetValue(0));
}
}

for (SegmentID segment_id : *segment_ids_ptr) {
Expand Down Expand Up @@ -725,13 +735,13 @@ Status NewTxn::AddColumnsDataInBlock(BlockMeta &block_meta, const Vector<SharedP
}

ColumnVector column_vector;
status = NewCatalog::GetColumnVector(*column_meta, 0/*row_count*/, ColumnVectorTipe::kReadWrite, column_vector);
status = NewCatalog::GetColumnVector(*column_meta, 0 /*row_count*/, ColumnVectorTipe::kReadWrite, column_vector);
if (!status.ok()) {
return status;
}

for (SizeT i = 0; i < block_row_count; ++i) {
column_vector.SetValue(i, default_value);
column_vector.AppendValue(default_value);
}
}

Expand Down
103 changes: 103 additions & 0 deletions src/unit_test/storage/new_catalog/new_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import mem_index;
import roaring_bitmap;
import index_filter_evaluators;
import index_emvb;
import constant_expr;

using namespace infinity;

Expand Down Expand Up @@ -8082,3 +8083,105 @@ TEST_P(NewCatalogTest, test_cleanup) {
}
new_txn_mgr->PrintAllKeyValue();
}

TEST_P(NewCatalogTest, test_add_columns) {
using namespace infinity;

NewTxnManager *new_txn_mgr = infinity::InfinityContext::instance().storage()->new_txn_manager();

SharedPtr<String> db_name = std::make_shared<String>("db1");
auto column_def1 = std::make_shared<ColumnDef>(0, std::make_shared<DataType>(LogicalType::kInteger), "col1", std::set<ConstraintType>());
auto default_value2 = std::make_shared<ConstantExpr>(LiteralType::kString);
{
const char *str = "abcdefghijklmnopqrstuvwxyz";
size_t str_len = strlen(str);
default_value2->str_value_ = (char *)malloc(str_len + 1);
strncpy(default_value2->str_value_, str, str_len + 1);
}
auto column_def2 =
std::make_shared<ColumnDef>(1, std::make_shared<DataType>(LogicalType::kVarchar), "col2", std::set<ConstraintType>(), "", default_value2);
auto table_name = std::make_shared<std::string>("tb1");
auto table_def = TableDef::Make(db_name, table_name, MakeShared<String>(), {column_def1});

{
auto *txn = new_txn_mgr->BeginTxn(MakeUnique<String>("create db"), TransactionType::kNormal);
Status status = txn->CreateDatabase(*db_name, ConflictType::kError, MakeShared<String>());
EXPECT_TRUE(status.ok());
status = new_txn_mgr->CommitTxn(txn);
EXPECT_TRUE(status.ok());
}
{
auto *txn = new_txn_mgr->BeginTxn(MakeUnique<String>("create table"), TransactionType::kNormal);
Status status = txn->CreateTable(*db_name, std::move(table_def), ConflictType::kIgnore);
EXPECT_TRUE(status.ok());
status = new_txn_mgr->CommitTxn(txn);
EXPECT_TRUE(status.ok());
}

u32 block_row_cnt = 8192;
auto make_input_block = [&](Value v1) {
auto input_block = MakeShared<DataBlock>();
auto append_to_col = [&](ColumnVector &col, Value v) {
for (u32 i = 0; i < block_row_cnt; ++i) {
col.AppendValue(std::move(v));
}
};
// Initialize input block
{
auto col1 = ColumnVector::Make(column_def1->type());
col1->Initialize();
append_to_col(*col1, v1);
input_block->InsertVector(col1, 0);
}
input_block->Finalize();
return input_block;
};
{
auto *txn = new_txn_mgr->BeginTxn(MakeUnique<String>("append"), TransactionType::kNormal);

auto input_block = make_input_block(Value::MakeInt(1));
Status status = txn->Append(*db_name, *table_name, input_block);
EXPECT_TRUE(status.ok());
status = new_txn_mgr->CommitTxn(txn);
EXPECT_TRUE(status.ok());
}
{
auto *txn = new_txn_mgr->BeginTxn(MakeUnique<String>("add column"), TransactionType::kNormal);

Vector<SharedPtr<ColumnDef>> columns;
columns.emplace_back(column_def2);
Status status = txn->AddColumns(*db_name, *table_name, columns);
EXPECT_TRUE(status.ok());
status = new_txn_mgr->CommitTxn(txn);
EXPECT_TRUE(status.ok());
}
{
auto *txn = new_txn_mgr->BeginTxn(MakeUnique<String>("scan"), TransactionType::kNormal);

Optional<DBMeeta> db_meta;
Optional<TableMeeta> table_meta;
Status status = txn->GetTableMeta(*db_name, *table_name, db_meta, table_meta);
EXPECT_TRUE(status.ok());

SegmentID segment_id = 0;
BlockID block_id = 0;

SegmentMeta segment_meta(segment_id, *table_meta, table_meta->kv_instance());
BlockMeta block_meta(block_id, segment_meta, segment_meta.kv_instance());

SizeT row_count = 0;
status = block_meta.GetRowCnt(row_count);
EXPECT_TRUE(status.ok());
EXPECT_EQ(row_count, block_row_cnt);

ColumnMeta column_meta(1, block_meta, block_meta.kv_instance());

ColumnVector col;
status = NewCatalog::GetColumnVector(column_meta, row_count, ColumnVectorTipe::kReadOnly, col);
EXPECT_TRUE(status.ok());

for (u32 i = 0; i < row_count; ++i) {
EXPECT_EQ(col.GetValue(i), Value::MakeVarchar("abcdefghijklmnopqrstuvwxyz"));
}
}
}

0 comments on commit c8cb7bb

Please sign in to comment.