Skip to content

Commit

Permalink
Use config to handle data and schema conversion accordingly (#509)
Browse files Browse the repository at this point in the history
Pass configs values to RecordConverter and SchemaConverter to handle data and schem conversion accordingly
  • Loading branch information
ismailsimsek authored Feb 22, 2025
1 parent ea52f61 commit a27cef4
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import jakarta.inject.Named;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
Expand All @@ -42,7 +43,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -110,7 +110,10 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
Map<String, List<RecordConverter>> result =
records.stream()
.map((ChangeEvent<Object, Object> e)
-> new RecordConverter(e.destination(), getBytes(e.value()), e.key() == null ? null : getBytes(e.key())))
-> new RecordConverter(e.destination(),
getBytes(e.value()),
e.key() == null ? null : getBytes(e.key())
, config))
.collect(Collectors.groupingBy(RecordConverter::destination));

// consume list of events for each destination table
Expand Down Expand Up @@ -143,7 +146,16 @@ public Table loadIcebergTable(TableIdentifier tableId, RecordConverter sampleEve
throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}
try {
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(config.createIdentifierFields()), config.writeFormat());
final Schema schema = sampleEvent.icebergSchema();
// Check if the message is a schema change event (DDL statement).
// Schema change events are identified by the presence of "ddl", "databaseName", and "tableChanges" fields.
// "schema change topic" https://debezium.io/documentation/reference/3.0/connectors/mysql.html#mysql-schema-change-topic
if (sampleEvent.isSchemaChangeEvent()) {
LOGGER.warn("Schema change topic detected. Creating Iceberg schema without identifier fields for append-only mode.");
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, new Schema(schema.columns()), config.writeFormat());
}

return IcebergUtil.createIcebergTable(icebergCatalog, tableId, schema, config.writeFormat());
} catch (Exception e) {
throw new DebeziumException("Failed to create table from debezium event schema:" + tableId + " Error:" + e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,8 @@ public interface IcebergConsumerConfig {
@WithName(value = "debezium.sink.iceberg.allow-field-addition")
@WithDefault(value = "true")
public boolean allowFieldAddition();

default boolean isIsoStringTemporalMode() {
return temporalPrecisionMode() == TemporalPrecisionMode.ISOSTRING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.UUID;

import static io.debezium.server.iceberg.IcebergChangeConsumer.keyDeserializer;
import static io.debezium.server.iceberg.IcebergChangeConsumer.valDeserializer;

/**
* Converts iceberg json event to Iceberg GenericRecord. Extracts event schema and key fields. Converts event schema to Iceberg Schema.
Expand All @@ -50,53 +49,49 @@ public class RecordConverter {
protected final String destination;
protected final byte[] valueData;
protected final byte[] keyData;
private JsonNode value;
private JsonNode key;
private final JsonNode value;
private final JsonNode key;
private final IcebergConsumerConfig config;

public RecordConverter(String destination, byte[] valueData, byte[] keyData) {
public RecordConverter(String destination, byte[] valueData, byte[] keyData, IcebergConsumerConfig config) {
this.destination = destination;
this.valueData = valueData;
this.keyData = keyData;
this.config = config;
this.key = keyDeserializer.deserialize(destination, keyData);
this.value = keyDeserializer.deserialize(destination, valueData);
}

public JsonNode key() {
if (key == null && keyData != null) {
key = keyDeserializer.deserialize(destination, keyData);
}

return key;
}

public JsonNode value() {
if (value == null && valueData != null) {
value = valDeserializer.deserialize(destination, valueData);
}

return value;
}

public Long cdcSourceTsMsValue(String cdcSourceTsMsField) {
public Long cdcSourceTsMsValue() {

final JsonNode element = value().get(cdcSourceTsMsField);
final JsonNode element = value().get(config.cdcSourceTsMsField());
if (element == null) {
throw new DebeziumException("Field '" + cdcSourceTsMsField + "' not found in JSON object: " + value());
throw new DebeziumException("Field '" + config.cdcSourceTsMsField() + "' not found in JSON object: " + value());
}

try {
return element.asLong();
} catch (NumberFormatException e) {
throw new DebeziumException("Error converting field '" + cdcSourceTsMsField + "' value '" + element + "' to Long: " + e.getMessage(), e);
throw new DebeziumException("Error converting field '" + config.cdcSourceTsMsField() + "' value '" + element + "' to Long: " + e.getMessage(), e);
}
}

public Operation cdcOpValue(String cdcOpField) {
if (!value().has(cdcOpField)) {
throw new DebeziumException("The value for field `" + cdcOpField + "` is missing. " +
public Operation cdcOpValue() {
if (!value().has(config.cdcOpField())) {
throw new DebeziumException("The value for field `" + config.cdcOpField() + "` is missing. " +
"This field is required when updating or deleting data, when running in upsert mode."
);
}

final String opFieldValue = value().get(cdcOpField).asText("c");
final String opFieldValue = value().get(config.cdcOpField()).asText("c");

return switch (opFieldValue) {
case "u" -> Operation.UPDATE;
Expand All @@ -105,13 +100,17 @@ public Operation cdcOpValue(String cdcOpField) {
case "c" -> Operation.INSERT;
case "i" -> Operation.INSERT;
default ->
throw new DebeziumException("Unexpected `" + cdcOpField + "=" + opFieldValue + "` operation value received, expecting one of ['u','d','r','c', 'i']");
throw new DebeziumException("Unexpected `" + config.cdcOpField() + "=" + opFieldValue + "` operation value received, expecting one of ['u','d','r','c', 'i']");
};
}

public SchemaConverter schemaConverter() {
try {
return new SchemaConverter(mapper.readTree(valueData).get("schema"), keyData == null ? null : mapper.readTree(keyData).get("schema"));
return new SchemaConverter(
mapper.readTree(valueData).get("schema"),
keyData == null ? null : mapper.readTree(keyData).get("schema"),
config
);
} catch (IOException e) {
throw new DebeziumException("Failed to get event schema", e);
}
Expand All @@ -124,30 +123,18 @@ public SchemaConverter schemaConverter() {
*
* @return True if it's a schema change event, false otherwise.
*/
private boolean isSchemaChangeEvent() {
public boolean isSchemaChangeEvent() {
return value().has("ddl") && value().has("databaseName") && value().has("tableChanges");
}


/**
* Converts the Kafka Connect schema to an Iceberg schema.
*
* @param createIdentifierFields Whether to include identifier fields in the Iceberg schema.
* Identifier fields are typically used for primary keys and are
* required for upsert/merge operations. They should be *excluded*
* for schema change topic messages to ensure append-only mode.
* @return The Iceberg schema.
*/
public Schema icebergSchema(boolean createIdentifierFields) {
// Check if the message is a schema change event (DDL statement).
// Schema change events are identified by the presence of "ddl", "databaseName", and "tableChanges" fields.
// "schema change topic" https://debezium.io/documentation/reference/3.0/connectors/mysql.html#mysql-schema-change-topic
if (isSchemaChangeEvent()) {
LOGGER.warn("Schema change topic detected. Creating Iceberg schema without identifier fields for append-only mode.");
return schemaConverter().icebergSchema(false); // Force no identifier fields for schema changes
}

return schemaConverter().icebergSchema(createIdentifierFields);
public Schema icebergSchema() {
return schemaConverter().icebergSchema();
}

public String destination() {
Expand All @@ -159,9 +146,9 @@ public RecordWrapper convertAsAppend(Schema schema) {
return new RecordWrapper(row, Operation.INSERT);
}

public RecordWrapper convert(Schema schema, String cdcOpField) {
public RecordWrapper convert(Schema schema) {
GenericRecord row = convert(schema.asStruct(), value());
Operation op = cdcOpValue(cdcOpField);
Operation op = cdcOpValue();
return new RecordWrapper(row, op);
}

Expand Down Expand Up @@ -209,7 +196,7 @@ private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node
val = node.isValueNode() ? UUID.fromString(node.asText(null)) : UUID.fromString(node.toString());
break;
case DATE:
if (node.isNull()){
if (node.isNull()) {
val = null;
} else if ((node.isInt())) {
// io.debezium.time.Date
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
public class SchemaConverter {
private final JsonNode valueSchema;
private final JsonNode keySchema;
private final IcebergConsumerConfig config;

public SchemaConverter(JsonNode valueSchema, JsonNode keySchema) {
public SchemaConverter(JsonNode valueSchema, JsonNode keySchema, IcebergConsumerConfig config) {
this.valueSchema = valueSchema;
this.keySchema = keySchema;
this.config = config;
}

protected JsonNode valueSchema() {
Expand Down Expand Up @@ -139,15 +141,15 @@ private static IcebergSchemaInfo icebergSchemaFields(JsonNode schemaNode, JsonNo
return schemaData;
}

public Schema icebergSchema(boolean createIdentifierFields) {
public Schema icebergSchema() {

if (this.valueSchema.isNull()) {
throw new RuntimeException("Failed to get schema from debezium event, event schema is null");
}

IcebergSchemaInfo schemaData = new IcebergSchemaInfo();
final JsonNode keySchemaNode;
if (!createIdentifierFields) {
if (!config.createIdentifierFields()) {
RecordConverter.LOGGER.warn("Creating identifier fields is disabled, creating table without identifier fields!");
keySchemaNode = null;
} else if (!RecordConverter.eventsAreUnwrapped && keySchema != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -93,13 +92,13 @@ protected List<RecordConverter> deduplicateBatch(List<RecordConverter> events) {
*/
private int compareByTsThenOp(RecordConverter lhs, RecordConverter rhs) {

int result = Long.compare(lhs.cdcSourceTsMsValue(config.cdcSourceTsMsField()), rhs.cdcSourceTsMsValue(config.cdcSourceTsMsField()));
int result = Long.compare(lhs.cdcSourceTsMsValue(), rhs.cdcSourceTsMsValue());

if (result == 0) {
// return (x < y) ? -1 : ((x == y) ? 0 : 1);
result = CDC_OPERATION_PRIORITY.getOrDefault(lhs.cdcOpValue(config.cdcOpField()), -1)
result = CDC_OPERATION_PRIORITY.getOrDefault(lhs.cdcOpValue(), -1)
.compareTo(
CDC_OPERATION_PRIORITY.getOrDefault(rhs.cdcOpValue(config.cdcOpField()), -1)
CDC_OPERATION_PRIORITY.getOrDefault(rhs.cdcOpValue(), -1)
);
}

Expand Down Expand Up @@ -159,7 +158,7 @@ public void addToTable(Table icebergTable, List<RecordConverter> events) {

for (Map.Entry<SchemaConverter, List<RecordConverter>> schemaEvents : eventsGroupedBySchema.entrySet()) {
// extend table schema if new fields found
applyFieldAddition(icebergTable, schemaEvents.getValue().get(0).icebergSchema(config.createIdentifierFields()));
applyFieldAddition(icebergTable, schemaEvents.getValue().get(0).icebergSchema());
// add set of events to table
addToTablePerSchema(icebergTable, schemaEvents.getValue());
}
Expand All @@ -179,7 +178,7 @@ private void addToTablePerSchema(Table icebergTable, List<RecordConverter> event
BaseTaskWriter<Record> writer = writerFactory.create(icebergTable);
try (writer) {
for (RecordConverter e : events) {
final RecordWrapper record = (config.upsert() && !tableSchema.identifierFieldIds().isEmpty()) ? e.convert(tableSchema, config.cdcOpField()) : e.convertAsAppend(tableSchema);
final RecordWrapper record = (config.upsert() && !tableSchema.identifierFieldIds().isEmpty()) ? e.convert(tableSchema) : e.convertAsAppend(tableSchema);
writer.write(record);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class RecordConverterTest {

@Inject
IcebergChangeEventBuilder eventBuilder;
@Inject
IcebergConsumerConfig config;

RecordConverterTest() throws IOException {
}
Expand All @@ -61,8 +63,8 @@ static void setup() {
@Test
public void testNestedJsonRecord() {
RecordConverter e = new RecordConverter("test",
serdeWithSchema.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema(true);
serdeWithSchema.getBytes(StandardCharsets.UTF_8), null, config);
Schema schema = e.icebergSchema();
System.out.println(schema.toString());
assertEquals(schema.toString(), ("""
table {
Expand All @@ -78,9 +80,9 @@ public void testNestedJsonRecord() {
@Test
public void testUnwrapJsonRecord() {
RecordConverter e = new RecordConverter("test",
unwrapWithSchema.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema(true);
RecordWrapper record = e.convert(schema, "__op");
unwrapWithSchema.getBytes(StandardCharsets.UTF_8), null, config);
Schema schema = e.icebergSchema();
RecordWrapper record = e.convert(schema);
assertEquals("orders", record.getField("__table").toString());
assertEquals(LocalDate.parse("2016-02-19"), record.getField("order_date"));
assertEquals(schema.toString(), """
Expand All @@ -103,9 +105,9 @@ public void testUnwrapJsonRecord() {
@Test
public void testNestedArrayJsonRecord() {
RecordConverter e = new RecordConverter("test",
unwrapWithArraySchema.getBytes(StandardCharsets.UTF_8), null);
unwrapWithArraySchema.getBytes(StandardCharsets.UTF_8), null, config);

Schema schema = e.icebergSchema(true);
Schema schema = e.icebergSchema();
assertEquals(schema.toString(), """
table {
1: name: optional string
Expand All @@ -120,16 +122,16 @@ public void testNestedArrayJsonRecord() {
assertEquals(schema.identifierFieldIds(), Set.of());
assertEquals(schema.findField("pay_by_quarter").type().asListType().elementType().toString(), "int");
assertEquals(schema.findField("schedule").type().asListType().elementType().toString(), "string");
RecordWrapper record = e.convert(schema,"__op");
RecordWrapper record = e.convert(schema);
//System.out.println(record);
assertTrue(record.toString().contains("[10000, 10001, 10002, 10003]"));
}

@Test
public void testNestedArray2JsonRecord() {
RecordConverter e = new RecordConverter("test",
unwrapWithArraySchema2.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema(true);
unwrapWithArraySchema2.getBytes(StandardCharsets.UTF_8), null, config);
Schema schema = e.icebergSchema();
System.out.println(schema);
assertEquals(schema.toString(), """
table {
Expand All @@ -145,9 +147,9 @@ public void testNestedArray2JsonRecord() {
@Test
public void testNestedGeomJsonRecord() {
RecordConverter e = new RecordConverter("test",
unwrapWithGeomSchema.getBytes(StandardCharsets.UTF_8), null);
Schema schema = e.icebergSchema(true);
RecordWrapper record = e.convert(schema,"__op");
unwrapWithGeomSchema.getBytes(StandardCharsets.UTF_8), null, config);
Schema schema = e.icebergSchema();
RecordWrapper record = e.convert(schema);
assertEquals(schema.toString(), """
table {
1: id: optional int
Expand Down Expand Up @@ -198,7 +200,7 @@ public void testIcebergSchemaConverterWithKey() {
.addField("__deleted", false)
.build();

Schema schema = t1.icebergSchema(true);
Schema schema = t1.icebergSchema();
assertEquals(schema.toString(), """
table {
1: id: required int (id)
Expand Down
Loading

0 comments on commit a27cef4

Please sign in to comment.