Skip to content

Commit

Permalink
Improve Time handling (#515)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek authored Feb 23, 2025
1 parent 7291e9e commit 160ab71
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import io.debezium.server.iceberg.tableoperator.Operation;
import io.debezium.server.iceberg.tableoperator.RecordWrapper;
import io.debezium.time.IsoDate;
import io.debezium.time.IsoTime;
import io.debezium.time.IsoTimestamp;
import io.debezium.time.ZonedTime;
import io.debezium.time.ZonedTimestamp;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
Expand All @@ -29,7 +31,9 @@
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -221,6 +225,34 @@ private Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) {
return LocalDate.parse(node.asText(), IsoDate.FORMATTER);
}
throw new RuntimeException("Failed to convert date value, field: " + field.name() + " value: " + node);

case TIME:
if (node.isTextual()) {
return switch (config.temporalPrecisionMode()) {
// io.debezium.time.IsoTime
case ISOSTRING -> LocalTime.parse(node.asText(), IsoTime.FORMATTER);
// io.debezium.time.ZonedTime
// A string representation of a time value with timezone information,
// Iceberg using LocalTime for time values
default -> OffsetTime.parse(node.asText(), ZonedTime.FORMATTER).toLocalTime();
};
}
if (node.isNumber()) {
return switch (config.temporalPrecisionMode()) {
// io.debezium.time.MicroTime
// Represents the time value in microseconds
case MICROSECONDS -> LocalTime.ofNanoOfDay(node.asLong() * 1000);
// io.debezium.time.NanoTime
// Represents the time value in nanoseconds
case NANOSECONDS -> LocalTime.ofNanoOfDay(node.asLong());
//org.apache.kafka.connect.data.Time
//Represents the number of milliseconds since midnight,
case CONNECT -> LocalTime.ofNanoOfDay(node.asLong() * 1_000_000);
default ->
throw new RuntimeException("Failed to convert time value, field: " + field.name() + " value: " + node);
};
}
throw new RuntimeException("Failed to convert time value, field: " + field.name() + " value: " + node);
case TIMESTAMP:
if (node.isNumber() && TS_MS_FIELDS.contains(field.name())) {
return timestamptzFromMillis(node.asLong());
Expand Down Expand Up @@ -317,4 +349,5 @@ private OffsetDateTime convertOffsetDateTimeValue(Types.NestedField field, JsonN

throw new RuntimeException(eexMessage);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,15 @@ private Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldT
case "int32": // int 4 bytes
return switch (fieldTypeName) {
case "io.debezium.time.Date", "org.apache.kafka.connect.data.Date" -> Types.DateType.get();
// NOTE: Time type is disable for the moment, it's not supported by spark
// //"Represents the number of milliseconds"
// case "io.debezium.time.Time" -> Types.TimeType.get();
// //"Represents the time value in microseconds
// case "io.debezium.time.MicroTime" -> Types.TimeType.get();
// //"Represents the time value in nanoseconds"
// case "io.debezium.time.NanoTime" -> Types.TimeType.get();
// //"Represents the time value in microseconds"
// case "org.apache.kafka.connect.data.Time" -> Types.TimeType.get();
default -> Types.IntegerType.get();
};
case "int64": // long 8 bytes
Expand All @@ -204,6 +213,15 @@ private Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldT
case "io.debezium.time.MicroTimestamp" -> Types.TimestampType.withoutZone();
case "io.debezium.time.NanoTimestamp" -> Types.TimestampType.withoutZone();
case "org.apache.kafka.connect.data.Timestamp" -> Types.TimestampType.withoutZone();
// NOTE: Time type is disable for the moment, it's not supported by spark
// //"Represents the number of milliseconds"
// case "io.debezium.time.Time" -> Types.TimeType.get();
// //"Represents the time value in microseconds
// case "io.debezium.time.MicroTime" -> Types.TimeType.get();
// //"Represents the time value in nanoseconds"
// case "io.debezium.time.NanoTime" -> Types.TimeType.get();
// //"Represents the time value in microseconds"
// case "org.apache.kafka.connect.data.Time" -> Types.TimeType.get();
default -> Types.LongType.get();
};
case "float8":
Expand All @@ -221,6 +239,9 @@ private Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldT
case "io.debezium.time.IsoDate" -> Types.DateType.get();
case "io.debezium.time.IsoTimestamp" -> Types.TimestampType.withoutZone();
case "io.debezium.time.ZonedTimestamp" -> Types.TimestampType.withZone();
// NOTE: Time type is disable for the moment, it's not supported by spark
// case "io.debezium.time.IsoTime" -> Types.TimeType.get();
// case "io.debezium.time.ZonedTime" -> Types.TimeType.get();
default -> Types.StringType.get();
};
case "uuid":
Expand Down

0 comments on commit 160ab71

Please sign in to comment.