Skip to content

Commit

Permalink
Initial support for tensor data type (#1205)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Add initial support for tensor data type

Issue link:#1179

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
- [x] Breaking Change (fix or feature that could cause existing
functionality not to work as expected)
- [x] Refactoring
- [x] Test cases
  • Loading branch information
yangzq50 authored May 14, 2024
1 parent 74a6ba7 commit 8d0f8c8
Show file tree
Hide file tree
Showing 48 changed files with 3,389 additions and 3,107 deletions.
4 changes: 3 additions & 1 deletion src/bin/infinity_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ void SignalHandler(int signal_number, siginfo_t *, void *) {
switch (signal_number) {
case SIGUSR1: {
fmt::print("Unrecoverable error issued, stop the server");
exit(-1);
break;
}
case SIGINT:
case SIGQUIT:
Expand All @@ -115,7 +117,7 @@ void SignalHandler(int signal_number, siginfo_t *, void *) {
case SIGSEGV: {
// Print back strace
infinity::PrintStacktrace("SEGMENT FAULTS");
exit(0);
exit(-1);
break;
}
#ifdef ENABLE_JEMALLOC_PROF
Expand Down
1 change: 1 addition & 0 deletions src/common/default_values.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export {
constexpr u64 MAX_VECTOR_CHUNK_COUNT = std::numeric_limits<u64>::max();
// Each row has one chunk.
constexpr u64 DEFAULT_FIXLEN_CHUNK_SIZE = 65536L;
constexpr u64 DEFAULT_FIXLEN_TENSOR_CHUNK_SIZE = 512UL * 1024UL * 4UL;

// segment related constants
constexpr SizeT DEFAULT_SEGMENT_CAPACITY = 1024 * 8192; // 1024 * 8192 = 8M rows
Expand Down
77 changes: 77 additions & 0 deletions src/executor/operator/physical_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import column_def;
import constant_expr;
import wal_entry;

import value;
import catalog;
import catalog_delta_entry;
import build_fast_rough_filter_task;
Expand Down Expand Up @@ -518,6 +519,44 @@ void PhysicalImport::CSVRowHandler(void *context) {
parser_context->block_entry_ = std::move(block_entry);
}

template <typename T>
void AppendJsonTensorToColumn(const nlohmann::json &line_json,
const String &column_name,
ColumnVector &column_vector,
EmbeddingInfo *embedding_info) {
Vector<T> &&embedding = line_json[column_name].get<Vector<T>>();
if (embedding.size() % embedding_info->Dimension() != 0) {
RecoverableError(Status::ImportFileFormatError(
fmt::format("Tensor element count {} isn't multiple of dimension {}.", embedding.size(), embedding_info->Dimension())));
}
const auto input_bytes = embedding.size() * sizeof(T);
const Value embedding_value =
Value::MakeTensor(reinterpret_cast<const_ptr_t>(embedding.data()), input_bytes, column_vector.data_type()->type_info());
column_vector.AppendValue(embedding_value);
}

template <>
void AppendJsonTensorToColumn<bool>(const nlohmann::json &line_json,
const String &column_name,
ColumnVector &column_vector,
EmbeddingInfo *embedding_info) {
Vector<float> &&embedding = line_json[column_name].get<Vector<float>>();
if (embedding.size() % embedding_info->Dimension() != 0) {
RecoverableError(Status::ImportFileFormatError(
fmt::format("Tensor element count {} isn't multiple of dimension {}.", embedding.size(), embedding_info->Dimension())));
}
const auto input_bytes = (embedding.size() + 7) / 8;
auto input_data = MakeUnique<u8[]>(input_bytes);
for (SizeT i = 0; i < embedding.size(); ++i) {
if (embedding[i] > 0) {
input_data[i / 8] |= u8(1) << (i % 8);
}
}
const Value embedding_value =
Value::MakeTensor(reinterpret_cast<const_ptr_t>(input_data.get()), input_bytes, column_vector.data_type()->type_info());
column_vector.AppendValue(embedding_value);
}

void PhysicalImport::JSONLRowHandler(const nlohmann::json &line_json, Vector<ColumnVector> &column_vectors) {
for (SizeT i = 0; auto &column_vector : column_vectors) {
const ColumnDef *column_def = table_entry_->GetColumnDefByID(i++);
Expand Down Expand Up @@ -604,6 +643,44 @@ void PhysicalImport::JSONLRowHandler(const nlohmann::json &line_json, Vector<Col
}
break;
}
case kTensor: {
auto embedding_info = static_cast<EmbeddingInfo *>(column_vector.data_type()->type_info().get());
// SizeT dim = embedding_info->Dimension();
switch (embedding_info->Type()) {
case kElemBit: {
AppendJsonTensorToColumn<bool>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
case kElemInt8: {
AppendJsonTensorToColumn<i8>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
case kElemInt16: {
AppendJsonTensorToColumn<i16>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
case kElemInt32: {
AppendJsonTensorToColumn<i32>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
case kElemInt64: {
AppendJsonTensorToColumn<i64>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
case kElemFloat: {
AppendJsonTensorToColumn<float>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
case kElemDouble: {
AppendJsonTensorToColumn<double>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
default: {
UnrecoverableError("Not implement: Embedding type.");
}
}
break;
}
default: {
UnrecoverableError("Not implement: Invalid data type.");
}
Expand Down
14 changes: 9 additions & 5 deletions src/function/aggregate_function_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import stl;
import base_expression;
import aggregate_function;
import cast_table;

import logger;
import status;
import infinity_exception;

Expand Down Expand Up @@ -54,23 +54,27 @@ AggregateFunction AggregateFunctionSet::GetMostMatchFunction(const SharedPtr<Bas
if (candidates_index.empty()) {
// No matched function
std::stringstream ss;
ss << "Can't find matched function for " << FunctionSet::ToString(name_, {input_argument}) << std::endl;
String function_str = FunctionSet::ToString(name_, {input_argument});
ss << "Can't find matched function for " << function_str << std::endl;
ss << "Candidate functions: " << std::endl;
for (auto &function : functions_) {
ss << function.ToString() << std::endl;
}
UnrecoverableError(ss.str());
LOG_ERROR(ss.str());
RecoverableError(Status::FunctionNotFound(function_str));
}

if (candidates_index.size() > 1) {
// multiple functions matched
std::stringstream ss;
ss << "Multiple matched functions of " << FunctionSet::ToString(name_, {input_argument}) << std::endl;
String function_str = FunctionSet::ToString(name_, {input_argument});
ss << "Multiple matched functions of " << function_str << std::endl;
ss << "Matched candidate functions: " << std::endl;
for (auto index : candidates_index) {
ss << functions_[index].ToString() << std::endl;
}
UnrecoverableError(ss.str());
LOG_ERROR(ss.str());
RecoverableError(Status::FunctionNotFound(function_str));
}

return functions_[candidates_index[0]];
Expand Down
171 changes: 171 additions & 0 deletions src/function/cast/embedding_cast.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import internal_types;
import embedding_info;
import knn_expr;
import data_type;
import default_values;

namespace infinity {

Expand All @@ -46,6 +47,9 @@ export inline BoundCastFunc BindEmbeddingCast(const DataType &source, const Data
if (source.type() == LogicalType::kEmbedding && target.type() == LogicalType::kVarchar) {
return BoundCastFunc(&ColumnVectorCast::TryCastColumnVectorToVarlenWithType<EmbeddingT, VarcharT, EmbeddingTryCastToVarlen>);
}
if (source.type() == LogicalType::kEmbedding && target.type() == LogicalType::kTensor) {
return BoundCastFunc(&ColumnVectorCast::TryCastColumnVectorToVarlenWithType<EmbeddingT, TensorT, EmbeddingTryCastToVarlen>);
}

if (source.type() != LogicalType::kEmbedding || target.type() != LogicalType::kEmbedding) {
RecoverableError(Status::NotSupportedTypeConversion(source.ToString(), target.ToString()));
Expand Down Expand Up @@ -84,6 +88,9 @@ export inline BoundCastFunc BindEmbeddingCast(const DataType &source, const Data
template <typename SourceElemType>
inline BoundCastFunc BindEmbeddingCast(const EmbeddingInfo *target) {
switch (target->Type()) {
case EmbeddingDataType::kElemBit: {
return BoundCastFunc(&ColumnVectorCast::TryCastColumnVectorEmbedding<SourceElemType, BooleanT, EmbeddingTryCastToFixlen>);
}
case EmbeddingDataType::kElemInt8: {
return BoundCastFunc(&ColumnVectorCast::TryCastColumnVectorEmbedding<SourceElemType, TinyIntT, EmbeddingTryCastToFixlen>);
}
Expand Down Expand Up @@ -112,6 +119,16 @@ inline BoundCastFunc BindEmbeddingCast(const EmbeddingInfo *target) {
struct EmbeddingTryCastToFixlen {
template <typename SourceElemType, typename TargetElemType>
static inline bool Run(const SourceElemType *source, TargetElemType *target, SizeT len) {
if constexpr (std::is_same_v<TargetElemType, bool>) {
auto *dst = reinterpret_cast<u8 *>(target);
std::fill_n(dst, (len + 7) / 8, 0);
for (SizeT i = 0; i < len; ++i) {
if (source[i] > SourceElemType{}) {
dst[i / 8] |= (u8(1) << (i % 8));
}
}
return true;
}
if constexpr (std::is_same<SourceElemType, TinyIntT>() || std::is_same<SourceElemType, SmallIntT>() ||
std::is_same<SourceElemType, IntegerT>() || std::is_same<SourceElemType, BigIntT>()) {
for (SizeT i = 0; i < len; ++i) {
Expand All @@ -130,6 +147,7 @@ struct EmbeddingTryCastToFixlen {
}
UnrecoverableError(
fmt::format("Not support to cast from {} to {}", DataType::TypeToString<SourceElemType>(), DataType::TypeToString<TargetElemType>()));
return false;
}
};

Expand Down Expand Up @@ -178,4 +196,157 @@ inline bool EmbeddingTryCastToVarlen::Run(const EmbeddingT &source,
return true;
}

template <typename TargetValueType, typename SourceValueType>
void EmbeddingTryCastToTensorImpl(const EmbeddingT &source, const SizeT source_embedding_dim, TensorT &target, ColumnVector *target_vector_ptr) {
if constexpr (std::is_same_v<TargetValueType, SourceValueType>) {
const auto source_size =
std::is_same_v<SourceValueType, BooleanT> ? (source_embedding_dim + 7) / 8 : source_embedding_dim * sizeof(SourceValueType);
const auto [chunk_id, chunk_offset] = target_vector_ptr->buffer_->fix_heap_mgr_->AppendToHeap(source.ptr, source_size);
target.chunk_id_ = chunk_id;
target.chunk_offset_ = chunk_offset;
} else if constexpr (std::is_same_v<TargetValueType, BooleanT>) {
static_assert(sizeof(bool) == 1);
const SizeT target_ptr_n = (source_embedding_dim + 7) / 8;
const auto target_size = target_ptr_n;
auto target_tmp_ptr = MakeUnique<u8[]>(target_size);
auto src_ptr = reinterpret_cast<const SourceValueType *>(source.ptr);
for (SizeT i = 0; i < source_embedding_dim; ++i) {
if (src_ptr[i] > SourceValueType{}) {
target_tmp_ptr[i / 8] |= (u8(1) << (i % 8));
}
}
const auto [chunk_id, chunk_offset] =
target_vector_ptr->buffer_->fix_heap_mgr_->AppendToHeap(reinterpret_cast<const char *>(target_tmp_ptr.get()), target_size);
target.chunk_id_ = chunk_id;
target.chunk_offset_ = chunk_offset;
} else {
const auto target_size = source_embedding_dim * sizeof(TargetValueType);
auto target_tmp_ptr = MakeUniqueForOverwrite<TargetValueType[]>(source_embedding_dim);
if (!EmbeddingTryCastToFixlen::Run(reinterpret_cast<const SourceValueType *>(source.ptr),
reinterpret_cast<TargetValueType *>(target_tmp_ptr.get()),
source_embedding_dim)) {
UnrecoverableError(fmt::format("Failed to cast from embedding with type {} to tensor with type {}",
DataType::TypeToString<SourceValueType>(),
DataType::TypeToString<TargetValueType>()));
}
const auto [chunk_id, chunk_offset] =
target_vector_ptr->buffer_->fix_heap_mgr_->AppendToHeap(reinterpret_cast<const char *>(target_tmp_ptr.get()), target_size);
target.chunk_id_ = chunk_id;
target.chunk_offset_ = chunk_offset;
}
}

template <typename TargetValueType>
void EmbeddingTryCastToTensorImpl(const EmbeddingT &source,
const EmbeddingDataType src_type,
const SizeT source_embedding_dim,
TensorT &target,
ColumnVector *target_vector_ptr) {
switch (src_type) {
case EmbeddingDataType::kElemBit: {
EmbeddingTryCastToTensorImpl<TargetValueType, BooleanT>(source, source_embedding_dim, target, target_vector_ptr);
break;
}
case EmbeddingDataType::kElemInt8: {
EmbeddingTryCastToTensorImpl<TargetValueType, TinyIntT>(source, source_embedding_dim, target, target_vector_ptr);
break;
}
case EmbeddingDataType::kElemInt16: {
EmbeddingTryCastToTensorImpl<TargetValueType, SmallIntT>(source, source_embedding_dim, target, target_vector_ptr);
break;
}
case EmbeddingDataType::kElemInt32: {
EmbeddingTryCastToTensorImpl<TargetValueType, IntegerT>(source, source_embedding_dim, target, target_vector_ptr);
break;
}
case EmbeddingDataType::kElemInt64: {
EmbeddingTryCastToTensorImpl<TargetValueType, BigIntT>(source, source_embedding_dim, target, target_vector_ptr);
break;
}
case EmbeddingDataType::kElemFloat: {
EmbeddingTryCastToTensorImpl<TargetValueType, FloatT>(source, source_embedding_dim, target, target_vector_ptr);
break;
}
case EmbeddingDataType::kElemDouble: {
EmbeddingTryCastToTensorImpl<TargetValueType, DoubleT>(source, source_embedding_dim, target, target_vector_ptr);
break;
}
default: {
UnrecoverableError(fmt::format("Can't cast from embedding to tensor with type {}", EmbeddingInfo::EmbeddingDataTypeToString(src_type)));
}
}
}

void EmbeddingTryCastToTensor(const EmbeddingT &source,
const EmbeddingDataType src_type,
const SizeT source_embedding_dim,
TensorT &target,
const EmbeddingDataType dst_type,
ColumnVector *target_vector_ptr) {
switch (dst_type) {
case EmbeddingDataType::kElemBit: {
EmbeddingTryCastToTensorImpl<BooleanT>(source, src_type, source_embedding_dim, target, target_vector_ptr);
break;
}
case EmbeddingDataType::kElemInt8: {
EmbeddingTryCastToTensorImpl<TinyIntT>(source, src_type, source_embedding_dim, target, target_vector_ptr);
break;
}
case EmbeddingDataType::kElemInt16: {
EmbeddingTryCastToTensorImpl<SmallIntT>(source, src_type, source_embedding_dim, target, target_vector_ptr);
break;
}
case EmbeddingDataType::kElemInt32: {
EmbeddingTryCastToTensorImpl<IntegerT>(source, src_type, source_embedding_dim, target, target_vector_ptr);
break;
}
case EmbeddingDataType::kElemInt64: {
EmbeddingTryCastToTensorImpl<BigIntT>(source, src_type, source_embedding_dim, target, target_vector_ptr);
break;
}
case EmbeddingDataType::kElemFloat: {
EmbeddingTryCastToTensorImpl<FloatT>(source, src_type, source_embedding_dim, target, target_vector_ptr);
break;
}
case EmbeddingDataType::kElemDouble: {
EmbeddingTryCastToTensorImpl<DoubleT>(source, src_type, source_embedding_dim, target, target_vector_ptr);
break;
}
default: {
UnrecoverableError(fmt::format("Can't cast from embedding to tensor with type {}", EmbeddingInfo::EmbeddingDataTypeToString(dst_type)));
}
}
}

template <>
inline bool EmbeddingTryCastToVarlen::Run(const EmbeddingT &source,
const DataType &source_type,
TensorT &target,
const DataType &target_type,
ColumnVector *target_vector_ptr) {
if (source_type.type() != LogicalType::kEmbedding) {
UnrecoverableError(fmt::format("Type here is expected as Embedding, but actually it is: {}", source_type.ToString()));
}
const EmbeddingInfo *embedding_info = (EmbeddingInfo *)(source_type.type_info().get());
const EmbeddingInfo *target_embedding_info = (EmbeddingInfo *)(target_type.type_info().get());
LOG_TRACE(fmt::format("EmbeddingInfo Dimension: {}", embedding_info->Dimension()));
const auto source_embedding_dim = embedding_info->Dimension();
const auto target_embedding_dim = target_embedding_info->Dimension();
if (source_embedding_dim % target_embedding_dim != 0) {
RecoverableError(Status::DataTypeMismatch(source_type.ToString(), target_type.ToString()));
}
const auto target_tensor_num = source_embedding_dim / target_embedding_dim;
// estimate the size of target tensor
if (const auto target_tensor_bytes = target_tensor_num * target_embedding_info->Size(); target_tensor_bytes > DEFAULT_FIXLEN_TENSOR_CHUNK_SIZE) {
// TODO: better error message: tensor size overflow
RecoverableError(Status::DataTypeMismatch(source_type.ToString(), target_type.ToString()));
}
target.embedding_num_ = target_tensor_num;
if (target_vector_ptr->buffer_->buffer_type_ != VectorBufferType::kTensorHeap) {
UnrecoverableError(fmt::format("Tensor column vector should use kTensorHeap VectorBuffer."));
}
EmbeddingTryCastToTensor(source, embedding_info->Type(), source_embedding_dim, target, target_embedding_info->Type(), target_vector_ptr);
return true;
}

} // namespace infinity
Loading

0 comments on commit 8d0f8c8

Please sign in to comment.