Skip to content

Commit

Permalink
Support MatchTensor Scan and Reranker on TensorArray column (#1267)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Support MatchTensor Scan and Reranker on TensorArray column

Issue link:#1179

### Type of change

- [x] Refactoring
- [x] Test cases
  • Loading branch information
yangzq50 authored May 31, 2024
1 parent 0b93397 commit 5dca233
Show file tree
Hide file tree
Showing 12 changed files with 710 additions and 249 deletions.
135 changes: 32 additions & 103 deletions src/executor/operator/physical_fusion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import embedding_info;
import block_index;
import block_column_entry;
import mlas_matrix_multiply;
import physical_match_tensor_scan;

namespace infinity {

Expand Down Expand Up @@ -208,28 +209,40 @@ void PhysicalFusion::ExecuteRRF(const Map<u64, Vector<UniquePtr<DataBlock>>> &in
output_data_block_array.push_back(std::move(output_data_block));
}

struct MatchTensorRerankDoc {
RowID row_id_;
u64 from_input_data_block_id_;
u32 from_block_idx_;
u32 from_row_idx_;
float score_ = 0.0f;
MatchTensorRerankDoc(const RowID row_id, const u64 from_input_data_block_id, const u32 from_block_idx, const u32 from_row_idx)
: row_id_(row_id), from_input_data_block_id_(from_input_data_block_id), from_block_idx_(from_block_idx), from_row_idx_(from_row_idx) {}
};

void PhysicalFusion::ExecuteMatchTensor(QueryContext *query_context,
const Map<u64, Vector<UniquePtr<DataBlock>>> &input_data_blocks,
Vector<UniquePtr<DataBlock>> &output_data_block_array) const {
const TableEntry *table_entry = base_table_ref_->table_entry_ptr_;
const BlockIndex *block_index = base_table_ref_->block_index_.get();
u32 topn = 10; // default topn
ColumnID column_id = std::numeric_limits<ColumnID>::max();
const char *query_tensor_ptr = fusion_expr_->match_tensor_expr_->query_embedding_.ptr;
const u32 column_unit_embedding_dimension = fusion_expr_->match_tensor_expr_->tensor_basic_embedding_dimension_;
const u32 query_tensor_embedding_num = fusion_expr_->match_tensor_expr_->num_of_embedding_in_query_tensor_;
const EmbeddingDataType query_tensor_data_type = fusion_expr_->match_tensor_expr_->embedding_data_type_;
// prepare query info
const TableEntry *table_entry = base_table_ref_->table_entry_ptr_;
const ColumnID column_id = fusion_expr_->match_tensor_expr_->column_expr_->binding().column_idx;
const ColumnDef *column_def = table_entry->GetColumnDefByID(column_id);
const DataType *column_data_type = column_def->type().get();
switch (column_data_type->type()) {
case LogicalType::kTensor:
case LogicalType::kTensorArray: {
break;
}
default: {
const auto error_info = fmt::format("Fusion MatchTensor column_name {} is not a Tensor or TensorArray column. column type is : {}.",
fusion_expr_->match_tensor_expr_->column_expr_->column_name(),
column_data_type->ToString());
LOG_ERROR(error_info);
RecoverableError(Status::NotSupport(error_info));
break;
}
}
const EmbeddingInfo *column_embedding_info = static_cast<const EmbeddingInfo *>(column_data_type->type_info().get());
if (fusion_expr_->match_tensor_expr_->tensor_basic_embedding_dimension_ != column_embedding_info->Dimension()) {
UnrecoverableError("Dimension of column and query tensor mismatch!");
}
// validate match_method
if (fusion_expr_->match_tensor_expr_->search_method_ == MatchTensorSearchMethod::kInvalid) {
const auto error_info = "Fusion MatchTensor match_method option is invalid.";
LOG_ERROR(error_info);
RecoverableError(Status::NotSupport(error_info));
}
// prepare topn
u32 topn = DEFAULT_MATCH_TENSOR_OPTION_TOP_N;
{
// find topn
if (fusion_expr_->options_.get() != nullptr) {
Expand All @@ -252,59 +265,6 @@ void PhysicalFusion::ExecuteMatchTensor(QueryContext *query_context,
}
}
}
// validate column_name
{
column_id = fusion_expr_->match_tensor_expr_->column_expr_->binding().column_idx;
const ColumnDef *column_def = table_entry->GetColumnDefByID(column_id);
const DataType *column_data_type = column_def->type().get();
const EmbeddingInfo *column_embedding_info = nullptr;
switch (column_data_type->type()) {
case LogicalType::kTensor:
case LogicalType::kTensorArray: {
column_embedding_info = static_cast<const EmbeddingInfo *>(column_data_type->type_info().get());
break;
}
default: {
const auto error_info =
fmt::format("Fusion MatchTensor column_name {} is not a Tensor or TensorArray column. column type is : {}.",
fusion_expr_->match_tensor_expr_->column_expr_->column_name(),
column_data_type->ToString());
LOG_ERROR(error_info);
RecoverableError(Status::NotSupport(error_info));
break;
}
}
if (column_unit_embedding_dimension != column_embedding_info->Dimension()) {
UnrecoverableError("Dimension of column and query tensor mismatch!");
}
switch (column_embedding_info->Type()) {
case EmbeddingDataType::kElemFloat: {
break;
}
// TODO: support bit type
default: {
const auto error_info = fmt::format("Fusion MatchTensor target column {} basic element type is unsupported: {}.",
fusion_expr_->match_tensor_expr_->column_expr_->column_name(),
EmbeddingT::EmbeddingDataType2String(column_embedding_info->Type()));
LOG_ERROR(error_info);
RecoverableError(Status::NotSupport(error_info));
break;
}
}
}
// validate tensor_data_type
// TODO: now only support float32 query tensor
if (query_tensor_data_type != EmbeddingDataType::kElemFloat) {
const auto error_info = "Fusion MatchTensor tensor_data_type option is invalid. Now only support float32.";
LOG_ERROR(error_info);
RecoverableError(Status::NotSupport(error_info));
}
// validate match_method
if (fusion_expr_->match_tensor_expr_->search_method_ != MatchTensorSearchMethod::kMaxSim) {
const auto error_info = "Fusion MatchTensor match_method option is invalid. Now only support MaxSim.";
LOG_ERROR(error_info);
RecoverableError(Status::NotSupport(error_info));
}
}
BufferManager *buffer_mgr = query_context->storage()->buffer_manager();
Vector<MatchTensorRerankDoc> rerank_docs;
Expand Down Expand Up @@ -338,38 +298,7 @@ void PhysicalFusion::ExecuteMatchTensor(QueryContext *query_context,
return lhs.row_id_ < rhs.row_id_;
});
// 3. calculate score
// TODO: now only consider MaxSim
for (auto &doc : rerank_docs) {
const RowID row_id = doc.row_id_;
const SegmentID segment_id = row_id.segment_id_;
const SegmentOffset segment_offset = row_id.segment_offset_;
const BlockID block_id = segment_offset / DEFAULT_BLOCK_CAPACITY;
const BlockOffset block_offset = segment_offset % DEFAULT_BLOCK_CAPACITY;
BlockColumnEntry *block_column_entry =
block_index->segment_block_index_.at(segment_id).block_map_.at(block_id)->GetColumnBlockEntry(column_id);
auto column_vec = block_column_entry->GetColumnVector(buffer_mgr);
auto tensor_ptr = reinterpret_cast<const TensorT *>(column_vec.data());
const auto [embedding_num, chunk_id, chunk_offset] = tensor_ptr[block_offset];
const char *tensor_data_ptr = column_vec.buffer_->fix_heap_mgr_->GetRawPtrFromChunk(chunk_id, chunk_offset);
// use mlas
auto output_ptr = MakeUniqueForOverwrite<float[]>(query_tensor_embedding_num * embedding_num);
matrixA_multiply_transpose_matrixB_output_to_C(reinterpret_cast<const float *>(query_tensor_ptr),
reinterpret_cast<const float *>(tensor_data_ptr),
query_tensor_embedding_num,
embedding_num,
column_unit_embedding_dimension,
output_ptr.get());
float maxsim_score = 0.0f;
for (u32 query_i = 0; query_i < query_tensor_embedding_num; ++query_i) {
const float *query_ip_ptr = output_ptr.get() + query_i * embedding_num;
float max_score = std::numeric_limits<float>::lowest();
for (u32 k = 0; k < embedding_num; ++k) {
max_score = std::max(max_score, query_ip_ptr[k]);
}
maxsim_score += max_score;
}
doc.score_ = maxsim_score;
}
CalculateFusionMatchTensorRerankerScores(rerank_docs, buffer_mgr, column_data_type, column_id, block_index, *fusion_expr_->match_tensor_expr_);
// 4. sort by score
std::sort(rerank_docs.begin(), rerank_docs.end(), [](const MatchTensorRerankDoc &lhs, const MatchTensorRerankDoc &rhs) noexcept {
return lhs.score_ > rhs.score_;
Expand Down
10 changes: 10 additions & 0 deletions src/executor/operator/physical_fusion.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,14 @@ private:
SharedPtr<Vector<SharedPtr<DataType>>> output_types_;
};

export struct MatchTensorRerankDoc {
RowID row_id_;
u64 from_input_data_block_id_;
u32 from_block_idx_;
u32 from_row_idx_;
float score_ = 0.0f;
MatchTensorRerankDoc(const RowID row_id, const u64 from_input_data_block_id, const u32 from_block_idx, const u32 from_row_idx)
: row_id_(row_id), from_input_data_block_id_(from_input_data_block_id), from_block_idx_(from_block_idx), from_row_idx_(from_row_idx) {}
};

} // namespace infinity
Loading

0 comments on commit 5dca233

Please sign in to comment.