Skip to content

Commit

Permalink
Fix field attribute issues
Browse files Browse the repository at this point in the history
Signed-off-by: ChenYunHey <1908166778@qq.com>
  • Loading branch information
ChenYunHey committed Mar 28, 2024
1 parent d4d3e57 commit 6498ff8
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public class JdbcCDC {
private static String serverTimezone;
private static String pluginName;
private static int batchSize;
private static String mongoDatabase;

public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
Expand All @@ -79,7 +78,6 @@ public static void main(String[] args) throws Exception {
tableList = parameter.get(SOURCE_DB_SCHEMA_TABLES.key()).split(",");
}
if ( dbType.equalsIgnoreCase("mongodb")){
//mongoDatabase = parameter.get(MONGO_DB_DATABASE.key());
batchSize = parameter.getInt(BATCH_SIZE.key(), BATCH_SIZE.defaultValue());
tableList = parameter.get(SOURCE_DB_SCHEMA_TABLES.key()).split(",");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,30 +175,30 @@ public LakeSoulRowDataWrapper toLakeSoulDataType(Schema sch, Struct value, Table
Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
Struct after = value.getStruct(Envelope.FieldName.AFTER);
RowData insert = convert(after, afterSchema, RowKind.INSERT, sortField);
boolean afterNullable = afterSchema.isOptional();
RowType rt = toFlinkRowType(afterSchema,afterNullable);
//boolean afterNullable = afterSchema.isOptional();
RowType rt = toFlinkRowType(afterSchema,false);
insert.setRowKind(RowKind.INSERT);
builder.setOperation("insert").setAfterRowData(insert).setAfterType(rt);
} else if (op == Envelope.Operation.DELETE) {
Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
Struct before = value.getStruct(Envelope.FieldName.BEFORE);
RowData delete = convert(before, beforeSchema, RowKind.DELETE, sortField);
boolean nullable = beforeSchema.isOptional();
RowType rt = toFlinkRowType(beforeSchema,nullable);
// boolean nullable = beforeSchema.isOptional();
RowType rt = toFlinkRowType(beforeSchema,false);
builder.setOperation("delete").setBeforeRowData(null).setBeforeRowType(rt);
delete.setRowKind(RowKind.DELETE);
} else {
Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
Struct before = value.getStruct(Envelope.FieldName.BEFORE);
RowData beforeData = convert(before, beforeSchema, RowKind.UPDATE_BEFORE, sortField);
boolean beforNullable = beforeSchema.isOptional();
RowType beforeRT = toFlinkRowType(beforeSchema,beforNullable);
//boolean beforNullable = beforeSchema.isOptional();
RowType beforeRT = toFlinkRowType(beforeSchema,false);
beforeData.setRowKind(RowKind.UPDATE_BEFORE);
Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
Struct after = value.getStruct(Envelope.FieldName.AFTER);
RowData afterData = convert(after, afterSchema, RowKind.UPDATE_AFTER, sortField);
boolean afterNullable = afterSchema.isOptional();
RowType afterRT = toFlinkRowType(afterSchema,afterNullable);
//boolean afterNullable = afterSchema.isOptional();
RowType afterRT = toFlinkRowType(afterSchema,false);
afterData.setRowKind(RowKind.UPDATE_AFTER);
if (partitionFieldsChanged(beforeRT, beforeData, afterRT, afterData)) {
// partition fields changed. we need to emit both before and after RowData
Expand Down Expand Up @@ -229,7 +229,7 @@ public RowType toFlinkRowTypeCDC(RowType rowType) {
return RowType.of(colTypes, colNames);
}

public RowType toFlinkRowType(Schema schema, boolean nullable) {
public RowType toFlinkRowType(Schema schema, boolean isMongoDDL) {
int arity = schema.fields().size() + 1;
if (useCDC) ++arity;
String[] colNames = new String[arity];
Expand All @@ -238,7 +238,11 @@ public RowType toFlinkRowType(Schema schema, boolean nullable) {
for (int i = 0; i < (useCDC ? arity - 2 : arity - 1); i++) {
Field item = fieldNames.get(i);
colNames[i] = item.name();
colTypes[i] = convertToLogical(item.schema(), !item.name().equals("_id") && nullable);
if (isMongoDDL){
colTypes[i] = convertToLogical(item.schema(), !item.name().equals("_id"));
}else {
colTypes[i] = convertToLogical(item.schema(), item.schema().isOptional());
}
}
// colNames[useCDC ? arity - 3 : arity - 2] = BINLOG_FILE_INDEX;
// colTypes[useCDC ? arity - 3 : arity - 2] = new BigIntType();
Expand Down Expand Up @@ -273,8 +277,6 @@ public List<RowType.RowField> getRowFields(Schema schema) {
}

private LogicalType primitiveLogicalType(Schema fieldSchema,boolean nullable) {
// boolean nullable = fieldSchema.isOptional();
// if (isMongoDDl) nullable = true;
switch (fieldSchema.type()) {
case BOOLEAN:
return new BooleanType(nullable);
Expand Down

0 comments on commit 6498ff8

Please sign in to comment.