Skip to content

Commit 6f8b3cb

Browse files
authored
Support string sort in external merge sort (#680)
### What problem does this PR solve? Improve external merge sort to support tuple[string, uint32, uint32] sorting Issue link:#633 ### Type of change - [x] New Feature (non-breaking change which adds functionality)
1 parent 0d948cf commit 6f8b3cb

File tree

3 files changed

+216
-69
lines changed

3 files changed

+216
-69
lines changed

src/storage/invertedindex/common/external_sort_merger.cpp

+1-3
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,8 @@ void SortMerger<KeyType, LenType>::Init(DirectIO &io_stream) {
172172
micro_run_pos_[i] = KeyAddr(micro_buf_[i], -1, i).LEN() + sizeof(LenType);
173173
num_micro_run_[i] = 0;
174174

175-
// if size_run_[i]>PRE_BUF_SIZE_, it needs to the end of a run and turn to the next run
176175
io_stream.Seek(next_run_pos);
177176
}
178-
// LOG_INFO << "Run number: " << group_size_;
179177

180178
// initialize predict heap and records number of every microrun
181179
for (u32 i = 0; i < group_size_; ++i) {
@@ -409,5 +407,5 @@ void SortMerger<KeyType, LenType>::Run() {
409407
}
410408

411409
template class SortMerger<u32, u8>;
412-
410+
template class SortMerger<TermTuple, u16>;
413411
} // namespace infinity

src/storage/invertedindex/common/external_sort_merger.cppm

+142-63
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,149 @@ export struct DirectIO {
4747
SizeT length_;
4848
};
4949

50+
export struct TermTuple {
51+
std::string_view term_;
52+
u32 doc_id_;
53+
u32 term_pos_;
54+
int Compare(const TermTuple &rhs) const {
55+
int ret = term_.compare(rhs.term_);
56+
if (ret == 0) {
57+
if (doc_id_ != rhs.doc_id_) {
58+
if (doc_id_ < rhs.doc_id_)
59+
return -1;
60+
else
61+
return 1;
62+
} else {
63+
if (term_pos_ != rhs.term_pos_) {
64+
if (term_pos_ < rhs.term_pos_)
65+
return -1;
66+
else
67+
return 1;
68+
}
69+
return 0;
70+
}
71+
} else
72+
return ret < 0 ? -1 : 1;
73+
}
74+
75+
bool operator==(const TermTuple &other) const { return Compare(other) == 0; }
76+
77+
bool operator>(const TermTuple &other) const { return Compare(other) < 0; }
78+
79+
bool operator<(const TermTuple &other) const { return Compare(other) > 0; }
80+
81+
TermTuple(char *p, u16 len) : term_(p, len - sizeof(doc_id_) - sizeof(term_pos_) - 1) {
82+
doc_id_ = *((u32 *)(p + term_.size() + 1));
83+
term_pos_ = *((u32 *)(p + term_.size() + 1 + sizeof(doc_id_)));
84+
}
85+
};
86+
87+
template <typename KeyType, typename LenType, typename = void>
88+
struct KeyAddress {
89+
char *data{nullptr};
90+
u64 addr;
91+
u32 idx;
92+
93+
KeyAddress(char *p, u64 ad, u32 i) {
94+
data = p;
95+
addr = ad;
96+
idx = i;
97+
}
98+
99+
KeyAddress() {
100+
data = nullptr;
101+
addr = -1;
102+
idx = -1;
103+
}
104+
105+
KeyType KEY() { return KeyType(data + sizeof(LenType), LEN()); }
106+
KeyType KEY() const { return KeyType(data + sizeof(LenType), LEN()); }
107+
LenType LEN() const { return *(LenType *)data; }
108+
u64 &ADDR() { return addr; }
109+
u64 ADDR() const { return addr; }
110+
u32 IDX() const { return idx; }
111+
u32 &IDX() { return idx; }
112+
113+
int Compare(const KeyAddress &p) const {
114+
if (KEY() == p.KEY())
115+
return 0;
116+
else if (KEY() > p.KEY())
117+
return 1;
118+
else
119+
return -1;
120+
}
121+
122+
bool operator==(const KeyAddress &other) const { return Compare(other) == 0; }
123+
124+
bool operator>(const KeyAddress &other) const { return Compare(other) < 0; }
125+
126+
bool operator<(const KeyAddress &other) const { return Compare(other) > 0; }
127+
};
128+
129+
template <typename KeyType, typename LenType>
130+
struct KeyAddress<KeyType, LenType, typename std::enable_if<std::is_scalar<KeyType>::value>::type> {
131+
char *data{nullptr};
132+
u64 addr;
133+
u32 idx;
134+
135+
KeyAddress(char *p, u64 ad, u32 i) {
136+
data = p;
137+
addr = ad;
138+
idx = i;
139+
}
140+
141+
KeyAddress() {
142+
data = nullptr;
143+
addr = -1;
144+
idx = -1;
145+
}
146+
147+
KeyType &KEY() { return *(KeyType *)(data + sizeof(LenType)); }
148+
KeyType KEY() const { return *(KeyType *)(data + sizeof(LenType)); }
149+
150+
LenType LEN() const { return *(LenType *)data; }
151+
u64 &ADDR() { return addr; }
152+
u64 ADDR() const { return addr; }
153+
u32 IDX() const { return idx; }
154+
u32 &IDX() { return idx; }
155+
156+
int Compare(const KeyAddress &p) const {
157+
if (KEY() == p.KEY())
158+
return 0;
159+
else if (KEY() > p.KEY())
160+
return 1;
161+
else
162+
return -1;
163+
164+
LenType len1 = LEN() / sizeof(KeyType);
165+
LenType len2 = p.LEN() / sizeof(KeyType);
166+
167+
for (LenType i = 1; i < len1 && i < len2; ++i) {
168+
if (((KeyType *)(data + sizeof(LenType)))[i] > ((KeyType *)(p.data + sizeof(LenType)))[i])
169+
return 1;
170+
if (((KeyType *)(data + sizeof(LenType)))[i] < ((KeyType *)(p.data + sizeof(LenType)))[i])
171+
return -1;
172+
}
173+
174+
if (len1 == len2)
175+
return 0;
176+
177+
if (len1 > len2)
178+
return 1;
179+
return -1;
180+
}
181+
182+
bool operator==(const KeyAddress &other) const { return Compare(other) == 0; }
183+
184+
bool operator>(const KeyAddress &other) const { return Compare(other) < 0; }
185+
186+
bool operator<(const KeyAddress &other) const { return Compare(other) > 0; }
187+
};
188+
50189
export template <typename KeyType, typename LenType>
51190
class SortMerger {
52191
typedef SortMerger<KeyType, LenType> self_t;
53-
54-
struct KeyAddr;
192+
typedef KeyAddress<KeyType, LenType> KeyAddr;
55193
String filenm_;
56194
const u32 MAX_GROUP_SIZE_; //!< max group size
57195
const u32 BS_SIZE_; //!< in fact it equals to memory size
@@ -60,8 +198,8 @@ class SortMerger {
60198
u32 OUT_BUF_SIZE_; //!< max size of output buffer
61199
const u32 OUT_BUF_NUM_; //!< output threads number
62200

63-
std::priority_queue<struct KeyAddr> pre_heap_; //!< predict heap
64-
std::priority_queue<struct KeyAddr> merge_heap_; //!< merge heap
201+
std::priority_queue<KeyAddr> pre_heap_; //!< predict heap
202+
std::priority_queue<KeyAddr> merge_heap_; //!< merge heap
65203

66204
u32 *micro_run_idx_{nullptr}; //!< the access index of each microruns
67205
u32 *micro_run_pos_{nullptr}; //!< the access position within each microruns
@@ -102,65 +240,6 @@ class SortMerger {
102240

103241
u64 FILE_LEN_;
104242

105-
struct KeyAddr {
106-
char *data{nullptr};
107-
u64 addr;
108-
u32 idx;
109-
110-
KeyAddr(char *p, u64 ad, u32 i) {
111-
data = p;
112-
addr = ad;
113-
idx = i;
114-
}
115-
116-
KeyAddr() {
117-
data = nullptr;
118-
addr = -1;
119-
idx = -1;
120-
}
121-
122-
KeyType &KEY() { return *(KeyType *)(data + sizeof(LenType)); }
123-
KeyType KEY() const { return *(KeyType *)(data + sizeof(LenType)); }
124-
LenType LEN() const { return *(LenType *)data; }
125-
u64 &ADDR() { return addr; }
126-
u64 ADDR() const { return addr; }
127-
u32 IDX() const { return idx; }
128-
u32 &IDX() { return idx; }
129-
130-
int Compare(const KeyAddr &p) const {
131-
if (KEY() == p.KEY())
132-
return 0;
133-
134-
if (KEY() > p.KEY())
135-
return 1;
136-
if (KEY() < p.KEY())
137-
return -1;
138-
139-
LenType len1 = LEN() / sizeof(KeyType);
140-
LenType len2 = p.LEN() / sizeof(KeyType);
141-
142-
for (LenType i = 1; i < len1 && i < len2; ++i) {
143-
if (((KeyType *)(data + sizeof(LenType)))[i] > ((KeyType *)(p.data + sizeof(LenType)))[i])
144-
return 1;
145-
if (((KeyType *)(data + sizeof(LenType)))[i] < ((KeyType *)(p.data + sizeof(LenType)))[i])
146-
return -1;
147-
}
148-
149-
if (len1 == len2)
150-
return 0;
151-
152-
if (len1 > len2)
153-
return 1;
154-
return -1;
155-
}
156-
157-
bool operator==(const KeyAddr &other) const { return Compare(other) == 0; }
158-
159-
bool operator>(const KeyAddr &other) const { return Compare(other) < 0; }
160-
161-
bool operator<(const KeyAddr &other) const { return Compare(other) > 0; }
162-
};
163-
164243
void NewBuffer();
165244

166245
void Init(DirectIO &io_stream);

src/unit_test/storage/invertedindex/common/external_sort.cpp

+73-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <cassert>
1717
#include <filesystem>
1818
#include <iostream>
19+
#include <string.h>
1920

2021
import stl;
2122
import third_party;
@@ -38,6 +39,17 @@ Vector<char> RandStr(T key) {
3839
return str;
3940
}
4041

42+
Vector<char> RandStr() {
43+
u32 len = rand() % 100;
44+
45+
Vector<char> str;
46+
str.reserve(len);
47+
for (u32 i = 0; i < len; ++i)
48+
str.push_back('a' + rand() % 26);
49+
50+
return str;
51+
}
52+
4153
class ExternalSortTest : public BaseTest {
4254
public:
4355
ExternalSortTest() {}
@@ -58,7 +70,6 @@ class ExternalSortTest : public BaseTest {
5870
while (run_num < 100 || SIZE % run_num != 0)
5971
run_num = rand() % 300;
6072

61-
// run_num = 800;
6273
for (u32 i = 0; i < run_num; ++i) {
6374
u64 pos = ftell(f);
6475
fseek(f, 2 * sizeof(u32) + sizeof(u64), SEEK_CUR);
@@ -69,8 +80,6 @@ class ExternalSortTest : public BaseTest {
6980
fwrite(&len, sizeof(LenType), 1, f);
7081
fwrite(str.data(), len, 1, f);
7182
s += len + sizeof(LenType);
72-
73-
// cout<<"\rAdd data: "<<(double)(i*SIZE/run_num+j)/SIZE*100.<<"%"<<std::Flush;
7483
}
7584
u64 next_run_pos = ftell(f);
7685
fseek(f, pos, SEEK_SET);
@@ -103,9 +112,70 @@ class ExternalSortTest : public BaseTest {
103112
}
104113
std::filesystem::remove("./tt");
105114
}
115+
116+
void CheckTermTuple(const u64 SIZE, u32 bs = 100000000) {
117+
std::filesystem::remove("./tt");
118+
119+
Vector<char> str;
120+
FILE *f = fopen("./tt", "w+");
121+
fwrite(&SIZE, sizeof(u64), 1, f);
122+
123+
u32 run_num = rand() % 300;
124+
while (run_num < 100 || SIZE % run_num != 0)
125+
run_num = rand() % 300;
126+
127+
char buffer[200];
128+
for (u32 i = 0; i < run_num; ++i) {
129+
u64 pos = ftell(f);
130+
fseek(f, 2 * sizeof(u32) + sizeof(u64), SEEK_CUR);
131+
u32 s = 0;
132+
for (u32 j = 0; j < SIZE / run_num; ++j) {
133+
str = RandStr();
134+
u32 doc_id = 34567; // i * SIZE / run_num + j;
135+
u32 term_pos = i;
136+
memcpy(buffer, str.data(), str.size());
137+
buffer[str.size()] = '\0';
138+
memcpy(buffer + str.size() + 1, &doc_id, sizeof(u32));
139+
memcpy(buffer + str.size() + 1 + sizeof(u32), &term_pos, sizeof(u32));
140+
u16 len = str.size() + 1 + sizeof(u32) + sizeof(u32);
141+
fwrite(&len, sizeof(u16), 1, f);
142+
fwrite(buffer, len, 1, f);
143+
s += len + sizeof(u16);
144+
}
145+
u64 next_run_pos = ftell(f);
146+
fseek(f, pos, SEEK_SET);
147+
fwrite(&s, sizeof(u32), 1, f);
148+
s = SIZE / run_num;
149+
fwrite(&s, sizeof(u32), 1, f);
150+
fwrite(&next_run_pos, sizeof(u64), 1, f);
151+
fseek(f, 0, SEEK_END);
152+
}
153+
fclose(f);
154+
155+
SortMerger<TermTuple, u16> merger("./tt", run_num, bs, 2);
156+
merger.Run();
157+
158+
f = fopen("./tt", "r");
159+
u64 count = 0;
160+
u32 doc_id = 34567;
161+
fread(&count, sizeof(u64), 1, f);
162+
EXPECT_EQ(count, SIZE);
163+
for (u32 i = 0; i < count; ++i) {
164+
u16 len = 0;
165+
fread(&len, sizeof(u16), 1, f);
166+
char *buf = new char[len];
167+
fread(buf, len, 1, f);
168+
TermTuple tuple(buf, len);
169+
EXPECT_EQ(tuple.doc_id_, doc_id);
170+
delete[] buf;
171+
}
172+
std::filesystem::remove("./tt");
173+
}
106174
};
107175

108176
TEST_F(ExternalSortTest, test1) {
109177
CheckMerger<u32, u8>(1000, 100000);
110178
CheckMerger<u32, u8>(10000, 1000000);
111179
}
180+
181+
TEST_F(ExternalSortTest, test2) { CheckTermTuple(10000, 1000000); }

0 commit comments

Comments
 (0)