Skip to content

Commit bbde4c9

Browse files
authored
Make external sort based offline index building work (infiniflow#812)
### What problem does this PR solve? Previously, offline index building works the same as the near real time index building which is either time inefficient and could take much memory. Issue link:infiniflow#633 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [x] Test cases
1 parent 7a62226 commit bbde4c9

File tree

8 files changed

+130
-51
lines changed

8 files changed

+130
-51
lines changed

src/storage/invertedindex/column_inverter.cpp

+15-13
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ module;
1919
#include <cstdio>
2020
#include <cstring>
2121
#include <vector>
22-
2322
module column_inverter;
2423
import stl;
2524
import analyzer;
@@ -200,32 +199,37 @@ void ColumnInverter::GeneratePosting() {
200199
}
201200
}
202201

202+
void ColumnInverter::SortForOfflineDump() {
203+
MergePrepare();
204+
Sort();
205+
}
206+
203207
/// Layout of the input of external sort file
204-
// +-----------++----------------++--------------------++--------------------------++-------------------------------------------------------+
205-
// | || || || || |
206-
// | Count || Size of A Run || Num of records || Position of Next Run || Data of a Run |
207-
// | || || within a Run || || |
208-
// +-----------++----------------++--------------------++--------------------------++-------------------------------------------------------+
209-
// ----------------------------------------------------------------------------------------------------------------------------+
208+
// +-----------+ +----------------++--------------------++--------------------------++-------------------------------------------------------+
209+
// | | | || || || |
210+
// | Count | | Size of A Run || Num of records || Position of Next Run || Data of a Run |
211+
// | | | || within a Run || || |
212+
// +-----------+ +----------------++--------------------++--------------------------++-------------------------------------------------------+
213+
// ----------------------------------------------------------------------------------------------------------------------------+
210214
// Data within each group
211215
void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count) {
212216
// spill sort results for external merge sort
217+
218+
// size of this Run in bytes
213219
u32 data_size = 0;
214220
u32 data_size_pos = ftell(spill_file);
215221
fwrite(&data_size, sizeof(u32), 1, spill_file);
216222
// number of tuples
217223
u32 num_of_tuples = positions_.size();
218224
tuple_count += num_of_tuples;
219225
fwrite(&num_of_tuples, sizeof(u32), 1, spill_file);
220-
221226
// start offset for next spill
222227
u64 next_start_offset = 0;
223228
u64 next_start_offset_pos = ftell(spill_file);
224229
fwrite(&next_start_offset, sizeof(u64), 1, spill_file);
225-
226230
u32 data_start_offset = ftell(spill_file);
227231
// sorted data
228-
u32 last_term_num = 0;
232+
u32 last_term_num = std::numeric_limits<u32>::max();
229233
StringRef term;
230234
u16 record_length = 0;
231235
char str_null = '\0';
@@ -246,9 +250,7 @@ void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count) {
246250
next_start_offset = ftell(spill_file);
247251
data_size = next_start_offset - data_start_offset;
248252
fseek(spill_file, data_size_pos, SEEK_SET);
249-
fwrite(&data_size, sizeof(u32), 1, spill_file);
250-
251-
// update offset for next spill
253+
fwrite(&data_size, sizeof(u32), 1, spill_file); // update offset for next spill
252254
fseek(spill_file, next_start_offset_pos, SEEK_SET);
253255
fwrite(&next_start_offset, sizeof(u64), 1, spill_file);
254256
fseek(spill_file, next_start_offset, SEEK_SET);

src/storage/invertedindex/column_inverter.cppm

+2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public:
4343

4444
void InvertColumn(u32 doc_id, const String &val);
4545

46+
void SortForOfflineDump();
47+
4648
void Merge(ColumnInverter &rhs);
4749

4850
static void Merge(Vector<SharedPtr<ColumnInverter>> &inverters);

src/storage/invertedindex/common/external_sort_merger.cppm

+3-2
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ export struct TermTuple {
5353
u32 term_pos_;
5454
int Compare(const TermTuple &rhs) const {
5555
int ret = term_.compare(rhs.term_);
56+
5657
if (ret == 0) {
5758
if (doc_id_ != rhs.doc_id_) {
5859
if (doc_id_ < rhs.doc_id_)
@@ -74,9 +75,9 @@ export struct TermTuple {
7475

7576
bool operator==(const TermTuple &other) const { return Compare(other) == 0; }
7677

77-
bool operator>(const TermTuple &other) const { return Compare(other) < 0; }
78+
bool operator>(const TermTuple &other) const { return Compare(other) > 0; }
7879

79-
bool operator<(const TermTuple &other) const { return Compare(other) > 0; }
80+
bool operator<(const TermTuple &other) const { return Compare(other) < 0; }
8081

8182
TermTuple(char *p, u16 len) : term_(p, len - sizeof(doc_id_) - sizeof(term_pos_) - 1) {
8283
doc_id_ = *((u32 *)(p + term_.size() + 1));

src/storage/invertedindex/index_defines.cppm

+2-2
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ export {
4949
constexpr u32 MAX_UNCOMPRESSED_SKIP_LIST_SIZE = 10;
5050
constexpr u8 SKIP_LIST_BUFFER_SIZE = 32;
5151

52-
constexpr const char *DICT_SUFFIX = "dic";
53-
constexpr const char *POSTING_SUFFIX = "pos";
52+
constexpr const char *DICT_SUFFIX = ".dic";
53+
constexpr const char *POSTING_SUFFIX = ".pos";
5454

5555
using ScoredId = Pair<float, u32>;
5656
using ScoredIds = Vector<ScoredId>;

src/storage/invertedindex/memory_indexer.cpp

+53-29
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ MemoryIndexer::~MemoryIndexer() {
7878
Reset();
7979
}
8080

81-
void MemoryIndexer::Insert(SharedPtr<ColumnVector> column_vector, u32 row_offset, u32 row_count) {
81+
void MemoryIndexer::Insert(SharedPtr<ColumnVector> column_vector, u32 row_offset, u32 row_count, bool offline) {
8282
u64 seq_inserted(0);
8383
u32 doc_count(0);
8484
{
@@ -90,16 +90,38 @@ void MemoryIndexer::Insert(SharedPtr<ColumnVector> column_vector, u32 row_offset
9090
}
9191
auto task = MakeShared<BatchInvertTask>(seq_inserted, column_vector, row_offset, row_count, doc_count);
9292
PostingWriterProvider provider = [this](const String &term) -> SharedPtr<PostingWriter> { return GetOrAddPosting(term); };
93-
auto func = [this, task, provider](int id) {
94-
auto inverter = MakeShared<ColumnInverter>(this->analyzer_, &this->byte_slice_pool_, provider);
95-
inverter->InvertColumn(task->column_vector_, task->row_offset_, task->row_count_, task->start_doc_id_);
96-
this->ring_inverted_.Put(task->task_seq_, inverter);
97-
};
98-
thread_pool_.push(func);
93+
if (offline) {
94+
auto func = [this, task, provider](int id) {
95+
auto inverter = MakeShared<ColumnInverter>(this->analyzer_, &this->byte_slice_pool_, provider);
96+
inverter->InvertColumn(task->column_vector_, task->row_offset_, task->row_count_, task->start_doc_id_);
97+
inverter->SortForOfflineDump();
98+
this->ring_sorted_.Put(task->task_seq_, inverter);
99+
};
100+
thread_pool_.push(func);
101+
} else {
102+
auto func = [this, task, provider](int id) {
103+
auto inverter = MakeShared<ColumnInverter>(this->analyzer_, &this->byte_slice_pool_, provider);
104+
inverter->InvertColumn(task->column_vector_, task->row_offset_, task->row_count_, task->start_doc_id_);
105+
this->ring_inverted_.Put(task->task_seq_, inverter);
106+
};
107+
thread_pool_.push(func);
108+
}
99109
}
100110

101-
void MemoryIndexer::Commit() {
102-
thread_pool_.push([this](int id) { this->CommitSync(); });
111+
void MemoryIndexer::Commit(bool offline) {
112+
if (offline) {
113+
if (nullptr == spill_file_handle_) {
114+
PrepareSpillFile();
115+
}
116+
while (inflight_tasks_ != 0) {
117+
this->ring_sorted_.Iterate([this](SharedPtr<ColumnInverter> &inverter) {
118+
inverter->SpillSortResults(this->spill_file_handle_, this->tuple_count_);
119+
inflight_tasks_--;
120+
num_runs_++;
121+
});
122+
}
123+
} else
124+
thread_pool_.push([this](int id) { this->CommitSync(); });
103125
}
104126

105127
SizeT MemoryIndexer::CommitSync() {
@@ -122,12 +144,17 @@ SizeT MemoryIndexer::CommitSync() {
122144
return num;
123145
}
124146

125-
void MemoryIndexer::Dump() {
147+
void MemoryIndexer::Dump(bool offline) {
148+
if (offline) {
149+
Commit(true);
150+
OfflineDump();
151+
return;
152+
}
153+
126154
while (GetInflightTasks() > 0) {
127155
sleep(1);
128156
CommitSync();
129157
}
130-
131158
Path path = Path(index_dir_) / base_name_;
132159
String index_prefix = path.string();
133160
LocalFileSystem fs;
@@ -191,52 +218,53 @@ void MemoryIndexer::OfflineDump() {
191218
// 2. Generate posting
192219
// 3. Dump disk segment data
193220
FinalSpillFile();
221+
std::cout << "num_runs_ " << num_runs_ << std::endl;
194222
SortMerger<TermTuple, u16> *merger = new SortMerger<TermTuple, u16>(spill_full_path_.c_str(), num_runs_, 100000000, 2);
195223
merger->Run();
196224
delete merger;
197225
FILE *f = fopen(spill_full_path_.c_str(), "r");
198226
u64 count;
199227
fread((char *)&count, sizeof(u64), 1, f);
200-
201-
String index_prefix; /// TODO, to be integrated
228+
Path path = Path(index_dir_) / base_name_;
229+
String index_prefix = path.string();
202230
LocalFileSystem fs;
203231
String posting_file = index_prefix + POSTING_SUFFIX;
204232
SharedPtr<FileWriter> posting_file_writer = MakeShared<FileWriter>(fs, posting_file, 128000);
205233
String dict_file = index_prefix + DICT_SUFFIX;
206234
SharedPtr<FileWriter> dict_file_writer = MakeShared<FileWriter>(fs, dict_file, 128000);
207235
TermMetaDumper term_meta_dumpler((PostingFormatOption(flag_)));
208-
209236
String fst_file = index_prefix + DICT_SUFFIX + ".fst";
210237
std::ofstream ofs(fst_file.c_str(), std::ios::binary | std::ios::trunc);
211238
OstreamWriter wtr(ofs);
212239
FstBuilder builder(wtr);
213240

241+
u16 record_length;
242+
char buf[MAX_TUPLE_LENGTH];
214243
std::string_view last_term;
215244
u32 last_term_pos = 0;
216245
u32 last_doc_id = INVALID_DOCID;
217-
u16 record_length;
218246
Deque<String> term_buffer;
219-
char buf[MAX_TUPLE_LENGTH];
220247
UniquePtr<PostingWriter> posting;
221248
SizeT term_meta_offset = 0;
249+
222250
for (u64 i = 0; i < count; ++i) {
223251
fread(&record_length, sizeof(u16), 1, f);
224252
fread(buf, record_length, 1, f);
225253
TermTuple tuple(buf, record_length);
226254
if (tuple.term_ != last_term) {
227255
if (last_doc_id != INVALID_DOCID) {
228256
posting->EndDocument(last_doc_id, 0);
257+
if (posting.get()) {
258+
TermMeta term_meta(posting->GetDF(), posting->GetTotalTF());
259+
posting->Dump(posting_file_writer, term_meta);
260+
term_meta_dumpler.Dump(dict_file_writer, term_meta);
261+
builder.Insert((u8 *)last_term.data(), last_term.length(), term_meta_offset);
262+
term_meta_offset = dict_file_writer->TotalWrittenBytes();
263+
}
229264
}
230265
term_buffer.push_back(String(tuple.term_));
231266
std::string_view view(term_buffer.back());
232267
last_term.swap(view);
233-
if (posting.get()) {
234-
TermMeta term_meta(posting->GetDF(), posting->GetTotalTF());
235-
posting->Dump(posting_file_writer, term_meta);
236-
term_meta_dumpler.Dump(dict_file_writer, term_meta);
237-
builder.Insert((u8 *)last_term.data(), last_term.length(), term_meta_offset);
238-
term_meta_offset = dict_file_writer->TotalWrittenBytes();
239-
}
240268
posting = MakeUnique<PostingWriter>(&byte_slice_pool_, &buffer_pool_, PostingFormatOption(flag_));
241269
} else if (last_doc_id != tuple.doc_id_) {
242270
posting->EndDocument(last_doc_id, 0);
@@ -247,15 +275,11 @@ void MemoryIndexer::OfflineDump() {
247275
posting->AddPosition(last_term_pos);
248276
}
249277
}
278+
250279
if (last_doc_id != INVALID_DOCID) {
251280
posting->EndDocument(last_doc_id, 0);
252281
}
253-
if (posting->GetDF() > 0) {
254-
TermMeta term_meta(posting->GetDF(), posting->GetTotalTF());
255-
posting->Dump(posting_file_writer, term_meta);
256-
term_meta_dumpler.Dump(dict_file_writer, term_meta);
257-
builder.Insert((u8 *)last_term.data(), last_term.length(), term_meta_offset);
258-
}
282+
posting_file_writer->Sync();
259283
dict_file_writer->Sync();
260284
builder.Finish();
261285
fs.AppendFile(dict_file, fst_file);

src/storage/invertedindex/memory_indexer.cppm

+3-3
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,18 @@ public:
5151
~MemoryIndexer();
5252

5353
// Insert is non-blocking. Caller must ensure there's no RowID gap between each call.
54-
void Insert(SharedPtr<ColumnVector> column_vector, u32 row_offset, u32 row_count);
54+
void Insert(SharedPtr<ColumnVector> column_vector, u32 row_offset, u32 row_count, bool offline = false);
5555

5656
// Commit is non-blocking. There shall be a background thread which call this method regularly (for example, every 2 seconds).
5757
// Other threads can also call this method.
58-
void Commit();
58+
void Commit(bool offline = false);
5959

6060
// CommitSync wait at max 100ms to get a batch of insertions and commit them. Returens the size of the batch.
6161
SizeT CommitSync();
6262

6363
// Dump is blocking and shall be called only once after inserting all documents.
6464
// WARN: Don't reuse MemoryIndexer after calling Dump!
65-
void Dump();
65+
void Dump(bool offline = false);
6666

6767
SizeT GetInflightTasks() {
6868
std::unique_lock<std::mutex> lock(mutex_);

src/unit_test/storage/invertedindex/common/external_sort.cpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,15 @@ class ExternalSortTest : public BaseTest {
124124
while (run_num < 100 || SIZE % run_num != 0)
125125
run_num = rand() % 300;
126126

127+
run_num = 10;
127128
char buffer[200];
128129
for (u32 i = 0; i < run_num; ++i) {
129130
u64 pos = ftell(f);
130131
fseek(f, 2 * sizeof(u32) + sizeof(u64), SEEK_CUR);
131132
u32 s = 0;
132133
for (u32 j = 0; j < SIZE / run_num; ++j) {
133-
str = RandStr();
134+
// str = RandStr();
135+
String str = std::to_string(i * SIZE / run_num + j);
134136
u32 doc_id = 34567; // i * SIZE / run_num + j;
135137
u32 term_pos = i;
136138
memcpy(buffer, str.data(), str.size());
@@ -178,4 +180,4 @@ TEST_F(ExternalSortTest, test1) {
178180
CheckMerger<u32, u8>(10000, 1000000);
179181
}
180182

181-
TEST_F(ExternalSortTest, test2) { CheckTermTuple(10000, 1000000); }
183+
TEST_F(ExternalSortTest, test2) { CheckTermTuple(100, 1000000); }

src/unit_test/storage/invertedindex/memory_indexer.cpp

+48
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,51 @@ TEST_F(MemoryIndexerTest, Insert) {
110110
}
111111
}
112112
}
113+
114+
TEST_F(MemoryIndexerTest, test2) {
115+
// https://en.wikipedia.org/wiki/Finite-state_transducer
116+
const char *paragraphs[] = {
117+
R"#(A finite-state transducer (FST) is a finite-state machine with two memory tapes, following the terminology for Turing machines: an input tape and an output tape. This contrasts with an ordinary finite-state automaton, which has a single tape. An FST is a type of finite-state automaton (FSA) that maps between two sets of symbols.[1] An FST is more general than an FSA. An FSA defines a formal language by defining a set of accepted strings, while an FST defines a relation between sets of strings.)#",
118+
R"#(An FST will read a set of strings on the input tape and generates a set of relations on the output tape. An FST can be thought of as a translator or relater between strings in a set.)#",
119+
R"#(In morphological parsing, an example would be inputting a string of letters into the FST, the FST would then output a string of morphemes.)#",
120+
R"#(An automaton can be said to recognize a string if we view the content of its tape as input. In other words, the automaton computes a function that maps strings into the set {0,1}. Alternatively, we can say that an automaton generates strings, which means viewing its tape as an output tape. On this view, the automaton generates a formal language, which is a set of strings. The two views of automata are equivalent: the function that the automaton computes is precisely the indicator function of the set of strings it generates. The class of languages generated by finite automata is known as the class of regular languages.)#",
121+
R"#(The two tapes of a transducer are typically viewed as an input tape and an output tape. On this view, a transducer is said to transduce (i.e., translate) the contents of its input tape to its output tape, by accepting a string on its input tape and generating another string on its output tape. It may do so nondeterministically and it may produce more than one output for each input string. A transducer may also produce no output for a given input string, in which case it is said to reject the input. In general, a transducer computes a relation between two formal languages.)#",
122+
};
123+
const SizeT num_paragraph = sizeof(paragraphs) / sizeof(char *);
124+
SharedPtr<ColumnVector> column = ColumnVector::Make(MakeShared<DataType>(LogicalType::kVarchar));
125+
column->Initialize();
126+
for (SizeT i = 0; i < num_paragraph; ++i) {
127+
Value v = Value::MakeVarchar(String(paragraphs[i]));
128+
column->AppendValue(v);
129+
}
130+
Vector<ExpectedPosting> expected_postings = {{"fst", {0, 1, 2}, {4, 2, 2}}, {"automaton", {0, 3}, {2, 5}}, {"transducer", {0, 4}, {1, 4}}};
131+
132+
MemoryIndexer
133+
indexer1("/tmp/infinity/fulltext_tbl1_col1", "chunk1", RowID(0U, 0U), flag_, "standard", byte_slice_pool_, buffer_pool_, thread_pool_);
134+
indexer1.Insert(column, 0, 2, true);
135+
indexer1.Insert(column, 2, 2, true);
136+
indexer1.Insert(column, 4, 1, true);
137+
indexer1.Dump(true);
138+
139+
ColumnIndexReader reader;
140+
reader.Open("/tmp/infinity/fulltext_tbl1_col1", Vector<String>{"chunk1"}, Vector<RowID>{RowID(0U, 0U)}, flag_, nullptr);
141+
for (SizeT i = 0; i < expected_postings.size(); ++i) {
142+
const ExpectedPosting &expected = expected_postings[i];
143+
const String &term = expected.term;
144+
145+
UniquePtr<PostingIterator> post_iter(reader.Lookup(term, &byte_slice_pool_));
146+
ASSERT_TRUE(post_iter != nullptr);
147+
148+
RowID doc_id = INVALID_ROWID;
149+
for (SizeT j = 0; j < expected.doc_ids.size(); ++j) {
150+
doc_id = post_iter->SeekDoc(expected.doc_ids[j]);
151+
ASSERT_EQ(doc_id, expected.doc_ids[j]);
152+
u32 tf = post_iter->GetCurrentTF();
153+
ASSERT_EQ(tf, expected.tfs[j]);
154+
}
155+
if (doc_id != INVALID_ROWID) {
156+
doc_id = post_iter->SeekDoc(doc_id + 1);
157+
ASSERT_EQ(doc_id, INVALID_ROWID);
158+
}
159+
}
160+
}

0 commit comments

Comments
 (0)