Skip to content

Commit 0f5e6b6

Browse files
authored
Stream import (#1298)
### What problem does this PR solve? Use stream import to replace batch import to save memory consumption. ### Type of change - [x] Refactoring --------- Signed-off-by: Jin Hai <haijin.chn@gmail.com>
1 parent dbbb652 commit 0f5e6b6

File tree

5 files changed

+144
-54
lines changed

5 files changed

+144
-54
lines changed

src/common/stl.cppm

+1
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ export namespace std {
185185
using std::ostream;
186186
using std::ofstream;
187187
using std::ifstream;
188+
using std::fstream;
188189
using std::ios;
189190

190191
using std::align;

src/executor/operator/physical_export.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,11 @@ SizeT PhysicalExport::ExportToJSONL(QueryContext *query_context, ExportOperatorS
144144

145145
SizeT row_count{0};
146146
Map<SegmentID, SegmentSnapshot>& segment_block_index_ref = block_index_->segment_block_index_;
147+
148+
LOG_DEBUG(fmt::format("Going to export segment count: {}", segment_block_index_ref.size()));
147149
for(auto& [segment_id, segment_snapshot]: segment_block_index_ref) {
148-
LOG_DEBUG(fmt::format("Export segment_id: {}", segment_id));
149150
SizeT block_count = segment_snapshot.block_map_.size();
151+
LOG_DEBUG(fmt::format("Export segment_id: {}, with block count: {}", segment_id, block_count));
150152
for(SizeT block_idx = 0; block_idx < block_count; ++ block_idx) {
151153
LOG_DEBUG(fmt::format("Export block_idx: {}", block_idx));
152154
BlockEntry *block_entry = segment_snapshot.block_map_[block_idx];

src/executor/operator/physical_import.cpp

+32-53
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ import value;
6262
import catalog;
6363
import catalog_delta_entry;
6464
import build_fast_rough_filter_task;
65+
import stream_io;
6566

6667
namespace infinity {
6768

@@ -428,34 +429,15 @@ void PhysicalImport::ImportCSV(QueryContext *query_context, ImportOperatorState
428429
}
429430

430431
void PhysicalImport::ImportJSONL(QueryContext *query_context, ImportOperatorState *import_op_state) {
431-
LocalFileSystem fs;
432-
auto [file_handler, status] = fs.OpenFile(file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock);
433-
if(!status.ok()) {
434-
UnrecoverableError(status.message());
435-
}
436-
DeferFn file_defer([&]() { fs.Close(*file_handler); });
437-
438-
SizeT file_size = fs.GetFileSize(*file_handler);
439-
String jsonl_str(file_size + 1, 0);
440-
SizeT read_n = file_handler->Read(jsonl_str.data(), file_size);
441-
if (read_n != file_size) {
442-
String error_message = fmt::format("Read file size {} doesn't match with file size {}.", read_n, file_size);
443-
LOG_CRITICAL(error_message);
444-
UnrecoverableError(error_message);
445-
}
446-
447-
if (read_n == 0) {
448-
auto result_msg = MakeUnique<String>(fmt::format("Empty JSONL file, IMPORT 0 Rows"));
449-
import_op_state->result_msg_ = std::move(result_msg);
450-
return;
451-
}
432+
StreamIO stream_io;
433+
stream_io.Init(file_path_, FileFlags::READ_FLAG);
434+
DeferFn file_defer([&]() { stream_io.Close(); });
452435

453436
Txn *txn = query_context->GetTxn();
454437
u64 segment_id = Catalog::GetNextSegmentID(table_entry_);
455438
SharedPtr<SegmentEntry> segment_entry = SegmentEntry::NewSegmentEntry(table_entry_, segment_id, txn);
456439
UniquePtr<BlockEntry> block_entry = BlockEntry::NewBlockEntry(segment_entry.get(), 0, 0, table_entry_->ColumnCount(), txn);
457440

458-
SizeT start_pos = 0;
459441
Vector<ColumnVector> column_vectors;
460442
for (SizeT i = 0; i < table_entry_->ColumnCount(); ++i) {
461443
auto *block_column_entry = block_entry->GetColumnBlockEntry(i);
@@ -464,8 +446,34 @@ void PhysicalImport::ImportJSONL(QueryContext *query_context, ImportOperatorStat
464446

465447
SizeT row_count{0};
466448
while (true) {
467-
if (start_pos >= file_size) {
449+
String json_str;
450+
if (stream_io.ReadLine(json_str)) {
451+
nlohmann::json line_json = nlohmann::json::parse(json_str);
452+
453+
JSONLRowHandler(line_json, column_vectors);
454+
block_entry->IncreaseRowCount(1);
455+
++row_count;
456+
457+
if (block_entry->GetAvailableCapacity() <= 0) {
458+
LOG_DEBUG(fmt::format("Block {} saved, total rows: {}", block_entry->block_id(), row_count));
459+
segment_entry->AppendBlockEntry(std::move(block_entry));
460+
if (segment_entry->Room() <= 0) {
461+
LOG_DEBUG(fmt::format("Segment {} saved, total rows: {}", segment_entry->segment_id(), row_count));
462+
SaveSegmentData(table_entry_, txn, segment_entry);
463+
u64 segment_id = Catalog::GetNextSegmentID(table_entry_);
464+
segment_entry = SegmentEntry::NewSegmentEntry(table_entry_, segment_id, txn);
465+
}
466+
467+
block_entry = BlockEntry::NewBlockEntry(segment_entry.get(), segment_entry->GetNextBlockID(), 0, table_entry_->ColumnCount(), txn);
468+
column_vectors.clear();
469+
for (SizeT i = 0; i < table_entry_->ColumnCount(); ++i) {
470+
auto *block_column_entry = block_entry->GetColumnBlockEntry(i);
471+
column_vectors.emplace_back(block_column_entry->GetColumnVector(txn->buffer_mgr()));
472+
}
473+
}
474+
} else {
468475
if (block_entry->row_count() == 0) {
476+
column_vectors.clear();
469477
std::move(*block_entry).Cleanup();
470478
} else {
471479
segment_entry->AppendBlockEntry(std::move(block_entry));
@@ -474,39 +482,10 @@ void PhysicalImport::ImportJSONL(QueryContext *query_context, ImportOperatorStat
474482
std::move(*segment_entry).Cleanup();
475483
} else {
476484
SaveSegmentData(table_entry_, txn, segment_entry);
485+
LOG_DEBUG(fmt::format("Last segment {} saved, total rows: {}", segment_entry->segment_id(), row_count));
477486
}
478487
break;
479488
}
480-
SizeT end_pos = jsonl_str.find('\n', start_pos);
481-
if (end_pos == String::npos) {
482-
end_pos = file_size;
483-
}
484-
std::string_view json_sv(jsonl_str.data() + start_pos, end_pos - start_pos);
485-
start_pos = end_pos + 1;
486-
487-
nlohmann::json line_json = nlohmann::json::parse(json_sv);
488-
489-
JSONLRowHandler(line_json, column_vectors);
490-
block_entry->IncreaseRowCount(1);
491-
++ row_count;
492-
493-
if (block_entry->GetAvailableCapacity() <= 0) {
494-
LOG_DEBUG(fmt::format("Block {} saved", block_entry->block_id()));
495-
segment_entry->AppendBlockEntry(std::move(block_entry));
496-
if (segment_entry->Room() <= 0) {
497-
LOG_DEBUG(fmt::format("Segment {} saved", segment_entry->segment_id()));
498-
SaveSegmentData(table_entry_, txn, segment_entry);
499-
u64 segment_id = Catalog::GetNextSegmentID(table_entry_);
500-
segment_entry = SegmentEntry::NewSegmentEntry(table_entry_, segment_id, txn);
501-
}
502-
503-
block_entry = BlockEntry::NewBlockEntry(segment_entry.get(), segment_entry->GetNextBlockID(), 0, table_entry_->ColumnCount(), txn);
504-
column_vectors.clear();
505-
for (SizeT i = 0; i < table_entry_->ColumnCount(); ++i) {
506-
auto *block_column_entry = block_entry->GetColumnBlockEntry(i);
507-
column_vectors.emplace_back(block_column_entry->GetColumnVector(txn->buffer_mgr()));
508-
}
509-
}
510489
}
511490

512491
auto result_msg = MakeUnique<String>(fmt::format("IMPORT {} Rows", row_count));

src/storage/io/stream_io.cpp

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
module;
16+
17+
#include <fstream>
18+
19+
module stream_io;
20+
21+
import stl;
22+
import logger;
23+
import status;
24+
import file_system_type;
25+
import infinity_exception;
26+
import third_party;
27+
28+
namespace infinity {
29+
30+
StreamIO::~StreamIO() = default;
31+
32+
void StreamIO::Init(const String& file_name, u8 flags) {
33+
bool reader_ = flags & FileFlags::READ_FLAG;
34+
bool writer_ = flags & FileFlags::WRITE_FLAG;
35+
if (reader_ && writer_) {
36+
file_.open(file_name, std::ios::in | std::ios::out);
37+
} else if (reader_) {
38+
file_.open(file_name, std::ios::in);
39+
} else if (writer_) {
40+
file_.open(file_name, std::ios::out);
41+
} else {
42+
Status status = Status::InvalidCommand("Not reachable");
43+
LOG_ERROR(status.message());
44+
RecoverableError(status);
45+
}
46+
47+
if (!file_.is_open()) {
48+
Status status = Status::IOError(fmt::format("{} can't open", file_name));
49+
LOG_ERROR(file_name);
50+
RecoverableError(status);
51+
}
52+
}
53+
54+
bool StreamIO::ReadLine(String& line) {
55+
if(getline(file_, line)) {
56+
return true;
57+
} else {
58+
return false;
59+
}
60+
}
61+
62+
void StreamIO::Close() {
63+
file_.close();
64+
}
65+
66+
} // namespace infinity

src/storage/io/stream_io.cppm

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
module;
16+
17+
#include <fstream>
18+
19+
export module stream_io;
20+
21+
import stl;
22+
import status;
23+
24+
namespace infinity {
25+
26+
export class StreamIO {
27+
28+
public:
29+
StreamIO() = default;
30+
~StreamIO();
31+
32+
void Init(const String& file_name, u8 flags);
33+
bool ReadLine(String& line);
34+
void Close();
35+
36+
private:
37+
std::fstream file_;
38+
bool reader_{false};
39+
bool writer_{false};
40+
};
41+
42+
} // namespace infinity

0 commit comments

Comments
 (0)