From 858574d0bd1f3ef4157d0446cfb05cef05aac96b Mon Sep 17 00:00:00 2001 From: Joel Lubinitsky <33523178+joellubi@users.noreply.github.com> Date: Thu, 18 Jan 2024 11:09:50 -0500 Subject: [PATCH] GH-39466: [Go][Parquet] Align Arrow and Parquet Timestamp Instant/Local Semantics (#39467) ### Rationale for this change Closes: #39466 ### What changes are included in this PR? - Update logic for determining whether an Arrow Timestamp should have `isAdjustedToUTC=true` on conversion to Parquet. - Update conversion from Parquet Timestamp to Arrow Timestamp to align with Parquet Format [backward-compatibilty](https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/LogicalTypes.md?plain=1#L480-L485) rules. - Refactor Timestamp serialization methods to reduce duplicated code ### Are these changes tested? Yes, - Logical type mapping in existing test updated. - New tests for roundtrip behavior of timestamps with various timezone settings, with/without store_schema enabled. - New test to clarify equality behavior of timestamps with instant semantics, as well as Go-related quirks with timezone-unaware timestamps. ### Are there any user-facing changes? Yes, users of `pqarrow.FileWriter` will produce Parquet files in which the `TIMESTAMP` type is normalized to UTC IFF the Arrow type provided has a timezone specified. This is different from the current Go behavior but aligned that of other implementations. The conversion from Parquet to Arrow has been updated as well to reflect the Parquet format [document](https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/LogicalTypes.md?plain=1#L480-L485). Rust already [implements](https://github.com/apache/arrow-rs/blob/a61e824abdd7b38ea214828480430ff2a13f2ead/parquet/src/arrow/schema/primitive.rs#L211-L239) the spec as described and #39489 has been reported due to a mismatch in the handling of convertedTypes in C++. * Closes: #39466 Authored-by: Joel Lubinitsky Signed-off-by: Matt Topol --- go/arrow/array/timestamp.go | 11 ++-- go/arrow/array/timestamp_test.go | 49 ++++++++++++++++- go/arrow/datatype_fixedwidth.go | 19 +++---- go/parquet/pqarrow/encode_arrow_test.go | 70 +++++++++++++++++++++++++ go/parquet/pqarrow/schema.go | 13 +++-- go/parquet/pqarrow/schema_test.go | 6 +-- 6 files changed, 140 insertions(+), 28 deletions(-) diff --git a/go/arrow/array/timestamp.go b/go/arrow/array/timestamp.go index 6ffb43e067af0..0cc46a127fc51 100644 --- a/go/arrow/array/timestamp.go +++ b/go/arrow/array/timestamp.go @@ -91,16 +91,15 @@ func (a *Timestamp) ValueStr(i int) string { return NullValueStr } - dt := a.DataType().(*arrow.TimestampType) - z, _ := dt.GetZone() - return a.values[i].ToTime(dt.Unit).In(z).Format("2006-01-02 15:04:05.999999999Z0700") + toTime, _ := a.DataType().(*arrow.TimestampType).GetToTimeFunc() + return toTime(a.values[i]).Format("2006-01-02 15:04:05.999999999Z0700") } func (a *Timestamp) GetOneForMarshal(i int) interface{} { - if a.IsNull(i) { - return nil + if val := a.ValueStr(i); val != NullValueStr { + return val } - return a.values[i].ToTime(a.DataType().(*arrow.TimestampType).Unit).Format("2006-01-02 15:04:05.999999999") + return nil } func (a *Timestamp) MarshalJSON() ([]byte, error) { diff --git a/go/arrow/array/timestamp_test.go b/go/arrow/array/timestamp_test.go index acbad8b586dd4..c172ad811dc37 100644 --- a/go/arrow/array/timestamp_test.go +++ b/go/arrow/array/timestamp_test.go @@ -234,7 +234,7 @@ func TestTimestampBuilder_Resize(t *testing.T) { assert.Equal(t, 5, ab.Len()) } -func TestTimestampValueStr(t *testing.T) { +func TestTimestampValueStr(t *testing.T) { mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) defer mem.AssertSize(t, 0) @@ -251,3 +251,50 @@ func TestTimestampValueStr(t *testing.T) { assert.Equal(t, "1968-11-30 13:30:45-0700", arr.ValueStr(0)) assert.Equal(t, "2016-02-29 10:42:23-0700", arr.ValueStr(1)) } + +func TestTimestampEquality(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + + tsDatatypes := []*arrow.TimestampType{ + {Unit: arrow.Second}, + {Unit: arrow.Second, TimeZone: "UTC"}, + {Unit: arrow.Second, TimeZone: "America/Phoenix"}, + } + + arrs := make([]*array.Timestamp, 0, len(tsDatatypes)) + for _, dt := range tsDatatypes { + bldr := array.NewTimestampBuilder(mem, dt) + defer bldr.Release() + + bldr.Append(-34226955) + bldr.Append(1456767743) + + arr := bldr.NewTimestampArray() + defer arr.Release() + + arrs = append(arrs, arr) + } + + // No timezone, "wall clock" semantics + // These timestamps have no actual timezone, but we still represent as UTC per Go conventions + assert.Equal(t, "1968-11-30 20:30:45Z", arrs[0].ValueStr(0)) + assert.Equal(t, "2016-02-29 17:42:23Z", arrs[0].ValueStr(1)) + + // UTC timezone, "instant" semantics + assert.Equal(t, "1968-11-30 20:30:45Z", arrs[1].ValueStr(0)) + assert.Equal(t, "2016-02-29 17:42:23Z", arrs[1].ValueStr(1)) + + // America/Phoenix timezone, "instant" semantics + assert.Equal(t, "1968-11-30 13:30:45-0700", arrs[2].ValueStr(0)) + assert.Equal(t, "2016-02-29 10:42:23-0700", arrs[2].ValueStr(1)) + + // Despite timezone and semantics, the physical values are equivalent + assert.Equal(t, arrs[0].Value(0), arrs[1].Value(0)) + assert.Equal(t, arrs[0].Value(0), arrs[2].Value(0)) + assert.Equal(t, arrs[1].Value(0), arrs[2].Value(0)) + + assert.Equal(t, arrs[0].Value(1), arrs[1].Value(1)) + assert.Equal(t, arrs[0].Value(1), arrs[2].Value(1)) + assert.Equal(t, arrs[1].Value(1), arrs[2].Value(1)) +} diff --git a/go/arrow/datatype_fixedwidth.go b/go/arrow/datatype_fixedwidth.go index 1a3074e59e75f..158dbd67b1b5e 100644 --- a/go/arrow/datatype_fixedwidth.go +++ b/go/arrow/datatype_fixedwidth.go @@ -348,8 +348,11 @@ type TemporalWithUnit interface { } // TimestampType is encoded as a 64-bit signed integer since the UNIX epoch (2017-01-01T00:00:00Z). -// The zero-value is a second and time zone neutral. Time zone neutral can be -// considered UTC without having "UTC" as a time zone. +// The zero-value is a second and time zone neutral. In Arrow semantics, time zone neutral does not +// represent a physical point in time, but rather a "wall clock" time that only has meaning within +// the context that produced it. In Go, time.Time can only represent instants; there is no notion +// of "wall clock" time. Therefore, time zone neutral timestamps are represented as UTC per Go +// conventions even though the Arrow type itself has no time zone. type TimestampType struct { Unit TimeUnit TimeZone string @@ -454,17 +457,7 @@ func (t *TimestampType) GetToTimeFunc() (func(Timestamp) time.Time, error) { return nil, err } - switch t.Unit { - case Second: - return func(v Timestamp) time.Time { return time.Unix(int64(v), 0).In(tz) }, nil - case Millisecond: - return func(v Timestamp) time.Time { return time.UnixMilli(int64(v)).In(tz) }, nil - case Microsecond: - return func(v Timestamp) time.Time { return time.UnixMicro(int64(v)).In(tz) }, nil - case Nanosecond: - return func(v Timestamp) time.Time { return time.Unix(0, int64(v)).In(tz) }, nil - } - return nil, fmt.Errorf("invalid timestamp unit: %s", t.Unit) + return func(v Timestamp) time.Time { return v.ToTime(t.Unit).In(tz) }, nil } // Time32Type is encoded as a 32-bit signed integer, representing either seconds or milliseconds since midnight. diff --git a/go/parquet/pqarrow/encode_arrow_test.go b/go/parquet/pqarrow/encode_arrow_test.go index 75eb965d033b5..25d31b54e1b31 100644 --- a/go/parquet/pqarrow/encode_arrow_test.go +++ b/go/parquet/pqarrow/encode_arrow_test.go @@ -171,6 +171,41 @@ func makeDateTypeTable(mem memory.Allocator, expected bool, partialDays bool) ar return array.NewTableFromRecords(arrsc, []arrow.Record{rec}) } +func makeTimestampTypeTable(mem memory.Allocator, expected bool) arrow.Table { + isValid := []bool{true, true, true, false, true, true} + + // Timestamp with relative (i.e. local) semantics. Make sure it roundtrips without being incorrectly converted to an absolute point in time. + f0 := arrow.Field{Name: "f0", Type: &arrow.TimestampType{Unit: arrow.Millisecond}, Nullable: true, Metadata: arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"1"})} + + // Timestamp with absolute (i.e. instant) semantics. The physical representation is always from Unix epoch in UTC timezone. + // TimeZone is used for display purposes and can be stripped on roundtrip without changing the actual instant referred to. + // WithStoreSchema will preserve the original timezone, but the instant in will be equivalent even if it's not used. + f1 := arrow.Field{Name: "f1", Type: &arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: "EST"}, Nullable: true, Metadata: arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"2"})} + f1X := arrow.Field{Name: "f1", Type: &arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: "UTC"}, Nullable: true, Metadata: arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"2"})} + + fieldList := []arrow.Field{f0} + if expected { + fieldList = append(fieldList, f1X) + } else { + fieldList = append(fieldList, f1) + } + + arrsc := arrow.NewSchema(fieldList, nil) + + ts64msValues := []arrow.Timestamp{1489269, 1489270, 1489271, 1489272, 1489272, 1489273} + + bldr := array.NewRecordBuilder(mem, arrsc) + defer bldr.Release() + + bldr.Field(0).(*array.TimestampBuilder).AppendValues(ts64msValues, isValid) + bldr.Field(1).(*array.TimestampBuilder).AppendValues(ts64msValues, isValid) + + rec := bldr.NewRecord() + defer rec.Release() + + return array.NewTableFromRecords(arrsc, []arrow.Record{rec}) +} + func TestWriteArrowCols(t *testing.T) { mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(t, 0) @@ -954,6 +989,25 @@ func (ps *ParquetIOTestSuite) TestDate64ReadWriteTable() { ps.Truef(array.TableEqual(date32ExpectedOutputTable, roundTripOutputTable), "expected table: %s\ngot table: %s", date32ExpectedOutputTable, roundTripOutputTable) } +func (ps *ParquetIOTestSuite) TestTimestampTZReadWriteTable() { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(ps.T(), 0) + + inputTable := makeTimestampTypeTable(mem, false) + defer inputTable.Release() + buf := writeTableToBuffer(ps.T(), mem, inputTable, inputTable.NumRows(), pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))) + defer buf.Release() + + reader := ps.createReader(mem, buf.Bytes()) + roundTripOutputTable := ps.readTable(reader) + defer roundTripOutputTable.Release() + + expectedOutputTable := makeTimestampTypeTable(mem, true) + defer expectedOutputTable.Release() + + ps.Truef(array.TableEqual(expectedOutputTable, roundTripOutputTable), "expected table: %s\ngot table: %s", expectedOutputTable, roundTripOutputTable) +} + func (ps *ParquetIOTestSuite) TestDate64ReadWriteTableWithPartialDays() { mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(ps.T(), 0) @@ -973,6 +1027,22 @@ func (ps *ParquetIOTestSuite) TestDate64ReadWriteTableWithPartialDays() { ps.Truef(array.TableEqual(date32ExpectedOutputTable, roundTripOutputTable), "expected table: %s\ngot table: %s", date32ExpectedOutputTable, roundTripOutputTable) } +func (ps *ParquetIOTestSuite) TestTimestampTZStoreSchemaReadWriteTable() { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(ps.T(), 0) + + inputTable := makeTimestampTypeTable(mem, false) + defer inputTable.Release() + buf := writeTableToBuffer(ps.T(), mem, inputTable, inputTable.NumRows(), pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem), pqarrow.WithStoreSchema())) + defer buf.Release() + + reader := ps.createReader(mem, buf.Bytes()) + roundTripOutputTable := ps.readTable(reader) + defer roundTripOutputTable.Release() + + ps.Truef(array.TableEqual(inputTable, roundTripOutputTable), "expected table: %s\ngot table: %s", inputTable, roundTripOutputTable) +} + func (ps *ParquetIOTestSuite) TestLargeBinaryReadWriteTable() { mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(ps.T(), 0) diff --git a/go/parquet/pqarrow/schema.go b/go/parquet/pqarrow/schema.go index 383d47fbaabed..f2aa4cdfe05ad 100644 --- a/go/parquet/pqarrow/schema.go +++ b/go/parquet/pqarrow/schema.go @@ -125,7 +125,7 @@ func isDictionaryReadSupported(dt arrow.DataType) bool { } func arrowTimestampToLogical(typ *arrow.TimestampType, unit arrow.TimeUnit) schema.LogicalType { - utc := typ.TimeZone == "" || typ.TimeZone == "UTC" + isAdjustedToUTC := typ.TimeZone != "" // for forward compatibility reasons, and because there's no other way // to signal to old readers that values are timestamps, we force @@ -146,7 +146,7 @@ func arrowTimestampToLogical(typ *arrow.TimestampType, unit arrow.TimeUnit) sche return schema.NoLogicalType{} } - return schema.NewTimestampLogicalTypeForce(utc, scunit) + return schema.NewTimestampLogicalTypeForce(isAdjustedToUTC, scunit) } func getTimestampMeta(typ *arrow.TimestampType, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (parquet.Type, schema.LogicalType, error) { @@ -519,9 +519,12 @@ func arrowTime64(logical *schema.TimeLogicalType) (arrow.DataType, error) { } func arrowTimestamp(logical *schema.TimestampLogicalType) (arrow.DataType, error) { - tz := "UTC" - if logical.IsFromConvertedType() { - tz = "" + tz := "" + + // ConvertedTypes are adjusted to UTC per backward compatibility guidelines + // https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/LogicalTypes.md?plain=1#L480-L485 + if logical.IsAdjustedToUTC() || logical.IsFromConvertedType() { + tz = "UTC" } switch logical.TimeUnit() { diff --git a/go/parquet/pqarrow/schema_test.go b/go/parquet/pqarrow/schema_test.go index a3c2c7a4ff60c..f320b903033db 100644 --- a/go/parquet/pqarrow/schema_test.go +++ b/go/parquet/pqarrow/schema_test.go @@ -304,7 +304,7 @@ func TestCoerceTImestampV1(t *testing.T) { arrowFields := make([]arrow.Field, 0) parquetFields = append(parquetFields, schema.Must(schema.NewPrimitiveNodeLogical("timestamp", parquet.Repetitions.Required, - schema.NewTimestampLogicalTypeForce(false, schema.TimeUnitMicros), parquet.Types.Int64, 0, -1))) + schema.NewTimestampLogicalTypeForce(true, schema.TimeUnitMicros), parquet.Types.Int64, 0, -1))) arrowFields = append(arrowFields, arrow.Field{Name: "timestamp", Type: &arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: "EST"}}) arrowSchema := arrow.NewSchema(arrowFields, nil) @@ -323,11 +323,11 @@ func TestAutoCoerceTImestampV1(t *testing.T) { arrowFields := make([]arrow.Field, 0) parquetFields = append(parquetFields, schema.Must(schema.NewPrimitiveNodeLogical("timestamp", parquet.Repetitions.Required, - schema.NewTimestampLogicalTypeForce(false, schema.TimeUnitMicros), parquet.Types.Int64, 0, -1))) + schema.NewTimestampLogicalTypeForce(true, schema.TimeUnitMicros), parquet.Types.Int64, 0, -1))) arrowFields = append(arrowFields, arrow.Field{Name: "timestamp", Type: &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "EST"}}) parquetFields = append(parquetFields, schema.Must(schema.NewPrimitiveNodeLogical("timestamp[ms]", parquet.Repetitions.Required, - schema.NewTimestampLogicalTypeForce(true, schema.TimeUnitMillis), parquet.Types.Int64, 0, -1))) + schema.NewTimestampLogicalTypeForce(false, schema.TimeUnitMillis), parquet.Types.Int64, 0, -1))) arrowFields = append(arrowFields, arrow.Field{Name: "timestamp[ms]", Type: &arrow.TimestampType{Unit: arrow.Second}}) arrowSchema := arrow.NewSchema(arrowFields, nil)