Skip to content

Commit

Permalink
Move append ut out of new catalog
Browse files Browse the repository at this point in the history
Signed-off-by: Jin Hai <haijin.chn@gmail.com>
  • Loading branch information
JinHai-CN committed Mar 10, 2025
1 parent ac3dfc6 commit 7b13e09
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 149 deletions.
225 changes: 225 additions & 0 deletions src/unit_test/storage/new_catalog/append.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
// Copyright(C) 2025 InfiniFlow, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "gtest/gtest.h"

import base_test;
import stl;
import third_party;
import status;
import new_catalog;
import new_txn_manager;
import infinity_context;
import txn_state;
import extra_ddl_info;
import infinity_exception;
import column_def;
import data_type;
import logical_type;
import table_def;
import index_base;
import index_secondary;
import index_ivf;
import index_full_text;
import index_hnsw;
import embedding_info;
import sparse_info;
import index_bmp;
import internal_types;
import defer_op;
import statement_common;
import meta_info;
import data_block;
import column_vector;
import value;
import data_access_state;
import kv_code;
import kv_store;
import new_txn;
import new_txn_store;
import buffer_obj;
import buffer_handle;
import secondary_index_in_mem;
import secondary_index_data;
import segment_meta;
import block_meta;
import column_meta;
import table_meeta;
import table_index_meeta;
import segment_index_meta;
import chunk_index_meta;
import db_meeta;
import catalog_meta;
import mem_index;
import roaring_bitmap;
import index_filter_evaluators;
import index_emvb;
import constant_expr;

using namespace infinity;

class TestAppend : public BaseTestParamStr {};

INSTANTIATE_TEST_SUITE_P(TestWithDifferentParams,
TestAppend,
::testing::Values(BaseTestParamStr::NULL_CONFIG_PATH, BaseTestParamStr::VFS_OFF_CONFIG_PATH));

TEST_P(TestAppend, test_append) {
using namespace infinity;

NewTxnManager *new_txn_mgr = infinity::InfinityContext::instance().storage()->new_txn_manager();

SharedPtr<String> db_name = std::make_shared<String>("db1");
auto column_def1 = std::make_shared<ColumnDef>(0, std::make_shared<DataType>(LogicalType::kInteger), "col1", std::set<ConstraintType>());
auto column_def2 = std::make_shared<ColumnDef>(1, std::make_shared<DataType>(LogicalType::kVarchar), "col2", std::set<ConstraintType>());
auto table_name = std::make_shared<std::string>("tb1");
auto table_def = TableDef::Make(db_name, table_name, MakeShared<String>(), {column_def1, column_def2});
{
auto *txn = new_txn_mgr->BeginTxn(MakeUnique<String>("create db"), TransactionType::kNormal);
Status status = txn->CreateDatabase(*db_name, ConflictType::kError, MakeShared<String>());
EXPECT_TRUE(status.ok());
status = new_txn_mgr->CommitTxn(txn);
EXPECT_TRUE(status.ok());
}
{
auto *txn = new_txn_mgr->BeginTxn(MakeUnique<String>("create table"), TransactionType::kNormal);
Status status = txn->CreateTable(*db_name, std::move(table_def), ConflictType::kIgnore);
EXPECT_TRUE(status.ok());
status = new_txn_mgr->CommitTxn(txn);
EXPECT_TRUE(status.ok());
}

auto input_block = MakeShared<DataBlock>();
{
// Initialize input block
{
auto col1 = ColumnVector::Make(column_def1->type());
col1->Initialize();
col1->AppendValue(Value::MakeInt(1));
col1->AppendValue(Value::MakeInt(2));
input_block->InsertVector(col1, 0);
}
{
auto col2 = ColumnVector::Make(column_def2->type());
col2->Initialize();
col2->AppendValue(Value::MakeVarchar("abc"));
col2->AppendValue(Value::MakeVarchar("abcdefghijklmnopqrstuvwxyz"));
input_block->InsertVector(col2, 1);
}
input_block->Finalize();
}
{
auto *txn = new_txn_mgr->BeginTxn(MakeUnique<String>("append"), TransactionType::kNormal);

Status status = txn->Append(*db_name, *table_name, input_block);
EXPECT_TRUE(status.ok());
status = new_txn_mgr->CommitTxn(txn);
EXPECT_TRUE(status.ok());
}
{
auto *txn = new_txn_mgr->BeginTxn(MakeUnique<String>("append again"), TransactionType::kNormal);

Status status = txn->Append(*db_name, *table_name, input_block);
EXPECT_TRUE(status.ok());
status = new_txn_mgr->CommitTxn(txn);
EXPECT_TRUE(status.ok());
}

{
auto *txn = new_txn_mgr->BeginTxn(MakeUnique<String>("concurrent append 1"), TransactionType::kNormal);
auto *txn2 = new_txn_mgr->BeginTxn(MakeUnique<String>("concurrent append 2"), TransactionType::kNormal);

Status status1 = txn->Append(*db_name, *table_name, input_block);
EXPECT_TRUE(status1.ok());
Status status2 = txn->Append(*db_name, *table_name, input_block);
EXPECT_TRUE(status2.ok());

status1 = new_txn_mgr->CommitTxn(txn);
EXPECT_TRUE(status1.ok());
status2 = new_txn_mgr->CommitTxn(txn2);
EXPECT_TRUE(status2.ok());
}
SizeT total_row_count = 8;

// Check the appended data.
{
auto *txn = new_txn_mgr->BeginTxn(MakeUnique<String>("scan"), TransactionType::kNormal);
TxnTimeStamp begin_ts = txn->BeginTS();

Optional<DBMeeta> db_meta;
Optional<TableMeeta> table_meta;
Status status = txn->GetTableMeta(*db_name, *table_name, db_meta, table_meta);
EXPECT_TRUE(status.ok());

auto [segment_ids, seg_status] = table_meta->GetSegmentIDs();
EXPECT_TRUE(seg_status.ok());
EXPECT_EQ(segment_ids->size(), 1);
SegmentID segment_id = segment_ids->at(0);
EXPECT_EQ(segment_id, 0);
SegmentMeta segment_meta(segment_id, *table_meta, *txn->kv_instance());

Vector<BlockID> *block_ids{};
status = segment_meta.GetBlockIDs(block_ids);

EXPECT_TRUE(status.ok());
EXPECT_EQ(block_ids->size(), 1);
BlockID block_id = block_ids->at(0);
EXPECT_EQ(block_id, 0);
BlockMeta block_meta(block_id, segment_meta, *txn->kv_instance());

NewTxnGetVisibleRangeState state;
status = NewCatalog::GetBlockVisibleRange(block_meta, begin_ts, state);
EXPECT_TRUE(status.ok());
{
Pair<BlockOffset, BlockOffset> range;
BlockOffset offset = 0;
bool has_next = state.Next(offset, range);
EXPECT_TRUE(has_next);
EXPECT_EQ(range.first, 0);
EXPECT_EQ(range.second, static_cast<BlockOffset>(total_row_count));
offset = range.second;
has_next = state.Next(offset, range);
EXPECT_FALSE(has_next);
}

SizeT row_count = state.block_offset_end();

{
SizeT column_idx = 0;
ColumnMeta column_meta(column_idx, block_meta, *txn->kv_instance());
ColumnVector col;

status = NewCatalog::GetColumnVector(column_meta, row_count, ColumnVectorTipe::kReadOnly, col);
EXPECT_TRUE(status.ok());

Value v1 = col.GetValue(0);
EXPECT_EQ(v1, Value::MakeInt(1));
Value v2 = col.GetValue(1);
EXPECT_EQ(v2, Value::MakeInt(2));
}

{
SizeT column_idx = 1;
ColumnMeta column_meta(column_idx, block_meta, *txn->kv_instance());
ColumnVector col;
status = NewCatalog::GetColumnVector(column_meta, row_count, ColumnVectorTipe::kReadOnly, col);

EXPECT_TRUE(status.ok());
Value v1 = col.GetValue(0);
EXPECT_EQ(v1, Value::MakeVarchar("abc"));
Value v2 = col.GetValue(1);
EXPECT_EQ(v2, Value::MakeVarchar("abcdefghijklmnopqrstuvwxyz"));
}
}
}
149 changes: 0 additions & 149 deletions src/unit_test/storage/new_catalog/new_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6752,155 +6752,6 @@ TEST_P(NewCatalogTest, test_import) {
}
}

TEST_P(NewCatalogTest, test_append) {
using namespace infinity;

NewTxnManager *new_txn_mgr = infinity::InfinityContext::instance().storage()->new_txn_manager();

SharedPtr<String> db_name = std::make_shared<String>("db1");
auto column_def1 = std::make_shared<ColumnDef>(0, std::make_shared<DataType>(LogicalType::kInteger), "col1", std::set<ConstraintType>());
auto column_def2 = std::make_shared<ColumnDef>(1, std::make_shared<DataType>(LogicalType::kVarchar), "col2", std::set<ConstraintType>());
auto table_name = std::make_shared<std::string>("tb1");
auto table_def = TableDef::Make(db_name, table_name, MakeShared<String>(), {column_def1, column_def2});
{
auto *txn = new_txn_mgr->BeginTxn(MakeUnique<String>("create db"), TransactionType::kNormal);
Status status = txn->CreateDatabase(*db_name, ConflictType::kError, MakeShared<String>());
EXPECT_TRUE(status.ok());
status = new_txn_mgr->CommitTxn(txn);
EXPECT_TRUE(status.ok());
}
{
auto *txn = new_txn_mgr->BeginTxn(MakeUnique<String>("create table"), TransactionType::kNormal);
Status status = txn->CreateTable(*db_name, std::move(table_def), ConflictType::kIgnore);
EXPECT_TRUE(status.ok());
status = new_txn_mgr->CommitTxn(txn);
EXPECT_TRUE(status.ok());
}

auto input_block = MakeShared<DataBlock>();
{
// Initialize input block
{
auto col1 = ColumnVector::Make(column_def1->type());
col1->Initialize();
col1->AppendValue(Value::MakeInt(1));
col1->AppendValue(Value::MakeInt(2));
input_block->InsertVector(col1, 0);
}
{
auto col2 = ColumnVector::Make(column_def2->type());
col2->Initialize();
col2->AppendValue(Value::MakeVarchar("abc"));
col2->AppendValue(Value::MakeVarchar("abcdefghijklmnopqrstuvwxyz"));
input_block->InsertVector(col2, 1);
}
input_block->Finalize();
}
{
auto *txn = new_txn_mgr->BeginTxn(MakeUnique<String>("append"), TransactionType::kNormal);

Status status = txn->Append(*db_name, *table_name, input_block);
EXPECT_TRUE(status.ok());
status = new_txn_mgr->CommitTxn(txn);
EXPECT_TRUE(status.ok());
}
{
auto *txn = new_txn_mgr->BeginTxn(MakeUnique<String>("append again"), TransactionType::kNormal);

Status status = txn->Append(*db_name, *table_name, input_block);
EXPECT_TRUE(status.ok());
status = new_txn_mgr->CommitTxn(txn);
EXPECT_TRUE(status.ok());
}

{
auto *txn = new_txn_mgr->BeginTxn(MakeUnique<String>("concurrent append 1"), TransactionType::kNormal);
auto *txn2 = new_txn_mgr->BeginTxn(MakeUnique<String>("concurrent append 2"), TransactionType::kNormal);

Status status1 = txn->Append(*db_name, *table_name, input_block);
EXPECT_TRUE(status1.ok());
Status status2 = txn->Append(*db_name, *table_name, input_block);
EXPECT_TRUE(status2.ok());

status1 = new_txn_mgr->CommitTxn(txn);
EXPECT_TRUE(status1.ok());
status2 = new_txn_mgr->CommitTxn(txn2);
EXPECT_TRUE(status2.ok());
}
SizeT total_row_count = 8;

// Check the appended data.
{
auto *txn = new_txn_mgr->BeginTxn(MakeUnique<String>("scan"), TransactionType::kNormal);
TxnTimeStamp begin_ts = txn->BeginTS();

Optional<DBMeeta> db_meta;
Optional<TableMeeta> table_meta;
Status status = txn->GetTableMeta(*db_name, *table_name, db_meta, table_meta);
EXPECT_TRUE(status.ok());

auto [segment_ids, seg_status] = table_meta->GetSegmentIDs();
EXPECT_TRUE(seg_status.ok());
EXPECT_EQ(segment_ids->size(), 1);
SegmentID segment_id = segment_ids->at(0);
EXPECT_EQ(segment_id, 0);
SegmentMeta segment_meta(segment_id, *table_meta, *txn->kv_instance());

Vector<BlockID> *block_ids{};
status = segment_meta.GetBlockIDs(block_ids);

EXPECT_TRUE(status.ok());
EXPECT_EQ(block_ids->size(), 1);
BlockID block_id = block_ids->at(0);
EXPECT_EQ(block_id, 0);
BlockMeta block_meta(block_id, segment_meta, *txn->kv_instance());

NewTxnGetVisibleRangeState state;
status = NewCatalog::GetBlockVisibleRange(block_meta, begin_ts, state);
EXPECT_TRUE(status.ok());
{
Pair<BlockOffset, BlockOffset> range;
BlockOffset offset = 0;
bool has_next = state.Next(offset, range);
EXPECT_TRUE(has_next);
EXPECT_EQ(range.first, 0);
EXPECT_EQ(range.second, static_cast<BlockOffset>(total_row_count));
offset = range.second;
has_next = state.Next(offset, range);
EXPECT_FALSE(has_next);
}

SizeT row_count = state.block_offset_end();

{
SizeT column_idx = 0;
ColumnMeta column_meta(column_idx, block_meta, *txn->kv_instance());
ColumnVector col;

status = NewCatalog::GetColumnVector(column_meta, row_count, ColumnVectorTipe::kReadOnly, col);
EXPECT_TRUE(status.ok());

Value v1 = col.GetValue(0);
EXPECT_EQ(v1, Value::MakeInt(1));
Value v2 = col.GetValue(1);
EXPECT_EQ(v2, Value::MakeInt(2));
}

{
SizeT column_idx = 1;
ColumnMeta column_meta(column_idx, block_meta, *txn->kv_instance());
ColumnVector col;
status = NewCatalog::GetColumnVector(column_meta, row_count, ColumnVectorTipe::kReadOnly, col);

EXPECT_TRUE(status.ok());
Value v1 = col.GetValue(0);
EXPECT_EQ(v1, Value::MakeVarchar("abc"));
Value v2 = col.GetValue(1);
EXPECT_EQ(v2, Value::MakeVarchar("abcdefghijklmnopqrstuvwxyz"));
}
}
}

TEST_P(NewCatalogTest, test_delete) {
using namespace infinity;

Expand Down

0 comments on commit 7b13e09

Please sign in to comment.