Skip to content

Commit

Permalink
Improve handling Timestamp fields and values (#513)
Browse files Browse the repository at this point in the history
* Improve handling Timestamp fields and values

* Improve handling Timestamp fields and values

* Improve handling Timestamp fields and values

* Improve handling Timestamp fields and values
  • Loading branch information
ismailsimsek authored Feb 22, 2025
1 parent a27cef4 commit cb3f0c3
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,9 @@ public interface IcebergConsumerConfig {
default boolean isIsoStringTemporalMode() {
return temporalPrecisionMode() == TemporalPrecisionMode.ISOSTRING;
}

default boolean isAdaptiveTemporalMode() {
return temporalPrecisionMode() == TemporalPrecisionMode.ADAPTIVE ||
temporalPrecisionMode() == TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,26 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.DebeziumException;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.server.iceberg.tableoperator.Operation;
import io.debezium.server.iceberg.tableoperator.RecordWrapper;
import io.debezium.time.IsoDate;
import io.debezium.time.IsoTimestamp;
import io.debezium.time.ZonedTimestamp;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -152,7 +156,19 @@ public RecordWrapper convert(Schema schema) {
return new RecordWrapper(row, op);
}

private static GenericRecord convert(Types.StructType tableFields, JsonNode data) {
public static LocalDateTime timestampFromMillis(long millisFromEpoch) {
return ChronoUnit.MILLIS.addTo(DateTimeUtil.EPOCH, millisFromEpoch).toLocalDateTime();
}

public static OffsetDateTime timestamptzFromNanos(long nanosFromEpoch) {
return ChronoUnit.NANOS.addTo(DateTimeUtil.EPOCH, nanosFromEpoch);
}

public static OffsetDateTime timestamptzFromMillis(long millisFromEpoch) {
return ChronoUnit.MILLIS.addTo(DateTimeUtil.EPOCH, millisFromEpoch);
}

private GenericRecord convert(Types.StructType tableFields, JsonNode data) {
LOGGER.debug("Processing nested field:{}", tableFields);
GenericRecord record = GenericRecord.create(tableFields);

Expand All @@ -169,7 +185,7 @@ private static GenericRecord convert(Types.StructType tableFields, JsonNode data
return record;
}

private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) {
private Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) {
LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type());
final Object val;
switch (field.type().typeId()) {
Expand Down Expand Up @@ -197,29 +213,32 @@ private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node
break;
case DATE:
if (node.isNull()) {
val = null;
} else if ((node.isInt())) {
return null;
}
if ((node.isInt())) {
// io.debezium.time.Date
// org.apache.kafka.connect.data.Date
// Represents the number of days since the epoch.
val = LocalDate.ofEpochDay(node.longValue());
} else if (node.isTextual()) {
return LocalDate.ofEpochDay(node.longValue());
}
if (node.isTextual()) {
// io.debezium.time.IsoDate
// Represents date values in UTC format, according to the ISO 8601 standard, for example, 2017-09-15Z.
val = LocalDate.parse(node.asText(), IsoDate.FORMATTER);
} else {
throw new RuntimeException("Failed to convert date value, field: " + field.name() + " value: " + node);
return LocalDate.parse(node.asText(), IsoDate.FORMATTER);
}
break;
throw new RuntimeException("Failed to convert date value, field: " + field.name() + " value: " + node);
case TIMESTAMP:
if ((node.isLong() || node.isNumber()) && TS_MS_FIELDS.contains(field.name())) {
val = OffsetDateTime.ofInstant(Instant.ofEpochMilli(node.longValue()), ZoneOffset.UTC);
} else if (node.isTextual()) {
val = OffsetDateTime.parse(node.asText());
} else {
throw new RuntimeException("Failed to convert timestamp value, field: " + field.name() + " value: " + node);
if (node.isNull()) {
return null;
}
break;
if (node.isNumber() && TS_MS_FIELDS.contains(field.name())) {
return timestamptzFromMillis(node.asLong());
}
boolean isTsWithZone = ((Types.TimestampType) field.type()).shouldAdjustToUTC();
if (isTsWithZone) {
return convertOffsetDateTimeValue(field, node, config.temporalPrecisionMode());
}
return convertLocalDateTimeValue(field, node, config.temporalPrecisionMode());
case BINARY:
try {
val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue());
Expand Down Expand Up @@ -274,5 +293,47 @@ private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node
return val;
}

public LocalDateTime convertLocalDateTimeValue(Types.NestedField field, JsonNode node, TemporalPrecisionMode temporalPrecisionMode) {

if (node.isNumber()) {
return switch (temporalPrecisionMode) {
case MICROSECONDS -> DateTimeUtil.timestampFromMicros(node.asLong());
case NANOSECONDS -> DateTimeUtil.timestampFromNanos(node.asLong());
case CONNECT -> timestampFromMillis(node.asLong());
default ->
throw new RuntimeException("Failed to convert timestamp value, field: " + field.name() + " value: " + node + " temporalPrecisionMode: " + temporalPrecisionMode);
};
}

if (node.isTextual()) {
return switch (temporalPrecisionMode) {
case ISOSTRING -> LocalDateTime.parse(node.asText(), IsoTimestamp.FORMATTER);
default ->
throw new RuntimeException("Failed to convert timestamp value, field: " + field.name() + " value: " + node + " temporalPrecisionMode: " + temporalPrecisionMode);
};
}
throw new RuntimeException("Failed to convert timestamp value, field: " + field.name() + " value: " + node + " temporalPrecisionMode: " + temporalPrecisionMode);
}

private OffsetDateTime convertOffsetDateTimeValue(Types.NestedField field, JsonNode node, TemporalPrecisionMode temporalPrecisionMode) {
if (node.isNumber()) {
// non Timezone
return switch (temporalPrecisionMode) {
case MICROSECONDS -> DateTimeUtil.timestamptzFromMicros(node.asLong());
case NANOSECONDS -> timestamptzFromNanos(node.asLong());
case CONNECT -> timestamptzFromMillis(node.asLong());
default ->
throw new RuntimeException("Failed to convert timestamp value, field: " + field.name() + " value: " + node + " temporalPrecisionMode: " + temporalPrecisionMode);
};
}

if (node.isTextual()) {
return switch (temporalPrecisionMode) {
default -> OffsetDateTime.parse(node.asText(), ZonedTimestamp.FORMATTER);
};
}

throw new RuntimeException("Failed to convert timestamp value, field: " + field.name() + " value: " + node + " temporalPrecisionMode: " + temporalPrecisionMode);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private static String getFieldName(JsonNode fieldSchema) {
* @param schemaData keeps information of iceberg schema like fields, nextFieldId and identifier fields
* @return map entry Key being the last id assigned to the iceberg field, Value being the converted iceberg NestedField.
*/
private static IcebergSchemaInfo debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, IcebergSchemaInfo schemaData, JsonNode keySchemaNode) {
private IcebergSchemaInfo debeziumFieldToIcebergField(JsonNode fieldSchema, String fieldName, IcebergSchemaInfo schemaData, JsonNode keySchemaNode) {
String fieldType = fieldSchema.get("type").textValue();
String fieldTypeName = getFieldName(fieldSchema);

Expand Down Expand Up @@ -130,7 +130,7 @@ private static JsonNode findNodeFieldByName(String fieldName, JsonNode node) {
* @param schemaNode
* @return
*/
private static IcebergSchemaInfo icebergSchemaFields(JsonNode schemaNode, JsonNode keySchemaNode, IcebergSchemaInfo schemaData) {
private IcebergSchemaInfo icebergSchemaFields(JsonNode schemaNode, JsonNode keySchemaNode, IcebergSchemaInfo schemaData) {
RecordConverter.LOGGER.debug("Converting iceberg schema to debezium:{}", schemaNode);
for (JsonNode field : getNodeFieldsArray(schemaNode)) {
String fieldName = field.get("field").textValue();
Expand Down Expand Up @@ -182,7 +182,7 @@ public Schema icebergSchema() {

}

private static Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldType, String fieldTypeName) {
private Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldType, String fieldTypeName) {
// Debezium Temporal types: https://debezium.io/documentation//reference/connectors/postgresql.html#postgresql-temporal-types
switch (fieldType) {
case "int8":
Expand All @@ -196,7 +196,16 @@ private static Type.PrimitiveType icebergPrimitiveField(String fieldName, String
if (RecordConverter.TS_MS_FIELDS.contains(fieldName)) {
return Types.TimestampType.withZone();
}
return Types.LongType.get();
if (config.isAdaptiveTemporalMode()) {
return Types.LongType.get();
}
return switch (fieldTypeName) {
case "io.debezium.time.Timestamp" -> Types.TimestampType.withoutZone();
case "io.debezium.time.MicroTimestamp" -> Types.TimestampType.withoutZone();
case "io.debezium.time.NanoTimestamp" -> Types.TimestampType.withoutZone();
case "org.apache.kafka.connect.data.Timestamp" -> Types.TimestampType.withoutZone();
default -> Types.LongType.get();
};
case "float8":
case "float16":
case "float32": // float is represented in 32 bits,
Expand All @@ -209,6 +218,8 @@ private static Type.PrimitiveType icebergPrimitiveField(String fieldName, String
case "string":
return switch (fieldTypeName) {
case "io.debezium.time.IsoDate" -> Types.DateType.get();
case "io.debezium.time.IsoTimestamp" -> Types.TimestampType.withoutZone();
case "io.debezium.time.ZonedTimestamp" -> Types.TimestampType.withZone();
default -> Types.StringType.get();
};
case "uuid":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -58,7 +56,7 @@ public void testConsumingVariousDataTypes() throws Exception {
"VALUES \n" +
"(1, null, null, null, null) \n" +
",(2, CURRENT_DATE , CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP ) \n" +
",(3, '2024-01-02'::DATE , CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP ) ";
",(3, '2024-01-02'::DATE , CURRENT_TIME, '2023-10-11 10:30:00'::timestamp, '2023-11-12 10:30:00+02'::timestamptz ) ";

SourcePostgresqlDB.runSQL(sql);
Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> {
Expand All @@ -72,6 +70,12 @@ public void testConsumingVariousDataTypes() throws Exception {
Assertions.assertEquals(DataTypes.DateType, getSchemaField(df, "c_date").dataType());
Assertions.assertEquals(1, df.filter("c_id = 2 AND c_date = CURRENT_DATE()").count());
Assertions.assertEquals(1, df.filter("c_id = 3 AND c_date = to_date('2024-01-02', 'yyyy-MM-dd')").count());
// Validate time field and values
System.out.println(getSchemaField(df, "c_timestamp").dataType());
Assertions.assertEquals(DataTypes.TimestampNTZType, getSchemaField(df, "c_timestamp").dataType());
Assertions.assertEquals(1, df.filter("c_id = 3 AND c_timestamp = to_timestamp('2023-10-11 10:30:00')").count());
Assertions.assertEquals(DataTypes.TimestampType, getSchemaField(df, "c_timestamptz").dataType());
Assertions.assertEquals(1, df.filter("c_id = 3 AND c_timestamptz = to_timestamp('2023-11-12 10:30:00+02')").count());
return true;
} catch (Exception e) {
// e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public void testFieldAddition() throws Exception {
ds.show();
System.out.println(dataTypeString(ds, "c_timestamptz"));
return ds.count() >= 3 &&
Objects.equals(dataTypeString(ds, "c_timestamptz"), "string");
Objects.equals(dataTypeString(ds, "c_timestamptz"), "timestamp");
} catch (Exception e) {
return false;
}
Expand Down

0 comments on commit cb3f0c3

Please sign in to comment.