diff --git a/.asf.yaml b/.asf.yaml index 760a830ef98c7..1eb019fea9af1 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -26,6 +26,8 @@ github: - jbonofre - js8544 - vibhatha + - zanmato1984 + - ZhangHuiGui notifications: commits: commits@arrow.apache.org diff --git a/cpp/src/arrow/compute/CMakeLists.txt b/cpp/src/arrow/compute/CMakeLists.txt index badcf4f2f26ac..fb778be113029 100644 --- a/cpp/src/arrow/compute/CMakeLists.txt +++ b/cpp/src/arrow/compute/CMakeLists.txt @@ -90,7 +90,8 @@ add_arrow_test(internals_test light_array_test.cc registry_test.cc key_hash_test.cc - row/compare_test.cc) + row/compare_test.cc + row/grouper_test.cc) add_arrow_compute_test(expression_test SOURCES expression_test.cc) diff --git a/cpp/src/arrow/compute/row/compare_internal.cc b/cpp/src/arrow/compute/row/compare_internal.cc index 078a8287c71c0..98aea9011266c 100644 --- a/cpp/src/arrow/compute/row/compare_internal.cc +++ b/cpp/src/arrow/compute/row/compare_internal.cc @@ -36,22 +36,22 @@ void KeyCompare::NullUpdateColumnToRow(uint32_t id_col, uint32_t num_rows_to_com const uint32_t* left_to_right_map, LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows, - uint8_t* match_bytevector, - bool are_cols_in_encoding_order) { + bool are_cols_in_encoding_order, + uint8_t* match_bytevector) { if (!rows.has_any_nulls(ctx) && !col.data(0)) { return; } uint32_t num_processed = 0; #if defined(ARROW_HAVE_RUNTIME_AVX2) if (ctx->has_avx2()) { - num_processed = NullUpdateColumnToRow_avx2(use_selection, id_col, num_rows_to_compare, - sel_left_maybe_null, left_to_right_map, - ctx, col, rows, match_bytevector); + num_processed = NullUpdateColumnToRow_avx2( + use_selection, id_col, num_rows_to_compare, sel_left_maybe_null, + left_to_right_map, ctx, col, rows, are_cols_in_encoding_order, match_bytevector); } #endif - uint32_t null_bit_id = - are_cols_in_encoding_order ? id_col : rows.metadata().pos_after_encoding(id_col); + const uint32_t null_bit_id = + ColIdInEncodingOrder(rows, id_col, are_cols_in_encoding_order); if (!col.data(0)) { // Remove rows from the result for which the column value is a null @@ -363,10 +363,9 @@ void KeyCompare::CompareColumnsToRows( continue; } - uint32_t offset_within_row = rows.metadata().encoded_field_offset( - are_cols_in_encoding_order - ? static_cast(icol) - : rows.metadata().pos_after_encoding(static_cast(icol))); + uint32_t offset_within_row = + rows.metadata().encoded_field_offset(ColIdInEncodingOrder( + rows, static_cast(icol), are_cols_in_encoding_order)); if (col.metadata().is_fixed_length) { if (sel_left_maybe_null) { CompareBinaryColumnToRow( @@ -375,9 +374,8 @@ void KeyCompare::CompareColumnsToRows( is_first_column ? match_bytevector_A : match_bytevector_B); NullUpdateColumnToRow( static_cast(icol), num_rows_to_compare, sel_left_maybe_null, - left_to_right_map, ctx, col, rows, - is_first_column ? match_bytevector_A : match_bytevector_B, - are_cols_in_encoding_order); + left_to_right_map, ctx, col, rows, are_cols_in_encoding_order, + is_first_column ? match_bytevector_A : match_bytevector_B); } else { // Version without using selection vector CompareBinaryColumnToRow( @@ -386,9 +384,8 @@ void KeyCompare::CompareColumnsToRows( is_first_column ? match_bytevector_A : match_bytevector_B); NullUpdateColumnToRow( static_cast(icol), num_rows_to_compare, sel_left_maybe_null, - left_to_right_map, ctx, col, rows, - is_first_column ? match_bytevector_A : match_bytevector_B, - are_cols_in_encoding_order); + left_to_right_map, ctx, col, rows, are_cols_in_encoding_order, + is_first_column ? match_bytevector_A : match_bytevector_B); } if (!is_first_column) { AndByteVectors(ctx, num_rows_to_compare, match_bytevector_A, match_bytevector_B); @@ -414,9 +411,8 @@ void KeyCompare::CompareColumnsToRows( } NullUpdateColumnToRow( static_cast(icol), num_rows_to_compare, sel_left_maybe_null, - left_to_right_map, ctx, col, rows, - is_first_column ? match_bytevector_A : match_bytevector_B, - are_cols_in_encoding_order); + left_to_right_map, ctx, col, rows, are_cols_in_encoding_order, + is_first_column ? match_bytevector_A : match_bytevector_B); } else { if (ivarbinary == 0) { CompareVarBinaryColumnToRow( @@ -429,9 +425,8 @@ void KeyCompare::CompareColumnsToRows( } NullUpdateColumnToRow( static_cast(icol), num_rows_to_compare, sel_left_maybe_null, - left_to_right_map, ctx, col, rows, - is_first_column ? match_bytevector_A : match_bytevector_B, - are_cols_in_encoding_order); + left_to_right_map, ctx, col, rows, are_cols_in_encoding_order, + is_first_column ? match_bytevector_A : match_bytevector_B); } if (!is_first_column) { AndByteVectors(ctx, num_rows_to_compare, match_bytevector_A, match_bytevector_B); diff --git a/cpp/src/arrow/compute/row/compare_internal.h b/cpp/src/arrow/compute/row/compare_internal.h index b039ca97ff978..16002ee5184e9 100644 --- a/cpp/src/arrow/compute/row/compare_internal.h +++ b/cpp/src/arrow/compute/row/compare_internal.h @@ -43,13 +43,19 @@ class ARROW_EXPORT KeyCompare { uint8_t* out_match_bitvector_maybe_null = NULLPTR); private: + static uint32_t ColIdInEncodingOrder(const RowTableImpl& rows, uint32_t id_col, + bool are_cols_in_encoding_order) { + return are_cols_in_encoding_order ? id_col + : rows.metadata().pos_after_encoding(id_col); + } + template static void NullUpdateColumnToRow(uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows, - uint8_t* match_bytevector, - bool are_cols_in_encoding_order); + bool are_cols_in_encoding_order, + uint8_t* match_bytevector); template static void CompareBinaryColumnToRowHelper( @@ -92,7 +98,8 @@ class ARROW_EXPORT KeyCompare { static uint32_t NullUpdateColumnToRowImp_avx2( uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, LightContext* ctx, const KeyColumnArray& col, - const RowTableImpl& rows, uint8_t* match_bytevector); + const RowTableImpl& rows, bool are_cols_in_encoding_order, + uint8_t* match_bytevector); template static uint32_t CompareBinaryColumnToRowHelper_avx2( @@ -118,13 +125,11 @@ class ARROW_EXPORT KeyCompare { static uint32_t AndByteVectors_avx2(uint32_t num_elements, uint8_t* bytevector_A, const uint8_t* bytevector_B); - static uint32_t NullUpdateColumnToRow_avx2(bool use_selection, uint32_t id_col, - uint32_t num_rows_to_compare, - const uint16_t* sel_left_maybe_null, - const uint32_t* left_to_right_map, - LightContext* ctx, const KeyColumnArray& col, - const RowTableImpl& rows, - uint8_t* match_bytevector); + static uint32_t NullUpdateColumnToRow_avx2( + bool use_selection, uint32_t id_col, uint32_t num_rows_to_compare, + const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, + LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows, + bool are_cols_in_encoding_order, uint8_t* match_bytevector); static uint32_t CompareBinaryColumnToRow_avx2( bool use_selection, uint32_t offset_within_row, uint32_t num_rows_to_compare, diff --git a/cpp/src/arrow/compute/row/compare_internal_avx2.cc b/cpp/src/arrow/compute/row/compare_internal_avx2.cc index ff407c51b83cb..18f656a2e458d 100644 --- a/cpp/src/arrow/compute/row/compare_internal_avx2.cc +++ b/cpp/src/arrow/compute/row/compare_internal_avx2.cc @@ -39,12 +39,14 @@ template uint32_t KeyCompare::NullUpdateColumnToRowImp_avx2( uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, LightContext* ctx, const KeyColumnArray& col, - const RowTableImpl& rows, uint8_t* match_bytevector) { + const RowTableImpl& rows, bool are_cols_in_encoding_order, + uint8_t* match_bytevector) { if (!rows.has_any_nulls(ctx) && !col.data(0)) { return num_rows_to_compare; } - uint32_t null_bit_id = rows.metadata().pos_after_encoding(id_col); + const uint32_t null_bit_id = + ColIdInEncodingOrder(rows, id_col, are_cols_in_encoding_order); if (!col.data(0)) { // Remove rows from the result for which the column value is a null @@ -569,7 +571,7 @@ uint32_t KeyCompare::NullUpdateColumnToRow_avx2( bool use_selection, uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map, LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows, - uint8_t* match_bytevector) { + bool are_cols_in_encoding_order, uint8_t* match_bytevector) { int64_t num_rows_safe = TailSkipForSIMD::FixBitAccess(sizeof(uint32_t), col.length(), col.bit_offset(0)); if (sel_left_maybe_null) { @@ -580,13 +582,13 @@ uint32_t KeyCompare::NullUpdateColumnToRow_avx2( } if (use_selection) { - return NullUpdateColumnToRowImp_avx2(id_col, num_rows_to_compare, - sel_left_maybe_null, left_to_right_map, - ctx, col, rows, match_bytevector); + return NullUpdateColumnToRowImp_avx2( + id_col, num_rows_to_compare, sel_left_maybe_null, left_to_right_map, ctx, col, + rows, are_cols_in_encoding_order, match_bytevector); } else { - return NullUpdateColumnToRowImp_avx2(id_col, num_rows_to_compare, - sel_left_maybe_null, left_to_right_map, - ctx, col, rows, match_bytevector); + return NullUpdateColumnToRowImp_avx2( + id_col, num_rows_to_compare, sel_left_maybe_null, left_to_right_map, ctx, col, + rows, are_cols_in_encoding_order, match_bytevector); } } diff --git a/cpp/src/arrow/compute/row/grouper_test.cc b/cpp/src/arrow/compute/row/grouper_test.cc new file mode 100644 index 0000000000000..1e853be5e4af7 --- /dev/null +++ b/cpp/src/arrow/compute/row/grouper_test.cc @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "arrow/compute/exec.h" +#include "arrow/compute/row/grouper.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" + +namespace arrow { +namespace compute { + +// Specialized case for GH-40997 +TEST(Grouper, ResortedColumnsWithLargeNullRows) { + const uint64_t num_rows = 1024; + + // construct random array with plenty of null values + const int32_t kSeed = 42; + const int32_t min = 0; + const int32_t max = 100; + const double null_probability = 0.3; + const double true_probability = 0.5; + auto rng = random::RandomArrayGenerator(kSeed); + auto b_arr = rng.Boolean(num_rows, true_probability, null_probability); + auto i32_arr = rng.Int32(num_rows, min, max, null_probability); + auto i64_arr = rng.Int64(num_rows, min, max * 10, null_probability); + + // construct batches with columns which will be resorted in the grouper make + std::vector exec_batches = {ExecBatch({i64_arr, i32_arr, b_arr}, num_rows), + ExecBatch({i32_arr, i64_arr, b_arr}, num_rows), + ExecBatch({i64_arr, b_arr, i32_arr}, num_rows), + ExecBatch({i32_arr, b_arr, i64_arr}, num_rows), + ExecBatch({b_arr, i32_arr, i64_arr}, num_rows), + ExecBatch({b_arr, i64_arr, i32_arr}, num_rows)}; + + const int num_batches = static_cast(exec_batches.size()); + std::vector group_num_vec; + group_num_vec.reserve(num_batches); + + for (const auto& exec_batch : exec_batches) { + ExecSpan span(exec_batch); + ASSERT_OK_AND_ASSIGN(auto grouper, Grouper::Make(span.GetTypes())); + ASSERT_OK_AND_ASSIGN(Datum group_ids, grouper->Consume(span)); + group_num_vec.emplace_back(grouper->num_groups()); + } + + for (int i = 1; i < num_batches; i++) { + ASSERT_EQ(group_num_vec[i - 1], group_num_vec[i]); + } +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/row/row_internal.cc b/cpp/src/arrow/compute/row/row_internal.cc index f6a62c09fcf24..469205e9b008d 100644 --- a/cpp/src/arrow/compute/row/row_internal.cc +++ b/cpp/src/arrow/compute/row/row_internal.cc @@ -66,7 +66,8 @@ void RowTableMetadata::FromColumnMetadataVector( // // Columns are sorted based on the size in bytes of their fixed-length part. // For the varying-length column, the fixed-length part is the 32-bit field storing - // cumulative length of varying-length fields. + // cumulative length of varying-length fields. This is to make the memory access of + // each individual column within the encoded row alignment-friendly. // // The rules are: // diff --git a/dev/tasks/tasks.yml b/dev/tasks/tasks.yml index 52a235c688eda..126b0fcb6f76a 100644 --- a/dev/tasks/tasks.yml +++ b/dev/tasks/tasks.yml @@ -409,7 +409,7 @@ tasks: arrow_jemalloc: "ON" python_version: "{{ python_version }}" macos_deployment_target: "{{ macos_version }}" - runs_on: "macos-latest" + runs_on: "macos-13" vcpkg_arch: "amd64" artifacts: - pyarrow-{no_rc_version}-{{ python_tag }}-{{ abi_tag }}-{{ platform_tag }}.whl diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 6a11b19ffcdf5..946c82b258241 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -3984,7 +3984,7 @@ cdef class RunEndEncodedArray(Array): ------- RunEndEncodedArray """ - logical_length = run_ends[-1] if len(run_ends) > 0 else 0 + logical_length = scalar(run_ends[-1]).as_py() if len(run_ends) > 0 else 0 return RunEndEncodedArray._from_arrays(type, True, logical_length, run_ends, values, 0) diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py index 6a190957879d3..3754daeb9b4bd 100644 --- a/python/pyarrow/tests/test_array.py +++ b/python/pyarrow/tests/test_array.py @@ -3578,12 +3578,23 @@ def check_run_end_encoded_from_arrays_with_type(ree_type=None): check_run_end_encoded(ree_array, run_ends, values, 19, 4, 0) +def check_run_end_encoded_from_typed_arrays(ree_type): + run_ends = [3, 5, 10, 19] + values = [1, 2, 1, 3] + typed_run_ends = pa.array(run_ends, ree_type.run_end_type) + typed_values = pa.array(values, ree_type.value_type) + ree_array = pa.RunEndEncodedArray.from_arrays(typed_run_ends, typed_values) + assert ree_array.type == ree_type + check_run_end_encoded(ree_array, run_ends, values, 19, 4, 0) + + def test_run_end_encoded_from_arrays(): check_run_end_encoded_from_arrays_with_type() for run_end_type in [pa.int16(), pa.int32(), pa.int64()]: for value_type in [pa.uint32(), pa.int32(), pa.uint64(), pa.int64()]: ree_type = pa.run_end_encoded(run_end_type, value_type) check_run_end_encoded_from_arrays_with_type(ree_type) + check_run_end_encoded_from_typed_arrays(ree_type) def test_run_end_encoded_from_buffers(): diff --git a/r/R/dplyr-mutate.R b/r/R/dplyr-mutate.R index 880f7799e6316..72882b6afd964 100644 --- a/r/R/dplyr-mutate.R +++ b/r/R/dplyr-mutate.R @@ -84,12 +84,12 @@ mutate.arrow_dplyr_query <- function(.data, agg_query$aggregations <- mask$.aggregations agg_query <- collapse.arrow_dplyr_query(agg_query) if (length(grv)) { - out <- left_join(out, agg_query, by = grv) + out <- dplyr::left_join(out, agg_query, by = grv) } else { # If there are no group_by vars, add a scalar column to both and join on that agg_query$selected_columns[["..tempjoin"]] <- Expression$scalar(1L) out$selected_columns[["..tempjoin"]] <- Expression$scalar(1L) - out <- left_join(out, agg_query, by = "..tempjoin") + out <- dplyr::left_join(out, agg_query, by = "..tempjoin") } }