diff --git a/src/executor/fragment_builder.cpp b/src/executor/fragment_builder.cpp index 8400fac028..495f255758 100644 --- a/src/executor/fragment_builder.cpp +++ b/src/executor/fragment_builder.cpp @@ -178,7 +178,21 @@ void FragmentBuilder::BuildFragments(PhysicalOperator *phys_op, PlanFragment *cu current_fragment_ptr->SetFragmentType(FragmentType::kSerialMaterialize); break; } - case PhysicalOperatorType::kFusion: + case PhysicalOperatorType::kFusion: { + if (phys_op->left() == nullptr) { + UnrecoverableError(fmt::format("No input node of {}", phys_op->GetName())); + } + if (phys_op->left()->operator_type() == PhysicalOperatorType::kFusion) { + if (phys_op->right() != nullptr) { + UnrecoverableError("Fusion operator with fusion operator child shouldn't have right child."); + } + current_fragment_ptr->AddOperator(phys_op); + // call next Fusion operator + BuildFragments(phys_op->left(), current_fragment_ptr); + break; + } + [[fallthrough]]; + } case PhysicalOperatorType::kMergeAggregate: case PhysicalOperatorType::kMergeHash: case PhysicalOperatorType::kMergeLimit: diff --git a/src/executor/operator/physical_fusion.cpp b/src/executor/operator/physical_fusion.cpp index f8600d4a39..3389742ee2 100644 --- a/src/executor/operator/physical_fusion.cpp +++ b/src/executor/operator/physical_fusion.cpp @@ -13,20 +13,17 @@ // limitations under the License. module; - #include #include module physical_fusion; import stl; - import query_context; import operator_state; import physical_operator; import physical_operator_type; import query_context; -// import data_table; import operator_state; import status; import data_block; @@ -42,6 +39,16 @@ import infinity_exception; import value; import internal_types; import logger; +import logical_type; +import knn_expr; +import txn; +import buffer_manager; +import table_entry; +import column_def; +import embedding_info; +import block_index; +import block_column_entry; +import mlas_matrix_multiply; namespace infinity { @@ -51,27 +58,41 @@ struct RRFRankDoc { Vector ranks; }; -PhysicalFusion::PhysicalFusion(u64 id, +PhysicalFusion::PhysicalFusion(const u64 id, + SharedPtr base_table_ref, UniquePtr left, UniquePtr right, SharedPtr fusion_expr, SharedPtr> load_metas) - : PhysicalOperator(PhysicalOperatorType::kFusion, std::move(left), std::move(right), id, load_metas), fusion_expr_(fusion_expr) {} + : PhysicalOperator(PhysicalOperatorType::kFusion, std::move(left), std::move(right), id, load_metas), base_table_ref_(std::move(base_table_ref)), + fusion_expr_(std::move(fusion_expr)) {} PhysicalFusion::~PhysicalFusion() {} -void PhysicalFusion::Init() {} - -bool PhysicalFusion::Execute(QueryContext *query_context, OperatorState *operator_state) { - FusionOperatorState *fusion_operator_state = static_cast(operator_state); - if (!fusion_operator_state->input_complete_) { - return false; +void PhysicalFusion::Init() { + { + String &method = fusion_expr_->method_; + std::transform(method.begin(), method.end(), std::back_inserter(to_lower_method_), [](unsigned char c) { return std::tolower(c); }); + } + { + const auto prev_output_names_ptr = left_->GetOutputNames(); + const Vector &prev_output_names = *prev_output_names_ptr; + output_names_ = MakeShared>(prev_output_names); + (*output_names_)[output_names_->size() - 2] = COLUMN_NAME_SCORE; + } + { + const auto prev_output_types_ptr = left_->GetOutputTypes(); + const Vector> &prev_output_types = *prev_output_types_ptr; + output_types_ = MakeShared>>(prev_output_types); + (*output_types_)[output_types_->size() - 2] = MakeShared(LogicalType::kFloat); } - if (fusion_expr_->method_.compare("rrf") != 0) { - Status status = Status::NotSupport(fmt::format("Fusion method {} is not implemented.", fusion_expr_->method_)); - LOG_ERROR(status.message()); - RecoverableError(status); + if (output_names_->size() != output_types_->size()) { + UnrecoverableError(fmt::format("output_names_ size {} is not equal to output_types_ size {}.", output_names_->size(), output_types_->size())); } +} + +void PhysicalFusion::ExecuteRRF(const Map>> &input_data_blocks, + Vector> &output_data_block_array) const { SizeT rank_constant = 60; if (fusion_expr_->options_.get() != nullptr) { if (auto it = fusion_expr_->options_->options_.find("rank_constant"); it != fusion_expr_->options_->options_.end()) { @@ -87,10 +108,10 @@ bool PhysicalFusion::Execute(QueryContext *query_context, OperatorState *operato Vector fragment_ids; SizeT fragment_idx = 0; // 1 calculate every doc's ranks - for (auto &[fragment_id, input_blocks] : fusion_operator_state->input_data_blocks_) { + for (const auto &[fragment_id, input_blocks] : input_data_blocks) { fragment_ids.push_back(fragment_id); SizeT base_rank = 1; - for (UniquePtr &input_data_block : input_blocks) { + for (const UniquePtr &input_data_block : input_blocks) { if (input_data_block->column_count() != GetOutputTypes()->size()) { UnrecoverableError(fmt::format("input_data_block column count {} is incorrect, expect {}.", input_data_block->column_count(), @@ -136,8 +157,9 @@ bool PhysicalFusion::Execute(QueryContext *query_context, OperatorState *operato // 4.1 get every doc's columns from input data blocks if (row_count == output_data_block->capacity()) { output_data_block->Finalize(); - operator_state->data_block_array_.push_back(std::move(output_data_block)); + output_data_block_array.push_back(std::move(output_data_block)); output_data_block = DataBlock::MakeUniquePtr(); + output_data_block->Init(*GetOutputTypes()); row_count = 0; } SizeT fragment_idx = 0; @@ -147,7 +169,7 @@ bool PhysicalFusion::Execute(QueryContext *query_context, OperatorState *operato UnrecoverableError(fmt::format("Cannot find fragment_idx")); } u64 fragment_id = fragment_ids[fragment_idx]; - auto &input_blocks = fusion_operator_state->input_data_blocks_[fragment_id]; + const auto &input_blocks = input_data_blocks.at(fragment_id); if (input_blocks.size() == 0) { UnrecoverableError(fmt::format("input_data_blocks_[{}] is empty.", fragment_id)); } @@ -172,10 +194,289 @@ bool PhysicalFusion::Execute(QueryContext *query_context, OperatorState *operato row_count++; } output_data_block->Finalize(); - operator_state->data_block_array_.push_back(std::move(output_data_block)); - fusion_operator_state->input_data_blocks_.clear(); - operator_state->SetComplete(); - return true; + output_data_block_array.push_back(std::move(output_data_block)); +} + +struct MatchTensorRerankDoc { + RowID row_id_; + u64 from_input_data_block_id_; + u32 from_block_idx_; + u32 from_row_idx_; + float score_ = 0.0f; + MatchTensorRerankDoc(const RowID row_id, const u64 from_input_data_block_id, const u32 from_block_idx, const u32 from_row_idx) + : row_id_(row_id), from_input_data_block_id_(from_input_data_block_id), from_block_idx_(from_block_idx), from_row_idx_(from_row_idx) {} +}; + +void PhysicalFusion::ExecuteMatchTensor(QueryContext *query_context, + const Map>> &input_data_blocks, + Vector> &output_data_block_array) const { + const TableEntry *table_entry = base_table_ref_->table_entry_ptr_; + const BlockIndex *block_index = base_table_ref_->block_index_.get(); + u32 topn = 10; + ColumnID column_id = std::numeric_limits::max(); + const DataType *column_data_type = nullptr; + const EmbeddingInfo *column_embedding_info = nullptr; + u32 column_unit_embedding_dimension = 0; + u32 column_unit_embedding_bytes = 0; + UniquePtr query_tensor; + u32 query_tensor_data_num = 0; + u32 query_tensor_embedding_num = 0; + EmbeddingDataType query_tensor_data_type = EmbeddingDataType::kElemInvalid; + // prepare query info + { + String column_name; + String search_tensor; + String tensor_data_type; + String match_method; + // find parameters + if (fusion_expr_->options_.get() != nullptr) { + const auto &options = fusion_expr_->options_->options_; + if (const auto it = options.find("column_name"); it != options.end()) { + column_name = it->second; + } + if (const auto it = options.find("search_tensor"); it != options.end()) { + search_tensor = it->second; + } + if (const auto it = options.find("tensor_data_type"); it != options.end()) { + tensor_data_type = it->second; + // to_lower + std::transform(tensor_data_type.begin(), tensor_data_type.end(), tensor_data_type.begin(), [](unsigned char c) { + return std::tolower(c); + }); + } + if (const auto it = options.find("match_method"); it != options.end()) { + match_method = it->second; + // to_lower + std::transform(match_method.begin(), match_method.end(), match_method.begin(), [](unsigned char c) { return std::tolower(c); }); + } + if (const auto it = options.find("topn"); it != options.end()) { + if (const int topn_int = std::stoi(it->second); topn_int > 0) { + topn = topn_int; + } + } + } + // validate column_name + { + column_id = table_entry->GetColumnIdByName(column_name); + const ColumnDef *column_def = table_entry->GetColumnDefByID(column_id); + column_data_type = column_def->type().get(); + switch (column_data_type->type()) { + case LogicalType::kTensor: + case LogicalType::kTensorArray: { + column_embedding_info = static_cast(column_data_type->type_info().get()); + break; + } + default: { + const auto error_info = + fmt::format("Fusion MatchTensor column_name {} is not a Tensor or TensorArray column. column type is : {}.", + column_name, + column_data_type->ToString()); + LOG_ERROR(error_info); + RecoverableError(Status::NotSupport(error_info)); + break; + } + } + column_unit_embedding_dimension = column_embedding_info->Dimension(); + column_unit_embedding_bytes = column_embedding_info->Size(); + switch (column_embedding_info->Type()) { + case EmbeddingDataType::kElemFloat: { + break; + } + // TODO: support bit type + default: { + const auto error_info = fmt::format("Fusion MatchTensor target column {} basic element type is unsupported: {}.", + column_name, + EmbeddingT::EmbeddingDataType2String(column_embedding_info->Type())); + LOG_ERROR(error_info); + RecoverableError(Status::NotSupport(error_info)); + break; + } + } + } + // validate tensor_data_type + // TODO: now only support float32 query tensor + if (tensor_data_type == "float32" or tensor_data_type == "float" or tensor_data_type == "f32") { + query_tensor_data_type = EmbeddingDataType::kElemFloat; + } else { + const auto error_info = "Fusion MatchTensor tensor_data_type option is invalid. Now only support float32."; + LOG_ERROR(error_info); + RecoverableError(Status::NotSupport(error_info)); + } + // validate search_tensor + { + const auto split_view = SplitTensorElement(search_tensor, ',', column_unit_embedding_dimension); + query_tensor_data_num = split_view.size(); + query_tensor_embedding_num = query_tensor_data_num / column_unit_embedding_dimension; + query_tensor = MakeUnique(query_tensor_embedding_num * column_unit_embedding_bytes); + switch (query_tensor_data_type) { + case EmbeddingDataType::kElemFloat: { + auto *query_tensor_ptr = reinterpret_cast(query_tensor.get()); + for (u32 i = 0; i < query_tensor_data_num; i++) { + query_tensor_ptr[i] = DataType::StringToValue(split_view[i]); + } + break; + } + default: { + const auto error_info = "Fusion MatchTensor query tensor data type is invalid. Now only support float32."; + LOG_ERROR(error_info); + RecoverableError(Status::NotSupport(error_info)); + break; + } + } + } + // validate match_method + if (match_method != "maxsim") { + const auto error_info = "Fusion MatchTensor match_method option is invalid. Now only support MaxSim."; + LOG_ERROR(error_info); + RecoverableError(Status::NotSupport(error_info)); + } + } + BufferManager *buffer_mgr = query_context->storage()->buffer_manager(); + Vector rerank_docs; + // 1. prepare query target rows + for (std::unordered_set row_id_set; const auto &[input_data_block_id, input_blocks] : input_data_blocks) { + for (u32 block_id = 0; block_id < input_blocks.size(); ++block_id) { + const UniquePtr &input_data_block = input_blocks[block_id]; + if (input_data_block->column_count() != GetOutputTypes()->size()) { + UnrecoverableError(fmt::format("input_data_block column count {} is incorrect, expect {}.", + input_data_block->column_count(), + GetOutputTypes()->size())); + } + auto &row_id_column = *input_data_block->column_vectors[input_data_block->column_count() - 1]; + auto row_ids = reinterpret_cast(row_id_column.data()); + u32 row_n = input_data_block->row_count(); + for (u32 i = 0; i < row_n; i++) { + const RowID doc_id = row_ids[i]; + if (row_id_set.contains(doc_id.ToUint64())) { + // skip duplicate target + continue; + } + row_id_set.insert(doc_id.ToUint64()); + rerank_docs.emplace_back(doc_id, input_data_block_id, block_id, i); + } + } + } + // 2. sort by RowID, and access blocks in order + std::sort(rerank_docs.begin(), rerank_docs.end(), [](const MatchTensorRerankDoc &lhs, const MatchTensorRerankDoc &rhs) noexcept { + return lhs.row_id_ < rhs.row_id_; + }); + // 3. calculate score + // TODO: now only consider MaxSim + for (auto &doc : rerank_docs) { + const RowID row_id = doc.row_id_; + const SegmentID segment_id = row_id.segment_id_; + const SegmentOffset segment_offset = row_id.segment_offset_; + const BlockID block_id = segment_offset / DEFAULT_BLOCK_CAPACITY; + const BlockOffset block_offset = segment_offset % DEFAULT_BLOCK_CAPACITY; + BlockColumnEntry *block_column_entry = + block_index->segment_block_index_.at(segment_id).block_map_.at(block_id)->GetColumnBlockEntry(column_id); + auto column_vec = block_column_entry->GetColumnVector(buffer_mgr); + auto tensor_ptr = reinterpret_cast(column_vec.data()); + const auto [embedding_num, chunk_id, chunk_offset] = tensor_ptr[block_offset]; + const char *tensor_data_ptr = column_vec.buffer_->fix_heap_mgr_->GetRawPtrFromChunk(chunk_id, chunk_offset); + // use mlas + auto output_ptr = MakeUniqueForOverwrite(query_tensor_embedding_num * embedding_num); + matrixA_multiply_transpose_matrixB_output_to_C(reinterpret_cast(query_tensor.get()), + reinterpret_cast(tensor_data_ptr), + query_tensor_embedding_num, + embedding_num, + column_unit_embedding_dimension, + output_ptr.get()); + float maxsim_score = 0.0f; + for (u32 query_i = 0; query_i < query_tensor_embedding_num; ++query_i) { + const float *query_ip_ptr = output_ptr.get() + query_i * embedding_num; + float max_score = std::numeric_limits::lowest(); + for (u32 k = 0; k < embedding_num; ++k) { + max_score = std::max(max_score, query_ip_ptr[k]); + } + maxsim_score += max_score; + } + doc.score_ = maxsim_score; + } + // 4. sort by score + std::sort(rerank_docs.begin(), rerank_docs.end(), [](const MatchTensorRerankDoc &lhs, const MatchTensorRerankDoc &rhs) noexcept { + return lhs.score_ > rhs.score_; + }); + if (rerank_docs.size() > topn) { + rerank_docs.erase(rerank_docs.begin() + topn, rerank_docs.end()); + } + // 5. generate output data blocks + UniquePtr output_data_block = DataBlock::MakeUniquePtr(); + output_data_block->Init(*GetOutputTypes()); + u32 row_count = 0; + for (MatchTensorRerankDoc &doc : rerank_docs) { + if (row_count == output_data_block->capacity()) { + output_data_block->Finalize(); + output_data_block_array.push_back(std::move(output_data_block)); + output_data_block = DataBlock::MakeUniquePtr(); + output_data_block->Init(*GetOutputTypes()); + row_count = 0; + } + const auto &input_blocks = input_data_blocks.at(doc.from_input_data_block_id_); + const u32 block_idx = doc.from_block_idx_; + const u32 row_idx = doc.from_row_idx_; + const ColumnID column_n = GetOutputTypes()->size() - 2; + for (ColumnID i = 0; i < column_n; ++i) { + output_data_block->column_vectors[i]->AppendWith(*input_blocks[block_idx]->column_vectors[i], row_idx, 1); + } + // 4.2 add hidden columns: score, row_id + Value v = Value::MakeFloat(doc.score_); + output_data_block->column_vectors[column_n]->AppendValue(v); + output_data_block->column_vectors[column_n + 1]->AppendWith(doc.row_id_, 1); + row_count++; + } + output_data_block->Finalize(); + output_data_block_array.push_back(std::move(output_data_block)); +} + +bool PhysicalFusion::Execute(QueryContext *query_context, OperatorState *operator_state) { + if (operator_state->prev_op_state_ != nullptr) { + // not the first op in fragment + return ExecuteNotFirstOp(query_context, operator_state); + } + return ExecuteFirstOp(query_context, static_cast(operator_state)); +} + +bool PhysicalFusion::ExecuteFirstOp(QueryContext *query_context, FusionOperatorState *fusion_operator_state) const { + if (!fusion_operator_state->input_complete_) { + return false; + } + if (to_lower_method_.compare("rrf") == 0) { + ExecuteRRF(fusion_operator_state->input_data_blocks_, fusion_operator_state->data_block_array_); + fusion_operator_state->input_data_blocks_.clear(); + fusion_operator_state->SetComplete(); + return true; + } + if (to_lower_method_.compare("match_tensor") == 0) { + ExecuteMatchTensor(query_context, fusion_operator_state->input_data_blocks_, fusion_operator_state->data_block_array_); + fusion_operator_state->input_data_blocks_.clear(); + fusion_operator_state->SetComplete(); + return true; + } + Status status = Status::NotSupport(fmt::format("Fusion method {} is not implemented.", fusion_expr_->method_)); + LOG_ERROR(status.message()); + RecoverableError(std::move(status)); + return false; +} + +bool PhysicalFusion::ExecuteNotFirstOp(QueryContext *query_context, OperatorState *operator_state) const { + // this op has prev fusion op + if (!operator_state->prev_op_state_->Complete()) { + UnrecoverableError("Fusion with previous fusion op, but prev_op_state_ is not complete."); + return false; + } + if (to_lower_method_.compare("match_tensor") == 0) { + Map>> input_data_blocks; + input_data_blocks.emplace(0, std::move(operator_state->prev_op_state_->data_block_array_)); + operator_state->prev_op_state_->data_block_array_.clear(); + ExecuteMatchTensor(query_context, input_data_blocks, operator_state->data_block_array_); + operator_state->SetComplete(); + return true; + } + Status status = Status::NotSupport(fmt::format("Fusion method {} is not implemented.", fusion_expr_->method_)); + LOG_ERROR(status.message()); + RecoverableError(std::move(status)); + return false; } String PhysicalFusion::ToString(i64 &space) const { diff --git a/src/executor/operator/physical_fusion.cppm b/src/executor/operator/physical_fusion.cppm index 79ffa39e77..a65e2ce280 100644 --- a/src/executor/operator/physical_fusion.cppm +++ b/src/executor/operator/physical_fusion.cppm @@ -17,7 +17,7 @@ module; export module physical_fusion; import stl; - +import base_table_ref; import query_context; import operator_state; import physical_operator; @@ -30,10 +30,12 @@ import internal_types; import data_type; namespace infinity { +struct DataBlock; -export class PhysicalFusion final: public PhysicalOperator { +export class PhysicalFusion final : public PhysicalOperator { public: explicit PhysicalFusion(u64 id, + SharedPtr base_table_ref, UniquePtr left, UniquePtr right, SharedPtr fusion_expr, @@ -42,23 +44,39 @@ public: void Init() override; - bool Execute(QueryContext *query_context, OperatorState *operator_state) final; + bool Execute(QueryContext *query_context, OperatorState *operator_state) override; - SharedPtr> GetOutputNames() const final { return left_->GetOutputNames(); }; + SharedPtr> GetOutputNames() const override { return output_names_; } - SharedPtr>> GetOutputTypes() const final { return left_->GetOutputTypes(); }; + SharedPtr>> GetOutputTypes() const override { return output_types_; } SizeT TaskletCount() override { UnrecoverableError("Not implement: TaskletCount not Implement"); return 0; } + void FillingTableRefs(HashMap> &table_refs) override { + table_refs.insert({base_table_ref_->table_index_, base_table_ref_}); + } + String ToString(i64 &space) const; + SharedPtr base_table_ref_{}; SharedPtr fusion_expr_; private: - + bool ExecuteFirstOp(QueryContext *query_context, FusionOperatorState *fusion_operator_state) const; + bool ExecuteNotFirstOp(QueryContext *query_context, OperatorState *operator_state) const; + // RRF has multiple input source, must be first op + void ExecuteRRF(const Map>> &input_data_blocks, Vector> &output_data_block_array) const; + // MatchTensor may have multiple or single input source, can be first or not first op + void ExecuteMatchTensor(QueryContext *query_context, + const Map>> &input_data_blocks, + Vector> &output_data_block_array) const; + + String to_lower_method_; + SharedPtr> output_names_; + SharedPtr>> output_types_; }; } // namespace infinity diff --git a/src/executor/physical_planner.cpp b/src/executor/physical_planner.cpp index e3d33fde4a..ac9659590b 100644 --- a/src/executor/physical_planner.cpp +++ b/src/executor/physical_planner.cpp @@ -902,17 +902,16 @@ UniquePtr PhysicalPlanner::BuildMatchTensorScan(const SharedPt } UniquePtr PhysicalPlanner::BuildFusion(const SharedPtr &logical_operator) const { - SharedPtr logical_fusion = static_pointer_cast(logical_operator); + const auto logical_fusion = static_pointer_cast(logical_operator); UniquePtr left_phy = nullptr, right_phy = nullptr; - auto left_logical_node = logical_operator->left_node(); - if (left_logical_node.get() != nullptr) { + if (const auto &left_logical_node = logical_operator->left_node(); left_logical_node.get() != nullptr) { left_phy = BuildPhysicalOperator(left_logical_node); } - auto right_logical_node = logical_operator->right_node(); - if (right_logical_node.get() != nullptr) { + if (const auto right_logical_node = logical_operator->right_node(); right_logical_node.get() != nullptr) { right_phy = BuildPhysicalOperator(right_logical_node); } return MakeUnique(logical_fusion->node_id(), + logical_fusion->base_table_ref_, std::move(left_phy), std::move(right_phy), logical_fusion->fusion_expr_, diff --git a/src/expression/fusion_expression.cpp b/src/expression/fusion_expression.cpp index f1ff2ad156..a687b73971 100644 --- a/src/expression/fusion_expression.cpp +++ b/src/expression/fusion_expression.cpp @@ -29,7 +29,7 @@ import third_party; namespace infinity { FusionExpression::FusionExpression(const String &method, SharedPtr options) - : BaseExpression(ExpressionType::kFusion, Vector>()), method_(method), options_(options) {} + : BaseExpression(ExpressionType::kFusion, Vector>()), method_(method), options_(std::move(options)) {} String FusionExpression::ToString() const { if (!alias_.empty()) { diff --git a/src/expression/search_expression.cpp b/src/expression/search_expression.cpp index 3ccb1bdeaa..c924016fb6 100644 --- a/src/expression/search_expression.cpp +++ b/src/expression/search_expression.cpp @@ -30,9 +30,9 @@ namespace infinity { SearchExpression::SearchExpression(Vector> &match_exprs, Vector> &knn_exprs, Vector> &match_tensor_exprs, - SharedPtr fusion_expr) + Vector> &fusion_exprs) : BaseExpression(ExpressionType::kSearch, Vector>()), match_exprs_(match_exprs), knn_exprs_(knn_exprs), - match_tensor_exprs_(match_tensor_exprs), fusion_expr_(fusion_expr) {} + match_tensor_exprs_(match_tensor_exprs), fusion_exprs_(fusion_exprs) {} String SearchExpression::ToString() const { if (!alias_.empty()) { @@ -60,7 +60,7 @@ String SearchExpression::ToString() const { cnt++; oss << match_tensor_expr->ToString(); } - if (fusion_expr_.get() != nullptr) { + for (auto &fusion_expr_ : fusion_exprs_) { if (cnt != 0) oss << ", "; cnt++; diff --git a/src/expression/search_expression.cppm b/src/expression/search_expression.cppm index bd800b0306..7706d05c01 100644 --- a/src/expression/search_expression.cppm +++ b/src/expression/search_expression.cppm @@ -33,7 +33,7 @@ public: SearchExpression(Vector> &match_exprs, Vector> &knn_exprs, Vector> &match_tensor_exprs, - SharedPtr fusion_expr); + Vector> &fusion_exprs); inline DataType Type() const override { return DataType(LogicalType::kFloat); } @@ -43,7 +43,7 @@ public: Vector> match_exprs_{}; Vector> knn_exprs_{}; Vector> match_tensor_exprs_{}; - SharedPtr fusion_expr_{}; + Vector> fusion_exprs_{}; }; } // namespace infinity \ No newline at end of file diff --git a/src/parser/expr/search_expr.cpp b/src/parser/expr/search_expr.cpp index 2b7dad20c5..2dd1d04f1e 100644 --- a/src/parser/expr/search_expr.cpp +++ b/src/parser/expr/search_expr.cpp @@ -46,13 +46,22 @@ std::string SearchExpr::ToString() const { oss << expr->ToString(); is_first = false; } - if (fusion_expr_ != nullptr) { - oss << ", " << fusion_expr_->ToString(); + for (auto &expr : fusion_exprs_) { + if (!is_first) + oss << ", "; + oss << expr->ToString(); + is_first = false; } return oss.str(); } void SearchExpr::SetExprs(std::vector *exprs) { + if (exprs == nullptr) { + ParserError("SearchExpr::SetExprs parameter is nullptr"); + } + if (exprs_ != nullptr) { + ParserError("SearchExpr::SetExprs member exprs_ is not nullptr"); + } exprs_ = exprs; for (ParsedExpr *expr : *exprs) { AddExpr(expr); @@ -65,8 +74,9 @@ void SearchExpr::Validate() const { if (num_sub_expr <= 0) { ParserError("Need at least one MATCH VECTOR / MATCH TENSOR / MATCH TEXT / QUERY expression"); } else if (num_sub_expr >= 2) { - if (fusion_expr_ == nullptr) + if (fusion_exprs_.empty()) { ParserError("Need FUSION expr since there are multiple MATCH VECTOR / MATCH TENSOR / MATCH TEXT / QUERY expressions"); + } } } @@ -82,10 +92,7 @@ void SearchExpr::AddExpr(infinity::ParsedExpr *expr) { match_tensor_exprs_.push_back(static_cast(expr)); break; case ParsedExprType::kFusion: - if (fusion_expr_ != nullptr) { - ParserError("More than one FUSION expr"); - } - fusion_expr_ = static_cast(expr); + fusion_exprs_.push_back(static_cast(expr)); break; default: ParserError("Invalid expr type for SEARCH"); diff --git a/src/parser/expr/search_expr.h b/src/parser/expr/search_expr.h index d334a680f4..31ed1b3b0d 100644 --- a/src/parser/expr/search_expr.h +++ b/src/parser/expr/search_expr.h @@ -36,7 +36,7 @@ class SearchExpr : public ParsedExpr { std::vector match_exprs_{}; std::vector knn_exprs_{}; std::vector match_tensor_exprs_{}; - FusionExpr *fusion_expr_{}; + std::vector fusion_exprs_{}; private: std::vector *exprs_{}; diff --git a/src/planner/bind_context.cppm b/src/planner/bind_context.cppm index ae05aa473f..b7420282e7 100644 --- a/src/planner/bind_context.cppm +++ b/src/planner/bind_context.cppm @@ -172,8 +172,8 @@ public: } auto search_expr = (SearchExpr *)expr; - allow_distance = !search_expr->knn_exprs_.empty() && search_expr->fusion_expr_ == nullptr; - allow_score = !search_expr->match_exprs_.empty() || !search_expr->match_tensor_exprs_.empty() || search_expr->fusion_expr_ != nullptr; + allow_distance = !search_expr->knn_exprs_.empty() && search_expr->fusion_exprs_.empty(); + allow_score = !search_expr->match_exprs_.empty() || !search_expr->match_tensor_exprs_.empty() || !(search_expr->fusion_exprs_.empty()); } void AddSubqueryBinding(const String &name, diff --git a/src/planner/bound_select_statement.cpp b/src/planner/bound_select_statement.cpp index 0dd8c49773..4ae1534909 100644 --- a/src/planner/bound_select_statement.cpp +++ b/src/planner/bound_select_statement.cpp @@ -143,65 +143,60 @@ SharedPtr BoundSelectStatement::BuildPlan(QueryContext *query_conte return root; } else { SharedPtr root = nullptr; - SizeT num_children = search_expr_->match_exprs_.size() + search_expr_->knn_exprs_.size() + search_expr_->match_tensor_exprs_.size(); + const SizeT num_children = search_expr_->match_exprs_.size() + search_expr_->knn_exprs_.size() + search_expr_->match_tensor_exprs_.size(); if (num_children <= 0) { UnrecoverableError("SEARCH shall have at least one MATCH TEXT or MATCH VECTOR or MATCH TENSOR expression"); } else if (num_children >= 3) { UnrecoverableError("SEARCH shall have at max two MATCH TEXT or MATCH VECTOR expression"); } - + if (table_ref_ptr_->type() != TableRefType::kTable) { + UnrecoverableError("Not base table reference"); + } + auto base_table_ref = static_pointer_cast(table_ref_ptr_); // FIXME: need check if there is subquery inside the where conditions auto filter_expr = ComposeExpressionWithDelimiter(where_conditions_, ConjunctionType::kAnd); - auto common_query_filter = - MakeShared(filter_expr, static_pointer_cast(table_ref_ptr_), query_context->GetTxn()->BeginTS()); + auto common_query_filter = MakeShared(filter_expr, base_table_ref, query_context->GetTxn()->BeginTS()); Vector> match_knn_nodes; - match_knn_nodes.reserve(search_expr_->match_exprs_.size()); + match_knn_nodes.reserve(num_children); for (auto &match_expr : search_expr_->match_exprs_) { - if (table_ref_ptr_->type() != TableRefType::kTable) { - UnrecoverableError("Not base table reference"); - } - auto base_table_ref = static_pointer_cast(table_ref_ptr_); SharedPtr matchNode = MakeShared(bind_context->GetNewLogicalNodeId(), base_table_ref, match_expr); matchNode->filter_expression_ = filter_expr; matchNode->common_query_filter_ = common_query_filter; match_knn_nodes.push_back(std::move(matchNode)); } for (auto &match_tensor_expr : search_expr_->match_tensor_exprs_) { - if (table_ref_ptr_->type() != TableRefType::kTable) { - UnrecoverableError("Not base table reference"); - } - auto base_table_ref = static_pointer_cast(table_ref_ptr_); auto match_tensor_node = MakeShared(bind_context->GetNewLogicalNodeId(), base_table_ref, match_tensor_expr); match_tensor_node->filter_expression_ = filter_expr; match_tensor_node->common_query_filter_ = common_query_filter; match_tensor_node->InitExtraOptions(); match_knn_nodes.push_back(std::move(match_tensor_node)); } - bind_context->GenerateTableIndex(); for (auto &knn_expr : search_expr_->knn_exprs_) { - if (table_ref_ptr_->type() != TableRefType::kTable) { - UnrecoverableError("Not base table reference"); - } SharedPtr knn_scan = BuildInitialKnnScan(table_ref_ptr_, knn_expr, query_context, bind_context); knn_scan->filter_expression_ = filter_expr; knn_scan->common_query_filter_ = common_query_filter; match_knn_nodes.push_back(std::move(knn_scan)); } - - if (search_expr_->fusion_expr_.get() != nullptr) { - SharedPtr fusionNode = MakeShared(bind_context->GetNewLogicalNodeId(), search_expr_->fusion_expr_); - fusionNode->set_left_node(match_knn_nodes[0]); + if (!(search_expr_->fusion_exprs_.empty())) { + auto firstfusionNode = MakeShared(bind_context->GetNewLogicalNodeId(), base_table_ref, search_expr_->fusion_exprs_[0]); + firstfusionNode->set_left_node(match_knn_nodes[0]); if (match_knn_nodes.size() > 1) - fusionNode->set_right_node(match_knn_nodes[1]); - root = fusionNode; + firstfusionNode->set_right_node(match_knn_nodes[1]); + root = std::move(firstfusionNode); + // extra fusion nodes + for (u32 i = 1; i < search_expr_->fusion_exprs_.size(); ++i) { + auto extrafusionNode = MakeShared(bind_context->GetNewLogicalNodeId(), base_table_ref, search_expr_->fusion_exprs_[i]); + extrafusionNode->set_left_node(root); + root = std::move(extrafusionNode); + } } else { - root = match_knn_nodes[0]; + root = std::move(match_knn_nodes[0]); } auto project = MakeShared(bind_context->GetNewLogicalNodeId(), projection_expressions_, projection_index_); project->set_left_node(root); - root = project; + root = std::move(project); return root; } diff --git a/src/planner/expression_binder.cpp b/src/planner/expression_binder.cpp index 4ff212d817..38a5fa1c17 100644 --- a/src/planner/expression_binder.cpp +++ b/src/planner/expression_binder.cpp @@ -72,6 +72,7 @@ import between_expr; import subquery_expr; import match_expr; import match_tensor_expr; +import fusion_expr; import data_type; import catalog; @@ -834,7 +835,7 @@ SharedPtr ExpressionBinder::BuildSearchExpr(const SearchExpr &ex Vector> match_exprs; Vector> knn_exprs; Vector> match_tensor_exprs; - SharedPtr fusion_expr = nullptr; + Vector> fusion_exprs; for (MatchExpr *match_expr : expr.match_exprs_) { match_exprs.push_back(MakeShared(match_expr->fields_, match_expr->matching_text_, match_expr->options_text_)); } @@ -845,10 +846,10 @@ SharedPtr ExpressionBinder::BuildSearchExpr(const SearchExpr &ex match_tensor_exprs.push_back( static_pointer_cast(BuildMatchTensorExpr(*match_tensor_expr, bind_context_ptr, depth, false))); } - if (expr.fusion_expr_ != nullptr) { - fusion_expr = MakeShared(expr.fusion_expr_->method_, expr.fusion_expr_->options_); + for (FusionExpr *fusion_expr : expr.fusion_exprs_) { + fusion_exprs.push_back(MakeShared(fusion_expr->method_, fusion_expr->options_)); } - SharedPtr bound_search_expr = MakeShared(match_exprs, knn_exprs, match_tensor_exprs, fusion_expr); + SharedPtr bound_search_expr = MakeShared(match_exprs, knn_exprs, match_tensor_exprs, fusion_exprs); return bound_search_expr; } diff --git a/src/planner/node/logical_fusion.cpp b/src/planner/node/logical_fusion.cpp index ea8af00cdf..fed9c71b9d 100644 --- a/src/planner/node/logical_fusion.cpp +++ b/src/planner/node/logical_fusion.cpp @@ -29,9 +29,8 @@ import internal_types; namespace infinity { -LogicalFusion::LogicalFusion(u64 node_id, - SharedPtr fusion_expr) - : LogicalNode(node_id, LogicalNodeType::kFusion), fusion_expr_(fusion_expr) {} +LogicalFusion::LogicalFusion(const u64 node_id, SharedPtr base_table_ref, SharedPtr fusion_expr) + : LogicalNode(node_id, LogicalNodeType::kFusion), base_table_ref_(std::move(base_table_ref)), fusion_expr_(std::move(fusion_expr)) {} String LogicalFusion::ToString(i64 &space) const { std::stringstream ss; diff --git a/src/planner/node/logical_fusion.cppm b/src/planner/node/logical_fusion.cppm index 55925c1793..03111bc9f9 100644 --- a/src/planner/node/logical_fusion.cppm +++ b/src/planner/node/logical_fusion.cppm @@ -32,8 +32,7 @@ namespace infinity { export class LogicalFusion : public LogicalNode { public: - explicit LogicalFusion(u64 node_id, - SharedPtr fusion_expr); + explicit LogicalFusion(u64 node_id, SharedPtr base_table_ref, SharedPtr fusion_expr); Vector GetColumnBindings() const final { return left_node_->GetColumnBindings(); }; @@ -45,6 +44,7 @@ public: inline String name() final { return "LogicalFusion"; } + SharedPtr base_table_ref_{}; SharedPtr fusion_expr_{}; }; diff --git a/src/storage/column_vector/column_vector.cpp b/src/storage/column_vector/column_vector.cpp index 5583cebeea..7d51950262 100644 --- a/src/storage/column_vector/column_vector.cpp +++ b/src/storage/column_vector/column_vector.cpp @@ -1343,54 +1343,6 @@ Vector SplitArrayElement(std::string_view data, char delimiter return ret; } -Vector SplitTensorElement(std::string_view data, char delimiter, const u32 unit_embedding_dim) { - SizeT data_size = data.size(); - if (data_size < 2 || data[0] != '[' || data[data_size - 1] != ']') { - Status status = Status::ImportFileFormatError("Tensor data must be surrounded by [ and ]"); - LOG_ERROR(status.message()); - RecoverableError(status); - } - bool have_child_embedding = false; - for (SizeT i = 1; i < data_size - 1; ++i) { - if (data[i] == '[') { - have_child_embedding = true; - break; - } - } - if (!have_child_embedding) { - return SplitArrayElement(data, delimiter); - } - std::string_view child_data = data.substr(1, data_size - 2); - Vector ret; - size_t bg_id = 0; - while (true) { - const auto next_bg_id = child_data.find('[', bg_id); - if (next_bg_id == std::string_view::npos) { - break; - } - const auto ed_id = child_data.find(']', next_bg_id); - if (ed_id == std::string_view::npos) { - Status status = Status::ImportFileFormatError("Tensor data member embedding must be surrounded by [ and ]"); - LOG_ERROR(status.message()); - RecoverableError(status); - } - if (const auto check_inner_valid = child_data.find('[', next_bg_id + 1); check_inner_valid < ed_id) { - Status status = Status::ImportFileFormatError("Tensor data format invalid: mismatch of inner '[', ']'."); - LOG_ERROR(status.message()); - RecoverableError(status); - } - Vector sub_result = SplitArrayElement(child_data.substr(next_bg_id, ed_id - next_bg_id + 1), delimiter); - if (sub_result.size() != unit_embedding_dim) { - Status status = Status::ImportFileFormatError("Tensor data member embedding size must be equal to unit embedding dimension."); - LOG_ERROR(status.message()); - RecoverableError(status); - } - ret.insert(ret.end(), sub_result.begin(), sub_result.end()); - bg_id = ed_id + 1; - } - return ret; -} - Vector> SplitTensorArrayElement(std::string_view data, char delimiter, const u32 unit_embedding_dim) { SizeT data_size = data.size(); if (data_size < 2 || data[0] != '[' || data[data_size - 1] != ']') { @@ -2110,4 +2062,52 @@ void CopyTensorArray(TensorArrayT &dst_ref, dst_buffer->fix_heap_mgr_->AppendToHeap(reinterpret_cast(dst_tensor_data.data()), tensor_num * sizeof(TensorT)); } +Vector SplitTensorElement(std::string_view data, char delimiter, const u32 unit_embedding_dim) { + SizeT data_size = data.size(); + if (data_size < 2 || data[0] != '[' || data[data_size - 1] != ']') { + Status status = Status::ImportFileFormatError("Tensor data must be surrounded by [ and ]"); + LOG_ERROR(status.message()); + RecoverableError(status); + } + bool have_child_embedding = false; + for (SizeT i = 1; i < data_size - 1; ++i) { + if (data[i] == '[') { + have_child_embedding = true; + break; + } + } + if (!have_child_embedding) { + return SplitArrayElement(data, delimiter); + } + std::string_view child_data = data.substr(1, data_size - 2); + Vector ret; + size_t bg_id = 0; + while (true) { + const auto next_bg_id = child_data.find('[', bg_id); + if (next_bg_id == std::string_view::npos) { + break; + } + const auto ed_id = child_data.find(']', next_bg_id); + if (ed_id == std::string_view::npos) { + Status status = Status::ImportFileFormatError("Tensor data member embedding must be surrounded by [ and ]"); + LOG_ERROR(status.message()); + RecoverableError(status); + } + if (const auto check_inner_valid = child_data.find('[', next_bg_id + 1); check_inner_valid < ed_id) { + Status status = Status::ImportFileFormatError("Tensor data format invalid: mismatch of inner '[', ']'."); + LOG_ERROR(status.message()); + RecoverableError(status); + } + Vector sub_result = SplitArrayElement(child_data.substr(next_bg_id, ed_id - next_bg_id + 1), delimiter); + if (sub_result.size() != unit_embedding_dim) { + Status status = Status::ImportFileFormatError("Tensor data member embedding size must be equal to unit embedding dimension."); + LOG_ERROR(status.message()); + RecoverableError(status); + } + ret.insert(ret.end(), sub_result.begin(), sub_result.end()); + bg_id = ed_id + 1; + } + return ret; +} + } // namespace infinity diff --git a/src/storage/column_vector/column_vector.cppm b/src/storage/column_vector/column_vector.cppm index dc9224f2cb..beb492ca13 100644 --- a/src/storage/column_vector/column_vector.cppm +++ b/src/storage/column_vector/column_vector.cppm @@ -1026,4 +1026,6 @@ export using BooleanColumnWriter = ColumnVectorPtrAndIdx; export template using ColumnValueReader = ColumnVectorPtrAndIdx; +export Vector SplitTensorElement(std::string_view data, char delimiter, const u32 unit_embedding_dim); + } // namespace infinity diff --git a/src/unit_test/parser/sql_select_statement.cpp b/src/unit_test/parser/sql_select_statement.cpp index 1a27c1803c..0f01147017 100644 --- a/src/unit_test/parser/sql_select_statement.cpp +++ b/src/unit_test/parser/sql_select_statement.cpp @@ -1509,8 +1509,9 @@ TEST_F(SelectStatementParsingTest, good_search_test) { } EXPECT_EQ(knn_expr5->topn_, 3); - EXPECT_NE(search_expr->fusion_expr_, nullptr); - auto *fusion_expr = search_expr->fusion_expr_; + EXPECT_EQ(search_expr->fusion_exprs_.size(), 1); + EXPECT_NE(search_expr->fusion_exprs_[0], nullptr); + auto *fusion_expr = search_expr->fusion_exprs_[0]; EXPECT_EQ(fusion_expr->method_, String("rrf")); EXPECT_NE(select_statement->where_expr_, nullptr); diff --git a/test/data/csv/tensor_maxsim.csv b/test/data/csv/tensor_maxsim.csv index a5070ed1ed..deb5ed8407 100644 --- a/test/data/csv/tensor_maxsim.csv +++ b/test/data/csv/tensor_maxsim.csv @@ -1,8 +1,8 @@ -"test00",2,"[0.1, 0.2, 0.3, -0.2, 0.3, -0.2, 0.2, 0.3]" -"test11",4,"[0.2, 0.1, 0.3, 0.4]" -"test22",6,"[0.3, 0.2, -11.1, 0.4, 0.4, 0.3, 0.2, -88.5, 0.1, -0.4, 9.4, 0.3]" -"test33",8,"[0.4, 0.3, 0.2, 0.1]" -"test44",12,"[0.1, 0.2, 0.3, -0.2, 0.3, -0.2, 0.2, 0.3]" -"test55",14,"[0.2, 0.1, 0.3, 0.4, -0.4, 0.3, 0.2, 0.1, 0.0, 0.2, -0.3, -0.2, 0.3, -0.2, 0.2, 0.3]" -"test66",16,"[0.3, 0.2, 0.1, 0.4, 0.4, 0.3, 0.2, -88.5, 0.1, -0.4, 9.4, 0.3, 0.3, 0.2, 0.1, 0.4, 0.4, 0.3, 0.2, -88.5, 0.1, -0.4, 9.4, 0.3]" -"test77",18,"[0.4, -0.3, 0.2, 0.1, -0.4, 0.3, 0.2, 0.1]" +"test00",2,"[0.1, 0.2, 0.3, -0.2, 0.3, -0.2, 0.2, 0.3]","this" +"test11",4,"[0.2, 0.1, 0.3, 0.4]","tree" +"test22",6,"[0.3, 0.2, -11.1, 0.4, 0.4, 0.3, 0.2, -88.5, 0.1, -0.4, 9.4, 0.3]","tell off" +"test33",8,"[0.4, 0.3, 0.2, 0.1]","that" +"test44",12,"[0.1, 0.2, 0.3, -0.2, 0.3, -0.2, 0.2, 0.3]","time off" +"test55",14,"[0.2, 0.1, 0.3, 0.4, -0.4, 0.3, 0.2, 0.1, 0.0, 0.2, -0.3, -0.2, 0.3, -0.2, 0.2, 0.3]","where" +"test66",16,"[0.3, 0.2, 0.1, 0.4, 0.4, 0.3, 0.2, -88.5, 0.1, -0.4, 9.4, 0.3, 0.3, 0.2, 0.1, 0.4, 0.4, 0.3, 0.2, -88.5, 0.1, -0.4, 9.4, 0.3]","on" +"test77",18,"[0.4, -0.3, 0.2, 0.1, -0.4, 0.3, 0.2, 0.1]","off" diff --git a/test/sql/dql/tensor/fusion_rerank_maxsim.slt b/test/sql/dql/tensor/fusion_rerank_maxsim.slt new file mode 100644 index 0000000000..eed3435f6c --- /dev/null +++ b/test/sql/dql/tensor/fusion_rerank_maxsim.slt @@ -0,0 +1,150 @@ + +statement ok +DROP TABLE IF EXISTS fusion_rerank_maxsim; + +statement ok +CREATE TABLE fusion_rerank_maxsim (title VARCHAR, num INT, t TENSOR(FLOAT, 4), body VARCHAR); + +statement ok +COPY fusion_rerank_maxsim FROM '/var/infinity/test_data/tensor_maxsim.csv' WITH ( DELIMITER ',' ); + +query I +SELECT * FROM fusion_rerank_maxsim; +---- +test00 2 [0.1,0.2,0.3,-0.2],[0.3,-0.2,0.2,0.3] this +test11 4 [0.2,0.1,0.3,0.4] tree +test22 6 [0.3,0.2,-11.1,0.4],[0.4,0.3,0.2,-88.5],[0.1,-0.4,9.4,0.3] tell off +test33 8 [0.4,0.3,0.2,0.1] that +test44 12 [0.1,0.2,0.3,-0.2],[0.3,-0.2,0.2,0.3] time off +test55 14 [0.2,0.1,0.3,0.4],[-0.4,0.3,0.2,0.1],[0,0.2,-0.3,-0.2],[0.3,-0.2,0.2,0.3] where +test66 16 [0.3,0.2,0.1,0.4],[0.4,0.3,0.2,-88.5],[0.1,-0.4,9.4,0.3],[0.3,0.2,0.1,0.4],[0.4,0.3,0.2,-88.5],[0.1,-0.4,9.4,0.3] on +test77 18 [0.4,-0.3,0.2,0.1],[-0.4,0.3,0.2,0.1] off + +statement ok +CREATE INDEX ft_index ON fusion_rerank_maxsim(body) USING FULLTEXT; + +query I +SELECT title, SCORE() FROM fusion_rerank_maxsim SEARCH MATCH TENSOR (t, [[0.0, -10.0, 0.0, 0.7], [9.2, 45.6, -55.8, 3.5]], 'float', 'maxsim'); +---- +test22 636.870056 +test55 27.369999 +test66 11.910000 +test33 3.620001 +test77 2.260000 +test44 -5.190000 +test00 -5.190000 +test11 -9.660001 + +query I +SELECT title, SCORE() FROM fusion_rerank_maxsim SEARCH MATCH TEXT ('body', 'off', 'topn=4'); +---- +test77 1.028622 +test22 0.758327 +test44 0.758327 + +query I +SELECT title, SCORE() FROM fusion_rerank_maxsim SEARCH MATCH TENSOR (t, [1.0, 0.0, 0.0, 0.0], 'float', 'maxsim', 'topn=2'); +---- +test22 0.400000 +test33 0.400000 + +query I +EXPLAIN SELECT title, SCORE() FROM fusion_rerank_maxsim SEARCH MATCH TEXT ('body', 'off', 'topn=4'), FUSION('match_tensor', 'column_name=t;search_tensor=[[0.0, -10.0, 0.0, 0.7], [9.2, 45.6, -55.8, 3.5]];tensor_data_type=float;match_method=MaxSim;topn=2'); +---- +PROJECT (4) + - table index: #4 + - expressions: [title (#0), SCORE (#1)] +-> FUSION (3) + - fusion: #FUSION('match_tensor', 'column_name=t,match_method=MaxSim,search_tensor=[[0.0, -10.0, 0.0, 0.7], [9.2, 45.6, -55.8, 3.5]],tensor_data_type=float,topn=2') + - output columns: [title, __score, __rowid] + -> MATCH (2) + - table name: fusion_rerank_maxsim(default_db.fusion_rerank_maxsim) + - table index: #1 + - match expression: MATCH TEXT ('body', 'off', 'topn=4') + - filter for secondary index: None + - filter except secondary index: None + - output columns: [title, __score, __rowid] + +query I +SELECT title, SCORE() FROM fusion_rerank_maxsim SEARCH MATCH TEXT ('body', 'off', 'topn=4'), FUSION('match_tensor', 'column_name=t;search_tensor=[[0.0, -10.0, 0.0, 0.7], [9.2, 45.6, -55.8, 3.5]];tensor_data_type=float;match_method=MaxSim;topn=2'); +---- +test22 636.870056 +test77 2.260000 + +query I +EXPLAIN SELECT title, SCORE() FROM fusion_rerank_maxsim SEARCH MATCH TEXT ('body', 'off', 'topn=4'), MATCH TENSOR (t, [1.0, 0.0, 0.0, 0.0], 'float', 'maxsim', 'topn=2'), FUSION('match_tensor', 'column_name=t;search_tensor=[[0.0, -10.0, 0.0, 0.7], [9.2, 45.6, -55.8, 3.5]];tensor_data_type=float;match_method=MaxSim;topn=2'); +---- +PROJECT (5) + - table index: #4 + - expressions: [title (#0), SCORE (#1)] +-> FUSION (4) + - fusion: #FUSION('match_tensor', 'column_name=t,match_method=MaxSim,search_tensor=[[0.0, -10.0, 0.0, 0.7], [9.2, 45.6, -55.8, 3.5]],tensor_data_type=float,topn=2') + - output columns: [title, __score, __rowid] + -> MATCH (2) + - table name: fusion_rerank_maxsim(default_db.fusion_rerank_maxsim) + - table index: #1 + - match expression: MATCH TEXT ('body', 'off', 'topn=4') + - filter for secondary index: None + - filter except secondary index: None + - output columns: [title, __score, __rowid] + -> MatchTensorScan (3) + - table name: fusion_rerank_maxsim(default_db.fusion_rerank_maxsim) + - table index: #1 + - MatchTensor expression: MATCH TENSOR (t, [[1,0,0,0]], MAX_SIM, 'topn=2') + - Top N: 2 + - filter for secondary index: None + - filter except secondary index: None + - output columns: [title, __score, __rowid] + +query I +SELECT title, SCORE() FROM fusion_rerank_maxsim SEARCH MATCH TEXT ('body', 'off', 'topn=4'), MATCH TENSOR (t, [1.0, 0.0, 0.0, 0.0], 'float', 'maxsim', 'topn=2'), FUSION('match_tensor', 'column_name=t;search_tensor=[[0.0, -10.0, 0.0, 0.7], [9.2, 45.6, -55.8, 3.5]];tensor_data_type=float;match_method=MaxSim;topn=2'); +---- +test22 636.870056 +test33 3.620001 + + +query I +EXPLAIN FRAGMENT SELECT title, SCORE() FROM fusion_rerank_maxsim SEARCH MATCH TEXT ('body', 'off', 'topn=4'), MATCH TENSOR (t, [1.0, 0.0, 0.0, 0.0], 'float', 'maxsim', 'topn=2'), FUSION('rrf'), FUSION('match_tensor', 'column_name=t;search_tensor=[[0.0, -10.0, 0.0, 0.7], [9.2, 45.6, -55.8, 3.5]];tensor_data_type=float;match_method=MaxSim;topn=2'); +---- +FRAGMENT (1) +-> PROJECT (6) + - table index: #4 + - expressions: [title (#0), SCORE (#1)] +-> FUSION (5) + - fusion: #FUSION('match_tensor', 'column_name=t,match_method=MaxSim,search_tensor=[[0.0, -10.0, 0.0, 0.7], [9.2, 45.6, -55.8, 3.5]],tensor_data_type=float,topn=2') + - output columns: [title, __score, __rowid] +-> FUSION (4) + - fusion: #FUSION('rrf', '') + - output columns: [title, __score, __rowid]: FRAGMENT #2, FRAGMENT #3 +(empty) +FRAGMENT (2) +-> MATCH (2) + - table name: fusion_rerank_maxsim(default_db.fusion_rerank_maxsim) + - table index: #1 + - match expression: MATCH TEXT ('body', 'off', 'topn=4') + - filter for secondary index: None + - filter except secondary index: None + - output columns: [title, __score, __rowid] +(empty) +(empty) +FRAGMENT (3) +-> MatchTensorScan (3) + - table name: fusion_rerank_maxsim(default_db.fusion_rerank_maxsim) + - table index: #1 + - MatchTensor expression: MATCH TENSOR (t, [[1,0,0,0]], MAX_SIM, 'topn=2') + - Top N: 2 + - filter for secondary index: None + - filter except secondary index: None + - output columns: [title, __score, __rowid] +(empty) +(empty) + +query I +SELECT title, SCORE() FROM fusion_rerank_maxsim SEARCH MATCH TEXT ('body', 'off', 'topn=4'), MATCH TENSOR (t, [1.0, 0.0, 0.0, 0.0], 'float', 'maxsim', 'topn=2'), FUSION('rrf'), FUSION('match_tensor', 'column_name=t;search_tensor=[[0.0, -10.0, 0.0, 0.7], [9.2, 45.6, -55.8, 3.5]];tensor_data_type=float;match_method=MaxSim;topn=2'); +---- +test22 636.870056 +test33 3.620001 + +# Cleanup +statement ok +DROP TABLE fusion_rerank_maxsim; diff --git a/test/sql/dql/tensor/tensor_maxsim.slt b/test/sql/dql/tensor/tensor_maxsim.slt index 970ef067ea..3e59f0578d 100644 --- a/test/sql/dql/tensor/tensor_maxsim.slt +++ b/test/sql/dql/tensor/tensor_maxsim.slt @@ -3,7 +3,7 @@ statement ok DROP TABLE IF EXISTS tensor_maxsim; statement ok -CREATE TABLE tensor_maxsim (title VARCHAR, num INT, t TENSOR(FLOAT, 4)); +CREATE TABLE tensor_maxsim (title VARCHAR, num INT, t TENSOR(FLOAT, 4), body VARCHAR); statement ok COPY tensor_maxsim FROM '/var/infinity/test_data/tensor_maxsim.csv' WITH ( DELIMITER ',' ); @@ -11,14 +11,14 @@ COPY tensor_maxsim FROM '/var/infinity/test_data/tensor_maxsim.csv' WITH ( DELIM query I SELECT * FROM tensor_maxsim; ---- -test00 2 [0.1,0.2,0.3,-0.2],[0.3,-0.2,0.2,0.3] -test11 4 [0.2,0.1,0.3,0.4] -test22 6 [0.3,0.2,-11.1,0.4],[0.4,0.3,0.2,-88.5],[0.1,-0.4,9.4,0.3] -test33 8 [0.4,0.3,0.2,0.1] -test44 12 [0.1,0.2,0.3,-0.2],[0.3,-0.2,0.2,0.3] -test55 14 [0.2,0.1,0.3,0.4],[-0.4,0.3,0.2,0.1],[0,0.2,-0.3,-0.2],[0.3,-0.2,0.2,0.3] -test66 16 [0.3,0.2,0.1,0.4],[0.4,0.3,0.2,-88.5],[0.1,-0.4,9.4,0.3],[0.3,0.2,0.1,0.4],[0.4,0.3,0.2,-88.5],[0.1,-0.4,9.4,0.3] -test77 18 [0.4,-0.3,0.2,0.1],[-0.4,0.3,0.2,0.1] +test00 2 [0.1,0.2,0.3,-0.2],[0.3,-0.2,0.2,0.3] this +test11 4 [0.2,0.1,0.3,0.4] tree +test22 6 [0.3,0.2,-11.1,0.4],[0.4,0.3,0.2,-88.5],[0.1,-0.4,9.4,0.3] tell off +test33 8 [0.4,0.3,0.2,0.1] that +test44 12 [0.1,0.2,0.3,-0.2],[0.3,-0.2,0.2,0.3] time off +test55 14 [0.2,0.1,0.3,0.4],[-0.4,0.3,0.2,0.1],[0,0.2,-0.3,-0.2],[0.3,-0.2,0.2,0.3] where +test66 16 [0.3,0.2,0.1,0.4],[0.4,0.3,0.2,-88.5],[0.1,-0.4,9.4,0.3],[0.3,0.2,0.1,0.4],[0.4,0.3,0.2,-88.5],[0.1,-0.4,9.4,0.3] on +test77 18 [0.4,-0.3,0.2,0.1],[-0.4,0.3,0.2,0.1] off query I EXPLAIN SELECT title, SCORE() FROM tensor_maxsim SEARCH MATCH TENSOR (t, [0.0, -10.0, 0.0, 0.7, 9.2, 45.6, -55.8, 3.5], 'float', 'maxsim') WHERE 10 > num;