Skip to content

Commit 38572f7

Browse files
committed
temp
1 parent 34b7f01 commit 38572f7

File tree

4 files changed

+111
-4
lines changed

4 files changed

+111
-4
lines changed

src/executor/operator/physical_import.cppm

+5-1
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,10 @@ public:
6969
bool header,
7070
char delimiter,
7171
CopyFileType type,
72+
SizeT task_count,
7273
SharedPtr<Vector<LoadMeta>> load_metas)
7374
: PhysicalOperator(PhysicalOperatorType::kImport, nullptr, nullptr, id, load_metas), table_entry_(table_entry), file_type_(type),
74-
file_path_(std::move(file_path)), header_(header), delimiter_(delimiter) {}
75+
file_path_(std::move(file_path)), header_(header), delimiter_(delimiter), task_count_(task_count) {}
7576

7677
~PhysicalImport() override = default;
7778

@@ -108,6 +109,8 @@ public:
108109

109110
inline char delimiter() const { return delimiter_; }
110111

112+
inline SizeT task_count() const { return task_count_; }
113+
111114
static void SaveSegmentData(TableEntry *table_entry, Txn *txn, SharedPtr<SegmentEntry> segment_entry);
112115

113116
private:
@@ -126,6 +129,7 @@ private:
126129
String file_path_{};
127130
bool header_{false};
128131
char delimiter_{','};
132+
SizeT task_count_{0};
129133
};
130134

131135
} // namespace infinity

src/executor/physical_planner.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,7 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildImport(const SharedPtr<Logical
506506
logical_import->header(),
507507
logical_import->delimiter(),
508508
logical_import->FileType(),
509+
logical_import->segment_count(),
509510
logical_operator->load_metas());
510511
}
511512

src/planner/logical_planner.cpp

+99-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
module;
1616

1717
#include <cassert>
18+
#include <cstring>
1819
#include <string>
1920
#include <tuple>
2021
#include <vector>
@@ -23,6 +24,11 @@ module logical_planner;
2324

2425
import stl;
2526
import bind_context;
27+
import defer_op;
28+
import file_system;
29+
import file_system_type;
30+
import statement_common;
31+
import zsv;
2632

2733
import infinity_exception;
2834
import query_binder;
@@ -885,12 +891,104 @@ Status LogicalPlanner::BuildImport(const CopyStatement *statement, SharedPtr<Bin
885891
RecoverableError(Status::FileNotFound(statement->file_path_));
886892
}
887893

894+
UniquePtr<FileHandler> file_handler = fs.OpenFile(statement->file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock);
895+
SizeT file_size = fs.GetFileSize(*file_handler);
896+
DeferFn defer_fn([&]() { fs.Close(*file_handler); });
897+
898+
SizeT row_count = 0;
899+
switch (statement->copy_file_type_) {
900+
case CopyFileType::kCSV: {
901+
FILE *fp = fopen(statement->file_path_.c_str(), "rb");
902+
if (!fp) {
903+
UnrecoverableError(strerror(errno));
904+
}
905+
auto opts = MakeUnique<ZsvOpts>();
906+
if (statement->header_) {
907+
opts->row_handler = [&](void *){};
908+
} else {
909+
opts->row_handler = [&](void *){ ++row_count; };
910+
}
911+
opts->delimiter = statement->delimiter_;
912+
opts->stream = fp;
913+
opts->buffsize = (1 << 20);
914+
915+
auto parser = ZsvParser(opts.get());
916+
917+
ZsvStatus csv_parser_status;
918+
while ((csv_parser_status = parser.ParseMore()) == zsv_status_ok) {
919+
;
920+
}
921+
parser.Finish();
922+
fclose(fp);
923+
break;
924+
}
925+
case CopyFileType::kJSON: {
926+
String jsonl_str(file_size + 1, 0);
927+
SizeT read_n = file_handler->Read(jsonl_str.data(), file_size);
928+
if (read_n != file_size) {
929+
UnrecoverableError(fmt::format("Read file size {} doesn't match with file size {}.", read_n, file_size));
930+
}
931+
if (read_n == 0) {
932+
break;
933+
}
934+
nlohmann::json json_arr;
935+
json_arr = nlohmann::json::parse(jsonl_str);
936+
if (!json_arr.is_array()) {
937+
break;
938+
}
939+
row_count = json_arr.size();
940+
break;
941+
}
942+
case CopyFileType::kJSONL: {
943+
String jsonl_str(file_size + 1, 0);
944+
SizeT read_n = file_handler->Read(jsonl_str.data(), file_size);
945+
if (read_n != file_size) {
946+
UnrecoverableError(fmt::format("Read file size {} doesn't match with file size {}.", read_n, file_size));
947+
}
948+
if (read_n == 0) {
949+
break;
950+
}
951+
SizeT start_pos = 0;
952+
SizeT end_pos = 0;
953+
while (true) {
954+
if (start_pos >= read_n) {
955+
break;
956+
}
957+
end_pos = jsonl_str.find('\n', start_pos);
958+
if (end_pos == String::npos) {
959+
end_pos = file_size;
960+
}
961+
start_pos = end_pos + 1;
962+
++row_count;
963+
}
964+
break;
965+
}
966+
case CopyFileType::kFVECS: {
967+
int dimension = 0;
968+
i64 nbytes = fs.Read(*file_handler, &dimension, sizeof(dimension));
969+
fs.Seek(*file_handler, 0);
970+
if (nbytes == 0) {
971+
break;
972+
}
973+
if (nbytes != sizeof(dimension)) {
974+
RecoverableError(Status::ImportFileFormatError(fmt::format("Read dimension which length isn't {}.", nbytes)));
975+
}
976+
SizeT row_size = dimension * sizeof(FloatT) + sizeof(dimension);
977+
row_count = file_size / row_size;
978+
break;
979+
}
980+
case CopyFileType::kInvalid: {
981+
UnrecoverableError("Invalid file type");
982+
}
983+
}
984+
888985
SharedPtr<LogicalNode> logical_import = MakeShared<LogicalImport>(bind_context_ptr->GetNewLogicalNodeId(),
889986
table_entry,
890987
statement->file_path_,
891988
statement->header_,
892989
statement->delimiter_,
893-
statement->copy_file_type_);
990+
statement->copy_file_type_,
991+
row_count / DEFAULT_SEGMENT_CAPACITY + 1);
894992

895993
this->logical_plan_ = logical_import;
896994
return Status::OK();

src/planner/node/logical_import.cppm

+6-2
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@ public:
3636
String file_path,
3737
bool header,
3838
char delimiter,
39-
CopyFileType type)
39+
CopyFileType type,
40+
SizeT segment_count)
4041
: LogicalNode(node_id, LogicalNodeType::kImport), table_entry_(table_entry), file_type_(type),
41-
file_path_(std::move(file_path)), header_(header), delimiter_(delimiter) {}
42+
file_path_(std::move(file_path)), header_(header), delimiter_(delimiter), segment_count_(segment_count) {}
4243

4344
[[nodiscard]] Vector<ColumnBinding> GetColumnBindings() const final;
4445

@@ -62,12 +63,15 @@ public:
6263

6364
[[nodiscard]] char delimiter() const { return delimiter_; }
6465

66+
[[nodiscard]] SizeT segment_count() const { return segment_count_; }
67+
6568
private:
6669
TableEntry *table_entry_{};
6770
CopyFileType file_type_{CopyFileType::kCSV};
6871
String file_path_{};
6972
bool header_{false};
7073
char delimiter_{','};
74+
SizeT segment_count_{0};
7175
};
7276

7377
} // namespace infinity

0 commit comments

Comments
 (0)