Skip to content

Commit

Permalink
Group by 1 (#2488)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Add group by in psql.
Add some test case.

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
- [x] Test cases
  • Loading branch information
small-turtle-1 authored Jan 22, 2025
1 parent d503d83 commit ef36c4f
Show file tree
Hide file tree
Showing 15 changed files with 784 additions and 506 deletions.
176 changes: 97 additions & 79 deletions src/executor/hash_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,109 +16,127 @@ module;

#include <string>
import stl;

import logical_type;
import column_vector;

import status;
import infinity_exception;
import third_party;
import internal_types;

module hash_table;

#if 0
namespace infinity {

void HashTable::Init(const Vector<DataType> &types) {
types_ = types;
SizeT type_count = types.size();

void HashTableBase::Init(Vector<SharedPtr<DataType>> types) {
types_ = std::move(types);
SizeT key_size = 0;
SizeT type_count = types_.size();
for (SizeT idx = 0; idx < type_count; ++idx) {
const DataType &data_type = types[idx];
const DataType &data_type = *types_[idx];
switch (data_type.type()) {
case kBoolean:
case kTinyInt:
case kSmallInt:
case kInteger:
case kBigInt:
case kHugeInt:
case kFloat:
case kDouble:
case kDecimal:
case kVarchar:
case kDate:
case kTime:
case kDateTime:
case kMixed: {
case LogicalType::kBoolean:
case LogicalType::kTinyInt:
case LogicalType::kSmallInt:
case LogicalType::kInteger:
case LogicalType::kBigInt:
case LogicalType::kFloat:
case LogicalType::kDouble:
case LogicalType::kDate:
case LogicalType::kTime:
case LogicalType::kDateTime:
case LogicalType::kTimestamp: {
SizeT type_size = data_type.Size();
key_size += type_size;
break; // All these type can be hashed.
}
case kTimestamp:
case kInterval:
case kArray:
case kTuple:
case kPoint:
case kLine:
case kLineSeg:
case kBox:
// case kPath:
// case kPolygon:
case kCircle:
// case kBitmap:
case kUuid:
// case kBlob:
case kEmbedding:
case kRowID:
case kNull:
case kMissing:
case kInvalid: {
case LogicalType::kVarchar: {
key_size = 0;
break; // Varchar can be hashed.
}
default: {
RecoverableError(Status::NotSupport(fmt::format("Attempt to construct hash key for type: {}", data_type.ToString())));
}
}

SizeT type_size = data_type.Size();
key_size_ += type_size;
if (key_size == 0) {
break;
}
}

// Key layout: col1\0col2\0col3\0.
key_size_ += type_count;
if (key_size) {
// Key layout: col1\0col2\0col3\0.
key_size += type_count;
}
key_size_ = key_size;
}

void HashTable::Append(const Vector<SharedPtr<ColumnVector>> &columns, SizeT block_id, SizeT row_count) {
UniquePtr<char[]> hash_key = MakeUnique<char[]>(key_size_);
void HashTableBase::GetHashKey(const Vector<SharedPtr<ColumnVector>> &columns, SizeT row_id, String &hash_key) const {
SizeT column_count = columns.size();
for (SizeT row_id = 0; row_id < row_count; ++row_id) {
std::memset(hash_key.get(), 0, key_size_);
SizeT offset = 0;
hash_key.clear();
bool has_null = false;
for (SizeT column_id = 0; column_id < column_count; ++column_id) {
if (!columns[column_id]->nulls_ptr_->IsTrue(row_id)) {
hash_key += "\0\0";
has_null = true;
continue;
}

for (SizeT column_id = 0; column_id < column_count; ++column_id) {
char *target_ptr = hash_key.get() + offset;
if (!columns[column_id]->nulls_ptr_->IsTrue(row_id)) {
*(target_ptr) = '\0';
offset += 2;
continue;
}
const DataType &data_type = *types_[column_id];

DataType &data_type = types_[column_id];
if (data_type.type() == kMixed) {
// Only float/boolean/integer/string can be built as hash key. Array/Tuple will be treated as null
RecoverableError(Status::NotSupport("Attempt to construct hash key for heterogeneous type"));
}
if (data_type.type() == LogicalType::kVarchar) {
Span<const char> text = columns[column_id]->GetVarchar(row_id);
hash_key.append(text.begin(), text.end());
} else {
SizeT type_size = types_[column_id]->Size();
Span<const char> binary(reinterpret_cast<const char *>(columns[column_id]->data() + type_size * row_id), type_size);
hash_key.append(binary.begin(), binary.end());
}
hash_key += '\0';
}
if (!has_null && key_size_ && hash_key.size() != key_size_) {
UnrecoverableError(fmt::format("Hash key size mismatch: {} vs {}", hash_key.size(), key_size_));
}
}

if (data_type.type() == kVarchar) {
VarcharT *vchar_ptr = &((VarcharT *)(columns[column_id]->data_ptr_))[row_id];
if (vchar_ptr->IsInlined()) {
std::memcpy(target_ptr, vchar_ptr->prefix, vchar_ptr->length);
} else {
std::memcpy(target_ptr, vchar_ptr->ptr, vchar_ptr->length);
}
offset += (vchar_ptr->length + 1);
} else {
SizeT type_size = types_[column_id].Size();
std::memcpy(target_ptr, columns[column_id]->data_ptr_ + type_size * row_id, type_size);
offset += (type_size + 1);
}
void HashTable::Append(const Vector<SharedPtr<ColumnVector>> &columns, SizeT block_id, SizeT row_count) {
String hash_key;
if (key_size_) {
hash_key.reserve(key_size_);
}
for (SizeT row_id = 0; row_id < row_count; ++row_id) {
GetHashKey(columns, row_id, hash_key);
hash_table_[std::move(hash_key)][block_id].emplace_back(row_id);
}
}

void MergeHashTable::Append(const Vector<SharedPtr<ColumnVector>> &columns, SizeT block_id, SizeT row_count) {
String hash_key;
if (key_size_) {
hash_key.reserve(key_size_);
}
for (SizeT row_id = 0; row_id < row_count; ++row_id) {
GetHashKey(columns, row_id, hash_key);
if (auto iter = hash_table_.find(hash_key); iter != hash_table_.end()) {
UnrecoverableError("Duplicate key in merge hash table");
} else {
hash_table_.emplace_hint(iter, std::move(hash_key), Pair<SizeT, SizeT>(block_id, row_id));
}
}
}

String key(hash_key.get(), key_size_);
hash_table_[key][block_id].emplace_back(row_id);
bool MergeHashTable::GetOrInsert(const Vector<SharedPtr<ColumnVector>> &columns, SizeT row_id, Pair<SizeT, SizeT> &block_row_id) {
String hash_key;
if (key_size_) {
hash_key.reserve(key_size_);
}
GetHashKey(columns, row_id, hash_key);
auto iter = hash_table_.find(hash_key);
if (iter == hash_table_.end()) {
hash_table_.emplace_hint(iter, std::move(hash_key), block_row_id);
return false;
}
block_row_id = iter->second;
return true;
}

} // namespace infinity
#endif
} // namespace infinity
27 changes: 23 additions & 4 deletions src/executor/hash_table.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,37 @@ import data_type;

namespace infinity {

export class HashTable {
class HashTableBase {
public:
void Init(const Vector<DataType> &types);
bool Initialized() const { return !types_.empty(); }

void Append(const Vector<SharedPtr<ColumnVector>> &columns, SizeT block_id, SizeT row_count);
void Init(Vector<SharedPtr<DataType>> types);

void GetHashKey(const Vector<SharedPtr<ColumnVector>> &columns, SizeT row_id, String &hash_key) const;

public:
Vector<DataType> types_{};
Vector<SharedPtr<DataType>> types_{};
SizeT key_size_{};
};

export class HashTable : public HashTableBase {
public:
void Append(const Vector<SharedPtr<ColumnVector>> &columns, SizeT block_id, SizeT row_count);

public:
// Key -> (block id -> row array)
HashMap<String, HashMap<SizeT, Vector<SizeT>>> hash_table_{};
};

export class MergeHashTable : public HashTableBase {
public:
void Append(const Vector<SharedPtr<ColumnVector>> &columns, SizeT block_id, SizeT row_count);

bool GetOrInsert(const Vector<SharedPtr<ColumnVector>> &columns, SizeT row_id, Pair<SizeT, SizeT> &block_row_id);

public:
// Key -> (block id, row id)
HashMap<String, Pair<SizeT, SizeT>> hash_table_{};
};

} // namespace infinity
Loading

0 comments on commit ef36c4f

Please sign in to comment.