-
Notifications
You must be signed in to change notification settings - Fork 930
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extend and simplify API for calculation of range-based rolling window offsets #17807
base: branch-25.04
Are you sure you want to change the base?
Extend and simplify API for calculation of range-based rolling window offsets #17807
Conversation
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
In draft because waiting for #17787 |
This is brilliant. |
4fe1283
to
a1f27ba
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
signposts
std::unique_ptr<table> grouped_range_rolling_window( | ||
table_view const& group_keys, | ||
column_view const& orderby, | ||
order order, | ||
null_order null_order, | ||
range_window_type preceding, | ||
range_window_type following, | ||
size_type min_periods, | ||
std::vector<std::pair<column_view const&, rolling_aggregation const&>> requests, | ||
rmm::cuda_stream_view stream = cudf::get_default_stream(), | ||
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't yet added tests of this function, want to bikeshed the interface first.
For example, should the min_periods
be part of the request, do we want a rolling_request
object similar to the groupby_request
we have for grouped aggregations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t feel like I know the status quo of our rolling APIs or the downstream requirements well enough to opine on this without significant research time. I might be able to circle back to this but it’ll be at least a week.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps the thing to do for now is to remove this API and then when using the other stuff from python we might notice things we want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are now no longer required.
enum class window_tag : std::int8_t { | ||
BOUNDED_OPEN, ///< Window is bounded by a value-based endpoint, endpoint is excluded. | ||
BOUNDED_CLOSED, ///< Window is bounded by a value-based endpoint, endpoint is included. | ||
UNBOUNDED, ///< Window runs to beginning (or end) of the group the row is in. | ||
CURRENT_ROW, ///< Window contains all rows that compare equal to the current row. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose I can actually use the public types and do constexpr dispatch via is_same_v
rather than comparison to a compile-time known enum tag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that seems like a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't gone through the full diff yet, but if it is possible we should check the compile-time and run-time impact of not using these as template parameters and instead passing them at runtime.
[[nodiscard]] __device__ constexpr cuda::std:: | ||
tuple<size_type, size_type, size_type, size_type, size_type, size_type, size_type> | ||
row_info(size_type i) const noexcept | ||
{ | ||
if (nulls_at_start) { | ||
return {null_count, 0, num_rows, 0, null_count, null_count, num_rows}; | ||
} else { | ||
return {null_count, | ||
num_rows, | ||
null_count, | ||
num_rows - null_count, | ||
num_rows, | ||
0, | ||
num_rows - null_count}; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should go back and try again, but I had what seemed to be miscompilation issues if I exposed all of these values via individual function calls instead of returning everything all at once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to help debug those if something comes up.
cpp/src/rolling/grouped_rolling.cu
Outdated
auto get_window_type = [](range_window_bounds const& bound) -> range_window_type { | ||
if (bound.is_unbounded()) { | ||
return unbounded{}; | ||
} else if (bound.is_current_row()) { | ||
return current_row{}; | ||
} else { | ||
return bounded_closed{bound.range_scalar()}; | ||
} | ||
}; | ||
auto [preceding_column, following_column] = | ||
make_range_windows(group_keys, | ||
order_by_column, | ||
order, | ||
get_window_type(preceding), | ||
get_window_type(following), | ||
stream, | ||
cudf::get_current_device_resource_ref()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Polyfill to migrate the old interface to the new one. I propose deprecating this API in favour of the one where one passes range_window_type
s and additionally the null_order
.
cpp/src/rolling/range_rolling.cu
Outdated
[[nodiscard]] null_order deduce_null_order(column_view const& orderby, | ||
order order, | ||
rmm::device_uvector<size_type> const& offsets, | ||
rmm::device_uvector<size_type> const& per_group_nulls, | ||
rmm::cuda_stream_view stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed to polyfill the old interface to the new one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, thank you for this.
cpp/src/rolling/range_rolling.cu
Outdated
{ | ||
auto d_orderby = column_device_view::create(orderby, stream); | ||
auto const num_groups = offsets.size() - 1; | ||
std::size_t bytes{0}; | ||
auto is_null_it = cudf::detail::make_counting_transform_iterator( | ||
cudf::size_type{0}, [orderby = *d_orderby] __device__(size_type i) -> size_type { | ||
return static_cast<size_type>(orderby.is_null_nocheck(i)); | ||
}); | ||
rmm::device_uvector<cudf::size_type> null_counts{num_groups, stream}; | ||
cub::DeviceSegmentedReduce::Sum(nullptr, | ||
bytes, | ||
is_null_it, | ||
null_counts.begin(), | ||
num_groups, | ||
offsets.begin(), | ||
offsets.begin() + 1, | ||
stream.value()); | ||
auto tmp = rmm::device_buffer(bytes, stream); | ||
cub::DeviceSegmentedReduce::Sum(tmp.data(), | ||
bytes, | ||
is_null_it, | ||
null_counts.begin(), | ||
num_groups, | ||
offsets.begin(), | ||
offsets.begin() + 1, | ||
stream.value()); | ||
return null_counts; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the major performance improvement in this PR. It makes it much faster to find the nulls per group in a grouped rolling window with low-cardinality group key and nulls in the orderby column (in bad cases the old code would take more than a second, this new code takes a few ms).
cpp/src/rolling/range_rolling.cu
Outdated
std::pair<std::unique_ptr<column>, std::unique_ptr<column>> make_range_windows( | ||
table_view const& group_keys, | ||
column_view const& orderby, | ||
order order, | ||
range_window_type preceding, | ||
range_window_type following, | ||
rmm::cuda_stream_view stream, | ||
rmm::device_async_resource_ref mr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function goes away once the old APIs are deprecated.
cpp/src/rolling/range_rolling.cu
Outdated
auto deduced_null_order = [&]() { | ||
if (null_order.has_value()) { return null_order.value(); } | ||
if (!orderby.has_nulls()) { | ||
// Doesn't matter in this case | ||
return null_order::BEFORE; | ||
} | ||
return deduce_null_order(orderby, order, offsets, per_group_nulls, stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also goes away.
f6ddb1e
to
36f2e0e
Compare
36f2e0e
to
596cb3e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a quick pass. This PR is huge but the code looks good overall. I tried to focus on architecture but I have some gaps in my knowledge of the rolling code and its downstream API / feature requirements. Some of my comments may be off base for that reason.
std::unique_ptr<table> grouped_range_rolling_window( | ||
table_view const& group_keys, | ||
column_view const& orderby, | ||
order order, | ||
null_order null_order, | ||
range_window_type preceding, | ||
range_window_type following, | ||
size_type min_periods, | ||
std::vector<std::pair<column_view const&, rolling_aggregation const&>> requests, | ||
rmm::cuda_stream_view stream = cudf::get_default_stream(), | ||
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t feel like I know the status quo of our rolling APIs or the downstream requirements well enough to opine on this without significant research time. I might be able to circle back to this but it’ll be at least a week.
enum class window_tag : std::int8_t { | ||
BOUNDED_OPEN, ///< Window is bounded by a value-based endpoint, endpoint is excluded. | ||
BOUNDED_CLOSED, ///< Window is bounded by a value-based endpoint, endpoint is included. | ||
UNBOUNDED, ///< Window runs to beginning (or end) of the group the row is in. | ||
CURRENT_ROW, ///< Window contains all rows that compare equal to the current row. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that seems like a good idea.
row_info(size_type i) const noexcept | ||
{ | ||
if (nulls_at_start) { | ||
return {null_count, 0, num_rows, 0, null_count, null_count, num_rows}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way we could use a struct with named members? I have no idea what this tuple represents (though perhaps I haven’t gotten to that point in the diff yet).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm posting the call-site here for (my own) reference:
auto const [null_count, group_start, group_end, null_start, null_end, start, end] =
groups.row_info(i);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this might be an easier read with a struct/class. The names would be useful.
* and greater than any non-nan value. These structs implement that logic. | ||
*/ | ||
template <typename T> | ||
struct less { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some related code here:
__device__ weak_ordering relational_compare(Element lhs, Element rhs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you suggest we move these implementations there?
static_assert(cuda::std::is_same_v<T, V>, "Cannot add mismatching types"); | ||
|
||
if constexpr (cuda::std::is_floating_point_v<T>) { | ||
// Mimicking spark requirements, inf/nan x propagates |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we overspecialized to Spark conventions here? Are there places where we are compiling extra kernels, handling extra logic, etc. which could be avoided by some preprocessing steps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't avoid it. If your addition of the delta would saturate to inf, spark says the window stops at max_float
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code, and all these kernels, already exists (albeit written differently) in the range comparator utils that are removed in this PR.
Put another way, all of the rolling window is already specialized to spark conventions. If we remove that specialization, they will have to implement it all themselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If asking Spark to implement some specialized kernels themselves allows us to reduce template instantiations / compile times, that could be an option worth exploring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Asking spark to implement things themselves requires that, either, we provide a compile-time customisation point for this addition in the public interface, or spark reimplements all of this code (as in, all of the code in this PR). It would save two template instantiations (namely float
and double
).
I don't think that's worth it.
{ | ||
} | ||
Grouping groups; ///< Group information to determine bounds on current row's window | ||
static_assert(cuda::std::is_same_v<Grouping, ungrouped> || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opportunity to reuse this:
inline constexpr bool is_any_v = std::disjunction<std::is_same<T, Ts>...>::value; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a useful utility. Would that this were available in a more accessible header.
* @brief Return information about the current row. | ||
* | ||
* @param i The row | ||
* @returns Tuple of `(null_count, group_start, group_end, null_start, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, a struct with named members would be good here.
cpp/src/rolling/range_rolling.cu
Outdated
// null_order was either BEFORE or AFTER | ||
// If at least one group has a null at the beginning and that | ||
// group has more entries than the null count of the group, must | ||
// be nulls at starts of groups (BEFORE),otherwise must be nulls at end (AFTER) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// be nulls at starts of groups (BEFORE),otherwise must be nulls at end (AFTER) | |
// be nulls at starts of groups (BEFORE), otherwise must be nulls at end (AFTER) |
I see that the JNI changes to hook things up to the new interfaces are out of scope here. I can help with that in a follow-up. For now, this change will need testing with Spark integration tests. I'll try a run with these changes tomorrow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's a first pass of review. I've started to get a sense of what all the code is, but I'll definitely need to let it settle for a day then come back for a second pass to really see how all the parts fit together. I'm hoping some of my suggestions will help simplify the code for future rounds of review, though.
This code is reminiscent of #9027 to me. I suspect that similar techniques might be applicable to simplifying some of the structures and code paths, so it might be worth scanning that PR for organizational ideas in case it gives ideas about what things require templating, what can be done at runtime, how to organize different structs, etc.
cpp/include/cudf/rolling.hpp
Outdated
* @note Using `make_range_window_bounds(table_view const&, column_view const&, order, null_order, | ||
* window_type, window_type, rmm::cuda_stream_view, rmm::device_async_resource_ref)` is preferred, | ||
* since this function requires the launch of an additional kernel to deduce the null order of the | ||
* orderby column. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have to provide this overload? Can we just require the caller to provide the null_order
? Is there an easy API they could use to get it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The public API that calls in to these functions doesn't currently have a null_order
argument. I have not yet deprecated that API in this PR (but could). But we need a way to deduce the null order (implemented as deduce_null_order
in range_rolling.cu) to polyfill during the deprecation period.
If we're happy to just remove the old API (without a deprecation period) then I don't need this function.
I suppose I don't actually need to make this API public though...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're happy to just remove the old API...
Requiring the null-order in the public APIs would break spark-rapids
builds. This might not be that hard to resolve. (We'd need to do that segmented null count at our end.) But a deprecation period would be useful for planning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to do the segmented null count, you just need to say where the nulls were sorted, which must be known, I think.
cpp/include/cudf/detail/rolling.hpp
Outdated
/** | ||
* @copydoc `make_range_window` | ||
* | ||
* This behaves as `make_range_window`, but template-specialised to `PRECEDING` windows. This way we | ||
* can compile it in a separate translation unit from `make_following_range_window`. | ||
*/ | ||
[[nodiscard]] std::unique_ptr<column> make_preceding_range_window( | ||
column_view const& orderby, | ||
std::optional<rolling::preprocessed_group_info> const& grouping, | ||
order order, | ||
null_order null_order, | ||
range_window_type window, | ||
rmm::cuda_stream_view stream, | ||
rmm::device_async_resource_ref mr); | ||
|
||
/** | ||
* @copydoc `make_range_window` | ||
* | ||
* This behaves as `make_range_window`, but template-specialised to `FOLLOWING` windows. This way we | ||
* can compile it in a separate translation unit from `make_preceding_range_window`. | ||
*/ | ||
[[nodiscard]] std::unique_ptr<column> make_following_range_window( | ||
column_view const& orderby, | ||
std::optional<rolling::preprocessed_group_info> const& grouping, | ||
order order, | ||
null_order null_order, | ||
range_window_type window, | ||
rmm::cuda_stream_view stream, | ||
rmm::device_async_resource_ref mr); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless I'm misunderstanding, you can do this with explicit template instantiation instead of defining helper functions (see the docs under "Function template instantiation", subheading "Explicit instantiation" if you're not familiar). I've made the other corresponding changes in range_(following|preceding).cu
and at call sites, but please let me know if there's some other reason that you need these functions.
/** | |
* @copydoc `make_range_window` | |
* | |
* This behaves as `make_range_window`, but template-specialised to `PRECEDING` windows. This way we | |
* can compile it in a separate translation unit from `make_following_range_window`. | |
*/ | |
[[nodiscard]] std::unique_ptr<column> make_preceding_range_window( | |
column_view const& orderby, | |
std::optional<rolling::preprocessed_group_info> const& grouping, | |
order order, | |
null_order null_order, | |
range_window_type window, | |
rmm::cuda_stream_view stream, | |
rmm::device_async_resource_ref mr); | |
/** | |
* @copydoc `make_range_window` | |
* | |
* This behaves as `make_range_window`, but template-specialised to `FOLLOWING` windows. This way we | |
* can compile it in a separate translation unit from `make_preceding_range_window`. | |
*/ | |
[[nodiscard]] std::unique_ptr<column> make_following_range_window( | |
column_view const& orderby, | |
std::optional<rolling::preprocessed_group_info> const& grouping, | |
order order, | |
null_order null_order, | |
range_window_type window, | |
rmm::cuda_stream_view stream, | |
rmm::device_async_resource_ref mr); |
[[nodiscard]] std::unique_ptr<column> make_following_range_window( | ||
column_view const& orderby, | ||
std::optional<rolling::preprocessed_group_info> const& grouping, | ||
order order, | ||
null_order null_order, | ||
range_window_type window, | ||
rmm::cuda_stream_view stream, | ||
rmm::device_async_resource_ref mr) | ||
{ | ||
return make_range_window<rolling::direction::FOLLOWING>( | ||
orderby, grouping, order, null_order, window, stream, mr); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[[nodiscard]] std::unique_ptr<column> make_following_range_window( | |
column_view const& orderby, | |
std::optional<rolling::preprocessed_group_info> const& grouping, | |
order order, | |
null_order null_order, | |
range_window_type window, | |
rmm::cuda_stream_view stream, | |
rmm::device_async_resource_ref mr) | |
{ | |
return make_range_window<rolling::direction::FOLLOWING>( | |
orderby, grouping, order, null_order, window, stream, mr); | |
} | |
template std::unique_ptr<column> make_range_window<rolling::direction::FOLLOWING>( | |
column_view const&, | |
std::optional<rolling::preprocessed_group_info> const&, | |
order, | |
null_order, | |
range_window_type, | |
rmm::cuda_stream_view, | |
rmm::device_async_resource_ref) |
[[nodiscard]] std::unique_ptr<column> make_preceding_range_window( | ||
column_view const& orderby, | ||
std::optional<rolling::preprocessed_group_info> const& grouping, | ||
order order, | ||
null_order null_order, | ||
range_window_type window, | ||
rmm::cuda_stream_view stream, | ||
rmm::device_async_resource_ref mr) | ||
{ | ||
return make_range_window<rolling::direction::PRECEDING>( | ||
orderby, grouping, order, null_order, window, stream, mr); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[[nodiscard]] std::unique_ptr<column> make_preceding_range_window( | |
column_view const& orderby, | |
std::optional<rolling::preprocessed_group_info> const& grouping, | |
order order, | |
null_order null_order, | |
range_window_type window, | |
rmm::cuda_stream_view stream, | |
rmm::device_async_resource_ref mr) | |
{ | |
return make_range_window<rolling::direction::PRECEDING>( | |
orderby, grouping, order, null_order, window, stream, mr); | |
} | |
template std::unique_ptr<column> make_range_window<rolling::direction::PRECEDING>( | |
column_view const&, | |
std::optional<rolling::preprocessed_group_info> const&, | |
order, | |
null_order, | |
range_window_type, | |
rmm::cuda_stream_view, | |
rmm::device_async_resource_ref) |
begin + start, | ||
begin + end, | ||
value, | ||
comparator_t<OrderbyT, window_tag::BOUNDED_CLOSED, Order>{}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just set the comparator type in the conditionals and then have a single thrust::distance
call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, because if the type is different, you can't assign to a concrete variable
if constexpr (Direction == direction::PRECEDING) { | ||
if constexpr (WindowTag == window_tag::UNBOUNDED) { return i - group_start + 1; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is another case where if we are going to stick with compile-time constants for branching I would prefer to see some explicit template specializations. The degree of nesting here makes things harder for me to follow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would you do this? The template parameters cannot be on the operator()
definition AFAIUI, so that means just replicating the body of this struct by hand N-template-variants times, which seems like a backward step to me, but perhaps I do not understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way I grok @vyasr's suggestion is to move the if-else blocks to a separate function template, with specializations for <PRECEDING, UNBOUNDED>
, <PRECEDING, CURRENT_ROW>
, etc. We call said function at line#483 instead. This will dispatch to the appropriate specialization.
All that said, I'd personally prefer the if constexpr
style of it. It makes for less fragmented reading.
* saturating addition/subtraction. | ||
*/ | ||
template <typename Grouping, typename OrderbyT, typename DeltaT> | ||
struct distance_kernel { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be distance_functor
(or whatever name you prefer; I'm not here to paint the bikeshed, but it's not a kernel).
…ange-rolling-redux
This removes the need for a new public function that bridges the old API to the new one.
Just dispatch on the types directly. Also, simplify variant unpicking.
cpp/include/cudf/rolling.hpp
Outdated
* @note Using `make_range_window_bounds(table_view const&, column_view const&, order, null_order, | ||
* window_type, window_type, rmm::cuda_stream_view, rmm::device_async_resource_ref)` is preferred, | ||
* since this function requires the launch of an additional kernel to deduce the null order of the | ||
* orderby column. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're happy to just remove the old API...
Requiring the null-order in the public APIs would break spark-rapids
builds. This might not be that hard to resolve. (We'd need to do that segmented null count at our end.) But a deprecation period would be useful for planning.
* | ASCENDING | DESCENDING | ||
* ----------+-----------+----------- | ||
* PRECEDING | x - delta | x + delta | ||
* FOLLOWING | x + delta | x - delta |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
[[nodiscard]] __device__ size_type operator()(size_type i) const | ||
{ | ||
using Comp = comparator_t<OrderbyT, WindowTag, Order>; | ||
auto const info = groups.row_info(i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might have missed where info
is used. We seem also to be unpacking the result of groups.row_info(i)
on the next line.
Did we intend to remove the info
line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, should be removed, thanks.
I've posted some of my ideas for changes in https://github.com/wence-/cudf/pull/2. I'm not convinced that all of them are good, so I'd like to see what reviewers think. The next step there would be removing a couple of template parameters and benchmarking (should be easier to do after my changes). Note that I had to merge latest 25.04 to resolve conflicts first, so you can ignore the first hundred or so commits. My first commit is wence-@b97e805. |
…ange-rolling-redux
Move direction/order to runtime dispatch
Move impls to rolling_utils.cuh We can also drop the null count from the return value.
d9c8eb8
to
fb1af0f
Compare
{1, 1, 2, 1, 1, 1, 2, 1, 1, 1, 1, 2, 1}}, | ||
cudf::test::fixed_width_column_wrapper<cudf::size_type>{ | ||
{0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0}}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the part I was a little concerned about. It is important that for CURRENT_ROW
operations, expected_following
should cover the first/last occurrence of the current row's value. i.e. expected_following[1] == 1
, expected_following[2] == 2
, etc.
Thank you for covering this.
/** | ||
* @brief Information about group bounds of the current row's group. | ||
*/ | ||
struct range_group_info { | ||
size_type group_start; | ||
size_type group_end; | ||
size_type null_start; | ||
size_type null_end; | ||
size_type non_null_start; | ||
size_type non_null_end; | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this struct and moved the grouped/ungrouped with nulls structs here from range_utils.cuh
.
__device__ constexpr bool operator()(T const& x, T const& y) const noexcept | ||
{ | ||
if (order == order::ASCENDING) { | ||
using op = typename comparator_impl<T, Tag>::op; | ||
return op{}(x, y); | ||
} else { | ||
using op = typename comparator_impl<T, Tag>::rev_op; | ||
return op{}(x, y); | ||
} | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the order
dispatch to runtime.
template <typename Grouping> | ||
struct unbounded_distance_functor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split the distance functor by window type which I think simplifies the logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes thanks, I definitely think that helps. I did this by template specializing in my PR, but this approach is fine too.
* | ||
* @tparam WindowType The tag indicating the type of window being computed | ||
*/ | ||
template <typename WindowType> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This template parameter could be made runtime by accepting the range_window_type
variant rather than the type as a parameter here. However, it won't change the compile time because we must instantiate all of the device kernels separately anyway.
Latest CI build, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is definitely better. I think there's still some room for improving code structure but we're approaching the end of that. I should be able to focus on evaluating the algorithm more in the next round of review and hopefully be done after that.
if constexpr (cuda::std::is_floating_point_v<T>) { | ||
if (cuda::std::isnan(x)) { return false; } | ||
return cuda::std::isnan(y) || x < y; | ||
} else { | ||
return x < y; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood. Yes, if we have to do this for Spark then so be it. I see the comment above explaining that so I think we're good here.
struct comparator_impl<T, current_row> { | ||
using op = less<T>; | ||
using rev_op = greater<T>; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like you could just default to less<T>
and greater<T>
in the default implementation of comparator_impl
(instead of void
) and then just specialize bounded_open
, but I suppose that leaves the unbounded
case with an invalid combination that is simply never used?
[[nodiscard]] __host__ __device__ constexpr inline cuda::std::pair<T, bool> operator()( | ||
T x, T y) noexcept | ||
{ | ||
// Mimicking spark requirements, inf/nan x propagates |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a note that we're choosing Spark conventions unconditionally because no other front-ends for libcudf are currently requesting any particular behavior here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does inf/nan y not propagate?
{ | ||
// Mimicking spark requirements, inf/nan x propagates | ||
if (cuda::std::isinf(x) || cuda::std::isnan(x)) { return {x, false}; } | ||
// Requirement, not checked, y is not inf or nan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this UB anywhere at the level of public APIs that needs to be documented too, or do we validate our inputs higher in the stack before we reach this point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will only arrive via the spark route, so @mythrocks?
I can document it somewhere closer to the public API though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's funny, I just got to this part of the change, and wanted to quiz you on it.
Let me run some tests and check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think this should be alright, from Spark's perspective. There doesn't seem to be a way to squeeze a Nan/Infinity through as a duration/interval/delta value. It fails out upstream.
// Note: the casts here are implementation defined (until C++20) but all | ||
// the platforms we care about do the twos-complement thing. | ||
if constexpr (cuda::std::is_same_v<Op, cuda::std::plus<>>) { | ||
auto const did_overflow = static_cast<T>((ux ^ uy) | ~(uy ^ result)) >= 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this be equivalent and save a flop (no negation)?
auto const did_overflow = static_cast<T>((ux ^ uy) | ~(uy ^ result)) >= 0; | |
auto const did_overflow = static_cast<T>((ux ^ result) & (uy ^ result)) < 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, that doesn't work. Consider (32bit)
#include <cstdint>
#include <limits>
int main()
{
constexpr int x = std::numeric_limits<int>::max();
constexpr int y = 2;
constexpr unsigned int ux = static_cast<unsigned int>(x);
constexpr unsigned int uy = static_cast<unsigned int>(y);
constexpr unsigned int z = (ux >> std::numeric_limits<int>::digits);
constexpr unsigned int result = ux + uy;
constexpr unsigned int overflowed_res = (ux >> std::numeric_limits<int>::digits) + std::numeric_limits<int>::max();
constexpr unsigned int bit_or = (ux ^ uy) | ~(uy ^ result);
constexpr int cast_or = static_cast<int>(bit_or);
constexpr bool overflow_or = cast_or >= 0;
constexpr unsigned int bit_and = (ux ^ uy) & (uy ^ result);
constexpr int cast_and = static_cast<int>(bit_and);
constexpr bool overflow_and = cast_and < 0;
static_assert(overflow_and == overflow_or);
}
The static_assert fails.
thrust::copy_n(rmm::exec_policy_nosync(stream), | ||
cudf::detail::make_counting_transform_iterator(0, functor), | ||
orderby.size(), | ||
result->mutable_view().begin<size_type>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that we are instantiating more templates here than we strictly need, and in this case since it's not in device code we may actually be OK to split it out without seeing any performance drop. Specifically, What happens if we move copy_n
out from being a lambda to being a standalone function? For the unbounded and current_row cases we should see the number of instances cut in half because we no longer specialize on the ScalarT. If I'm reading this right I think those cases are maybe also independent of the OrderByT, and only depend on the order and direction. I think when copy_n
is a lambda inside window_bounds
it will be instantiated once per instance of window_bounds
, whereas with a separate function it should only be instantiated once per type.
Maybe we could go further and rip expand
out into a separate function too, but I don't think that'll be necessary if we move out the copy_n
function since I'm guessing all of the real compile time is in creating the various thrust specializations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ScalarT
is always paired with OrderbyT
(so splitting out ScalarT
doesn't reduce the number of instantiations I think).
unbounded
can avoid specialising on OrderbyT
, but then you'd have to handle it differently in the type-dispatcher I think.
cudf::data_type_error); | ||
if (row_delta && row_delta->type().scale() != orderby.type().scale()) { | ||
auto const value = | ||
dynamic_cast<fixed_point_scalar<OrderbyT> const*>(row_delta)->fixed_point_value(stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just static_cast here if we're not checking that the cast is valid anyway?
dynamic_cast<fixed_point_scalar<OrderbyT> const*>(row_delta)->fixed_point_value(stream); | |
static_cast<fixed_point_scalar<OrderbyT> const*>(row_delta)->fixed_point_value(stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, can do (this cast is guaranteed valid after all)
/** | ||
* @brief The type of the range-based rolling window endpoint. | ||
*/ | ||
using range_window_type = std::variant<unbounded, current_row, bounded_closed, bounded_open>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that I'm more familiar with what the code is doing, is the main reason that we're using a variant here that the bounded windows have a window size (delta) while the other two do not? If so, would things be simpler if we switched to a single class + enum? I guess it would be a little awkward to have to deal with having a delta and not in others.
Perhaps the better question in the long run is, is there a way for us to unify range_window_type
and range_window_bounds
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want range_window_bounds
to go away
rolling_aggregation const& aggr, | ||
rmm::cuda_stream_view stream, | ||
rmm::device_async_resource_ref mr) | ||
[[nodiscard]] static null_order deduce_null_order( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to deprecate the overload of the public API that requires this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intention was to do the deprecation in a followup to this PR, to avoid extra cognitive load here.
d_offsets = offsets.data(), | ||
nulls_per_group = per_group_nulls.data()] __device__(size_type i) -> bool { | ||
return nulls_per_group[i] < (d_offsets[i + 1] - d_offsets[i]) && | ||
d_orderby.is_null_nocheck(d_offsets[i]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slower but simpler: move the order == ascending check into the functor and dedupe the rest of this function. Probably not worth the cost though, especially since this is a polyfill to be removed.
} | ||
|
||
template <typename T, CUDF_ENABLE_IF(cuda::std::is_integral_v<T>&& cuda::std::is_signed_v<T>)> | ||
[[nodiscard]] __host__ __device__ constexpr inline cuda::std::pair<T, bool> operator()( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extreme nitpick:
As an aside, given that saturating
has no state, we can probably get away with operator()
being const
.
Description
In both cudf.pandas and cudf-polars we would like to be able to use the existing libcudf functionality for computing the
preceding
andfollowing
columns for range-based (grouped) rolling windows.The current functionality is designed with spark in mind and supports calculations with slightly different requirements compared to pandas and polars.
In this PR, we unify the construction of these offset column calculations to satisfy all uses.
Specifically:
BOUNDED_OPEN
window. This is a range-based window where the endpoint of the range is not included.The proposed interface for describing the requested window-type going forward is a strongly-typed one using
std::variant
. This removes the need for default-constructed scalars when specifyingUNBOUNDED
andCURRENT_ROW
windows.Performance improvements
Spark permits nulls in the
orderby
column. In the grouped-rolling case, these nulls must be sorted at one end of each group. The current implementation finds the partition point in each group usingthrust::for_each
over each group andthrust::partition_point
to find the break. For low-cardinality grouped rolling aggregations this can be very slow, since we have only a single thread searching over each group. We replace this with a segmented sum with CUB to find the per-group null count (and hence the break).The dispatched functor for computing the bound given a row value has constexpr dispatch for the common, and required for pandas and polars, case that the
orderby
column has no-nulls. This shaves some milliseconds.In the case that the
orderby
column does have nulls, we extend the interface such that the caller (who must have arranged that it is sorted correctly) tells us whether nulls were sortedBEFORE
orAFTER
. This avoids multiple kernel launches to deduce the null order, saving some milliseconds, and (in the grouped case) memory footprint. We polyfill this deduction so that the existing interface still works until we can deprecate and remove it.Guide for review
The core logic is implemented in
range_utils.cuh
, specifically therange_window_clamper
, dispatched to bymake_range_window
. To keep compile times under control, the template instantiation for computing preceding and following windows is moved into separate translation units: locally compilation takes ~10mins/TU. Things could be split out further if deemed necessary.Checklist