Skip to content

Commit

Permalink
Add Dataset.assert_sorted to Python API
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Feb 12, 2025
1 parent 574e4a5 commit 23823d9
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 2 deletions.
7 changes: 5 additions & 2 deletions python/pyarrow/_compute.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1981,8 +1981,11 @@ class PartitionNthOptions(_PartitionNthOptions):


cdef class Ordering(_Weakrefable):
def __init__(self):
_forbid_instantiation(self.__class__)
def __init__(self, sort_keys, *, null_placement="at_end"):
c_sort_keys = unwrap_sort_keys(sort_keys, allow_str=False)
c_null_placement = unwrap_null_placement(null_placement)
ordering = COrdering(c_sort_keys, c_null_placement)
self.init(ordering)

cdef void init(self, const COrdering& sp):
self.wrapped = sp
Expand Down
29 changes: 29 additions & 0 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,35 @@ cdef class Dataset(_Weakrefable):
)
return res

def assert_sort(self, sorting, *, null_placement="at_end"):
"""
Assert the Dataset is sorted by one or multiple columns.
Parameters
----------
sorting : str or list[tuple(name, order)]
Name of the column to use to sort (ascending), or
a list of multiple sorting conditions where
each entry is a tuple with column name
and sorting order ("ascending" or "descending")
null_placement : str, default "at_end"
Where nulls in input should be sorted, only applying to
columns/fields mentioned in `sort_keys`.
Accepted values are "at_start", "at_end".
Returns
-------
InMemoryDataset
A new dataset where sorted order is guaranteed or an exception is raised.
"""
if isinstance(sorting, str):
sorting = [(sorting, "ascending")]

res = _pac()._assert_sorted(
self, output_type=InMemoryDataset, sort_keys=sorting, null_placement=null_placement
)
return res

def join(self, right_dataset, keys, right_keys=None, join_type="left outer",
left_suffix=None, right_suffix=None, coalesce_keys=True,
use_threads=True):
Expand Down
14 changes: 14 additions & 0 deletions python/pyarrow/acero.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,20 @@ def _sort_source(table_or_dataset, sort_keys, output_type=Table, **kwargs):
raise TypeError("Unsupported output type")


def _assert_sorted(dataset, sort_keys, *, null_placement="at_end", output_type=Table):

ordering = Ordering(sort_keys, null_placement=null_placement)
decl = _dataset_to_decl(dataset, use_threads=True, ordering=ordering)
result_table = decl.to_table(use_threads=True)

if output_type == Table:
return result_table
elif output_type == ds.InMemoryDataset:
return ds.InMemoryDataset(result_table)
else:
raise TypeError("Unsupported output type")


def _group_by(table, aggregates, keys, use_threads=True):

decl = Declaration.from_sequence([
Expand Down
45 changes: 45 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from shutil import copytree
from urllib.parse import quote

from pyarrow import ArrowException

try:
import numpy as np
except ImportError:
Expand Down Expand Up @@ -5654,6 +5656,49 @@ def test_dataset_sort_by(tempdir, dstype):
assert sorted_tab_dict["b"] == ["foo", "car", "bar", "foobar"]


def do_test_dataset_assert_sorted(tempdir, dstype, table, expect_sorted, **kwargs):
if dstype == "fs":
filename = "-".join([f"{k}={v}" for k, v in kwargs.items()])
ds.write_dataset(table, tempdir / filename, format="ipc")
dt = ds.dataset(tempdir / filename, format="ipc")
elif dstype == "mem":
dt = ds.dataset(table)
else:
raise NotImplementedError

if expect_sorted:
dt.assert_sort(**kwargs).to_table()
else:
with pytest.raises(ArrowException, match="Data is not ordered"):
dt.assert_sort(**kwargs).to_table()


@pytest.mark.parametrize('dstype', [
"fs", "mem"
])
def test_dataset_assert_sorted(tempdir, dstype):
table = pa.table([
pa.array([1, 2, 3, 4, None]),
pa.array(["b", "a", "b", "a", "c"]),
], names=["values", "keys"])

def assert_sorted(**kwargs):
do_test_dataset_assert_sorted(tempdir, dstype, table, True, **kwargs)

assert_sorted(sorting="values")
assert_sorted(sorting="values", null_placement="at_end")
assert_sorted(sorting=[("values", "ascending")])
assert_sorted(sorting=[("values", "ascending")], null_placement="at_end")

def assert_not_sorted(**kwargs):
do_test_dataset_assert_sorted(tempdir, dstype, table, False, **kwargs)

assert_not_sorted(sorting="keys")
assert_not_sorted(sorting="values", null_placement="at_start")
assert_not_sorted(sorting=[("values", "descending")])
assert_not_sorted(sorting=[("values", "ascending")], null_placement="at_start")


def test_checksum_write_dataset_read_dataset_to_table(tempdir):
"""Check that checksum verification works for datasets created with
ds.write_dataset and read with ds.dataset.to_table"""
Expand Down

0 comments on commit 23823d9

Please sign in to comment.