Skip to content

Commit e22ef94

Browse files
authored
Supporting export one embedding column to FVECS file (#1321)
### What problem does this PR solve? SQL: COPY test_knn_ip(c2) TO '/var/infinity/embedding.FVECS' WITH (DELIMITER ',', FORMAT fvecs); ### Type of change - [x] New Feature (non-breaking change which adds functionality) Signed-off-by: Jin Hai <haijin.chn@gmail.com>
1 parent 5d13de7 commit e22ef94

17 files changed

+1808
-1581
lines changed

src/executor/operator/physical_export.cpp

+72
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import file_system_type;
3333
import defer_op;
3434
import stl;
3535
import logical_type;
36+
import embedding_info;
37+
import status;
3638

3739
namespace infinity {
3840

@@ -50,6 +52,10 @@ bool PhysicalExport::Execute(QueryContext *query_context, OperatorState *operato
5052
exported_row_count = ExportToJSONL(query_context, export_op_state);
5153
break;
5254
}
55+
case CopyFileType::kFVECS: {
56+
exported_row_count = ExportToFVECS(query_context, export_op_state);
57+
break;
58+
}
5359
default: {
5460
String error_message = "Not supported file type";
5561
LOG_CRITICAL(error_message);
@@ -192,4 +198,70 @@ SizeT PhysicalExport::ExportToJSONL(QueryContext *query_context, ExportOperatorS
192198
return row_count;
193199
}
194200

201+
SizeT PhysicalExport::ExportToFVECS(QueryContext *query_context, ExportOperatorState *export_op_state) {
202+
203+
if(column_idx_array_.size() != 1) {
204+
String error_message = "Only one column with embedding data type can be exported as FVECS file";
205+
LOG_CRITICAL(error_message);
206+
UnrecoverableError(error_message);
207+
}
208+
209+
u64 exported_column_idx = column_idx_array_[0];
210+
const Vector<SharedPtr<ColumnDef>>& column_defs = table_entry_->column_defs();
211+
DataType* data_type = column_defs[exported_column_idx]->type().get();
212+
if(data_type->type() != LogicalType::kEmbedding) {
213+
String error_message = fmt::format("Only embedding column can be exported as FVECS file, but it is {}", data_type->ToString());
214+
LOG_CRITICAL(error_message);
215+
UnrecoverableError(error_message);
216+
}
217+
218+
EmbeddingInfo* embedding_type_info = static_cast<EmbeddingInfo*>(data_type->type_info().get());
219+
if(embedding_type_info->Type() != EmbeddingDataType::kElemFloat) {
220+
Status status = Status::NotSupport("Only float element type embedding is supported now.");
221+
LOG_ERROR(status.message());
222+
RecoverableError(status);
223+
}
224+
225+
i32 dimension = embedding_type_info->Dimension();
226+
227+
LocalFileSystem fs;
228+
auto [file_handler, status] = fs.OpenFile(file_path_, FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kWriteLock);
229+
if(!status.ok()) {
230+
RecoverableError(status);
231+
}
232+
DeferFn file_defer([&]() { fs.Close(*file_handler); });
233+
234+
SizeT row_count{0};
235+
Map<SegmentID, SegmentSnapshot>& segment_block_index_ref = block_index_->segment_block_index_;
236+
237+
// Write header
238+
LOG_DEBUG(fmt::format("Going to export segment count: {}", segment_block_index_ref.size()));
239+
for(auto& [segment_id, segment_snapshot]: segment_block_index_ref) {
240+
SizeT block_count = segment_snapshot.block_map_.size();
241+
LOG_DEBUG(fmt::format("Export segment_id: {}, with block count: {}", segment_id, block_count));
242+
for(SizeT block_idx = 0; block_idx < block_count; ++ block_idx) {
243+
LOG_DEBUG(fmt::format("Export block_idx: {}", block_idx));
244+
BlockEntry *block_entry = segment_snapshot.block_map_[block_idx];
245+
SizeT block_row_count = block_entry->row_count();
246+
247+
ColumnVector exported_column_vector = block_entry->GetColumnBlockEntry(exported_column_idx)->GetColumnVector(query_context->storage()->buffer_manager());
248+
if(exported_column_vector.Size() != block_row_count) {
249+
String error_message = "Unmatched row_count between block and block_column";
250+
LOG_CRITICAL(error_message);
251+
UnrecoverableError(error_message);
252+
}
253+
254+
for(SizeT row_idx = 0; row_idx < block_row_count; ++ row_idx) {
255+
Value v = exported_column_vector.GetValue(row_idx);
256+
Span<char> embedding = v.GetEmbedding();
257+
fs.Write(*file_handler, &dimension, sizeof(dimension));
258+
fs.Write(*file_handler, embedding.data(), embedding.size_bytes());
259+
++ row_count;
260+
}
261+
}
262+
}
263+
LOG_DEBUG(fmt::format("Export to FVECS, db {}, table {}, file: {}, row: {}", schema_name_, table_name_, file_path_, row_count));
264+
return row_count;
265+
}
266+
195267
} // namespace infinity

src/executor/operator/physical_export.cppm

+5-2
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,11 @@ public:
4343
bool header,
4444
char delimiter,
4545
CopyFileType type,
46+
Vector<u64> column_idx_array,
4647
SharedPtr<BlockIndex> block_index,
4748
SharedPtr<Vector<LoadMeta>> load_metas)
4849
: PhysicalOperator(PhysicalOperatorType::kExport, nullptr, nullptr, id, load_metas), table_entry_(table_entry), file_type_(type), file_path_(std::move(file_path)),
49-
table_name_(std::move(table_name)), schema_name_(std::move(schema_name)), header_(header), delimiter_(delimiter), block_index_(std::move(block_index)) {}
50+
table_name_(std::move(table_name)), schema_name_(std::move(schema_name)), header_(header), delimiter_(delimiter), column_idx_array_(std::move(column_idx_array)), block_index_(std::move(block_index)) {}
5051

5152
~PhysicalExport() override = default;
5253

@@ -69,6 +70,8 @@ public:
6970

7071
SizeT ExportToJSONL(QueryContext *query_context, ExportOperatorState *export_op_state);
7172

73+
SizeT ExportToFVECS(QueryContext *query_context, ExportOperatorState *export_op_state);
74+
7275
inline CopyFileType FileType() const { return file_type_; }
7376

7477
inline const String &file_path() const { return file_path_; }
@@ -92,7 +95,7 @@ private:
9295
String schema_name_{"default_db"};
9396
bool header_{false};
9497
char delimiter_{','};
95-
98+
Vector<u64> column_idx_array_;
9699
SharedPtr<BlockIndex> block_index_{};
97100
};
98101

src/executor/operator/physical_import.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ void PhysicalImport::ImportFVECS(QueryContext *query_context, ImportOperatorStat
136136
}
137137
DeferFn defer_fn([&]() { fs.Close(*file_handler); });
138138

139-
int dimension = 0;
139+
i32 dimension = 0;
140140
i64 nbytes = fs.Read(*file_handler, &dimension, sizeof(dimension));
141141
fs.Seek(*file_handler, 0);
142142
if (nbytes == 0) {

src/executor/physical_planner.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,7 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildExport(const SharedPtr<Logical
567567
logical_export->header(),
568568
logical_export->delimiter(),
569569
logical_export->FileType(),
570+
logical_export->column_idx_array(),
570571
logical_export->block_index(),
571572
logical_operator->load_metas());
572573
}

src/parser/expression_lexer.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
#line 1 "expression_lexer.cpp"
1+
#line 2 "expression_lexer.cpp"
22

3-
#line 3 "expression_lexer.cpp"
3+
#line 4 "expression_lexer.cpp"
44

55
#define YY_INT_ALIGNED short int
66

@@ -808,10 +808,10 @@ static const flex_int16_t yy_rule_linenum[28] =
808808

809809
static thread_local std::stringstream string_buffer;
810810

811-
#line 811 "expression_lexer.cpp"
811+
#line 812 "expression_lexer.cpp"
812812
#define YY_NO_INPUT 1
813813

814-
#line 814 "expression_lexer.cpp"
814+
#line 815 "expression_lexer.cpp"
815815

816816
#define INITIAL 0
817817
#define SINGLE_QUOTED_STRING 1
@@ -1165,7 +1165,7 @@ YY_DECL
11651165
#line 27 "expression_lexer.l"
11661166

11671167

1168-
#line 1168 "expression_lexer.cpp"
1168+
#line 1169 "expression_lexer.cpp"
11691169

11701170
while ( /*CONSTCOND*/1 ) /* loops until end-of-file is reached */
11711171
{
@@ -1405,7 +1405,7 @@ YY_RULE_SETUP
14051405
#line 88 "expression_lexer.l"
14061406
ECHO;
14071407
YY_BREAK
1408-
#line 1408 "expression_lexer.cpp"
1408+
#line 1409 "expression_lexer.cpp"
14091409
case YY_STATE_EOF(INITIAL):
14101410
yyterminate();
14111411

src/parser/expression_lexer.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
#define expressionHEADER_H 1
33
#define expressionIN_HEADER 1
44

5-
#line 5 "expression_lexer.h"
5+
#line 6 "expression_lexer.h"
66

7-
#line 7 "expression_lexer.h"
7+
#line 8 "expression_lexer.h"
88

99
#define YY_INT_ALIGNED short int
1010

@@ -849,6 +849,6 @@ extern int yylex \
849849
#line 88 "expression_lexer.l"
850850

851851

852-
#line 852 "expression_lexer.h"
852+
#line 853 "expression_lexer.h"
853853
#undef expressionIN_HEADER
854854
#endif /* expressionHEADER_H */

src/parser/lexer.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
#line 1 "lexer.cpp"
1+
#line 2 "lexer.cpp"
22

3-
#line 3 "lexer.cpp"
3+
#line 4 "lexer.cpp"
44

55
#define YY_INT_ALIGNED short int
66

@@ -1258,10 +1258,10 @@ static const flex_int16_t yy_rule_linenum[180] =
12581258

12591259
static thread_local std::stringstream string_buffer;
12601260

1261-
#line 1261 "lexer.cpp"
1261+
#line 1262 "lexer.cpp"
12621262
#define YY_NO_INPUT 1
12631263

1264-
#line 1264 "lexer.cpp"
1264+
#line 1265 "lexer.cpp"
12651265

12661266
#define INITIAL 0
12671267
#define SINGLE_QUOTED_STRING 1
@@ -1615,7 +1615,7 @@ YY_DECL
16151615
#line 27 "lexer.l"
16161616

16171617

1618-
#line 1618 "lexer.cpp"
1618+
#line 1619 "lexer.cpp"
16191619

16201620
while ( /*CONSTCOND*/1 ) /* loops until end-of-file is reached */
16211621
{
@@ -2615,7 +2615,7 @@ YY_RULE_SETUP
26152615
#line 240 "lexer.l"
26162616
ECHO;
26172617
YY_BREAK
2618-
#line 2618 "lexer.cpp"
2618+
#line 2619 "lexer.cpp"
26192619
case YY_STATE_EOF(INITIAL):
26202620
yyterminate();
26212621

src/parser/lexer.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
#define sqlHEADER_H 1
33
#define sqlIN_HEADER 1
44

5-
#line 5 "lexer.h"
5+
#line 6 "lexer.h"
66

7-
#line 7 "lexer.h"
7+
#line 8 "lexer.h"
88

99
#define YY_INT_ALIGNED short int
1010

@@ -849,6 +849,6 @@ extern int yylex \
849849
#line 240 "lexer.l"
850850

851851

852-
#line 852 "lexer.h"
852+
#line 853 "lexer.h"
853853
#undef sqlIN_HEADER
854854
#endif /* sqlHEADER_H */

0 commit comments

Comments
 (0)