You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Is your feature request related to a problem? Please describe.
I have been exploring extending the rolling window interfaces we have in libcudf to support the use cases that cudf-polars (and cudf-classic) need.
In doing so, I think there is some opportunity for both cleanup, and performance improvement in the construction of the inputs to cudf::detail::rolling_window (which is where eventually every frontend caller ends up).
[Aside: This perf improvement is separate from improving the algorithmic complexity of some rolling window operations when the window itself is large.]
To do so, we need to classify first the required types of rolling windows, and how the implementation proceeds.
libcudf low-level API
To perform a rolling (or grouped rolling) aggregation, libcudf provides cudf::detail::rolling_window with signature:
Different window types can be controlled by providing appropriate iterators to preceding_window_begin ($P$) and following_window_begin ($F$). For this API, the window begin/end are always row-based integer offsets. That is, for each row $i$ in the input, the window is defined by all rows $j$ such that
Currently, the kernel that does the windowed aggregation additionally clamps the window bounds to be in-range for the column, additionally, various public interfaces appear to pre-clamp.
polars, pandas, spark window specifications
In all cases (polars, pandas, spark) windows can be described either as row-based (using integer offsets) or range-based (using a delta from the current row value to select rows in the window). As we can see above, to interface with libcudf we eventually need to turn these into row-based offsets in all cases.
All three also offer both grouped and ungrouped rolling windows. A grouped rolling window partitions the input column by some grouping table and then applies normal rolling windows group-wise. An ungrouped rolling window can be treated as a grouped window with a single group.
In terms of constructing the $P$ and $F$ iterators, grouping keys merely provide extra constraints on the offset values: offsets may never result in a window crossing a group boundary.
Row-based window specifications
spark: A window is specified by a pair of (preceding_offset, following_offset). The offset can be an integer, subtracted from the current row to produce the left endpoint (preceding), added to the current row to to produce the right endpoint (following), or the magic flag UNBOUNDED which is equivalent to setting the row offset to $\infty$. Note that in spark the window intervals defined by these offsets are always closed, that is, they include the endpoints. This specification maps, pretty much directly, onto what libcudf requires.
pandas: A window is specified by a pair of (length, is_centered=True/False). The offset is an integer, windows always trail the current row unless is_centered=True (in which case the window is centered over the current row). In pandas one can specify whether the window interval is closed, left-half-open, right-half-open, or open. To match up with libcudf, we just need to convert the length/is_centered pair + interval bounds to appropriate integers. Note: Pandas supports the equivalent of left-UNBOUNDED windows via DataFrame.expanding.
polars: A window is specified by a pair of (length, offset). Both offset and length are integers, the window is defined by [i + offset, ..., i + offset + length]. One can specify, like pandas, whether the interval is closed, left-half-open, right-half-open, or open. To match up with libcudf, we again just need to map the length and offset + interval bounds to appropriate integers. Note: polars doesn't AFAICT support the equivalent of UNBOUNDED windows (though one can achieve the result by setting length=len(dataframe)).
All three libraries also support grouped versions of the above.
Range-based window specifications
This code is, as far as I can tell, all implemented with spark semantics only. As well as row-based (integer offset) windows, all libraries support window specifications (on some subset of types) that are range based. That is, given a preceding value $P$, following value $F$, and an orderby column $O$, for a row $i$ with value $O_i$ rows of window $i$ are the ordered rows $j$ such that
$$
O_i - P \le O_j \le O_i + F.
$$
Note, this is the spark definition (windows are closed at the endpoints). One can provide the magic value UNBOUNDED to mean $\infty$, and the magic value CURRENT_ROW equivalent to $P$ (respectively $F$) being equal to zero.
It is not documented in the API, but both $P$ and $F$ must be non-negative.
Polars and pandas also support, in their APIs specifying range-based windows. However, they do not restrict the preceding and following values to be non-negative. They also allow (as above) both open and half-open intervals (as well as closed intervals like spark), but do not have magic sigils for UNBOUNDED and CURRENT_ROW windows.
In all cases, the orderby column must be in sorted order (spark and pandas allow sorted in ascending and descending orders, polars only in ascending). Polars and pandas do not permit nulls in the orderby column, spark does.
All libraries permit providing grouping keys in which case the orderby column must be sorted within each group (polars does not require that the group keys are sorted, and does not guarantee that group key order is maintained).
Implementation issues
Needless work
Depending on exactly how one calls the full column rolling window code, the libcudf code-path either clamps preceding/following bounds to be in-bounds at least once, often twice, sometimes materialising two columns of integers along the way.
[col_size = input.size(), following = following_window.begin<size_type>()] __device__(
size_type i) { returnthrust::min(col_size - i - 1, following[i]); }));
, this eventually calls the same rolling kernel which again ensures that the start/stop are in bounds (note, once again, if the rolling aggregation is a UDF [CUDA or PTRX], then we only clamp in the kernel).
There are other code paths (used by spark I believe) for grouped_rolling_window, what happens here again depends on whether the rolling aggregation is a UDF. If it is then the preceding/following integers are clamped to the group bounds by a transform_iterator (really, a struct that provides operator[]) and no additional materialisation is done.
If it is not, then the preceding/following integers are expanded into columns
In both cases, the rolling kernel clamps the start/stop to the bounds of the input column.
As a consequence of these divergences, the UDF-based aggregations do less work (and require less memory), but do not support (because they were not modified) all the cases that non-UDF aggregations support: specifically UDF-based aggregations require that both preceding and following are non-negative (and hence the window covers the current row), in contrast non-UDF aggregations allow preceding and following to be negative (the window need not cover the current row).
spark-specific semantics for some interfaces
There are APIs to do grouped range-based and group row-based rolling windows, but they conform to spark requirementse. The range-based rolling window API cudf::grouped_range_rolling_window does lots of work, turning the range-based bounds into columns of preceding and following windows (for passing to cudf::detail::rolling_window) and then doing a single rolling aggregation.
This precludes computing the window bounds once and reusing them for multiple aggregations (an API usage supported by both polars and pandas). Again, these functions guarantee that they produce in-bounds window offsets, but the kernel that does rolling windows does not know this and does extra work.
I would like it if the computation of the window offsets from range bounds were more flexible (to support polars/pandas requirements), and separate from the calculation of the rolling aggregation (so that I can reuse it).
Concrete suggestions/requests
Bounds checking
I think the responsibility for clamping the provided windows to be in bounds should not be the responsibility of the rolling aggregation kernel, it should require iterators for the preceding/following windows that always produces an in-bounds value.
For the cudf::rolling_window API that takes columns of preceding/following, we should document and require that they always produce in-bounds values.
Remove need for materialisation of intermediates where possible
For row-based windows (both grouped and ungrouped), I think we should always use unmaterialised transform_iterators to turn the input row offsets into clamped (grouped) row offsets.
That is, the APIs that take size_type for preceding and following should not need to materialise a column.
Provide API to construct window offset bounds from ranges
Extend the existing (spark-specific) detail API for constructing window bounds from ranges to a public one, and expand it to support the use cases that polars and pandas need: I have a PoC for this that does everything except handle nulls.
Remove/deprecate dead code
Originally range-based windows were only for timestamp order-by columns (hence cudf::grouped_time_range_rolling_window). But this (since 2021) has just called into cudf::grouped_range_rolling_window, so we can probably deprecate it.
Rationalise API
I think we want:
construction of window offsets from range bounds (grouped and ungrouped). Suggestion: std::pair<std::unique_ptr<column>, std::unique_ptr<column>> cudf::compute_range_window_bounds(table_view const &group_keys, column_view &orderby, order column_order, null_order null_order, range_window const& preceding, std::optional<range_window> const& following, stream, mr)
computing row-based rolling windows on a std::vector of aggregation requests (like groupby requests). Suggestion: std::unique_ptr<table> cudf::rolling_window(table_view const &group_keys, size_type preceding, size_type following, std::vector<size_type> min_periods, std::vector<RollingAggRequests>, stream, mr)
computing row-based rolling windows on a std::vector of aggregation requests given columns of window bounds (same as above but column_view for preceding and following).
I think we can build all the pieces we want out of these three things.
The text was updated successfully, but these errors were encountered:
Note that I have a bunch of ideas about how to improve the performance of computing the windows (for large bounded windows) for some subset of the aggregations that we care about, but it will be easier if the current code is a bit more straightforward to follow through and provides stronger guarantees or preconditions.
Is your feature request related to a problem? Please describe.
I have been exploring extending the rolling window interfaces we have in libcudf to support the use cases that cudf-polars (and cudf-classic) need.
In doing so, I think there is some opportunity for both cleanup, and performance improvement in the construction of the inputs to
cudf::detail::rolling_window
(which is where eventually every frontend caller ends up).[Aside: This perf improvement is separate from improving the algorithmic complexity of some rolling window operations when the window itself is large.]
To do so, we need to classify first the required types of rolling windows, and how the implementation proceeds.
libcudf low-level API
To perform a rolling (or grouped rolling) aggregation, libcudf provides
cudf::detail::rolling_window
with signature:Different window types can be controlled by providing appropriate iterators to$P$ ) and $F$ ). For this API, the window begin/end are always row-based integer offsets. That is, for each row $i$ in the input, the window is defined by all rows $j$ such that
preceding_window_begin
(following_window_begin
(Currently, the kernel that does the windowed aggregation additionally clamps the window bounds to be in-range for the column, additionally, various public interfaces appear to pre-clamp.
polars, pandas, spark window specifications
In all cases (polars, pandas, spark) windows can be described either as row-based (using integer offsets) or range-based (using a delta from the current row value to select rows in the window). As we can see above, to interface with libcudf we eventually need to turn these into row-based offsets in all cases.
All three also offer both grouped and ungrouped rolling windows. A grouped rolling window partitions the input column by some grouping table and then applies normal rolling windows group-wise. An ungrouped rolling window can be treated as a grouped window with a single group.
In terms of constructing the$P$ and $F$ iterators, grouping keys merely provide extra constraints on the offset values: offsets may never result in a window crossing a group boundary.
Row-based window specifications
spark: A window is specified by a pair of$\infty$ . Note that in spark the window intervals defined by these offsets are always closed, that is, they include the endpoints. This specification maps, pretty much directly, onto what libcudf requires.
(preceding_offset, following_offset)
. The offset can be an integer, subtracted from the current row to produce the left endpoint (preceding
), added to the current row to to produce the right endpoint (following
), or the magic flagUNBOUNDED
which is equivalent to setting the row offset topandas: A window is specified by a pair of
(length, is_centered=True/False)
. The offset is an integer, windows always trail the current row unlessis_centered=True
(in which case the window is centered over the current row). In pandas one can specify whether the window interval is closed, left-half-open, right-half-open, or open. To match up with libcudf, we just need to convert the length/is_centered pair + interval bounds to appropriate integers. Note: Pandas supports the equivalent of left-UNBOUNDED
windows viaDataFrame.expanding
.polars: A window is specified by a pair of
(length, offset)
. Both offset and length are integers, the window is defined by[i + offset, ..., i + offset + length]
. One can specify, like pandas, whether the interval is closed, left-half-open, right-half-open, or open. To match up with libcudf, we again just need to map the length and offset + interval bounds to appropriate integers. Note: polars doesn't AFAICT support the equivalent ofUNBOUNDED
windows (though one can achieve the result by settinglength=len(dataframe)
).All three libraries also support grouped versions of the above.
Range-based window specifications
This code is, as far as I can tell, all implemented with spark semantics only. As well as row-based (integer offset) windows, all libraries support window specifications (on some subset of types) that are range based. That is, given a$P$ , following value $F$ , and an $O$ , for a row $i$ with value $O_i$ rows of window $i$ are the ordered rows $j$ such that
preceding
valueorderby
columnNote, this is the spark definition (windows are closed at the endpoints). One can provide the magic value$\infty$ , and the magic value $P$ (respectively $F$ ) being equal to zero.
UNBOUNDED
to meanCURRENT_ROW
equivalent toIt is not documented in the API, but both$P$ and $F$ must be non-negative.
Polars and pandas also support, in their APIs specifying range-based windows. However, they do not restrict the preceding and following values to be non-negative. They also allow (as above) both open and half-open intervals (as well as closed intervals like spark), but do not have magic sigils for
UNBOUNDED
andCURRENT_ROW
windows.In all cases, the
orderby
column must be in sorted order (spark and pandas allow sorted in ascending and descending orders, polars only in ascending). Polars and pandas do not permit nulls in theorderby
column, spark does.All libraries permit providing grouping keys in which case the
orderby
column must be sorted within each group (polars does not require that the group keys are sorted, and does not guarantee that group key order is maintained).Implementation issues
Needless work
Depending on exactly how one calls the full column rolling window code, the libcudf code-path either clamps preceding/following bounds to be in-bounds at least once, often twice, sometimes materialising two columns of integers along the way.
Specifically:
cudf::rolling_window(input, size_type preceding, size_type following, ...)
callscudf::detail::rolling_window(input, default_output, size_type, size_type, ...)
which materialises the preceding and following integers into columns via atransform_iterator
cudf/cpp/src/rolling/detail/rolling_fixed_window.cu
Lines 69 to 79 in 0c5bd66
cudf/cpp/src/rolling/detail/rolling.cuh
Lines 1044 to 1048 in 0c5bd66
cudf::rolling_window(input, column_view preceding, column_view following, ...)
callscudf::detail::rolling_window(input, column_view, column_view, ...)
which clamps the preceding and following iterators using a transform iteratorcudf/cpp/src/rolling/detail/rolling_variable_window.cu
Lines 69 to 79 in 0c5bd66
There are other code paths (used by spark I believe) for
grouped_rolling_window
, what happens here again depends on whether the rolling aggregation is a UDF. If it is then the preceding/following integers are clamped to the group bounds by atransform_iterator
(really, a struct that providesoperator[]
) and no additional materialisation is done.If it is not, then the preceding/following integers are expanded into columns
cudf/cpp/src/rolling/grouped_rolling.cu
Lines 232 to 237 in 0c5bd66
In both cases, the rolling kernel clamps the start/stop to the bounds of the input column.
As a consequence of these divergences, the UDF-based aggregations do less work (and require less memory), but do not support (because they were not modified) all the cases that non-UDF aggregations support: specifically UDF-based aggregations require that both preceding and following are non-negative (and hence the window covers the current row), in contrast non-UDF aggregations allow preceding and following to be negative (the window need not cover the current row).
spark-specific semantics for some interfaces
There are APIs to do grouped range-based and group row-based rolling windows, but they conform to spark requirementse. The range-based rolling window API
cudf::grouped_range_rolling_window
does lots of work, turning the range-based bounds into columns of preceding and following windows (for passing tocudf::detail::rolling_window
) and then doing a single rolling aggregation.This precludes computing the window bounds once and reusing them for multiple aggregations (an API usage supported by both polars and pandas). Again, these functions guarantee that they produce in-bounds window offsets, but the kernel that does rolling windows does not know this and does extra work.
I would like it if the computation of the window offsets from range bounds were more flexible (to support polars/pandas requirements), and separate from the calculation of the rolling aggregation (so that I can reuse it).
Concrete suggestions/requests
Bounds checking
I think the responsibility for clamping the provided windows to be in bounds should not be the responsibility of the rolling aggregation kernel, it should require iterators for the preceding/following windows that always produces an in-bounds value.
For the
cudf::rolling_window
API that takes columns ofpreceding
/following
, we should document and require that they always produce in-bounds values.Remove need for materialisation of intermediates where possible
For row-based windows (both grouped and ungrouped), I think we should always use unmaterialised
transform_iterators
to turn the input row offsets into clamped (grouped) row offsets.That is, the APIs that take
size_type
forpreceding
andfollowing
should not need to materialise a column.Provide API to construct window offset bounds from ranges
Extend the existing (spark-specific) detail API for constructing window bounds from ranges to a public one, and expand it to support the use cases that polars and pandas need: I have a PoC for this that does everything except handle nulls.
Remove/deprecate dead code
Originally range-based windows were only for timestamp order-by columns (hence
cudf::grouped_time_range_rolling_window
). But this (since 2021) has just called intocudf::grouped_range_rolling_window
, so we can probably deprecate it.Rationalise API
I think we want:
std::pair<std::unique_ptr<column>, std::unique_ptr<column>> cudf::compute_range_window_bounds(table_view const &group_keys, column_view &orderby, order column_order, null_order null_order, range_window const& preceding, std::optional<range_window> const& following, stream, mr)
std::unique_ptr<table> cudf::rolling_window(table_view const &group_keys, size_type preceding, size_type following, std::vector<size_type> min_periods, std::vector<RollingAggRequests>, stream, mr)
column_view
forpreceding
andfollowing
).I think we can build all the pieces we want out of these three things.
The text was updated successfully, but these errors were encountered: