Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move cudf._lib.stream_compaction to cudf.core._internals #17456

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/cudf/cudf/_lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
# the License.
# =============================================================================

set(cython_sources column.pyx groupby.pyx interop.pyx scalar.pyx stream_compaction.pyx
string_casting.pyx strings_udf.pyx types.pyx utils.pyx
set(cython_sources column.pyx groupby.pyx interop.pyx scalar.pyx string_casting.pyx strings_udf.pyx
types.pyx utils.pyx
)
set(linked_libraries cudf::cudf)

Expand Down
1 change: 0 additions & 1 deletion python/cudf/cudf/_lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from . import (
groupby,
interop,
stream_compaction,
string_casting,
strings_udf,
)
Expand Down
181 changes: 0 additions & 181 deletions python/cudf/cudf/_lib/stream_compaction.pyx

This file was deleted.

14 changes: 6 additions & 8 deletions python/cudf/cudf/core/_base_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
from typing_extensions import Self

import cudf
from cudf._lib.stream_compaction import (
apply_boolean_mask,
drop_duplicates,
drop_nulls,
)
from cudf._lib.types import size_type_dtype
from cudf.api.extensions import no_default
from cudf.api.types import is_integer, is_list_like, is_scalar
from cudf.core._internals import copying
from cudf.core._internals.stream_compaction import (
apply_boolean_mask,
drop_duplicates,
drop_nulls,
)
from cudf.core.abc import Serializable
from cudf.core.column import ColumnBase, column
from cudf.core.copy_types import GatherMap
Expand Down Expand Up @@ -414,7 +414,7 @@ def hasnans(self):
raise NotImplementedError

@property
def nlevels(self):
def nlevels(self) -> int:
"""
Number of levels.
"""
Expand Down Expand Up @@ -1944,7 +1944,6 @@ def drop_duplicates(
return self._from_columns_like_self(
drop_duplicates(
list(self._columns),
keys=range(len(self._columns)),
keep=keep,
nulls_are_equal=nulls_are_equal,
),
Expand Down Expand Up @@ -2033,7 +2032,6 @@ def dropna(self, how="any"):
drop_nulls(
data_columns,
how=how,
keys=range(len(data_columns)),
),
self._column_names,
)
Expand Down
121 changes: 121 additions & 0 deletions python/cudf/cudf/core/_internals/stream_compaction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
from __future__ import annotations

from typing import TYPE_CHECKING, Literal

import pylibcudf as plc

from cudf._lib.column import Column
from cudf.core.buffer import acquire_spill_lock

if TYPE_CHECKING:
from cudf.core.column import ColumnBase


@acquire_spill_lock()
def drop_nulls(
columns: list[ColumnBase],
how: Literal["any", "all"] = "any",
keys: list[int] | None = None,
thresh: int | None = None,
) -> list[ColumnBase]:
"""
Drops null rows from cols depending on key columns.

Parameters
----------
columns : list of columns
how : "any" or "all". If thresh is None, drops rows of cols that have any
nulls or all nulls (respectively) in subset (default: "any")
keys : List of column indices. If set, then these columns are checked for
nulls rather than all of columns (optional)
thresh : Minimum number of non-nulls required to keep a row (optional)

Returns
-------
columns with null rows dropped
"""
if how not in {"any", "all"}:
raise ValueError("how must be 'any' or 'all'")

keys = keys if keys is not None else list(range(len(columns)))

# Note: If how == "all" and thresh is specified this prioritizes thresh
if thresh is not None:
keep_threshold = thresh
elif how == "all":
keep_threshold = 1
else:
keep_threshold = len(keys)

plc_table = plc.stream_compaction.drop_nulls(
plc.Table([col.to_pylibcudf(mode="read") for col in columns]),
keys,
keep_threshold,
)
return [Column.from_pylibcudf(col) for col in plc_table.columns()]


@acquire_spill_lock()
def apply_boolean_mask(
columns: list[ColumnBase], boolean_mask: ColumnBase
) -> list[ColumnBase]:
"""
Drops the rows which correspond to False in boolean_mask.

Parameters
----------
columns : list of columns whose rows are dropped as per boolean_mask
boolean_mask : a boolean column of same size as source_table

Returns
-------
columns obtained from applying mask
"""
plc_table = plc.stream_compaction.apply_boolean_mask(
plc.Table([col.to_pylibcudf(mode="read") for col in columns]),
boolean_mask.to_pylibcudf(mode="read"),
)
return [Column.from_pylibcudf(col) for col in plc_table.columns()]


@acquire_spill_lock()
def drop_duplicates(
columns: list[ColumnBase],
keys: list[int] | None = None,
keep: Literal["first", "last", False] = "first",
nulls_are_equal: bool = True,
) -> list[ColumnBase]:
"""
Drops rows in source_table as per duplicate rows in keys.

Parameters
----------
columns : List of columns
keys : List of column indices. If set, then these columns are checked for
duplicates rather than all of columns (optional)
keep : keep 'first' or 'last' or none of the duplicate rows
nulls_are_equal : if True, nulls are treated equal else not.

Returns
-------
columns with duplicate dropped
"""
_keep_options = {
"first": plc.stream_compaction.DuplicateKeepOption.KEEP_FIRST,
"last": plc.stream_compaction.DuplicateKeepOption.KEEP_LAST,
False: plc.stream_compaction.DuplicateKeepOption.KEEP_NONE,
}
if (keep_option := _keep_options.get(keep)) is None:
raise ValueError('keep must be either "first", "last" or False')

plc_table = plc.stream_compaction.stable_distinct(
plc.Table([col.to_pylibcudf(mode="read") for col in columns]),
keys if keys is not None else list(range(len(columns))),
keep_option,
plc.types.NullEquality.EQUAL
if nulls_are_equal
else plc.types.NullEquality.UNEQUAL,
plc.types.NanEquality.ALL_EQUAL,
)
return [Column.from_pylibcudf(col) for col in plc_table.columns()]
Loading
Loading