Skip to content

Commit

Permalink
switch to per-column option
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroshade committed Dec 6, 2024
1 parent 5ea2f7b commit 7e693de
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 19 deletions.
6 changes: 4 additions & 2 deletions parquet/pqarrow/encode_arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1976,8 +1976,10 @@ func TestForceLargeTypes(t *testing.T) {
require.NoError(t, err)
defer rdr.Close()

pqrdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{
ForceLarge: true}, mem)
props := pqarrow.ArrowReadProperties{}
props.SetForceLarge(0, true)
props.SetForceLarge(1, true)
pqrdr, err := pqarrow.NewFileReader(rdr, props, mem)
require.NoError(t, err)

recrdr, err := pqrdr.GetRecordReader(context.Background(), nil, nil)
Expand Down
37 changes: 31 additions & 6 deletions parquet/pqarrow/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,38 @@ type ArrowReadProperties struct {
Parallel bool
// BatchSize is the size used for calls to NextBatch when reading whole columns
BatchSize int64
// Setting ForceLarge to true will force the reader to use LargeString/LargeBinary
// for string and binary columns respectively, instead of the default variants. This
// can be necessary if you know that there are columns which contain more than 2GB of
// data, which would prevent use of int32 offsets.
ForceLarge bool

readDictIndices map[int]struct{}
readDictIndices map[int]struct{}
forceLargeIndices map[int]struct{}
}

// SetForceLarge determines whether a particular column, if it is String or Binary,
// will use the LargeString/LargeBinary variants (with int64 offsets) instead of int32
// offsets. This is specifically useful if you know that particular columns contain more
// than 2GB worth of byte data which would prevent use of int32 offsets.
//
// Passing false will use the default variants while passing true will use the large
// variant. If the passed column index is not a string or binary column, then this will
// have no effect.
func (props *ArrowReadProperties) SetForceLarge(colIdx int, forceLarge bool) {
if props.forceLargeIndices == nil {
props.forceLargeIndices = make(map[int]struct{})
}

if forceLarge {
props.forceLargeIndices[colIdx] = struct{}{}
} else {
delete(props.forceLargeIndices, colIdx)
}
}

func (props *ArrowReadProperties) ForceLarge(colIdx int) bool {
if props.forceLargeIndices == nil {
return false
}

_, ok := props.forceLargeIndices[colIdx]
return ok
}

// SetReadDict determines whether to read a particular column as dictionary
Expand Down
34 changes: 23 additions & 11 deletions parquet/pqarrow/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,22 +494,16 @@ func arrowFromInt64(logical schema.LogicalType) (arrow.DataType, error) {
}
}

func arrowFromByteArray(ctx *schemaTree, logical schema.LogicalType) (arrow.DataType, error) {
func arrowFromByteArray(logical schema.LogicalType) (arrow.DataType, error) {
switch logtype := logical.(type) {
case schema.StringLogicalType:
if ctx.props.ForceLarge {
return arrow.BinaryTypes.LargeString, nil
}
return arrow.BinaryTypes.String, nil
case schema.DecimalLogicalType:
return arrowDecimal(logtype), nil
case schema.NoLogicalType,
schema.EnumLogicalType,
schema.JSONLogicalType,
schema.BSONLogicalType:
if ctx.props.ForceLarge {
return arrow.BinaryTypes.LargeBinary, nil
}
return arrow.BinaryTypes.Binary, nil
default:
return nil, xerrors.New("unhandled logicaltype " + logical.String() + " for byte_array")
Expand Down Expand Up @@ -613,7 +607,7 @@ func getParquetType(typ arrow.DataType, props *parquet.WriterProperties, arrprop
}
}

func getArrowType(ctx *schemaTree, physical parquet.Type, logical schema.LogicalType, typeLen int) (arrow.DataType, error) {
func getArrowType(physical parquet.Type, logical schema.LogicalType, typeLen int) (arrow.DataType, error) {
if !logical.IsValid() || logical.Equals(schema.NullLogicalType{}) {
return arrow.Null, nil
}
Expand All @@ -632,7 +626,7 @@ func getArrowType(ctx *schemaTree, physical parquet.Type, logical schema.Logical
case parquet.Types.Double:
return arrow.PrimitiveTypes.Float64, nil
case parquet.Types.ByteArray:
return arrowFromByteArray(ctx, logical)
return arrowFromByteArray(logical)
case parquet.Types.FixedLenByteArray:
return arrowFromFLBA(logical, typeLen)
default:
Expand Down Expand Up @@ -714,7 +708,7 @@ func listToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo, ctx *s
// }
primitiveNode := listNode.(*schema.PrimitiveNode)
colIndex := ctx.schema.ColumnIndexByNode(primitiveNode)
arrowType, err := getArrowType(ctx, primitiveNode.PhysicalType(), primitiveNode.LogicalType(), primitiveNode.TypeLength())
arrowType, err := getArrowType(primitiveNode.PhysicalType(), primitiveNode.LogicalType(), primitiveNode.TypeLength())
if err != nil {
return err
}
Expand All @@ -723,6 +717,15 @@ func listToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo, ctx *s
arrowType = &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrowType}
}

if arrow.IsBinaryLike(arrowType.ID()) && ctx.props.ForceLarge(colIndex) {
switch arrowType.ID() {
case arrow.STRING:
arrowType = arrow.BinaryTypes.LargeString
case arrow.BINARY:
arrowType = arrow.BinaryTypes.LargeBinary
}
}

itemField := arrow.Field{Name: listNode.Name(), Type: arrowType, Nullable: false, Metadata: createFieldMeta(int(listNode.FieldID()))}
populateLeaf(colIndex, &itemField, currentLevels, ctx, out, &out.Children[0])
}
Expand Down Expand Up @@ -888,7 +891,7 @@ func nodeToSchemaField(n schema.Node, currentLevels file.LevelInfo, ctx *schemaT

primitive := n.(*schema.PrimitiveNode)
colIndex := ctx.schema.ColumnIndexByNode(primitive)
arrowType, err := getArrowType(ctx, primitive.PhysicalType(), primitive.LogicalType(), primitive.TypeLength())
arrowType, err := getArrowType(primitive.PhysicalType(), primitive.LogicalType(), primitive.TypeLength())
if err != nil {
return err
}
Expand All @@ -897,6 +900,15 @@ func nodeToSchemaField(n schema.Node, currentLevels file.LevelInfo, ctx *schemaT
arrowType = &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrowType}
}

if arrow.IsBinaryLike(arrowType.ID()) && ctx.props.ForceLarge(colIndex) {
switch arrowType.ID() {
case arrow.STRING:
arrowType = arrow.BinaryTypes.LargeString
case arrow.BINARY:
arrowType = arrow.BinaryTypes.LargeBinary
}
}

if primitive.RepetitionType() == parquet.Repetitions.Repeated {
// one-level list encoding e.g. a: repeated int32;
repeatedAncestorDefLevel := currentLevels.IncrementRepeated()
Expand Down

0 comments on commit 7e693de

Please sign in to comment.