Skip to content

Commit 6f9ab13

Browse files
authored
Fix issues caused by background index optimize (#1141)
Fix PostingWriter::Dump Serialize ChunkIndexEntry::deprecate_ts_ Fix MemIndexRecover crash - [x] Bug Fix (non-breaking change which fixes an issue)
1 parent c30d3c6 commit 6f9ab13

19 files changed

+88
-37
lines changed

benchmark/local_infinity/fulltext/fulltext_benchmark.cpp

+1-3
Original file line numberDiff line numberDiff line change
@@ -301,9 +301,7 @@ void BenchmarkMoreQuery(SharedPtr<Infinity> infinity, const String &db_name, con
301301
profiler.End();
302302
}
303303

304-
int main(int argc, char *argn[]) {
305-
argc = 3;
306-
const char* argv[] = {"fulltext", "--mode", "query"};
304+
int main(int argc, char *argv[]) {
307305
CLI::App app{"fulltext_benchmark"};
308306
// https://github.com/CLIUtils/CLI11/blob/main/examples/enum.cpp
309307
// Using enumerations in an option

python/hello_infinity.py

-7
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import json
1615
import infinity
1716
import infinity.index as index
1817
from infinity.common import REMOTE_HOST
@@ -75,12 +74,6 @@ def test_english():
7574
except Exception as e:
7675
print(str(e))
7776

78-
# Print the JSON data
79-
# print("------json-------")
80-
# print(json_data)
81-
# print("------tabular - querybuilder-------")
82-
# print(qb_result)
83-
8477

8578
def test_chinese():
8679
"""

src/storage/buffer/file_worker/raw_file_worker.cpp

+2-6
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,8 @@ import status;
2525

2626
namespace infinity {
2727

28-
RawFileWorker::RawFileWorker(SharedPtr<String> file_dir, SharedPtr<String> file_name)
29-
: FileWorker(std::move(file_dir), std::move(file_name)), buffer_size_(0) {
30-
LocalFileSystem fs;
31-
String full_path = fs.ConcatenateFilePath(*file_dir_, *file_name_);
32-
buffer_size_ = fs.GetFileSizeByPath(full_path);
33-
}
28+
RawFileWorker::RawFileWorker(SharedPtr<String> file_dir, SharedPtr<String> file_name, u32 file_size)
29+
: FileWorker(std::move(file_dir), std::move(file_name)), buffer_size_(file_size) {}
3430

3531
RawFileWorker::~RawFileWorker() {
3632
if (data_ != nullptr) {

src/storage/buffer/file_worker/raw_file_worker.cppm

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ namespace infinity {
2727
// - The file size is consistant since creation.
2828
export class RawFileWorker : public FileWorker {
2929
public:
30-
explicit RawFileWorker(SharedPtr<String> file_dir, SharedPtr<String> file_name);
30+
explicit RawFileWorker(SharedPtr<String> file_dir, SharedPtr<String> file_name, u32 file_size);
3131

3232
virtual ~RawFileWorker() override;
3333

src/storage/invertedindex/column_inverter.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ void ColumnInverter::MergePrepare() {
108108
}
109109

110110
void ColumnInverter::Merge(ColumnInverter &rhs) {
111-
assert(begin_doc_id_ + doc_count_ == rhs.begin_doc_id_);
111+
assert(begin_doc_id_ + doc_count_ <= rhs.begin_doc_id_);
112112
MergePrepare();
113113
for (auto &doc_terms : rhs.terms_per_doc_) {
114114
u32 doc_id = doc_terms.first;

src/storage/invertedindex/memory_indexer.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,14 @@ void MemoryIndexer::Insert(SharedPtr<ColumnVector> column_vector, u32 row_offset
144144
}
145145
}
146146

147+
void MemoryIndexer::InsertGap(u32 row_count) {
148+
if (is_spilled_)
149+
Load();
150+
151+
std::unique_lock<std::mutex> lock(mutex_);
152+
doc_count_ += row_count;
153+
}
154+
147155
void MemoryIndexer::Commit(bool offline) {
148156
if (offline) {
149157
commiting_thread_pool_.push([this](int id) { this->CommitOffline(); });

src/storage/invertedindex/memory_indexer.cppm

+3
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ public:
6565
// Insert is non-blocking. Caller must ensure there's no RowID gap between each call.
6666
void Insert(SharedPtr<ColumnVector> column_vector, u32 row_offset, u32 row_count, bool offline = false);
6767

68+
// InsertGap insert some empty documents. This is for abnormal case.
69+
void InsertGap(u32 row_count);
70+
6871
// Commit is non-blocking and thread-safe. There shall be a background thread which call this method regularly.
6972
void Commit(bool offline = false);
7073

src/storage/invertedindex/posting_writer.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ tf_t PostingWriter::GetCurrentTF() const { return doc_list_encoder_->GetCurrentT
6060
void PostingWriter::SetCurrentTF(tf_t tf) { doc_list_encoder_->SetCurrentTF(tf); }
6161

6262
void PostingWriter::Dump(const SharedPtr<FileWriter> &file_writer, TermMeta &term_meta, bool spill) {
63+
term_meta.doc_freq_ = GetDF();
64+
term_meta.total_tf_ = GetTotalTF();
65+
term_meta.payload_ = 0;
6366
term_meta.doc_start_ = file_writer->TotalWrittenBytes();
6467
doc_list_encoder_->Dump(file_writer, spill);
6568
if (position_list_encoder_) {

src/storage/meta/catalog.cpp

+7-2
Original file line numberDiff line numberDiff line change
@@ -331,8 +331,13 @@ void Catalog::CommitCreateIndex(TxnIndexStore *txn_index_store, TxnTimeStamp com
331331
// table_index_entry->RollbackPopulateIndex(txn_index_store, txn);
332332
// }
333333

334-
void Catalog::Append(TableEntry *table_entry, TransactionID txn_id, void *txn_store, TxnTimeStamp commit_ts, BufferManager *buffer_mgr) {
335-
return table_entry->AppendData(txn_id, txn_store, commit_ts, buffer_mgr);
334+
void Catalog::Append(TableEntry *table_entry,
335+
TransactionID txn_id,
336+
void *txn_store,
337+
TxnTimeStamp commit_ts,
338+
BufferManager *buffer_mgr,
339+
bool is_replay) {
340+
return table_entry->AppendData(txn_id, txn_store, commit_ts, buffer_mgr, is_replay);
336341
}
337342

338343
void Catalog::RollbackAppend(TableEntry *table_entry, TransactionID txn_id, TxnTimeStamp commit_ts, void *txn_store) {

src/storage/meta/catalog.cppm

+2-1
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,8 @@ public:
198198
// static void RollbackPopulateIndex(TxnIndexStore *txn_index_store, Txn *txn);
199199

200200
// Append related functions
201-
static void Append(TableEntry *table_entry, TransactionID txn_id, void *txn_store, TxnTimeStamp commit_ts, BufferManager *buffer_mgr);
201+
static void
202+
Append(TableEntry *table_entry, TransactionID txn_id, void *txn_store, TxnTimeStamp commit_ts, BufferManager *buffer_mgr, bool is_replay = false);
202203

203204
static void RollbackAppend(TableEntry *table_entry, TransactionID txn_id, TxnTimeStamp commit_ts, void *txn_store);
204205

src/storage/meta/entry/block_entry.cppm

+6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
module;
1616

17+
#include "type/complex/row_id.h"
18+
1719
export module block_entry;
1820

1921
import stl;
@@ -124,6 +126,10 @@ public:
124126

125127
u32 segment_id() const;
126128

129+
u32 segment_offset() const { return u32(block_id() * row_capacity()); }
130+
131+
RowID base_row_id() const { return RowID(segment_id(), segment_offset()); }
132+
127133
const SharedPtr<String> &base_dir() const { return block_dir_; }
128134

129135
BlockColumnEntry *GetColumnBlockEntry(SizeT column_id) const { return columns_[column_id].get(); }

src/storage/meta/entry/chunk_index_entry.cpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ SharedPtr<ChunkIndexEntry> ChunkIndexEntry::NewFtChunkIndexEntry(SegmentIndexEnt
106106
assert(index_dir.get() != nullptr);
107107
if (buffer_mgr != nullptr) {
108108
auto column_length_file_name = MakeShared<String>(base_name + LENGTH_SUFFIX);
109-
auto file_worker = MakeUnique<RawFileWorker>(index_dir, column_length_file_name);
109+
auto file_worker = MakeUnique<RawFileWorker>(index_dir, column_length_file_name, row_count * sizeof(u32));
110110
chunk_index_entry->buffer_obj_ = buffer_mgr->GetBufferObject(std::move(file_worker));
111111
}
112112
return chunk_index_entry;
@@ -123,7 +123,7 @@ SharedPtr<ChunkIndexEntry> ChunkIndexEntry::NewReplayChunkIndexEntry(ChunkID chu
123123
if (param->index_base_->index_type_ == IndexType::kFullText) {
124124
const auto &index_dir = segment_index_entry->index_dir();
125125
auto column_length_file_name = MakeShared<String>(base_name + LENGTH_SUFFIX);
126-
auto file_worker = MakeUnique<RawFileWorker>(index_dir, column_length_file_name);
126+
auto file_worker = MakeUnique<RawFileWorker>(index_dir, column_length_file_name, row_count * sizeof(u32));
127127
chunk_index_entry->buffer_obj_ = buffer_mgr->GetBufferObject(std::move(file_worker));
128128
} else {
129129
const auto &index_dir = segment_index_entry->index_dir();
@@ -145,6 +145,7 @@ nlohmann::json ChunkIndexEntry::Serialize() {
145145
index_entry_json["base_rowid"] = this->base_rowid_.ToUint64();
146146
index_entry_json["row_count"] = this->row_count_;
147147
index_entry_json["commit_ts"] = this->commit_ts_.load();
148+
index_entry_json["deprecate_ts_"] = this->deprecate_ts_.load();
148149
return index_entry_json;
149150
}
150151

@@ -158,6 +159,7 @@ SharedPtr<ChunkIndexEntry> ChunkIndexEntry::Deserialize(const nlohmann::json &in
158159
u32 row_count = index_entry_json["row_count"];
159160
auto ret = NewReplayChunkIndexEntry(chunk_id, segment_index_entry, param, base_name, base_rowid, row_count, buffer_mgr);
160161
ret->commit_ts_.store(index_entry_json["commit_ts"]);
162+
ret->deprecate_ts_.store(index_entry_json["deprecate_ts_"]);
161163
return ret;
162164
}
163165

src/storage/meta/entry/chunk_index_entry.cppm

+6-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
module;
1616

1717
#include "type/complex/row_id.h"
18+
#include <cassert>
1819

1920
export module chunk_index_entry;
2021

@@ -89,11 +90,15 @@ public:
8990

9091
BufferObj *GetBufferObj() { return buffer_obj_; }
9192

92-
void DeprecateChunk(TxnTimeStamp commit_ts) { deprecate_ts_.store(commit_ts); }
93+
void DeprecateChunk(TxnTimeStamp commit_ts) {
94+
assert(commit_ts_.load() < commit_ts);
95+
deprecate_ts_.store(commit_ts);
96+
}
9397

9498
bool CheckVisible(TxnTimeStamp ts) {
9599
TxnTimeStamp deprecate_ts = deprecate_ts_.load();
96100
TxnTimeStamp commit_ts = commit_ts_.load();
101+
assert(commit_ts == UNCOMMIT_TS || commit_ts < deprecate_ts);
97102
return ts >= commit_ts && ts <= deprecate_ts;
98103
}
99104

src/storage/meta/entry/segment_index_entry.cpp

+22-5
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,8 @@ void SegmentIndexEntry::MemIndexInsert(SharedPtr<BlockEntry> block_entry,
241241
u32 row_count,
242242
TxnTimeStamp commit_ts,
243243
BufferManager *buffer_manager) {
244-
u32 seg_id = block_entry->segment_id();
245-
u16 block_id = block_entry->block_id();
246-
SegmentOffset block_offset = u32(block_id) * block_entry->row_capacity();
247-
RowID begin_row_id(seg_id, row_offset + block_offset);
244+
SegmentOffset block_offset = block_entry->segment_offset();
245+
RowID begin_row_id = block_entry->base_row_id() + row_offset;
248246

249247
const SharedPtr<IndexBase> &index_base = table_index_entry_->table_index_def();
250248
const SharedPtr<ColumnDef> &column_def = table_index_entry_->column_def();
@@ -267,7 +265,15 @@ void SegmentIndexEntry::MemIndexInsert(SharedPtr<BlockEntry> block_entry,
267265
table_index_entry_->GetFulltextCommitingThreadPool());
268266
table_index_entry_->UpdateFulltextSegmentTs(commit_ts);
269267
} else {
270-
assert(begin_row_id == memory_indexer_->GetBaseRowId() + memory_indexer_->GetDocCount());
268+
RowID exp_begin_row_id = memory_indexer_->GetBaseRowId() + memory_indexer_->GetDocCount();
269+
assert(begin_row_id >= exp_begin_row_id);
270+
if (begin_row_id > exp_begin_row_id) {
271+
LOG_WARN(fmt::format("Begin row id: {}, expect begin row id: {}, insert gap: {}",
272+
begin_row_id.ToUint64(),
273+
exp_begin_row_id.ToUint64(),
274+
begin_row_id - exp_begin_row_id));
275+
memory_indexer_->InsertGap(begin_row_id - exp_begin_row_id);
276+
}
271277
}
272278
BlockColumnEntry *block_column_entry = block_entry->GetColumnBlockEntry(column_id);
273279
SharedPtr<ColumnVector> column_vector = MakeShared<ColumnVector>(block_column_entry->GetColumnVector(buffer_manager));
@@ -408,6 +414,17 @@ void SegmentIndexEntry::PopulateEntirely(const SegmentEntry *segment_entry, Txn
408414
auto block_entry_iter = BlockEntryIter(segment_entry);
409415
for (const auto *block_entry = block_entry_iter.Next(); block_entry != nullptr; block_entry = block_entry_iter.Next()) {
410416
BlockColumnEntry *block_column_entry = block_entry->GetColumnBlockEntry(column_id);
417+
RowID begin_row_id = block_entry->base_row_id();
418+
RowID exp_begin_row_id = memory_indexer_->GetBaseRowId() + memory_indexer_->GetDocCount();
419+
assert(begin_row_id >= exp_begin_row_id);
420+
if (begin_row_id > exp_begin_row_id) {
421+
LOG_WARN(fmt::format("Begin row id: {}, expect begin row id: {}, insert gap: {}",
422+
begin_row_id.ToUint64(),
423+
exp_begin_row_id.ToUint64(),
424+
begin_row_id - exp_begin_row_id));
425+
memory_indexer_->InsertGap(begin_row_id - exp_begin_row_id);
426+
}
427+
411428
SharedPtr<ColumnVector> column_vector = MakeShared<ColumnVector>(block_column_entry->GetColumnVector(buffer_mgr));
412429
memory_indexer_->Insert(column_vector, 0, block_entry->row_count(), true);
413430
memory_indexer_->Commit(true);

src/storage/meta/entry/segment_index_entry.cppm

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public:
145145
std::sort(std::begin(chunk_index_entries),
146146
std::end(chunk_index_entries),
147147
[](const SharedPtr<ChunkIndexEntry> &lhs, const SharedPtr<ChunkIndexEntry> &rhs) noexcept {
148-
return lhs->base_rowid_ < rhs->base_rowid_;
148+
return (lhs->base_rowid_ < rhs->base_rowid_ || (lhs->base_rowid_ == rhs->base_rowid_ && lhs->row_count_ < rhs->row_count_));
149149
});
150150
}
151151

src/storage/meta/entry/table_entry.cpp

+14-4
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ void TableEntry::AddCompactNew(SharedPtr<SegmentEntry> segment_entry) {
355355
}
356356
}
357357

358-
void TableEntry::AppendData(TransactionID txn_id, void *txn_store, TxnTimeStamp commit_ts, BufferManager *buffer_mgr) {
358+
void TableEntry::AppendData(TransactionID txn_id, void *txn_store, TxnTimeStamp commit_ts, BufferManager *buffer_mgr, bool is_replay) {
359359
SizeT row_count = 0;
360360

361361
// Read-only no lock needed.
@@ -402,8 +402,11 @@ void TableEntry::AppendData(TransactionID txn_id, void *txn_store, TxnTimeStamp
402402
row_count += actual_appended;
403403
}
404404

405-
// Realtime index insertion
406-
MemIndexInsert(txn, append_state_ptr->append_ranges_);
405+
// Needn't inserting into MemIndex since MemIndexRecover is responsible for recovering MemIndex
406+
if (!is_replay) {
407+
// Realtime index insertion.
408+
MemIndexInsert(txn, append_state_ptr->append_ranges_);
409+
}
407410

408411
this->row_count_ += row_count;
409412
}
@@ -665,6 +668,7 @@ void TableEntry::MemIndexInsertInner(TableIndexEntry *table_index_entry, Txn *tx
665668
if (i == dump_idx && segment_index_entry->MemIndexRowCount() >= 1000000) {
666669
SharedPtr<ChunkIndexEntry> chunk_index_entry = segment_index_entry->MemIndexDump();
667670
if (chunk_index_entry.get() != nullptr) {
671+
chunk_index_entry->Commit(txn->CommitTS());
668672
txn_table_store->AddChunkIndexStore(table_index_entry, chunk_index_entry.get());
669673

670674
if (index_base->index_type_ == IndexType::kFullText) {
@@ -685,6 +689,7 @@ void TableEntry::MemIndexDump(Txn *txn, bool spill) {
685689
TxnIndexStore *txn_index_store = txn_table_store->GetIndexStore(table_index_entry);
686690
SharedPtr<ChunkIndexEntry> chunk_index_entry = table_index_entry->MemIndexDump(txn_index_store, spill);
687691
if (chunk_index_entry.get() != nullptr) {
692+
chunk_index_entry->Commit(txn->CommitTS());
688693
txn_table_store->AddChunkIndexStore(table_index_entry, chunk_index_entry.get());
689694
table_index_entry->UpdateFulltextSegmentTs(txn->CommitTS());
690695
}
@@ -737,7 +742,12 @@ void TableEntry::MemIndexRecover(BufferManager *buffer_manager) {
737742
append_ranges.emplace_back(segment_id, block_id, start_offset, row_count);
738743
for (SizeT i = block_id + 1; i < block_entries.size(); i++) {
739744
assert(block_entries[i - 1]->row_capacity() == block_capacity);
740-
assert(block_entries[i - 1]->GetAvailableCapacity() <= 0);
745+
if (block_entries[i - 1]->GetAvailableCapacity() > 0) {
746+
LOG_ERROR(fmt::format("MemIndexRecover got a non-full BlockEntry. block_id {}, row_count {}, block_entries {}",
747+
block_entries[i - 1]->block_id(),
748+
block_entries[i - 1]->row_count(),
749+
block_entries.size()));
750+
}
741751
append_ranges.emplace_back(segment_id, i, 0, block_entries[i]->row_count());
742752
}
743753
}

src/storage/meta/entry/table_entry.cppm

+1-1
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public:
150150

151151
void AddCompactNew(SharedPtr<SegmentEntry> segment_entry);
152152

153-
void AppendData(TransactionID txn_id, void *txn_store, TxnTimeStamp commit_ts, BufferManager *buffer_mgr);
153+
void AppendData(TransactionID txn_id, void *txn_store, TxnTimeStamp commit_ts, BufferManager *buffer_mgr, bool is_replay = false);
154154

155155
void RollbackAppend(TransactionID txn_id, TxnTimeStamp commit_ts, void *txn_store);
156156

src/storage/txn/txn_store.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,11 @@ void TxnIndexStore::Commit(TransactionID txn_id, TxnTimeStamp commit_ts) const {
100100
segment_index_entry->CommitSegmentIndex(txn_id, commit_ts);
101101
}
102102
for (auto *chunk_index_entry : chunk_index_entries_) {
103-
chunk_index_entry->Commit(commit_ts);
103+
// uncommitted: insert or import, create index
104+
// committed: create index, insert or import
105+
if (!chunk_index_entry->Committed()) {
106+
chunk_index_entry->Commit(commit_ts);
107+
}
104108
}
105109
}
106110

src/storage/wal/wal_manager.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -779,7 +779,7 @@ void WalManager::WalCmdAppendReplay(const WalCmdAppend &cmd, TransactionID txn_i
779779
table_store->append_state_ = std::move(append_state);
780780

781781
fake_txn->FakeCommit(commit_ts);
782-
Catalog::Append(table_store->table_entry_, fake_txn->TxnID(), table_store, commit_ts, storage_->buffer_manager());
782+
Catalog::Append(table_store->table_entry_, fake_txn->TxnID(), table_store, commit_ts, storage_->buffer_manager(), true);
783783
Catalog::CommitWrite(table_store->table_entry_, fake_txn->TxnID(), commit_ts, table_store->txn_segments());
784784
}
785785

0 commit comments

Comments
 (0)