From 24c9edba757f034dafa05934d6abf3adb0091b21 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Mon, 24 Feb 2025 05:36:33 -0800 Subject: [PATCH 1/9] Support sorted merges in cudf.polars --- python/cudf_polars/cudf_polars/dsl/ir.py | 30 +++++++++++++++---- .../cudf_polars/cudf_polars/dsl/translate.py | 4 +-- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 603f51e9d40..cdfa30d6594 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -1653,11 +1653,31 @@ def do_evaluate(cls, schema: Schema, df: DataFrame) -> DataFrame: class MergeSorted(IR): """Merge sorted operation.""" - def __init__(self, schema: Schema, left: IR, right: IR, key: str): - # libcudf merge is not stable wrt order of inputs, since - # it uses a priority queue to manage the tables it produces. - # See: https://github.com/rapidsai/cudf/issues/16010 - raise NotImplementedError("MergeSorted not yet implemented") + __slots__ = ("key",) + _non_child = ("schema", "key") + key: str + """Key that is sorted.""" + + def __init__(self, schema: Schema, key: str, left: IR, right: IR): + self.schema = schema + self.key = key + self.children = (left, right) + self._non_child_args = ( + schema, + key, + ) + + @classmethod + def do_evaluate(cls, schema: Schema, key: str, *dfs: DataFrame) -> DataFrame: + return DataFrame.from_table( + plc.merge.merge( + [df.table for df in dfs], + [0, 0], + [plc.types.Order.ASCENDING] * 2, + [plc.types.NullOrder.AFTER] * 2, + ), + dfs[0].column_names, + ) class MapFunction(IR): diff --git a/python/cudf_polars/cudf_polars/dsl/translate.py b/python/cudf_polars/cudf_polars/dsl/translate.py index 22f97f2bf52..1c51fec6039 100644 --- a/python/cudf_polars/cudf_polars/dsl/translate.py +++ b/python/cudf_polars/cudf_polars/dsl/translate.py @@ -467,14 +467,14 @@ def _( def _( node: pl_ir.MergeSorted, translator: Translator, schema: dict[str, plc.DataType] ) -> ir.IR: + key = node.key inp_left = translator.translate_ir(n=node.input_left) inp_right = translator.translate_ir(n=node.input_right) - key = node.key return ir.MergeSorted( schema, + key, inp_left, inp_right, - key, ) From d01f1dfe704740f6f0e20c03be3cb88bed26390b Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Mon, 24 Feb 2025 12:16:13 -0800 Subject: [PATCH 2/9] add a test --- python/cudf_polars/cudf_polars/dsl/ir.py | 5 +++-- python/cudf_polars/tests/test_merge_sorted.py | 21 +++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) create mode 100644 python/cudf_polars/tests/test_merge_sorted.py diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index cdfa30d6594..b934d0881ef 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -1669,10 +1669,11 @@ def __init__(self, schema: Schema, key: str, left: IR, right: IR): @classmethod def do_evaluate(cls, schema: Schema, key: str, *dfs: DataFrame) -> DataFrame: + left, right = dfs return DataFrame.from_table( plc.merge.merge( - [df.table for df in dfs], - [0, 0], + [left.table, right.table], + [left.column_names.index(key), right.column_names.index(key)], [plc.types.Order.ASCENDING] * 2, [plc.types.NullOrder.AFTER] * 2, ), diff --git a/python/cudf_polars/tests/test_merge_sorted.py b/python/cudf_polars/tests/test_merge_sorted.py new file mode 100644 index 00000000000..57257fb4dd3 --- /dev/null +++ b/python/cudf_polars/tests/test_merge_sorted.py @@ -0,0 +1,21 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +import pytest + +import polars as pl + +from cudf_polars.testing.asserts import assert_gpu_result_equal + + +def test_merge_sorted(): + df0 = pl.LazyFrame( + {"name": ["steve", "elise", "bob"], "age": [42, 44, 18]} + ).sort("age") + df1 = pl.LazyFrame( + {"name": ["anna", "megan", "steve", "thomas"], "age": [21, 33, 42, 20]} + ).sort("age") + q = df0.merge_sorted(df1, key="age") + assert_gpu_result_equal(q) + From a614ee7dc02c849b1714346571e8ac08d50adbb3 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Tue, 25 Feb 2025 09:53:46 -0800 Subject: [PATCH 3/9] more tests cases --- python/cudf_polars/cudf_polars/dsl/ir.py | 11 +++++--- python/cudf_polars/tests/test_merge_sorted.py | 26 ++++++++++++++----- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index b934d0881ef..6a998928950 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -1670,14 +1670,17 @@ def __init__(self, schema: Schema, key: str, left: IR, right: IR): @classmethod def do_evaluate(cls, schema: Schema, key: str, *dfs: DataFrame) -> DataFrame: left, right = dfs + left, right = dfs + on_col_left = left.select_columns({key})[0] + on_col_right = right.select_columns({key})[0] return DataFrame.from_table( plc.merge.merge( - [left.table, right.table], + [right.table, left.table], [left.column_names.index(key), right.column_names.index(key)], - [plc.types.Order.ASCENDING] * 2, - [plc.types.NullOrder.AFTER] * 2, + [on_col_left.order, on_col_right.order], + [plc.types.NullOrder.BEFORE, plc.types.NullOrder.BEFORE], ), - dfs[0].column_names, + left.column_names, ) diff --git a/python/cudf_polars/tests/test_merge_sorted.py b/python/cudf_polars/tests/test_merge_sorted.py index 57257fb4dd3..649dee20d51 100644 --- a/python/cudf_polars/tests/test_merge_sorted.py +++ b/python/cudf_polars/tests/test_merge_sorted.py @@ -9,13 +9,27 @@ from cudf_polars.testing.asserts import assert_gpu_result_equal -def test_merge_sorted(): +@pytest.mark.parametrize( + "descending", + [ + pytest.param( + True, + marks=pytest.mark.xfail( + reason="https://github.com/rapidsai/cudf/issues/18089" + ), + ), + False, + ], +) +def test_merge_sorted(descending): df0 = pl.LazyFrame( - {"name": ["steve", "elise", "bob"], "age": [42, 44, 18]} - ).sort("age") + {"name": ["steve", "elise", "bob", "john"], "age": [42, 44, 18, None]} + ).sort("age", descending=descending) df1 = pl.LazyFrame( - {"name": ["anna", "megan", "steve", "thomas"], "age": [21, 33, 42, 20]} - ).sort("age") + { + "name": ["anna", "megan", "steve", "thomas", "john"], + "age": [21, 33, 42, 20, None], + } + ).sort("age", descending=descending) q = df0.merge_sorted(df1, key="age") assert_gpu_result_equal(q) - From 0131fb12cf4c04941447a0d6a08ce184a9e35295 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Tue, 25 Feb 2025 09:57:20 -0800 Subject: [PATCH 4/9] copyright --- python/cudf_polars/tests/test_merge_sorted.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf_polars/tests/test_merge_sorted.py b/python/cudf_polars/tests/test_merge_sorted.py index 649dee20d51..10acd5170c5 100644 --- a/python/cudf_polars/tests/test_merge_sorted.py +++ b/python/cudf_polars/tests/test_merge_sorted.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations From 9163acb3fa57f739c82c1b7ac5969e245d8604f2 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Tue, 25 Feb 2025 10:57:36 -0800 Subject: [PATCH 5/9] improve test coverage --- python/cudf_polars/cudf_polars/dsl/ir.py | 6 +++- python/cudf_polars/tests/test_merge_sorted.py | 32 ++++++++++++++++--- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 6a998928950..0deec2d50dc 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -1666,11 +1666,15 @@ def __init__(self, schema: Schema, key: str, left: IR, right: IR): schema, key, ) + assert isinstance(left, Sort) + assert isinstance(right, Sort) + assert left.order == right.order + assert len(left.schema.keys()) <= len(right.schema.keys()) @classmethod def do_evaluate(cls, schema: Schema, key: str, *dfs: DataFrame) -> DataFrame: left, right = dfs - left, right = dfs + right = right.discard_columns(right.column_names_set - left.column_names_set) on_col_left = left.select_columns({key})[0] on_col_right = right.select_columns({key})[0] return DataFrame.from_table( diff --git a/python/cudf_polars/tests/test_merge_sorted.py b/python/cudf_polars/tests/test_merge_sorted.py index 10acd5170c5..131d0540982 100644 --- a/python/cudf_polars/tests/test_merge_sorted.py +++ b/python/cudf_polars/tests/test_merge_sorted.py @@ -14,14 +14,37 @@ [ pytest.param( True, - marks=pytest.mark.xfail( - reason="https://github.com/rapidsai/cudf/issues/18089" - ), + marks=pytest.mark.xfail(reason="polars/issues/21464"), ), False, ], ) -def test_merge_sorted(descending): +def test_merge_sorted_without_nulls(descending): + df0 = pl.LazyFrame({"name": ["steve", "elise", "bob"], "age": [42, 44, 18]}).sort( + "age", descending=descending + ) + df1 = pl.LazyFrame( + { + "name": ["anna", "megan", "steve", "thomas"], + "age": [21, 33, 42, 20], + "height": [5, 5, 5, 5], + } + ).sort("age", descending=descending) + q = df0.merge_sorted(df1, key="age") + assert_gpu_result_equal(q) + + +@pytest.mark.parametrize( + "descending", + [ + pytest.param( + True, + marks=pytest.mark.xfail(reason="polars/issues/21464 and cudf/issues/18089"), + ), + False, + ], +) +def test_merge_sorted_with_nulls(descending): df0 = pl.LazyFrame( {"name": ["steve", "elise", "bob", "john"], "age": [42, 44, 18, None]} ).sort("age", descending=descending) @@ -29,6 +52,7 @@ def test_merge_sorted(descending): { "name": ["anna", "megan", "steve", "thomas", "john"], "age": [21, 33, 42, 20, None], + "height": [5, 5, 5, 5, 5], } ).sort("age", descending=descending) q = df0.merge_sorted(df1, key="age") From cc3b45efbd9cfc021c4a6e9e722b77bb38ad7584 Mon Sep 17 00:00:00 2001 From: Matt711 Date: Thu, 27 Feb 2025 08:43:23 -0500 Subject: [PATCH 6/9] fix merge sorted when key column is sorted in descending order with nulls --- python/cudf_polars/cudf_polars/dsl/ir.py | 22 ++++++++++--------- python/cudf_polars/tests/test_merge_sorted.py | 2 +- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 0deec2d50dc..14728051507 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -1654,25 +1654,22 @@ class MergeSorted(IR): """Merge sorted operation.""" __slots__ = ("key",) - _non_child = ("schema", "key") + _non_child = ("key",) key: str """Key that is sorted.""" def __init__(self, schema: Schema, key: str, left: IR, right: IR): - self.schema = schema - self.key = key - self.children = (left, right) - self._non_child_args = ( - schema, - key, - ) assert isinstance(left, Sort) assert isinstance(right, Sort) assert left.order == right.order assert len(left.schema.keys()) <= len(right.schema.keys()) + self.schema = schema + self.key = key + self.children = (left, right) + self._non_child_args = (key,) @classmethod - def do_evaluate(cls, schema: Schema, key: str, *dfs: DataFrame) -> DataFrame: + def do_evaluate(cls, key: str, *dfs: DataFrame) -> DataFrame: left, right = dfs right = right.discard_columns(right.column_names_set - left.column_names_set) on_col_left = left.select_columns({key})[0] @@ -1682,7 +1679,12 @@ def do_evaluate(cls, schema: Schema, key: str, *dfs: DataFrame) -> DataFrame: [right.table, left.table], [left.column_names.index(key), right.column_names.index(key)], [on_col_left.order, on_col_right.order], - [plc.types.NullOrder.BEFORE, plc.types.NullOrder.BEFORE], + [ + plc.types.NullOrder.BEFORE + if on_col_left.order == plc.types.Order.ASCENDING + else plc.types.NullOrder.AFTER + ] + * 2, ), left.column_names, ) diff --git a/python/cudf_polars/tests/test_merge_sorted.py b/python/cudf_polars/tests/test_merge_sorted.py index 131d0540982..d21d9bf0b72 100644 --- a/python/cudf_polars/tests/test_merge_sorted.py +++ b/python/cudf_polars/tests/test_merge_sorted.py @@ -39,7 +39,7 @@ def test_merge_sorted_without_nulls(descending): [ pytest.param( True, - marks=pytest.mark.xfail(reason="polars/issues/21464 and cudf/issues/18089"), + marks=pytest.mark.xfail(reason="polars/issues/21464"), ), False, ], From 637bc9514a04ab2dcd8550f4736ce51efe88d87f Mon Sep 17 00:00:00 2001 From: Matt711 Date: Thu, 27 Feb 2025 08:51:41 -0500 Subject: [PATCH 7/9] simplify a bit --- python/cudf_polars/cudf_polars/dsl/ir.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 14728051507..37f4805e562 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -1679,12 +1679,7 @@ def do_evaluate(cls, key: str, *dfs: DataFrame) -> DataFrame: [right.table, left.table], [left.column_names.index(key), right.column_names.index(key)], [on_col_left.order, on_col_right.order], - [ - plc.types.NullOrder.BEFORE - if on_col_left.order == plc.types.Order.ASCENDING - else plc.types.NullOrder.AFTER - ] - * 2, + [on_col_left.null_order, on_col_right.null_order], ), left.column_names, ) From 8a4619cf5831f1309e85093bca08b064e3edd508 Mon Sep 17 00:00:00 2001 From: Matt711 Date: Thu, 27 Feb 2025 09:55:06 -0500 Subject: [PATCH 8/9] remove outdated test --- python/cudf_polars/tests/test_mapfunction.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/python/cudf_polars/tests/test_mapfunction.py b/python/cudf_polars/tests/test_mapfunction.py index 7a9f4a56545..0f88bd5fae9 100644 --- a/python/cudf_polars/tests/test_mapfunction.py +++ b/python/cudf_polars/tests/test_mapfunction.py @@ -12,16 +12,6 @@ ) -def test_merge_sorted_raises(): - df1 = pl.LazyFrame({"a": [1, 6, 9], "b": [1, -10, 4]}) - df2 = pl.LazyFrame({"a": [-1, 5, 11, 20], "b": [2, 7, -4, None]}) - df3 = pl.LazyFrame({"a": [-10, 20, 21], "b": [1, 2, 3]}) - - q = df1.merge_sorted(df2, key="a").merge_sorted(df3, key="a") - - assert_ir_translation_raises(q, NotImplementedError) - - def test_explode_multiple_raises(): df = pl.LazyFrame({"a": [[1, 2], [3, 4]], "b": [[5, 6], [7, 8]]}) q = df.explode("a", "b") From c966377238b841cd64869971c7db839fc9ee0b23 Mon Sep 17 00:00:00 2001 From: Matt711 Date: Thu, 27 Feb 2025 11:59:55 -0500 Subject: [PATCH 9/9] add schema to non_child_args --- python/cudf_polars/cudf_polars/dsl/ir.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 37f4805e562..41a2b5880e3 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -1654,7 +1654,7 @@ class MergeSorted(IR): """Merge sorted operation.""" __slots__ = ("key",) - _non_child = ("key",) + _non_child = ("schema", "key") key: str """Key that is sorted."""