From 94b437f7e89225b18cb916f581ec7253c35b2a1a Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 5 May 2023 14:49:43 -0700 Subject: [PATCH 01/18] feat: expose parquet sort metadata --- docs/source/python/api/formats.rst | 1 + python/pyarrow/_parquet.pxd | 24 +- python/pyarrow/_parquet.pyx | 275 +++++++++++++++++- python/pyarrow/parquet/core.py | 59 +++- python/pyarrow/tests/parquet/test_metadata.py | 71 +++++ 5 files changed, 416 insertions(+), 14 deletions(-) diff --git a/docs/source/python/api/formats.rst b/docs/source/python/api/formats.rst index 9ca499c0972e5..86e2585ac2537 100644 --- a/docs/source/python/api/formats.rst +++ b/docs/source/python/api/formats.rst @@ -97,6 +97,7 @@ Parquet Metadata FileMetaData RowGroupMetaData + SortingColumn ColumnChunkMetaData Statistics ParquetSchema diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 59b50ceda8c40..f47212daad527 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -24,7 +24,7 @@ from pyarrow.includes.libarrow cimport (CChunkedArray, CScalar, CSchema, CStatus CKeyValueMetadata, CRandomAccessFile, COutputStream, TimeUnit, CRecordBatchReader) -from pyarrow.lib cimport _Weakrefable +from pyarrow.lib cimport (_Weakrefable, pyarrow_unwrap_schema) cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil: @@ -328,11 +328,17 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: optional[ParquetIndexLocation] GetColumnIndexLocation() const optional[ParquetIndexLocation] GetOffsetIndexLocation() const + struct CSortingColumn" parquet::SortingColumn": + int column_idx + c_bool descending + c_bool nulls_first + cdef cppclass CRowGroupMetaData" parquet::RowGroupMetaData": c_bool Equals(const CRowGroupMetaData&) const - int num_columns() - int64_t num_rows() - int64_t total_byte_size() + int num_columns() const + int64_t num_rows() const + int64_t total_byte_size() const + vector[CSortingColumn] sorting_columns() const unique_ptr[CColumnChunkMetaData] ColumnChunk(int i) const cdef cppclass CFileMetaData" parquet::FileMetaData": @@ -419,6 +425,7 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: Builder* disable_dictionary() Builder* enable_dictionary() Builder* enable_dictionary(const c_string& path) + Builder* set_sorting_columns(vector[CSortingColumn] sorting_columns) Builder* disable_statistics() Builder* enable_statistics() Builder* enable_statistics(const c_string& path) @@ -515,8 +522,8 @@ cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil: CStatus ToParquetSchema( const CSchema* arrow_schema, - const ArrowReaderProperties& properties, - const shared_ptr[const CKeyValueMetadata]& key_value_metadata, + const WriterProperties& properties, + const ArrowWriterProperties& arrow_properties, shared_ptr[SchemaDescriptor]* out) @@ -581,8 +588,9 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( FileEncryptionProperties encryption_properties=*, write_batch_size=*, dictionary_pagesize_limit=*, - write_page_index=*, - write_page_checksum=*) except * + write_page_index=* + write_page_checksum=*, + sorting_columns=*) except * cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties( diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 35344eb735516..c33f10c5ca5e0 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -18,6 +18,7 @@ # cython: profile=False # distutils: language = c++ +from collections.abc import Sequence from textwrap import indent import warnings @@ -506,6 +507,122 @@ cdef class ColumnChunkMetaData(_Weakrefable): return self.metadata.GetColumnIndexLocation().has_value() +cdef class SortingColumn: + """Sorting specification for a single column. + + Returned by :meth:`RowGroupMetaData.sorting_columns` and used in :class:`ParquetWriter` + to specify the sort order of the data. + + Example + ------- + + """ + cdef int column_index + cdef c_bool descending + cdef c_bool nulls_first + + def __init__(self, int column_index, c_bool descending=False, c_bool nulls_first=False): + """ + Parameters + ---------- + column_index : int + Index of column data is sorted by. + descending : bool, default False + Whether column is sorted in descending order. + nulls_first : bool, default False + Whether null values appear before valid values. + """ + self.column_index = column_index + self.descending = descending + self.nulls_first = nulls_first + + @classmethod + def from_sort_order(schema, sort_keys, null_placement='at_end'): + """ + Create a tuple of SortingColumn objects from the same arguments as + :class:`pyarrow.compute.SortOptions`. + + Parameters + ---------- + schema : Schema + Schema of the input data. + sort_keys : Sequence of (name, order) tuples + Names of field/column keys to sort the input on, + along with the order each field/column is sorted in. + Accepted values for `order` are "ascending", "descending". + The field name can be a string column name or expression. + null_placement : {'at_start', 'at_end'}, default 'at_end' + Where null values should appear in the sort order. + + Returns + ------- + sorting_columns : tuple of SortingColumn + """ + if null_placement == 'at_start': + nulls_first = True + elif null_placement == 'at_end': + nulls_first = False + else: + raise ValueError('null_placement must be "at_start" or "at_end"') + + col_map = _name_to_index_map(schema) + + sort_columns = [] + for name, order in sort_keys: + if order == 'ascending': + descending = False + elif order == 'descending': + descending = True + else: + raise ValueError('order must be "ascending" or "descending"') + + if isinstance(name, str): + column_index = col_map[name] + elif isinstance(name, int): + column_index = name + else: + raise TypeError('sort key name must be a string or integer') + + sort_columns.append(SortingColumn(column_index, descending, nulls_first)) + + return tuple(sort_columns) + + def __repr__(self): + return """SortingColumn(column_index={0}, descending={1}, nulls_first={2})""".format( + self.column_index, self.descending, self.nulls_first) + + def __eq__(self, SortingColumn other): + return (self.column_index == other.column_index and + self.descending == other.descending and + self.nulls_first == other.nulls_first) + + def __hash__(self): + return hash((self.column_index, self.descending, self.nulls_first)) + + @property + def column_index(self): + """"Index of column data is sorted by (int).""" + return self.column_index + + @property + def descending(self): + """Whether column is sorted in descending order (bool).""" + return self.descending + + @property + def nulls_first(self): + """Whether null values appear before valid values (bool).""" + return self.nulls_first + + def to_dict(self): + """Convert to dictionary representation.""" + return { + 'column_index': self.column_index, + 'descending': self.descending, + 'nulls_first': self.nulls_first + } + + cdef class RowGroupMetaData(_Weakrefable): """Metadata for a single row group.""" @@ -565,10 +682,12 @@ cdef class RowGroupMetaData(_Weakrefable): return """{0} num_columns: {1} num_rows: {2} - total_byte_size: {3}""".format(object.__repr__(self), + total_byte_size: {3} + sorting_columns: {4}""".format(object.__repr__(self), self.num_columns, self.num_rows, - self.total_byte_size) + self.total_byte_size, + self.sorting_columns) def to_dict(self): """ @@ -585,6 +704,7 @@ cdef class RowGroupMetaData(_Weakrefable): num_rows=self.num_rows, total_byte_size=self.total_byte_size, columns=columns, + sorting_columns=[col.to_dict() for col in self.sorting_columns] ) for i in range(self.num_columns): columns.append(self.column(i).to_dict()) @@ -605,6 +725,19 @@ cdef class RowGroupMetaData(_Weakrefable): """Total byte size of all the uncompressed column data in this row group (int).""" return self.metadata.total_byte_size() + @property + def sorting_columns(self): + """Columns the row group is sorted by (tuple of :class:`SortingColumn`).)""" + out = [] + cdef vector[CSortingColumn] sorting_columns = self.metadata.sorting_columns() + for sorting_col in sorting_columns: + out.append(SortingColumn( + sorting_col.column_idx, + sorting_col.descending, + sorting_col.nulls_first + )) + return tuple(out) + def _reconstruct_filemetadata(Buffer serialized): cdef: @@ -1550,6 +1683,31 @@ cdef class ParquetReader(_Weakrefable): return closed +cdef CSortingColumn _convert_sorting_column(SortingColumn sorting_column): + cdef CSortingColumn c_sorting_column + + c_sorting_column.column_idx = sorting_column.column_index + c_sorting_column.descending = sorting_column.descending + c_sorting_column.nulls_first = sorting_column.nulls_first + + return c_sorting_column + + +cdef vector[CSortingColumn] _convert_sorting_columns(sorting_columns) except *: + if not (isinstance(sorting_columns, Sequence) + and all(isinstance(col, SortingColumn) for col in sorting_columns)): + raise ValueError( + "'sorting_columns' must be a list of `SortingColumn`") + + cdef vector[CSortingColumn] c_sorting_columns = [_convert_sorting_column(col) + for col in sorting_columns] + + if len(sorting_columns) != len(c_sorting_columns): + raise ValueError("something has gone wrong") + + return c_sorting_columns + + cdef shared_ptr[WriterProperties] _create_writer_properties( use_dictionary=None, compression=None, @@ -1564,7 +1722,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( write_batch_size=None, dictionary_pagesize_limit=None, write_page_index=False, - write_page_checksum=False) except *: + write_page_checksum=False, + sorting_columns=None) except *: """General writer properties""" cdef: shared_ptr[WriterProperties] properties @@ -1649,6 +1808,11 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( for column in write_statistics: props.enable_statistics(tobytes(column)) + # sorting_columns + + if sorting_columns is not None: + props.set_sorting_columns(_convert_sorting_columns(sorting_columns)) + # use_byte_stream_split if isinstance(use_byte_stream_split, bool): @@ -1788,6 +1952,82 @@ cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties( return arrow_properties +cdef _name_to_index_map(Schema arrow_schema): + cdef: + shared_ptr[CSchema] sp_arrow_schema + shared_ptr[SchemaDescriptor] sp_parquet_schema + shared_ptr[WriterProperties] props = _create_writer_properties() + shared_ptr[ArrowWriterProperties] arrow_props = _create_arrow_writer_properties( + use_deprecated_int96_timestamps=False, + coerce_timestamps=None, + allow_truncated_timestamps=False, + writer_engine_version="V2" + ) + + sp_arrow_schema = pyarrow_unwrap_schema(arrow_schema) + + with nogil: + check_status(ToParquetSchema( + sp_arrow_schema.get(), deref(props.get()), deref(arrow_props.get()), &sp_parquet_schema)) + + out = dict() + + cdef SchemaDescriptor* parquet_schema = sp_parquet_schema.get() + + for i in range(parquet_schema.num_columns()): + name = frombytes(parquet_schema.Column(i).name()) + out[name] = i + + return out + + +def _sort_keys_to_sorting_columns(sort_keys, null_placement, Schema schema): + """Convert SortOptions to a list of SortingColumn objects""" + + if null_placement is None or null_placement == "at_end": + nulls_first = False + elif null_placement == "at_start": + nulls_first = True + else: + raise ValueError("Invalid value for null_placement: {0}" + .format(null_placement)) + + name_to_index_map = _name_to_index_map(schema) + + sorting_columns = [] + + for sort_key in sort_keys: + if isinstance(sort_key, str): + name = sort_key + descending = False + elif (isinstance(sort_key, tuple) and len(sort_key) == 2 and + isinstance(sort_key[0], str) and + isinstance(sort_key[1], str)): + name, descending = sort_key + if descending == "descending": + descending = True + elif descending == "ascending": + descending = False + else: + raise ValueError("Invalid sort key direction: {0}" + .format(descending)) + else: + raise ValueError("Invalid sort key: {0}".format(sort_key)) + + try: + column_index = name_to_index_map[name] + except KeyError: + raise ValueError("Sort key name '{0}' not found in schema:\n{1}" + .format(name, schema)) + + sorting_column = SortingColumn( + column_index, + descending, + nulls_first) + sorting_columns.append(sorting_column) + + return sorting_columns + cdef class ParquetWriter(_Weakrefable): cdef: @@ -1835,7 +2075,8 @@ cdef class ParquetWriter(_Weakrefable): dictionary_pagesize_limit=None, store_schema=True, write_page_index=False, - write_page_checksum=False): + write_page_checksum=False, + sorting_columns=None): cdef: shared_ptr[WriterProperties] properties shared_ptr[ArrowWriterProperties] arrow_properties @@ -1853,6 +2094,29 @@ cdef class ParquetWriter(_Weakrefable): self.sink = GetResultValue(FileOutputStream.Open(c_where)) self.own_sink = True + # This needs to be simplified while we still have access to the schema. + if sorting_columns is not None: + # For [(col1, "descending"), (col2, "ascending"), (col3)] + if (isinstance(sorting_columns, Sequence) and + all(isinstance(sort_key, str) or isinstance(sort_key, tuple) for sort_key in sorting_columns)): + sorting_columns = _sort_keys_to_sorting_columns( + sorting_columns, + None, + schema + ) + # For ([(col1, "descending"), (col2, "ascending"), (col3)], "at_end") + if (isinstance(sorting_columns, tuple) + and len(sorting_columns) <= 2 + and isinstance(sorting_columns[0], Sequence)): + if len(sorting_columns) == 1: + sorting_columns, null_placement = sorting_columns[0], None + else: + sorting_columns, null_placement = sorting_columns + sorting_columns = _sort_keys_to_sorting_columns( + sorting_columns, + null_placement, + schema) + properties = _create_writer_properties( use_dictionary=use_dictionary, compression=compression, @@ -1867,7 +2131,8 @@ cdef class ParquetWriter(_Weakrefable): write_batch_size=write_batch_size, dictionary_pagesize_limit=dictionary_pagesize_limit, write_page_index=write_page_index, - write_page_checksum=write_page_checksum + write_page_checksum=write_page_checksum, + sorting_columns=sorting_columns, ) arrow_properties = _create_arrow_writer_properties( use_deprecated_int96_timestamps=use_deprecated_int96_timestamps, diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index db22eb3293c86..004e97a75e1cd 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -48,7 +48,8 @@ ParquetSchema, ColumnSchema, ParquetLogicalType, FileEncryptionProperties, - FileDecryptionProperties) + FileDecryptionProperties, + SortingColumn) from pyarrow.fs import (LocalFileSystem, FileSystem, FileType, _resolve_filesystem_and_path, _ensure_filesystem) from pyarrow import filesystem as legacyfs @@ -718,6 +719,53 @@ def _get_column_indices(self, column_names, use_pandas_metadata=False): return indices + @property + def sort_order(self): + """ + Return the sort order of the file, if any. + + Returns + ------- + sort_order : tuple of (sort_keys, null_placement) or None + The sort order of the file. sort_keys is a sequence of tuples of + (name, order) where name is the column name and order is either + "ascending" or "descending". null_placement is either "at_start" + or "at_end". If the file is not sorted, None is returned. + """ + metadata = self.metadata + sorting_columns = { + metadata.row_group(i).sorting_columns + for i in range(metadata.num_row_groups) + } + + if len(sorting_columns) > 1: + # There are inconsistent sorting columns, so no global sort order + return None + sorting_columns = sorting_columns.pop() + if len(sorting_columns) == 0: + # There are no sorting columns + return None + + # Need to map the Parquet column indices into field references + sort_keys = [] + for sorting_column in sorting_columns: + name = self.schema.column(sorting_column.column_index).name + if sorting_column.descending: + order = "descending" + else: + order = "ascending" + sort_keys.append((name, order)) + + if all(col.nulls_first for col in sorting_columns): + null_placement = "at_start" + elif all(not col.nulls_first for col in sorting_columns): + null_placement = "at_end" + else: + # Mixed null placement is not supported + return None + + return (sort_keys, null_placement) + _SPARK_DISALLOWED_CHARS = re.compile('[ ,;{}()\n\t=]') @@ -895,6 +943,10 @@ def _sanitize_table(table, new_schema, flavor): Whether to write page checksums in general for all columns. Page checksums enable detection of data corruption, which might occur during transmission or in the storage. +sorting_columns : Sequence of SortingColumn, default None + Specify the sort order of the data being written. The writer does not sort + the data nor does it verify that the data is sorted. The sort order is + written to the row group metadata, which can then be used by readers. """ _parquet_writer_example_doc = """\ @@ -989,6 +1041,7 @@ def __init__(self, where, schema, filesystem=None, store_schema=True, write_page_index=False, write_page_checksum=False, + sorting_columns=None, **options): if use_deprecated_int96_timestamps is None: # Use int96 timestamps for Spark @@ -1047,6 +1100,7 @@ def __init__(self, where, schema, filesystem=None, store_schema=store_schema, write_page_index=write_page_index, write_page_checksum=write_page_checksum, + sorting_columns=sorting_columns, **options) self.is_open = True @@ -3129,6 +3183,7 @@ def write_table(table, where, row_group_size=None, version='2.6', store_schema=True, write_page_index=False, write_page_checksum=False, + sorting_columns=None, **kwargs): # Implementor's note: when adding keywords here / updating defaults, also # update it in write_to_dataset and _dataset_parquet.pyx ParquetFileWriteOptions @@ -3158,6 +3213,7 @@ def write_table(table, where, row_group_size=None, version='2.6', store_schema=store_schema, write_page_index=write_page_index, write_page_checksum=write_page_checksum, + sorting_columns=sorting_columns, **kwargs) as writer: writer.write_table(table, row_group_size=row_group_size) except Exception: @@ -3742,6 +3798,7 @@ def read_schema(where, memory_map=False, decryption_properties=None, "ParquetWriter", "PartitionSet", "RowGroupMetaData", + "SortingColumn", "Statistics", "read_metadata", "read_pandas", diff --git a/python/pyarrow/tests/parquet/test_metadata.py b/python/pyarrow/tests/parquet/test_metadata.py index 3efaf1dbf5526..c23cfce3c9891 100644 --- a/python/pyarrow/tests/parquet/test_metadata.py +++ b/python/pyarrow/tests/parquet/test_metadata.py @@ -301,6 +301,77 @@ def test_parquet_write_disable_statistics(tempdir): assert cc_b.statistics is None +def test_parquet_sorting_columns(): + table = pa.table({'a': [1, 2, 3], 'b': ['a', 'b', 'c']}) + + # Rejects invalid sorting_columns + writer = pa.BufferOutputStream() + with pytest.raises(ValueError): + _write_table(table, writer, sorting_columns="string") + with pytest.raises(ValueError): + _write_table(table, writer, sorting_columns=["string"]) + with pytest.raises(ValueError): + _write_table(table, writer, sorting_columns=([("string", None)])) + with pytest.raises(ValueError): + _write_table(table, writer, sorting_columns=([("string")], None)) + + sorting_columns = ( + pq.SortingColumn(column_index=0, descending=True, nulls_first=True), + pq.SortingColumn(column_index=1, descending=False), + ) + writer = pa.BufferOutputStream() + _write_table(table, writer, sorting_columns=sorting_columns) + reader = pa.BufferReader(writer.getvalue()) + + # Can retrieve sorting columns from metadata + metadata = pq.read_metadata(reader) + set_sorting_columns = {metadata.row_group(i).sorting_columns + for i in range(metadata.num_row_groups)} + assert set_sorting_columns == set([sorting_columns]) + + # Sort keys are None since nulls_first is inconsistent. + pq_file = pq.ParquetFile(reader) + assert pq_file.sort_order is None + + expected = ( + pq.SortingColumn(column_index=0, descending=True), + pq.SortingColumn(column_index=1, descending=False), + ) + writer = pa.BufferOutputStream() + _write_table(table, writer, sorting_columns=[("a", "descending"), "b"]) + reader = pa.BufferReader(writer.getvalue()) + + # Can retrieve sorting columns from metadata + metadata = pq.read_metadata(reader) + set_sorting_columns = {metadata.row_group(i).sorting_columns + for i in range(metadata.num_row_groups)} + assert set_sorting_columns == set([expected]) + + # Sort keys can be retrieved at file level + pq_file = pq.ParquetFile(reader) + assert pq_file.sort_order == ([("a", "descending"), ("b", "ascending")], "at_end") + + # Now with nulls_first=True + expected = ( + pq.SortingColumn(column_index=0, descending=True, nulls_first=True), + pq.SortingColumn(column_index=1, descending=False, nulls_first=True), + ) + writer = pa.BufferOutputStream() + _write_table(table, writer, sorting_columns=( + [("a", "descending"), "b"], "at_start")) + reader = pa.BufferReader(writer.getvalue()) + + # Can retrieve sorting columns from metadata + metadata = pq.read_metadata(reader) + set_sorting_columns = {metadata.row_group(i).sorting_columns + for i in range(metadata.num_row_groups)} + assert set_sorting_columns == set([expected]) + + # Sort keys can be retrieved at file level + pq_file = pq.ParquetFile(reader) + assert pq_file.sort_order == ([("a", "descending"), ("b", "ascending")], "at_start") + + def test_field_id_metadata(): # ARROW-7080 field_id = b'PARQUET:field_id' From 23aa525ba06bbdb9dbc672f6106c6993b107d992 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 10 May 2023 15:55:00 -0700 Subject: [PATCH 02/18] refactor: simplify API --- python/pyarrow/_parquet.pyx | 105 ++++++++++++------ python/pyarrow/parquet/core.py | 20 +--- python/pyarrow/tests/parquet/test_metadata.py | 98 ++++++++-------- 3 files changed, 118 insertions(+), 105 deletions(-) diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index c33f10c5ca5e0..3b719934de361 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -537,7 +537,7 @@ cdef class SortingColumn: self.nulls_first = nulls_first @classmethod - def from_sort_order(schema, sort_keys, null_placement='at_end'): + def from_sort_order(cls, Schema schema, sort_keys, null_placement='at_end'): """ Create a tuple of SortingColumn objects from the same arguments as :class:`pyarrow.compute.SortOptions`. @@ -567,25 +567,79 @@ cdef class SortingColumn: col_map = _name_to_index_map(schema) - sort_columns = [] - for name, order in sort_keys: - if order == 'ascending': + sorting_columns = [] + + for sort_key in sort_keys: + if isinstance(sort_key, str): + name = sort_key descending = False - elif order == 'descending': - descending = True + elif (isinstance(sort_key, tuple) and len(sort_key) == 2 and + isinstance(sort_key[0], str) and + isinstance(sort_key[1], str)): + name, descending = sort_key + if descending == "descending": + descending = True + elif descending == "ascending": + descending = False + else: + raise ValueError("Invalid sort key direction: {0}" + .format(descending)) else: - raise ValueError('order must be "ascending" or "descending"') + raise ValueError("Invalid sort key: {0}".format(sort_key)) - if isinstance(name, str): + try: column_index = col_map[name] - elif isinstance(name, int): - column_index = name - else: - raise TypeError('sort key name must be a string or integer') + except KeyError: + raise ValueError("Sort key name '{0}' not found in schema:\n{1}" + .format(name, schema)) - sort_columns.append(SortingColumn(column_index, descending, nulls_first)) + sorting_columns.append( + cls(column_index, descending=descending, nulls_first=nulls_first) + ) - return tuple(sort_columns) + return tuple(sorting_columns) + + @staticmethod + def as_sort_order(Schema schema, sorting_columns): + """ + Convert a tuple of SortingColumn objects to the same format as + :class:`pyarrow.compute.SortOptions`. + + Parameters + ---------- + schema : Schema + Schema of the input data. + sorting_columns : tuple of SortingColumn + Columns to sort the input on. + + Returns + ------- + sort_keys : tuple of (name, order) tuples + null_placement : {'at_start', 'at_end'} + """ + col_map = {i: name for i, name in _name_to_index_map(schema).items()} + + sort_keys = [] + nulls_first = None + + for sorting_column in sorting_columns: + name = col_map[sorting_column.column_index] + if sorting_column.descending: + order = "descending" + else: + order = "ascending" + sort_keys.append((name, order)) + if nulls_first is None: + nulls_first = sorting_column.nulls_first + elif nulls_first != sorting_column.nulls_first: + raise ValueError("Sorting columns have inconsistent null placement") + + if nulls_first: + null_placement = "at_start" + else: + null_placement = "at_end" + + return tuple(sort_keys), null_placement def __repr__(self): return """SortingColumn(column_index={0}, descending={1}, nulls_first={2})""".format( @@ -2094,29 +2148,6 @@ cdef class ParquetWriter(_Weakrefable): self.sink = GetResultValue(FileOutputStream.Open(c_where)) self.own_sink = True - # This needs to be simplified while we still have access to the schema. - if sorting_columns is not None: - # For [(col1, "descending"), (col2, "ascending"), (col3)] - if (isinstance(sorting_columns, Sequence) and - all(isinstance(sort_key, str) or isinstance(sort_key, tuple) for sort_key in sorting_columns)): - sorting_columns = _sort_keys_to_sorting_columns( - sorting_columns, - None, - schema - ) - # For ([(col1, "descending"), (col2, "ascending"), (col3)], "at_end") - if (isinstance(sorting_columns, tuple) - and len(sorting_columns) <= 2 - and isinstance(sorting_columns[0], Sequence)): - if len(sorting_columns) == 1: - sorting_columns, null_placement = sorting_columns[0], None - else: - sorting_columns, null_placement = sorting_columns - sorting_columns = _sort_keys_to_sorting_columns( - sorting_columns, - null_placement, - schema) - properties = _create_writer_properties( use_dictionary=use_dictionary, compression=compression, diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index 004e97a75e1cd..fe58c26bc1c72 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -745,26 +745,8 @@ def sort_order(self): if len(sorting_columns) == 0: # There are no sorting columns return None - - # Need to map the Parquet column indices into field references - sort_keys = [] - for sorting_column in sorting_columns: - name = self.schema.column(sorting_column.column_index).name - if sorting_column.descending: - order = "descending" - else: - order = "ascending" - sort_keys.append((name, order)) - - if all(col.nulls_first for col in sorting_columns): - null_placement = "at_start" - elif all(not col.nulls_first for col in sorting_columns): - null_placement = "at_end" else: - # Mixed null placement is not supported - return None - - return (sort_keys, null_placement) + return sorting_columns _SPARK_DISALLOWED_CHARS = re.compile('[ ,;{}()\n\t=]') diff --git a/python/pyarrow/tests/parquet/test_metadata.py b/python/pyarrow/tests/parquet/test_metadata.py index c23cfce3c9891..9ae40d937c137 100644 --- a/python/pyarrow/tests/parquet/test_metadata.py +++ b/python/pyarrow/tests/parquet/test_metadata.py @@ -301,75 +301,75 @@ def test_parquet_write_disable_statistics(tempdir): assert cc_b.statistics is None -def test_parquet_sorting_columns(): - table = pa.table({'a': [1, 2, 3], 'b': ['a', 'b', 'c']}) +def test_parquet_sorting_column(): + sorting_col = pq.SortingColumn(10) + assert sorting_col.column_index == 10 + assert sorting_col.descending is False + assert sorting_col.nulls_first is False + + sorting_col = pq.SortingColumn(0, descending=True, nulls_first=True) + assert sorting_col.column_index == 0 + assert sorting_col.descending is True + assert sorting_col.nulls_first is True + + schema = pa.schema([('a', pa.int64()), ('b', pa.int64())]) + sorting_cols = ( + pq.SortingColumn(1, descending=True), + pq.SortingColumn(0, descending=False), + ) + sort_order, null_placement = pq.SortingColumn.as_sort_order(schema, sorting_cols) + assert sort_order == (('b', "descending"), ('a', "ascending")) + assert null_placement == "at_end" - # Rejects invalid sorting_columns - writer = pa.BufferOutputStream() - with pytest.raises(ValueError): - _write_table(table, writer, sorting_columns="string") - with pytest.raises(ValueError): - _write_table(table, writer, sorting_columns=["string"]) - with pytest.raises(ValueError): - _write_table(table, writer, sorting_columns=([("string", None)])) - with pytest.raises(ValueError): - _write_table(table, writer, sorting_columns=([("string")], None)) + sorting_cols_roundtripped = pq.SortingColumn.from_sort_order( + schema, sort_order, null_placement) + assert sorting_cols_roundtripped == sorting_cols - sorting_columns = ( - pq.SortingColumn(column_index=0, descending=True, nulls_first=True), - pq.SortingColumn(column_index=1, descending=False), + sorting_cols = pq.SortingColumn.from_sort_order( + schema, ('a', ('b', "descending")), null_placement="at_start") + expected = ( + pq.SortingColumn(0, descending=False, nulls_first=True), + pq.SortingColumn(1, descending=True, nulls_first=True), ) - writer = pa.BufferOutputStream() - _write_table(table, writer, sorting_columns=sorting_columns) - reader = pa.BufferReader(writer.getvalue()) + assert sorting_cols == expected - # Can retrieve sorting columns from metadata - metadata = pq.read_metadata(reader) - set_sorting_columns = {metadata.row_group(i).sorting_columns - for i in range(metadata.num_row_groups)} - assert set_sorting_columns == set([sorting_columns]) + # Conversions handle empty tuples + empty_sorting_cols = pq.SortingColumn.from_sort_order(schema, ()) + assert empty_sorting_cols == () - # Sort keys are None since nulls_first is inconsistent. - pq_file = pq.ParquetFile(reader) - assert pq_file.sort_order is None + assert pq.SortingColumn.as_sort_order(schema, ()) == ((), "at_end") - expected = ( - pq.SortingColumn(column_index=0, descending=True), - pq.SortingColumn(column_index=1, descending=False), - ) - writer = pa.BufferOutputStream() - _write_table(table, writer, sorting_columns=[("a", "descending"), "b"]) - reader = pa.BufferReader(writer.getvalue()) + with pytest.raises(ValueError): + pq.SortingColumn.from_sort_order(schema, (("a", "not a valid sort order"))) - # Can retrieve sorting columns from metadata - metadata = pq.read_metadata(reader) - set_sorting_columns = {metadata.row_group(i).sorting_columns - for i in range(metadata.num_row_groups)} - assert set_sorting_columns == set([expected]) + with pytest.raises(ValueError, match="inconsistent null placement"): + sorting_cols = ( + pq.SortingColumn(1, nulls_first=True), + pq.SortingColumn(0, nulls_first=False), + ) + pq.SortingColumn.as_sort_order(schema, sorting_cols) - # Sort keys can be retrieved at file level - pq_file = pq.ParquetFile(reader) - assert pq_file.sort_order == ([("a", "descending"), ("b", "ascending")], "at_end") - # Now with nulls_first=True - expected = ( +def test_parquet_file_sorting_columns(): + table = pa.table({'a': [1, 2, 3], 'b': ['a', 'b', 'c']}) + + sorting_columns = ( pq.SortingColumn(column_index=0, descending=True, nulls_first=True), - pq.SortingColumn(column_index=1, descending=False, nulls_first=True), + pq.SortingColumn(column_index=1, descending=False), ) writer = pa.BufferOutputStream() - _write_table(table, writer, sorting_columns=( - [("a", "descending"), "b"], "at_start")) + _write_table(table, writer, sorting_columns=sorting_columns) reader = pa.BufferReader(writer.getvalue()) # Can retrieve sorting columns from metadata metadata = pq.read_metadata(reader) set_sorting_columns = {metadata.row_group(i).sorting_columns for i in range(metadata.num_row_groups)} - assert set_sorting_columns == set([expected]) + assert set_sorting_columns == set([sorting_columns]) - # Sort keys can be retrieved at file level + # Can also retrieve from the file reader pq_file = pq.ParquetFile(reader) - assert pq_file.sort_order == ([("a", "descending"), ("b", "ascending")], "at_start") + assert pq_file.sort_order == sorting_columns def test_field_id_metadata(): From fe0c7c02d6102ac7e749b12c861e137bbf3bff57 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 10 May 2023 17:10:07 -0700 Subject: [PATCH 03/18] docs: add a nice example --- python/pyarrow/parquet/core.py | 42 ++++++++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index fe58c26bc1c72..12e233b1d9c43 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -726,11 +726,42 @@ def sort_order(self): Returns ------- - sort_order : tuple of (sort_keys, null_placement) or None - The sort order of the file. sort_keys is a sequence of tuples of - (name, order) where name is the column name and order is either - "ascending" or "descending". null_placement is either "at_start" - or "at_end". If the file is not sorted, None is returned. + sort_order : tuple of SortingColumn or None + The sort order of the file. If the file is not sorted, None is returned. + + Examples + -------- + + Parquet files can be written with a sort order. However, you are responsible + for ensuring the data provided is sorted; the parquet writer doesn't perform + the sorting for you. + + >>> import pyarrow.parquet as pq + >>> import pyarrow as pa + >>> table = pa.table({'n_legs': [2, 2, 4, 4, 5, 100], + ... 'animal': ["Flamingo", "Parrot", "Dog", "Horse", + ... "Brittle stars", "Centipede"]}) + >>> sort_keys = (("n_legs", "descending"), ("animal", "ascending")) + >>> table = table.sort_by(sort_keys) + + While the sort_by function takes arguments in terms of column names, the + Parquet API requires indices. Use the + :meth:`pyarrow.parquet.SortingColumn.from_sort_order` method to convert + the sort keys. + + >>> sorting_columns = pq.SortingColumn.from_sort_order(table.schema, sort_keys) + >>> sorting_columns + (SortingColumn(column_index=0, descending=True, nulls_first=False), + SortingColumn(column_index=1, descending=False, nulls_first=False)) + + Write the table to a Parquet file with the sort order and read it back: + + >>> pq.write_table(table, 'sorted_animals.parquet', + ... sorting_columns = sorting_columns) + >>> parquet_file = pq.ParquetFile('sorted_animals.parquet') + >>> parquet_file.sort_order + (SortingColumn(column_index=0, descending=True, nulls_first=False), + SortingColumn(column_index=1, descending=False, nulls_first=False)) """ metadata = self.metadata sorting_columns = { @@ -929,6 +960,7 @@ def _sanitize_table(table, new_schema, flavor): Specify the sort order of the data being written. The writer does not sort the data nor does it verify that the data is sorted. The sort order is written to the row group metadata, which can then be used by readers. + See example at :attr:`ParquetFile.sort_order`. """ _parquet_writer_example_doc = """\ From f92869ba667037fde33fd703a711e1f71d407750 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 11 May 2023 13:48:58 -0700 Subject: [PATCH 04/18] fix: address nested schemas --- python/pyarrow/_parquet.pyx | 65 ++++++++++++++----- python/pyarrow/parquet/core.py | 2 +- python/pyarrow/tests/parquet/test_metadata.py | 24 ++++++- 3 files changed, 70 insertions(+), 21 deletions(-) diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 3b719934de361..e242a3eeb3f0a 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -510,28 +510,59 @@ cdef class ColumnChunkMetaData(_Weakrefable): cdef class SortingColumn: """Sorting specification for a single column. - Returned by :meth:`RowGroupMetaData.sorting_columns` and used in :class:`ParquetWriter` - to specify the sort order of the data. + Returned by :meth:`RowGroupMetaData.sorting_columns` and used in + :class:`ParquetWriter` to specify the sort order of the data. - Example - ------- + Parameters + ---------- + column_index : int + Index of column data is sorted by. + descending : bool, default False + Whether column is sorted in descending order. + nulls_first : bool, default False + Whether null values appear before valid values. + .. note:: + Column indices are zero-based, refer only to leaf fields, and are in + depth-first order. This may make the column indices for nested schemas + different from what you expect. In most cases, it will be easier to + specify the sort order using column names instead of column indices + and converting using the ``from_sort_order`` method. + + Examples + -------- + + In other APIs, sort order is specified by names, such as: + + >>> sort_order = [('id', 'ascending'), ('timestamp', 'descending')] + + For Parquet, the column index must be used instead: + + >>> import pyarrow.parquet as pq + >>> [pq.SortingColumn(0), pq.SortingColumn(1, descending=True)] + [SortingColumn(column_index=0, descending=False, nulls_first=False), + SortingColumn(column_index=1, descending=True, nulls_first=False)] + + Convert the sort_order into the list of sorting columns with + ``from_sort_order`` (note that the schema must be provided as well): + + >>> import pyarrow as pa + >>> schema = pa.schema([('id', pa.int64()), ('timestamp', pa.timestamp('ms'))]) + >>> sorting_columns = pq.SortingColumn.from_sort_order(schema, sort_order) + >>> sorting_columns + (SortingColumn(column_index=0, descending=False, nulls_first=False), + SortingColumn(column_index=1, descending=True, nulls_first=False)) + + Convert back to the sort order with ``to_sort_order``: + + >>> pq.SortingColumn.to_sort_order(schema, sorting_columns) + ((('id', 'ascending'), ('timestamp', 'descending')), 'at_end') """ cdef int column_index cdef c_bool descending cdef c_bool nulls_first def __init__(self, int column_index, c_bool descending=False, c_bool nulls_first=False): - """ - Parameters - ---------- - column_index : int - Index of column data is sorted by. - descending : bool, default False - Whether column is sorted in descending order. - nulls_first : bool, default False - Whether null values appear before valid values. - """ self.column_index = column_index self.descending = descending self.nulls_first = nulls_first @@ -600,7 +631,7 @@ cdef class SortingColumn: return tuple(sorting_columns) @staticmethod - def as_sort_order(Schema schema, sorting_columns): + def to_sort_order(Schema schema, sorting_columns): """ Convert a tuple of SortingColumn objects to the same format as :class:`pyarrow.compute.SortOptions`. @@ -617,7 +648,7 @@ cdef class SortingColumn: sort_keys : tuple of (name, order) tuples null_placement : {'at_start', 'at_end'} """ - col_map = {i: name for i, name in _name_to_index_map(schema).items()} + col_map = {i: name for name, i in _name_to_index_map(schema).items()} sort_keys = [] nulls_first = None @@ -2029,7 +2060,7 @@ cdef _name_to_index_map(Schema arrow_schema): cdef SchemaDescriptor* parquet_schema = sp_parquet_schema.get() for i in range(parquet_schema.num_columns()): - name = frombytes(parquet_schema.Column(i).name()) + name = frombytes(parquet_schema.Column(i).path().get().ToDotString()) out[name] = i return out diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index 12e233b1d9c43..bffd810968fad 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -746,7 +746,7 @@ def sort_order(self): While the sort_by function takes arguments in terms of column names, the Parquet API requires indices. Use the - :meth:`pyarrow.parquet.SortingColumn.from_sort_order` method to convert + :meth:`pyarrow.parquet.SortingColumn.from_sort_order` method to convert the sort keys. >>> sorting_columns = pq.SortingColumn.from_sort_order(table.schema, sort_keys) diff --git a/python/pyarrow/tests/parquet/test_metadata.py b/python/pyarrow/tests/parquet/test_metadata.py index 9ae40d937c137..5f73332064650 100644 --- a/python/pyarrow/tests/parquet/test_metadata.py +++ b/python/pyarrow/tests/parquet/test_metadata.py @@ -317,7 +317,7 @@ def test_parquet_sorting_column(): pq.SortingColumn(1, descending=True), pq.SortingColumn(0, descending=False), ) - sort_order, null_placement = pq.SortingColumn.as_sort_order(schema, sorting_cols) + sort_order, null_placement = pq.SortingColumn.to_sort_order(schema, sorting_cols) assert sort_order == (('b', "descending"), ('a', "ascending")) assert null_placement == "at_end" @@ -337,7 +337,7 @@ def test_parquet_sorting_column(): empty_sorting_cols = pq.SortingColumn.from_sort_order(schema, ()) assert empty_sorting_cols == () - assert pq.SortingColumn.as_sort_order(schema, ()) == ((), "at_end") + assert pq.SortingColumn.to_sort_order(schema, ()) == ((), "at_end") with pytest.raises(ValueError): pq.SortingColumn.from_sort_order(schema, (("a", "not a valid sort order"))) @@ -347,7 +347,25 @@ def test_parquet_sorting_column(): pq.SortingColumn(1, nulls_first=True), pq.SortingColumn(0, nulls_first=False), ) - pq.SortingColumn.as_sort_order(schema, sorting_cols) + pq.SortingColumn.to_sort_order(schema, sorting_cols) + + +def test_parquet_sorting_column_nested(): + schema = pa.schema({ + 'a': pa.struct([('x', pa.int64()), ('y', pa.int64())]), + 'b': pa.int64() + }) + + sorting_columns = [ + pq.SortingColumn(0, descending=True), # a.x + pq.SortingColumn(2, descending=False) # b + ] + + sort_order, null_placement = pq.SortingColumn.to_sort_order(schema, sorting_columns) + assert null_placement == "at_end" + assert len(sort_order) == 2 + assert sort_order[0] == ("a.x", "descending") + assert sort_order[1] == ("b", "ascending") def test_parquet_file_sorting_columns(): From ad128a6f926a8f78b545d66ed923e4a318dee2d4 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 11 May 2023 14:17:24 -0700 Subject: [PATCH 05/18] fix the repr --- python/pyarrow/_parquet.pyx | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index e242a3eeb3f0a..d5cefa02dafeb 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -578,10 +578,9 @@ cdef class SortingColumn: schema : Schema Schema of the input data. sort_keys : Sequence of (name, order) tuples - Names of field/column keys to sort the input on, + Names of field/column keys (str) to sort the input on, along with the order each field/column is sorted in. Accepted values for `order` are "ascending", "descending". - The field name can be a string column name or expression. null_placement : {'at_start', 'at_end'}, default 'at_end' Where null values should appear in the sort order. @@ -673,7 +672,11 @@ cdef class SortingColumn: return tuple(sort_keys), null_placement def __repr__(self): - return """SortingColumn(column_index={0}, descending={1}, nulls_first={2})""".format( + return """{} + column_index: {} + descending: {} + nulls_first: {}""".format( + object.__repr__(self), self.column_index, self.descending, self.nulls_first) def __eq__(self, SortingColumn other): From 2ed9b1b35d91fc34dd8562aee711ff09dbed3b3e Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 12 May 2023 08:03:25 -0700 Subject: [PATCH 06/18] docs: fix doc formatting --- python/pyarrow/_parquet.pyx | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index d5cefa02dafeb..92188492ffe34 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -508,7 +508,8 @@ cdef class ColumnChunkMetaData(_Weakrefable): cdef class SortingColumn: - """Sorting specification for a single column. + """ + Sorting specification for a single column. Returned by :meth:`RowGroupMetaData.sorting_columns` and used in :class:`ParquetWriter` to specify the sort order of the data. @@ -522,12 +523,14 @@ cdef class SortingColumn: nulls_first : bool, default False Whether null values appear before valid values. - .. note:: - Column indices are zero-based, refer only to leaf fields, and are in - depth-first order. This may make the column indices for nested schemas - different from what you expect. In most cases, it will be easier to - specify the sort order using column names instead of column indices - and converting using the ``from_sort_order`` method. + Notes + ----- + + Column indices are zero-based, refer only to leaf fields, and are in + depth-first order. This may make the column indices for nested schemas + different from what you expect. In most cases, it will be easier to + specify the sort order using column names instead of column indices + and converting using the ``from_sort_order`` method. Examples -------- @@ -557,6 +560,10 @@ cdef class SortingColumn: >>> pq.SortingColumn.to_sort_order(schema, sorting_columns) ((('id', 'ascending'), ('timestamp', 'descending')), 'at_end') + + See Also + -------- + RowGroupMetaData.sorting_columns, ParquetFile.sort_order """ cdef int column_index cdef c_bool descending @@ -815,7 +822,7 @@ cdef class RowGroupMetaData(_Weakrefable): @property def sorting_columns(self): - """Columns the row group is sorted by (tuple of :class:`SortingColumn`).)""" + """Columns the row group is sorted by (tuple of :class:`SortingColumn`)).""" out = [] cdef vector[CSortingColumn] sorting_columns = self.metadata.sorting_columns() for sorting_col in sorting_columns: From 67584e5094fb74b8b5145ba775a9782d73aa2adf Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 12 May 2023 09:18:04 -0700 Subject: [PATCH 07/18] docs: update repr in examples --- python/pyarrow/_parquet.pyx | 20 ++++++++++++++++---- python/pyarrow/parquet/core.py | 20 ++++++++++++++++---- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 92188492ffe34..6de6de055d243 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -543,8 +543,14 @@ cdef class SortingColumn: >>> import pyarrow.parquet as pq >>> [pq.SortingColumn(0), pq.SortingColumn(1, descending=True)] - [SortingColumn(column_index=0, descending=False, nulls_first=False), - SortingColumn(column_index=1, descending=True, nulls_first=False)] + [ + column_index: 0 + descending: False + nulls_first: False, + + column_index: 1 + descending: True + nulls_first: False] Convert the sort_order into the list of sorting columns with ``from_sort_order`` (note that the schema must be provided as well): @@ -553,8 +559,14 @@ cdef class SortingColumn: >>> schema = pa.schema([('id', pa.int64()), ('timestamp', pa.timestamp('ms'))]) >>> sorting_columns = pq.SortingColumn.from_sort_order(schema, sort_order) >>> sorting_columns - (SortingColumn(column_index=0, descending=False, nulls_first=False), - SortingColumn(column_index=1, descending=True, nulls_first=False)) + ( + column_index: 0 + descending: False + nulls_first: False, + + column_index: 1 + descending: True + nulls_first: False) Convert back to the sort order with ``to_sort_order``: diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index bffd810968fad..964c4e510c75b 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -751,8 +751,14 @@ def sort_order(self): >>> sorting_columns = pq.SortingColumn.from_sort_order(table.schema, sort_keys) >>> sorting_columns - (SortingColumn(column_index=0, descending=True, nulls_first=False), - SortingColumn(column_index=1, descending=False, nulls_first=False)) + ( + column_index: 0 + descending: True + nulls_first: False, + + column_index: 1 + descending: False + nulls_first: False) Write the table to a Parquet file with the sort order and read it back: @@ -760,8 +766,14 @@ def sort_order(self): ... sorting_columns = sorting_columns) >>> parquet_file = pq.ParquetFile('sorted_animals.parquet') >>> parquet_file.sort_order - (SortingColumn(column_index=0, descending=True, nulls_first=False), - SortingColumn(column_index=1, descending=False, nulls_first=False)) + ( + column_index: 0 + descending: True + nulls_first: False, + + column_index: 1 + descending: False + nulls_first: False) """ metadata = self.metadata sorting_columns = { From bba1a1250034afd3cec408335d0a0d2065a329c5 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 9 Jun 2023 21:57:46 -0700 Subject: [PATCH 08/18] remove sort order aggregation --- python/pyarrow/_parquet.pyx | 24 +++--- python/pyarrow/parquet/core.py | 73 ------------------- python/pyarrow/tests/parquet/test_metadata.py | 4 - 3 files changed, 12 insertions(+), 89 deletions(-) diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 6de6de055d243..f3a1a730160ee 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -544,13 +544,13 @@ cdef class SortingColumn: >>> import pyarrow.parquet as pq >>> [pq.SortingColumn(0), pq.SortingColumn(1, descending=True)] [ - column_index: 0 - descending: False - nulls_first: False, + column_index: 0 + descending: False + nulls_first: False, - column_index: 1 - descending: True - nulls_first: False] + column_index: 1 + descending: True + nulls_first: False] Convert the sort_order into the list of sorting columns with ``from_sort_order`` (note that the schema must be provided as well): @@ -560,13 +560,13 @@ cdef class SortingColumn: >>> sorting_columns = pq.SortingColumn.from_sort_order(schema, sort_order) >>> sorting_columns ( - column_index: 0 - descending: False - nulls_first: False, + column_index: 0 + descending: False + nulls_first: False, - column_index: 1 - descending: True - nulls_first: False) + column_index: 1 + descending: True + nulls_first: False) Convert back to the sort order with ``to_sort_order``: diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index 964c4e510c75b..852b339211b0d 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -719,78 +719,6 @@ def _get_column_indices(self, column_names, use_pandas_metadata=False): return indices - @property - def sort_order(self): - """ - Return the sort order of the file, if any. - - Returns - ------- - sort_order : tuple of SortingColumn or None - The sort order of the file. If the file is not sorted, None is returned. - - Examples - -------- - - Parquet files can be written with a sort order. However, you are responsible - for ensuring the data provided is sorted; the parquet writer doesn't perform - the sorting for you. - - >>> import pyarrow.parquet as pq - >>> import pyarrow as pa - >>> table = pa.table({'n_legs': [2, 2, 4, 4, 5, 100], - ... 'animal': ["Flamingo", "Parrot", "Dog", "Horse", - ... "Brittle stars", "Centipede"]}) - >>> sort_keys = (("n_legs", "descending"), ("animal", "ascending")) - >>> table = table.sort_by(sort_keys) - - While the sort_by function takes arguments in terms of column names, the - Parquet API requires indices. Use the - :meth:`pyarrow.parquet.SortingColumn.from_sort_order` method to convert - the sort keys. - - >>> sorting_columns = pq.SortingColumn.from_sort_order(table.schema, sort_keys) - >>> sorting_columns - ( - column_index: 0 - descending: True - nulls_first: False, - - column_index: 1 - descending: False - nulls_first: False) - - Write the table to a Parquet file with the sort order and read it back: - - >>> pq.write_table(table, 'sorted_animals.parquet', - ... sorting_columns = sorting_columns) - >>> parquet_file = pq.ParquetFile('sorted_animals.parquet') - >>> parquet_file.sort_order - ( - column_index: 0 - descending: True - nulls_first: False, - - column_index: 1 - descending: False - nulls_first: False) - """ - metadata = self.metadata - sorting_columns = { - metadata.row_group(i).sorting_columns - for i in range(metadata.num_row_groups) - } - - if len(sorting_columns) > 1: - # There are inconsistent sorting columns, so no global sort order - return None - sorting_columns = sorting_columns.pop() - if len(sorting_columns) == 0: - # There are no sorting columns - return None - else: - return sorting_columns - _SPARK_DISALLOWED_CHARS = re.compile('[ ,;{}()\n\t=]') @@ -972,7 +900,6 @@ def _sanitize_table(table, new_schema, flavor): Specify the sort order of the data being written. The writer does not sort the data nor does it verify that the data is sorted. The sort order is written to the row group metadata, which can then be used by readers. - See example at :attr:`ParquetFile.sort_order`. """ _parquet_writer_example_doc = """\ diff --git a/python/pyarrow/tests/parquet/test_metadata.py b/python/pyarrow/tests/parquet/test_metadata.py index 5f73332064650..b5c4106803753 100644 --- a/python/pyarrow/tests/parquet/test_metadata.py +++ b/python/pyarrow/tests/parquet/test_metadata.py @@ -385,10 +385,6 @@ def test_parquet_file_sorting_columns(): for i in range(metadata.num_row_groups)} assert set_sorting_columns == set([sorting_columns]) - # Can also retrieve from the file reader - pq_file = pq.ParquetFile(reader) - assert pq_file.sort_order == sorting_columns - def test_field_id_metadata(): # ARROW-7080 From 3a487e3ebb87820e22b47d6601a9bd3853f7c0e2 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 18 Jun 2023 15:01:08 -0700 Subject: [PATCH 09/18] make repr deterministic --- python/pyarrow/_parquet.pyx | 29 +++++++---------------------- 1 file changed, 7 insertions(+), 22 deletions(-) diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index f3a1a730160ee..c194ba2170717 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -543,14 +543,8 @@ cdef class SortingColumn: >>> import pyarrow.parquet as pq >>> [pq.SortingColumn(0), pq.SortingColumn(1, descending=True)] - [ - column_index: 0 - descending: False - nulls_first: False, - - column_index: 1 - descending: True - nulls_first: False] + [SortingColumn(column_index=0, descending=False, nulls_first=False), + SortingColumn(column_index=1, descending=True, nulls_first=False)] Convert the sort_order into the list of sorting columns with ``from_sort_order`` (note that the schema must be provided as well): @@ -559,14 +553,8 @@ cdef class SortingColumn: >>> schema = pa.schema([('id', pa.int64()), ('timestamp', pa.timestamp('ms'))]) >>> sorting_columns = pq.SortingColumn.from_sort_order(schema, sort_order) >>> sorting_columns - ( - column_index: 0 - descending: False - nulls_first: False, - - column_index: 1 - descending: True - nulls_first: False) + (SortingColumn(column_index=0, descending=False, nulls_first=False), + SortingColumn(column_index=1, descending=True, nulls_first=False)) Convert back to the sort order with ``to_sort_order``: @@ -575,7 +563,7 @@ cdef class SortingColumn: See Also -------- - RowGroupMetaData.sorting_columns, ParquetFile.sort_order + RowGroupMetaData.sorting_columns """ cdef int column_index cdef c_bool descending @@ -691,11 +679,8 @@ cdef class SortingColumn: return tuple(sort_keys), null_placement def __repr__(self): - return """{} - column_index: {} - descending: {} - nulls_first: {}""".format( - object.__repr__(self), + return """{}(column_index={}, descending={}, nulls_first={})""".format( + self.__class__.__name__, self.column_index, self.descending, self.nulls_first) def __eq__(self, SortingColumn other): From 2c98b04d96e2ee14d27aa0ec4532c33cb045fb78 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 18 Jun 2023 18:11:39 -0700 Subject: [PATCH 10/18] fix doctest --- python/pyarrow/_parquet.pyx | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index c194ba2170717..b77ce1f3ac1f4 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -543,8 +543,7 @@ cdef class SortingColumn: >>> import pyarrow.parquet as pq >>> [pq.SortingColumn(0), pq.SortingColumn(1, descending=True)] - [SortingColumn(column_index=0, descending=False, nulls_first=False), - SortingColumn(column_index=1, descending=True, nulls_first=False)] + [SortingColumn(column_index=0, descending=False, nulls_first=False), SortingColumn(column_index=1, descending=True, nulls_first=False)] Convert the sort_order into the list of sorting columns with ``from_sort_order`` (note that the schema must be provided as well): @@ -553,8 +552,7 @@ cdef class SortingColumn: >>> schema = pa.schema([('id', pa.int64()), ('timestamp', pa.timestamp('ms'))]) >>> sorting_columns = pq.SortingColumn.from_sort_order(schema, sort_order) >>> sorting_columns - (SortingColumn(column_index=0, descending=False, nulls_first=False), - SortingColumn(column_index=1, descending=True, nulls_first=False)) + (SortingColumn(column_index=0, descending=False, nulls_first=False), SortingColumn(column_index=1, descending=True, nulls_first=False)) Convert back to the sort order with ``to_sort_order``: From d994b42859623c24a4c5f8a25b9e1037a6a12f29 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 6 Jul 2023 08:02:38 -0600 Subject: [PATCH 11/18] pr feedback --- python/pyarrow/_parquet.pyx | 10 +--------- python/pyarrow/tests/parquet/test_metadata.py | 5 ++--- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index b77ce1f3ac1f4..54612c2bd2e4e 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -517,7 +517,7 @@ cdef class SortingColumn: Parameters ---------- column_index : int - Index of column data is sorted by. + Index of column that data is sorted by. descending : bool, default False Whether column is sorted in descending order. nulls_first : bool, default False @@ -704,14 +704,6 @@ cdef class SortingColumn: """Whether null values appear before valid values (bool).""" return self.nulls_first - def to_dict(self): - """Convert to dictionary representation.""" - return { - 'column_index': self.column_index, - 'descending': self.descending, - 'nulls_first': self.nulls_first - } - cdef class RowGroupMetaData(_Weakrefable): """Metadata for a single row group.""" diff --git a/python/pyarrow/tests/parquet/test_metadata.py b/python/pyarrow/tests/parquet/test_metadata.py index b5c4106803753..a3b51ae431931 100644 --- a/python/pyarrow/tests/parquet/test_metadata.py +++ b/python/pyarrow/tests/parquet/test_metadata.py @@ -381,9 +381,8 @@ def test_parquet_file_sorting_columns(): # Can retrieve sorting columns from metadata metadata = pq.read_metadata(reader) - set_sorting_columns = {metadata.row_group(i).sorting_columns - for i in range(metadata.num_row_groups)} - assert set_sorting_columns == set([sorting_columns]) + assert metadata.num_row_groups == 1 + assert sorting_columns == metadata.row_group(0).sorting_columns def test_field_id_metadata(): From dda4fed665735bce98c3df2e3267f0168dc01e7c Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 11 Sep 2023 17:56:07 +0100 Subject: [PATCH 12/18] Add `sorting_columns` to `ParquetFileWriteOptions` --- python/pyarrow/_dataset_parquet.pyx | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index d458ac4ee710d..e8a8c01399fe0 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -608,6 +608,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions): dictionary_pagesize_limit=self._properties["dictionary_pagesize_limit"], write_page_index=self._properties["write_page_index"], write_page_checksum=self._properties["write_page_checksum"], + sorting_columns=self._properties["sorting_columns"], ) def _set_arrow_properties(self): @@ -658,6 +659,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions): write_page_index=False, encryption_config=None, write_page_checksum=False, + sorting_columns=None, ) self._set_properties() From 0de1e68e5d77e0a8b7aa2c7e547959d5e08ee9e3 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 11 Sep 2023 19:18:31 +0100 Subject: [PATCH 13/18] Format for reduced git diff --- python/pyarrow/_parquet.pxd | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index f47212daad527..0b73daf5e50a3 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -590,7 +590,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( dictionary_pagesize_limit=*, write_page_index=* write_page_checksum=*, - sorting_columns=*) except * + sorting_columns=*, +) except * cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties( @@ -599,7 +600,8 @@ cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties( allow_truncated_timestamps=*, writer_engine_version=*, use_compliant_nested_type=*, - store_schema=*) except * + store_schema=*, +) except * cdef class ParquetSchema(_Weakrefable): cdef: From 280cfee7e3bbbe432a0718adffa21ac73ada3335 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 24 Oct 2023 09:00:02 +0100 Subject: [PATCH 14/18] Remove unused helper --- python/pyarrow/_parquet.pyx | 48 ------------------------------------- 1 file changed, 48 deletions(-) diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 54612c2bd2e4e..90960c212a526 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -2063,54 +2063,6 @@ cdef _name_to_index_map(Schema arrow_schema): return out -def _sort_keys_to_sorting_columns(sort_keys, null_placement, Schema schema): - """Convert SortOptions to a list of SortingColumn objects""" - - if null_placement is None or null_placement == "at_end": - nulls_first = False - elif null_placement == "at_start": - nulls_first = True - else: - raise ValueError("Invalid value for null_placement: {0}" - .format(null_placement)) - - name_to_index_map = _name_to_index_map(schema) - - sorting_columns = [] - - for sort_key in sort_keys: - if isinstance(sort_key, str): - name = sort_key - descending = False - elif (isinstance(sort_key, tuple) and len(sort_key) == 2 and - isinstance(sort_key[0], str) and - isinstance(sort_key[1], str)): - name, descending = sort_key - if descending == "descending": - descending = True - elif descending == "ascending": - descending = False - else: - raise ValueError("Invalid sort key direction: {0}" - .format(descending)) - else: - raise ValueError("Invalid sort key: {0}".format(sort_key)) - - try: - column_index = name_to_index_map[name] - except KeyError: - raise ValueError("Sort key name '{0}' not found in schema:\n{1}" - .format(name, schema)) - - sorting_column = SortingColumn( - column_index, - descending, - nulls_first) - sorting_columns.append(sorting_column) - - return sorting_columns - - cdef class ParquetWriter(_Weakrefable): cdef: unique_ptr[FileWriter] writer From 0249bb61f55591c19d6c4c04cb17ecfe5ea034c0 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 30 Oct 2023 11:47:45 +0000 Subject: [PATCH 15/18] Move import to where it is needed! --- python/pyarrow/_parquet.pxd | 2 +- python/pyarrow/_parquet.pyx | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 0b73daf5e50a3..54b0f27e30bea 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -24,7 +24,7 @@ from pyarrow.includes.libarrow cimport (CChunkedArray, CScalar, CSchema, CStatus CKeyValueMetadata, CRandomAccessFile, COutputStream, TimeUnit, CRecordBatchReader) -from pyarrow.lib cimport (_Weakrefable, pyarrow_unwrap_schema) +from pyarrow.lib cimport _Weakrefable cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil: diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 90960c212a526..46d58696f0f99 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -32,6 +32,7 @@ from pyarrow.lib cimport (_Weakrefable, Buffer, Schema, Table, NativeFile, pyarrow_wrap_chunked_array, pyarrow_wrap_schema, + pyarrow_unwrap_schema, pyarrow_wrap_table, pyarrow_wrap_batch, pyarrow_wrap_scalar, From 7c8849fa1d3534886e49a7caf4f9a1689d4bb4c1 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 30 Oct 2023 11:56:43 +0000 Subject: [PATCH 16/18] Remove unnecessary exception --- python/pyarrow/_parquet.pyx | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 46d58696f0f99..d26e645db5c3e 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -1785,9 +1785,6 @@ cdef vector[CSortingColumn] _convert_sorting_columns(sorting_columns) except *: cdef vector[CSortingColumn] c_sorting_columns = [_convert_sorting_column(col) for col in sorting_columns] - if len(sorting_columns) != len(c_sorting_columns): - raise ValueError("something has gone wrong") - return c_sorting_columns From 842107f6abf7b6f8f4bb8981460217e783787abd Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Sun, 10 Dec 2023 11:11:46 +0000 Subject: [PATCH 17/18] Fix rebase error --- python/pyarrow/_parquet.pxd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 54b0f27e30bea..8f7bf18924e4e 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -588,7 +588,7 @@ cdef shared_ptr[WriterProperties] _create_writer_properties( FileEncryptionProperties encryption_properties=*, write_batch_size=*, dictionary_pagesize_limit=*, - write_page_index=* + write_page_index=*, write_page_checksum=*, sorting_columns=*, ) except * From 44dff9dff6f736bc97e829951fa5271dc073aa18 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 19 Dec 2023 14:50:58 +0000 Subject: [PATCH 18/18] Rename methods --- python/pyarrow/_parquet.pyx | 16 ++++++++-------- python/pyarrow/tests/parquet/test_metadata.py | 16 ++++++++-------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index d26e645db5c3e..0b685245655a2 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -531,7 +531,7 @@ cdef class SortingColumn: depth-first order. This may make the column indices for nested schemas different from what you expect. In most cases, it will be easier to specify the sort order using column names instead of column indices - and converting using the ``from_sort_order`` method. + and converting using the ``from_ordering`` method. Examples -------- @@ -546,18 +546,18 @@ cdef class SortingColumn: >>> [pq.SortingColumn(0), pq.SortingColumn(1, descending=True)] [SortingColumn(column_index=0, descending=False, nulls_first=False), SortingColumn(column_index=1, descending=True, nulls_first=False)] - Convert the sort_order into the list of sorting columns with - ``from_sort_order`` (note that the schema must be provided as well): + Convert the sort_order into the list of sorting columns with + ``from_ordering`` (note that the schema must be provided as well): >>> import pyarrow as pa >>> schema = pa.schema([('id', pa.int64()), ('timestamp', pa.timestamp('ms'))]) - >>> sorting_columns = pq.SortingColumn.from_sort_order(schema, sort_order) + >>> sorting_columns = pq.SortingColumn.from_ordering(schema, sort_order) >>> sorting_columns (SortingColumn(column_index=0, descending=False, nulls_first=False), SortingColumn(column_index=1, descending=True, nulls_first=False)) - Convert back to the sort order with ``to_sort_order``: + Convert back to the sort order with ``to_ordering``: - >>> pq.SortingColumn.to_sort_order(schema, sorting_columns) + >>> pq.SortingColumn.to_ordering(schema, sorting_columns) ((('id', 'ascending'), ('timestamp', 'descending')), 'at_end') See Also @@ -574,7 +574,7 @@ cdef class SortingColumn: self.nulls_first = nulls_first @classmethod - def from_sort_order(cls, Schema schema, sort_keys, null_placement='at_end'): + def from_ordering(cls, Schema schema, sort_keys, null_placement='at_end'): """ Create a tuple of SortingColumn objects from the same arguments as :class:`pyarrow.compute.SortOptions`. @@ -636,7 +636,7 @@ cdef class SortingColumn: return tuple(sorting_columns) @staticmethod - def to_sort_order(Schema schema, sorting_columns): + def to_ordering(Schema schema, sorting_columns): """ Convert a tuple of SortingColumn objects to the same format as :class:`pyarrow.compute.SortOptions`. diff --git a/python/pyarrow/tests/parquet/test_metadata.py b/python/pyarrow/tests/parquet/test_metadata.py index a3b51ae431931..73284d2e53b9e 100644 --- a/python/pyarrow/tests/parquet/test_metadata.py +++ b/python/pyarrow/tests/parquet/test_metadata.py @@ -317,15 +317,15 @@ def test_parquet_sorting_column(): pq.SortingColumn(1, descending=True), pq.SortingColumn(0, descending=False), ) - sort_order, null_placement = pq.SortingColumn.to_sort_order(schema, sorting_cols) + sort_order, null_placement = pq.SortingColumn.to_ordering(schema, sorting_cols) assert sort_order == (('b', "descending"), ('a', "ascending")) assert null_placement == "at_end" - sorting_cols_roundtripped = pq.SortingColumn.from_sort_order( + sorting_cols_roundtripped = pq.SortingColumn.from_ordering( schema, sort_order, null_placement) assert sorting_cols_roundtripped == sorting_cols - sorting_cols = pq.SortingColumn.from_sort_order( + sorting_cols = pq.SortingColumn.from_ordering( schema, ('a', ('b', "descending")), null_placement="at_start") expected = ( pq.SortingColumn(0, descending=False, nulls_first=True), @@ -334,20 +334,20 @@ def test_parquet_sorting_column(): assert sorting_cols == expected # Conversions handle empty tuples - empty_sorting_cols = pq.SortingColumn.from_sort_order(schema, ()) + empty_sorting_cols = pq.SortingColumn.from_ordering(schema, ()) assert empty_sorting_cols == () - assert pq.SortingColumn.to_sort_order(schema, ()) == ((), "at_end") + assert pq.SortingColumn.to_ordering(schema, ()) == ((), "at_end") with pytest.raises(ValueError): - pq.SortingColumn.from_sort_order(schema, (("a", "not a valid sort order"))) + pq.SortingColumn.from_ordering(schema, (("a", "not a valid sort order"))) with pytest.raises(ValueError, match="inconsistent null placement"): sorting_cols = ( pq.SortingColumn(1, nulls_first=True), pq.SortingColumn(0, nulls_first=False), ) - pq.SortingColumn.to_sort_order(schema, sorting_cols) + pq.SortingColumn.to_ordering(schema, sorting_cols) def test_parquet_sorting_column_nested(): @@ -361,7 +361,7 @@ def test_parquet_sorting_column_nested(): pq.SortingColumn(2, descending=False) # b ] - sort_order, null_placement = pq.SortingColumn.to_sort_order(schema, sorting_columns) + sort_order, null_placement = pq.SortingColumn.to_ordering(schema, sorting_columns) assert null_placement == "at_end" assert len(sort_order) == 2 assert sort_order[0] == ("a.x", "descending")