From 7e693de225d6c965e7c96bf9fa41f09188d879f9 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Fri, 6 Dec 2024 12:38:12 -0500 Subject: [PATCH] switch to per-column option --- parquet/pqarrow/encode_arrow_test.go | 6 +++-- parquet/pqarrow/properties.go | 37 +++++++++++++++++++++++----- parquet/pqarrow/schema.go | 34 ++++++++++++++++--------- 3 files changed, 58 insertions(+), 19 deletions(-) diff --git a/parquet/pqarrow/encode_arrow_test.go b/parquet/pqarrow/encode_arrow_test.go index 266d8911..78e9c684 100644 --- a/parquet/pqarrow/encode_arrow_test.go +++ b/parquet/pqarrow/encode_arrow_test.go @@ -1976,8 +1976,10 @@ func TestForceLargeTypes(t *testing.T) { require.NoError(t, err) defer rdr.Close() - pqrdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{ - ForceLarge: true}, mem) + props := pqarrow.ArrowReadProperties{} + props.SetForceLarge(0, true) + props.SetForceLarge(1, true) + pqrdr, err := pqarrow.NewFileReader(rdr, props, mem) require.NoError(t, err) recrdr, err := pqrdr.GetRecordReader(context.Background(), nil, nil) diff --git a/parquet/pqarrow/properties.go b/parquet/pqarrow/properties.go index b721cc04..d349a398 100755 --- a/parquet/pqarrow/properties.go +++ b/parquet/pqarrow/properties.go @@ -165,13 +165,38 @@ type ArrowReadProperties struct { Parallel bool // BatchSize is the size used for calls to NextBatch when reading whole columns BatchSize int64 - // Setting ForceLarge to true will force the reader to use LargeString/LargeBinary - // for string and binary columns respectively, instead of the default variants. This - // can be necessary if you know that there are columns which contain more than 2GB of - // data, which would prevent use of int32 offsets. - ForceLarge bool - readDictIndices map[int]struct{} + readDictIndices map[int]struct{} + forceLargeIndices map[int]struct{} +} + +// SetForceLarge determines whether a particular column, if it is String or Binary, +// will use the LargeString/LargeBinary variants (with int64 offsets) instead of int32 +// offsets. This is specifically useful if you know that particular columns contain more +// than 2GB worth of byte data which would prevent use of int32 offsets. +// +// Passing false will use the default variants while passing true will use the large +// variant. If the passed column index is not a string or binary column, then this will +// have no effect. +func (props *ArrowReadProperties) SetForceLarge(colIdx int, forceLarge bool) { + if props.forceLargeIndices == nil { + props.forceLargeIndices = make(map[int]struct{}) + } + + if forceLarge { + props.forceLargeIndices[colIdx] = struct{}{} + } else { + delete(props.forceLargeIndices, colIdx) + } +} + +func (props *ArrowReadProperties) ForceLarge(colIdx int) bool { + if props.forceLargeIndices == nil { + return false + } + + _, ok := props.forceLargeIndices[colIdx] + return ok } // SetReadDict determines whether to read a particular column as dictionary diff --git a/parquet/pqarrow/schema.go b/parquet/pqarrow/schema.go index 8afa6c92..efb95519 100644 --- a/parquet/pqarrow/schema.go +++ b/parquet/pqarrow/schema.go @@ -494,12 +494,9 @@ func arrowFromInt64(logical schema.LogicalType) (arrow.DataType, error) { } } -func arrowFromByteArray(ctx *schemaTree, logical schema.LogicalType) (arrow.DataType, error) { +func arrowFromByteArray(logical schema.LogicalType) (arrow.DataType, error) { switch logtype := logical.(type) { case schema.StringLogicalType: - if ctx.props.ForceLarge { - return arrow.BinaryTypes.LargeString, nil - } return arrow.BinaryTypes.String, nil case schema.DecimalLogicalType: return arrowDecimal(logtype), nil @@ -507,9 +504,6 @@ func arrowFromByteArray(ctx *schemaTree, logical schema.LogicalType) (arrow.Data schema.EnumLogicalType, schema.JSONLogicalType, schema.BSONLogicalType: - if ctx.props.ForceLarge { - return arrow.BinaryTypes.LargeBinary, nil - } return arrow.BinaryTypes.Binary, nil default: return nil, xerrors.New("unhandled logicaltype " + logical.String() + " for byte_array") @@ -613,7 +607,7 @@ func getParquetType(typ arrow.DataType, props *parquet.WriterProperties, arrprop } } -func getArrowType(ctx *schemaTree, physical parquet.Type, logical schema.LogicalType, typeLen int) (arrow.DataType, error) { +func getArrowType(physical parquet.Type, logical schema.LogicalType, typeLen int) (arrow.DataType, error) { if !logical.IsValid() || logical.Equals(schema.NullLogicalType{}) { return arrow.Null, nil } @@ -632,7 +626,7 @@ func getArrowType(ctx *schemaTree, physical parquet.Type, logical schema.Logical case parquet.Types.Double: return arrow.PrimitiveTypes.Float64, nil case parquet.Types.ByteArray: - return arrowFromByteArray(ctx, logical) + return arrowFromByteArray(logical) case parquet.Types.FixedLenByteArray: return arrowFromFLBA(logical, typeLen) default: @@ -714,7 +708,7 @@ func listToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo, ctx *s // } primitiveNode := listNode.(*schema.PrimitiveNode) colIndex := ctx.schema.ColumnIndexByNode(primitiveNode) - arrowType, err := getArrowType(ctx, primitiveNode.PhysicalType(), primitiveNode.LogicalType(), primitiveNode.TypeLength()) + arrowType, err := getArrowType(primitiveNode.PhysicalType(), primitiveNode.LogicalType(), primitiveNode.TypeLength()) if err != nil { return err } @@ -723,6 +717,15 @@ func listToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo, ctx *s arrowType = &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrowType} } + if arrow.IsBinaryLike(arrowType.ID()) && ctx.props.ForceLarge(colIndex) { + switch arrowType.ID() { + case arrow.STRING: + arrowType = arrow.BinaryTypes.LargeString + case arrow.BINARY: + arrowType = arrow.BinaryTypes.LargeBinary + } + } + itemField := arrow.Field{Name: listNode.Name(), Type: arrowType, Nullable: false, Metadata: createFieldMeta(int(listNode.FieldID()))} populateLeaf(colIndex, &itemField, currentLevels, ctx, out, &out.Children[0]) } @@ -888,7 +891,7 @@ func nodeToSchemaField(n schema.Node, currentLevels file.LevelInfo, ctx *schemaT primitive := n.(*schema.PrimitiveNode) colIndex := ctx.schema.ColumnIndexByNode(primitive) - arrowType, err := getArrowType(ctx, primitive.PhysicalType(), primitive.LogicalType(), primitive.TypeLength()) + arrowType, err := getArrowType(primitive.PhysicalType(), primitive.LogicalType(), primitive.TypeLength()) if err != nil { return err } @@ -897,6 +900,15 @@ func nodeToSchemaField(n schema.Node, currentLevels file.LevelInfo, ctx *schemaT arrowType = &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrowType} } + if arrow.IsBinaryLike(arrowType.ID()) && ctx.props.ForceLarge(colIndex) { + switch arrowType.ID() { + case arrow.STRING: + arrowType = arrow.BinaryTypes.LargeString + case arrow.BINARY: + arrowType = arrow.BinaryTypes.LargeBinary + } + } + if primitive.RepetitionType() == parquet.Repetitions.Repeated { // one-level list encoding e.g. a: repeated int32; repeatedAncestorDefLevel := currentLevels.IncrementRepeated()