Skip to content

Commit 65479a4

Browse files
authored
Refactor index and bug fix (infiniflow#794)
### What problem does this PR solve? Remove redundant codes Fix several codec errors Add NoCompressed codec for debug usage Have the MemoryIndexer unit test passed under single thread Issue link:infiniflow#551 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [x] Refactoring - [x] Test cases
1 parent 5150646 commit 65479a4

25 files changed

+136
-217
lines changed

src/storage/invertedindex/format/buffered_byte_slice.cppm

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public:
5757
virtual SizeT EstimateDumpSize() const { return posting_writer_.GetSize(); }
5858

5959
protected:
60-
virtual SizeT DoFlush();
60+
SizeT DoFlush();
6161

6262
protected:
6363
FlushInfo flush_info_;

src/storage/invertedindex/format/buffered_skiplist_writer.cpp

+2-70
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ void BufferedSkipListWriter::AddItem(u32 key, u32 value1, u32 value2) {
2424
last_value1_ = value1;
2525
PushBack(2, value2);
2626
EndPushBack();
27-
2827
if (NeedFlush(SKIP_LIST_BUFFER_SIZE)) {
2928
Flush();
3029
}
@@ -52,74 +51,7 @@ void BufferedSkipListWriter::AddItem(u32 value_delta) {
5251
}
5352
}
5453

55-
SizeT BufferedSkipListWriter::DoFlush() {
56-
u32 flush_size = 0;
57-
u8 size = buffer_.Size();
58-
59-
const PostingValues *posting_values = GetPostingValues();
60-
for (SizeT i = 0; i < posting_values->GetSize(); ++i) {
61-
PostingValue *posting_value = posting_values->GetValue(i);
62-
u8 *buffer = buffer_.GetRow(posting_value->location_);
63-
flush_size += posting_value->Encode(posting_writer_, buffer, size * posting_value->GetSize());
64-
}
65-
return flush_size;
66-
}
67-
68-
void BufferedSkipListWriter::Dump(const SharedPtr<FileWriter> &file) {
69-
posting_writer_.Dump(file);
70-
return;
71-
72-
u32 skiplist_size = GetTotalCount();
73-
if (skiplist_size == 0) {
74-
return;
75-
}
76-
if (GetPostingValues()->GetSize() != 3) {
77-
posting_writer_.Dump(file);
78-
return;
79-
}
80-
81-
const ByteSliceList *skipList = posting_writer_.GetByteSliceList();
82-
ByteSliceListIterator iter(skipList);
83-
u32 start = 0;
84-
const PostingValues *posting_values = GetPostingValues();
85-
for (SizeT i = 0; i < posting_values->GetSize(); ++i) {
86-
PostingValue *posting_value = posting_values->GetValue(i);
87-
u32 len = posting_value->GetSize() * skiplist_size;
88-
if (i > 0) {
89-
// not encode last value after first row
90-
len -= posting_value->GetSize();
91-
}
92-
if (len == 0) {
93-
break;
94-
}
95-
bool ret = iter.SeekSlice(start);
96-
assert(ret);
97-
(void)ret;
98-
while (iter.HasNext(start + len)) {
99-
void *data = nullptr;
100-
SizeT size = 0;
101-
iter.Next(data, size);
102-
assert(data);
103-
assert(size);
104-
file->Write((char *)data, size);
105-
}
106-
start += posting_value->GetSize() * skiplist_size;
107-
}
108-
}
109-
110-
SizeT BufferedSkipListWriter::EstimateDumpSize() const {
111-
u32 skiplist_size = GetTotalCount();
112-
if (skiplist_size == 0) {
113-
return 0;
114-
}
115-
if (GetPostingValues()->GetSize() != 3) {
116-
return posting_writer_.GetSize();
117-
}
54+
void BufferedSkipListWriter::Dump(const SharedPtr<FileWriter> &file) { posting_writer_.Dump(file); }
11855

119-
const PostingValues *posting_values = GetPostingValues();
120-
assert(posting_values->GetSize() == 3);
121-
122-
SizeT opt_size = posting_values->GetValue(1)->GetSize() + posting_values->GetValue(1)->GetSize();
123-
return posting_writer_.GetSize() - opt_size;
124-
}
56+
SizeT BufferedSkipListWriter::EstimateDumpSize() const { return posting_writer_.GetSize(); }
12557
} // namespace infinity

src/storage/invertedindex/format/buffered_skiplist_writer.cppm

-3
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@ public:
2424

2525
SizeT EstimateDumpSize() const override;
2626

27-
protected:
28-
SizeT DoFlush() override;
29-
3027
private:
3128
static const u32 INVALID_LAST_KEY = (u32)-1;
3229

src/storage/invertedindex/format/doc_list_format_option.cppm

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public:
6464
u8 row_count = 0;
6565
u32 offset = 0;
6666
{
67+
// NoCompressPostingValue<u32> *doc_id_value = new NoCompressPostingValue<u32>;
6768
TypedPostingValue<u32> *doc_id_value = new TypedPostingValue<u32>;
6869
doc_id_value->location_ = row_count++;
6970
doc_id_value->offset_ = offset;

src/storage/invertedindex/format/inmem_doc_list_decoder.cpp

+5-8
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ void InMemDocListDecoder::Init(df_t df, SkipListReader *skiplist_reader, Buffere
3434

3535
bool InMemDocListDecoder::DecodeDocBuffer(docid_t start_doc_id,
3636
docid_t *doc_buffer,
37-
docid_t &first_doc_id_,
38-
docid_t &last_doc_id_,
37+
docid_t &first_doc_id,
38+
docid_t &last_doc_id,
3939
ttf_t &current_ttf) {
40-
DocBufferInfo doc_buffer_info(doc_buffer, first_doc_id_, last_doc_id_, current_ttf);
40+
DocBufferInfo doc_buffer_info(doc_buffer, first_doc_id, last_doc_id, current_ttf);
4141
if (skiplist_reader_ == nullptr) {
4242
current_ttf = 0;
4343
return DecodeDocBufferWithoutSkipList(0, 0, start_doc_id, doc_buffer_info);
@@ -47,7 +47,7 @@ bool InMemDocListDecoder::DecodeDocBuffer(docid_t start_doc_id,
4747
u32 record_len;
4848
u32 last_doc_id_in_prev_record;
4949

50-
auto ret = skiplist_reader_->SkipTo((u32)start_doc_id, (u32 &)last_doc_id_, last_doc_id_in_prev_record, offset, record_len);
50+
auto ret = skiplist_reader_->SkipTo((u32)start_doc_id, (u32 &)last_doc_id, last_doc_id_in_prev_record, offset, record_len);
5151
if (!ret) {
5252
// we should decode buffer
5353
last_doc_id_in_prev_record = skiplist_reader_->GetLastKeyInBuffer();
@@ -64,23 +64,20 @@ bool InMemDocListDecoder::DecodeDocBuffer(docid_t start_doc_id,
6464
SizeT acutal_decode_count = 0;
6565
doc_list_reader_.Decode(doc_buffer, MAX_DOC_PER_RECORD, acutal_decode_count);
6666

67-
first_doc_id_ = doc_buffer[0] + last_doc_id_in_prev_record;
67+
first_doc_id = doc_buffer[0] + last_doc_id_in_prev_record;
6868
return true;
6969
}
7070

7171
bool InMemDocListDecoder::DecodeDocBufferWithoutSkipList(docid_t last_doc_id_in_prev_record,
7272
u32 offset,
7373
docid_t start_doc_id,
7474
DocBufferInfo &doc_buffer_info) {
75-
// only decode one time
7675
if (finish_decoded_) {
7776
return false;
7877
}
7978

80-
// TODO: seek return value
8179
doc_list_reader_.Seek(offset);
8280

83-
// short list when no skip
8481
SizeT decode_count;
8582
if (!doc_list_reader_.Decode(doc_buffer_info.doc_buffer_, MAX_DOC_PER_RECORD, decode_count)) {
8683
return false;

src/storage/invertedindex/format/inmem_pair_value_skiplist_reader.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import buffered_byte_slice;
66
import buffered_byte_slice_reader;
77
import pair_value_skiplist_reader;
88
import index_defines;
9+
import logger;
10+
import third_party;
911

1012
module inmem_pair_value_skiplist_reader;
1113

@@ -51,7 +53,7 @@ Pair<int, bool> InMemPairValueSkipListReader::LoadBuffer() {
5153
return MakePair(0, false);
5254
}
5355
if (key_num != value_num) {
54-
// LOG_ERROR(fmt::format("SKipList decode error, doc_num = {} offset_num = {}", key_num, value_num));
56+
LOG_ERROR(fmt::format("SKipList decode error, doc_num = {} offset_num = {}", key_num, value_num));
5557
return MakePair(-1, false);
5658
}
5759
num_in_buffer_ = key_num;

src/storage/invertedindex/format/inmem_pair_value_skiplist_reader.cppm

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@ namespace infinity {
1212

1313
export class InMemPairValueSkipListReader : public PairValueSkipListReader {
1414
public:
15-
InMemPairValueSkipListReader(MemoryPool *sessionPool);
15+
InMemPairValueSkipListReader(MemoryPool *session_pool);
1616

1717
~InMemPairValueSkipListReader();
1818

1919
InMemPairValueSkipListReader(const InMemPairValueSkipListReader &other) = delete;
2020

21-
void Load(BufferedByteSlice *postingBuffer);
21+
void Load(BufferedByteSlice *posting_buffer);
2222

2323
private:
2424
Pair<int, bool> LoadBuffer() override;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
module;
2+
3+
import stl;
4+
import byte_slice_reader;
5+
import byte_slice_writer;
6+
7+
export module no_compress_encoder;
8+
9+
namespace infinity {
10+
11+
export template <typename T>
12+
class NoCompressIntEncoder {
13+
public:
14+
NoCompressIntEncoder() = default;
15+
~NoCompressIntEncoder() = default;
16+
17+
inline u32 Encode(ByteSliceWriter &slice_writer, const T *src, u32 src_len) const;
18+
19+
inline u32 Decode(T *dest, u32 dest_len, ByteSliceReader &slice_reader) const;
20+
};
21+
22+
template <typename T>
23+
u32 NoCompressIntEncoder<T>::Encode(ByteSliceWriter &slice_writer, const T *src, u32 src_len) const {
24+
u32 encode_len = 0;
25+
slice_writer.WriteByte((u8)src_len);
26+
encode_len += sizeof(u8);
27+
slice_writer.Write((const void *)src, src_len * sizeof(T));
28+
return encode_len + src_len * sizeof(T);
29+
}
30+
31+
template <typename T>
32+
u32 NoCompressIntEncoder<T>::Decode(T *dest, u32 dest_len, ByteSliceReader &slice_reader) const {
33+
u32 read_count = slice_reader.ReadByte();
34+
slice_reader.Read((void *)dest, read_count * sizeof(T));
35+
return read_count;
36+
}
37+
38+
} // namespace infinity

src/storage/invertedindex/format/pair_value_skiplist_reader.cppm

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public:
3131
u32 GetCurrentKey() const { return current_key_; }
3232

3333
u32 GetLastValueInBuffer() const override;
34+
3435
u32 GetLastKeyInBuffer() const override;
3536

3637
private:

src/storage/invertedindex/format/pos_list_decoder.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ bool PositionListDecoder::LocateRecord(const InDocPositionState *state, u32 &tf)
225225
record_offset_ = record_offset;
226226
offset_in_record_ = offset_in_record;
227227

228-
ByteSliceList *positionList = pos_list_reader_.GetByteSliceList();
228+
ByteSliceList *position_list = pos_list_reader_.GetByteSliceList();
229229
if (!need_reopen_ && last_decode_offset_ == record_offset_) {
230230
// no need to relocate
231231
return false;
@@ -235,7 +235,7 @@ bool PositionListDecoder::LocateRecord(const InDocPositionState *state, u32 &tf)
235235
if (pos_single_slice_) {
236236
pos_list_reader_.Open(pos_single_slice_);
237237
} else {
238-
pos_list_reader_.Open(positionList);
238+
pos_list_reader_.Open(position_list);
239239
}
240240
last_decode_offset_ = 0;
241241
}

src/storage/invertedindex/format/pos_list_decoder.cppm

-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ public:
2828

2929
void SetNeedReopen(bool need_reopen) { need_reopen_ = need_reopen; }
3030

31-
// TODO: separate TfBitmap CalculateRecordOffset from LocateRecord
3231
virtual bool LocateRecord(const InDocPositionState *state, u32 &tf);
3332

3433
virtual u32 DecodeRecord(pos_t *pos_buffer, u32 pos_buffer_len);

src/storage/invertedindex/format/pos_list_encoder.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ InMemPositionListDecoder *PositionListEncoder::GetInMemPositionListDecoder(Memor
105105
// doclist -> ttf -> pos skiplist -> poslist
106106
ttf_t ttf = total_pos_count_;
107107

108-
// TODO: delete buffer in MemoryPool
109108
InMemPairValueSkipListReader *in_mem_skiplist_reader = nullptr;
110109
if (pos_skiplist_writer_) {
111110
// not support tf bitmap in realtime segment

src/storage/invertedindex/format/posting_value.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ module;
22

33
import stl;
44
import int_encoder;
5+
import no_compress_encoder;
56

67
module posting_value;
78

@@ -10,6 +11,7 @@ namespace infinity {
1011
struct EncoderProvider {
1112
UniquePtr<Int32Encoder> int32_encoder_;
1213
UniquePtr<Int16Encoder> int16_encoder_;
14+
UniquePtr<NoCompressIntEncoder<u32>> no_compress_encoder_;
1315

1416
static EncoderProvider *GetInstance() {
1517
static EncoderProvider instance;
@@ -27,6 +29,12 @@ struct EncoderProvider {
2729
int16_encoder_.reset(new Int16Encoder);
2830
return int16_encoder_.get();
2931
}
32+
33+
NoCompressEncoder *GetNoCompressEncoder() {
34+
if (!no_compress_encoder_.get())
35+
no_compress_encoder_.reset(new NoCompressEncoder);
36+
return no_compress_encoder_.get();
37+
}
3038
};
3139

3240
const Int32Encoder *GetDocIDEncoder() { return EncoderProvider::GetInstance()->GetInt32Encoder(); }

src/storage/invertedindex/format/posting_value.cppm

+18
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import stl;
44
import int_encoder;
55
import byte_slice_reader;
66
import byte_slice_writer;
7+
import no_compress_encoder;
78

89
export module posting_value;
910

@@ -42,6 +43,7 @@ struct ValueTypeTraits<u32> {
4243

4344
export typedef IntEncoder<u32, NewPForDeltaCompressor> Int32Encoder;
4445
export typedef IntEncoder<u16, NewPForDeltaCompressor> Int16Encoder;
46+
export typedef NoCompressIntEncoder<u32> NoCompressEncoder;
4547

4648
template <typename T>
4749
struct EncoderTypeTraits {
@@ -84,6 +86,22 @@ struct TypedPostingValue : public PostingValue {
8486
const Encoder *encoder_;
8587
};
8688

89+
export template <typename T>
90+
struct NoCompressPostingValue : public PostingValue {
91+
92+
SizeT GetSize() const override { return sizeof(T); }
93+
94+
u32 Encode(ByteSliceWriter &slice_writer, const u8 *src, u32 len) const override {
95+
return encoder_->Encode(slice_writer, (const T *)src, len / sizeof(T));
96+
}
97+
98+
u32 Decode(u8 *dest, u32 dest_len, ByteSliceReader &slice_reader) const override {
99+
return encoder_->Decode((T *)dest, dest_len / sizeof(T), slice_reader);
100+
}
101+
102+
const NoCompressEncoder *encoder_;
103+
};
104+
87105
export struct PostingValues {
88106
~PostingValues() {
89107
for (SizeT i = 0; i < values_.size(); ++i) {

src/storage/invertedindex/format/skiplist_writer.cpp

-35
This file was deleted.

0 commit comments

Comments
 (0)