Skip to content

Commit

Permalink
Merge branch 'main' into parquet/adding-str-length-for-debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Feb 2, 2024
2 parents 5b531a6 + 129a529 commit e36adcb
Show file tree
Hide file tree
Showing 26 changed files with 630 additions and 183 deletions.
12 changes: 12 additions & 0 deletions cpp/src/arrow/array/array_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,18 @@ Result<std::shared_ptr<Array>> Array::View(
return MakeArray(result);
}

Result<std::shared_ptr<Array>> Array::CopyTo(
const std::shared_ptr<MemoryManager>& to) const {
ARROW_ASSIGN_OR_RAISE(auto copied_data, data()->CopyTo(to));
return MakeArray(copied_data);
}

Result<std::shared_ptr<Array>> Array::ViewOrCopyTo(
const std::shared_ptr<MemoryManager>& to) const {
ARROW_ASSIGN_OR_RAISE(auto new_data, data()->ViewOrCopyTo(to));
return MakeArray(new_data);
}

// ----------------------------------------------------------------------
// NullArray

Expand Down
16 changes: 16 additions & 0 deletions cpp/src/arrow/array/array_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,22 @@ class ARROW_EXPORT Array {
/// An error is returned if the types are not layout-compatible.
Result<std::shared_ptr<Array>> View(const std::shared_ptr<DataType>& type) const;

/// \brief Construct a copy of the array with all buffers on destination
/// Memory Manager
///
/// This method recursively copies the array's buffers and those of its children
/// onto the destination MemoryManager device and returns the new Array.
Result<std::shared_ptr<Array>> CopyTo(const std::shared_ptr<MemoryManager>& to) const;

/// \brief Construct a new array attempting to zero-copy view if possible.
///
/// Like CopyTo this method recursively goes through all of the array's buffers
/// and those of it's children and first attempts to create zero-copy
/// views on the destination MemoryManager device. If it can't, it falls back
/// to performing a copy. See Buffer::ViewOrCopy.
Result<std::shared_ptr<Array>> ViewOrCopyTo(
const std::shared_ptr<MemoryManager>& to) const;

/// Construct a zero-copy slice of the array with the indicated offset and
/// length
///
Expand Down
39 changes: 39 additions & 0 deletions cpp/src/arrow/array/data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "arrow/array/util.h"
#include "arrow/buffer.h"
#include "arrow/device.h"
#include "arrow/scalar.h"
#include "arrow/status.h"
#include "arrow/type.h"
Expand All @@ -36,6 +37,7 @@
#include "arrow/util/dict_util.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/range.h"
#include "arrow/util/ree_util.h"
#include "arrow/util/slice_util_internal.h"
#include "arrow/util/union_util.h"
Expand Down Expand Up @@ -140,6 +142,43 @@ std::shared_ptr<ArrayData> ArrayData::Make(std::shared_ptr<DataType> type, int64
return std::make_shared<ArrayData>(std::move(type), length, null_count, offset);
}

namespace {
template <typename Fn>
Result<std::shared_ptr<ArrayData>> CopyToImpl(const ArrayData& data,
const std::shared_ptr<MemoryManager>& to,
Fn&& copy_fn) {
auto output = ArrayData::Make(data.type, data.length, data.null_count, data.offset);
output->buffers.resize(data.buffers.size());
for (auto&& [buf, out_buf] : internal::Zip(data.buffers, output->buffers)) {
if (buf) {
ARROW_ASSIGN_OR_RAISE(out_buf, copy_fn(buf, to));
}
}

output->child_data.reserve(data.child_data.size());
for (const auto& child : data.child_data) {
ARROW_ASSIGN_OR_RAISE(auto copied, CopyToImpl(*child, to, copy_fn));
output->child_data.push_back(std::move(copied));
}

if (data.dictionary) {
ARROW_ASSIGN_OR_RAISE(output->dictionary, CopyToImpl(*data.dictionary, to, copy_fn));
}

return output;
}
} // namespace

Result<std::shared_ptr<ArrayData>> ArrayData::CopyTo(
const std::shared_ptr<MemoryManager>& to) const {
return CopyToImpl(*this, to, MemoryManager::CopyBuffer);
}

Result<std::shared_ptr<ArrayData>> ArrayData::ViewOrCopyTo(
const std::shared_ptr<MemoryManager>& to) const {
return CopyToImpl(*this, to, Buffer::ViewOrCopy);
}

std::shared_ptr<ArrayData> ArrayData::Slice(int64_t off, int64_t len) const {
ARROW_CHECK_LE(off, length) << "Slice offset (" << off
<< ") greater than array length (" << length << ")";
Expand Down
19 changes: 16 additions & 3 deletions cpp/src/arrow/array/data.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,14 @@
#include "arrow/buffer.h"
#include "arrow/result.h"
#include "arrow/type.h"
#include "arrow/type_fwd.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/macros.h"
#include "arrow/util/span.h"
#include "arrow/util/visibility.h"

namespace arrow {

class Array;
struct ArrayData;

namespace internal {
// ----------------------------------------------------------------------
// Null handling for types without a validity bitmap and the dictionary type
Expand Down Expand Up @@ -183,6 +181,21 @@ struct ARROW_EXPORT ArrayData {

std::shared_ptr<ArrayData> Copy() const { return std::make_shared<ArrayData>(*this); }

/// \brief Copy all buffers and children recursively to destination MemoryManager
///
/// This utilizes MemoryManager::CopyBuffer to create a new ArrayData object
/// recursively copying the buffers and all child buffers to the destination
/// memory manager. This includes dictionaries if applicable.
Result<std::shared_ptr<ArrayData>> CopyTo(
const std::shared_ptr<MemoryManager>& to) const;
/// \brief View or Copy this ArrayData to destination memory manager.
///
/// Tries to view the buffer contents on the given memory manager's device
/// if possible (to avoid a copy) but falls back to copying if a no-copy view
/// isn't supported.
Result<std::shared_ptr<ArrayData>> ViewOrCopyTo(
const std::shared_ptr<MemoryManager>& to) const;

bool IsNull(int64_t i) const { return !IsValid(i); }

bool IsValid(int64_t i) const {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ class ARROW_EXPORT Buffer {
static Result<std::shared_ptr<Buffer>> ViewOrCopy(
std::shared_ptr<Buffer> source, const std::shared_ptr<MemoryManager>& to);

virtual std::shared_ptr<Device::SyncEvent> device_sync_event() { return NULLPTR; }
virtual std::shared_ptr<Device::SyncEvent> device_sync_event() const { return NULLPTR; }

protected:
bool is_mutable_;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1466,7 +1466,7 @@ class ImportedBuffer : public Buffer {

~ImportedBuffer() override = default;

std::shared_ptr<Device::SyncEvent> device_sync_event() override {
std::shared_ptr<Device::SyncEvent> device_sync_event() const override {
return import_->device_sync_;
}

Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/c/bridge_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1282,7 +1282,9 @@ class MyBuffer final : public MutableBuffer {
default_memory_pool()->Free(const_cast<uint8_t*>(data_), size_);
}

std::shared_ptr<Device::SyncEvent> device_sync_event() override { return device_sync_; }
std::shared_ptr<Device::SyncEvent> device_sync_event() const override {
return device_sync_;
}

protected:
std::shared_ptr<Device::SyncEvent> device_sync_;
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/device.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
#include <cstring>
#include <utility>

#include "arrow/array.h"
#include "arrow/buffer.h"
#include "arrow/io/memory.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/util/logging.h"

Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/gpu/cuda_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,11 @@ Result<std::shared_ptr<Buffer>> CudaMemoryManager::CopyBufferTo(
Result<std::unique_ptr<Buffer>> CudaMemoryManager::CopyNonOwnedTo(
const Buffer& buf, const std::shared_ptr<MemoryManager>& to) {
if (to->is_cpu()) {
auto sync_event = buf.device_sync_event();
if (sync_event) {
RETURN_NOT_OK(sync_event->Wait());
}

// Device-to-CPU copy
std::unique_ptr<Buffer> dest;
ARROW_ASSIGN_OR_RAISE(auto from_context, cuda_device()->GetContext());
Expand Down
27 changes: 4 additions & 23 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1336,30 +1336,11 @@ class CopyCollectListener : public CollectListener {

Status OnRecordBatchWithMetadataDecoded(
RecordBatchWithMetadata record_batch_with_metadata) override {
auto& record_batch = record_batch_with_metadata.batch;
for (auto column_data : record_batch->column_data()) {
ARROW_RETURN_NOT_OK(CopyArrayData(column_data));
}
return CollectListener::OnRecordBatchWithMetadataDecoded(record_batch_with_metadata);
}
ARROW_ASSIGN_OR_RAISE(
record_batch_with_metadata.batch,
record_batch_with_metadata.batch->CopyTo(default_cpu_memory_manager()));

private:
Status CopyArrayData(std::shared_ptr<ArrayData> data) {
auto& buffers = data->buffers;
for (size_t i = 0; i < buffers.size(); ++i) {
auto& buffer = buffers[i];
if (!buffer) {
continue;
}
ARROW_ASSIGN_OR_RAISE(buffers[i], Buffer::Copy(buffer, buffer->memory_manager()));
}
for (auto child_data : data->child_data) {
ARROW_RETURN_NOT_OK(CopyArrayData(child_data));
}
if (data->dictionary) {
ARROW_RETURN_NOT_OK(CopyArrayData(data->dictionary));
}
return Status::OK();
return CollectListener::OnRecordBatchWithMetadataDecoded(record_batch_with_metadata);
}
};

Expand Down
24 changes: 24 additions & 0 deletions cpp/src/arrow/record_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,30 @@ Status ValidateBatch(const RecordBatch& batch, bool full_validation) {

} // namespace

Result<std::shared_ptr<RecordBatch>> RecordBatch::CopyTo(
const std::shared_ptr<MemoryManager>& to) const {
ArrayVector copied_columns;
copied_columns.reserve(num_columns());
for (const auto& col : columns()) {
ARROW_ASSIGN_OR_RAISE(auto c, col->CopyTo(to));
copied_columns.push_back(std::move(c));
}

return Make(schema_, num_rows(), std::move(copied_columns));
}

Result<std::shared_ptr<RecordBatch>> RecordBatch::ViewOrCopyTo(
const std::shared_ptr<MemoryManager>& to) const {
ArrayVector copied_columns;
copied_columns.reserve(num_columns());
for (const auto& col : columns()) {
ARROW_ASSIGN_OR_RAISE(auto c, col->ViewOrCopyTo(to));
copied_columns.push_back(std::move(c));
}

return Make(schema_, num_rows(), std::move(copied_columns));
}

Status RecordBatch::Validate() const {
return ValidateBatch(*this, /*full_validation=*/false);
}
Expand Down
19 changes: 19 additions & 0 deletions cpp/src/arrow/record_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,25 @@ class ARROW_EXPORT RecordBatch {
/// \return the number of rows (the corresponding length of each column)
int64_t num_rows() const { return num_rows_; }

/// \brief Copy the entire RecordBatch to destination MemoryManager
///
/// This uses Array::CopyTo on each column of the record batch to create
/// a new record batch where all underlying buffers for the columns have
/// been copied to the destination MemoryManager. This uses
/// MemoryManager::CopyBuffer under the hood.
Result<std::shared_ptr<RecordBatch>> CopyTo(
const std::shared_ptr<MemoryManager>& to) const;

/// \brief View or Copy the entire RecordBatch to destination MemoryManager
///
/// This uses Array::ViewOrCopyTo on each column of the record batch to create
/// a new record batch where all underlying buffers for the columns have
/// been zero-copy viewed on the destination MemoryManager, falling back
/// to performing a copy if it can't be viewed as a zero-copy buffer. This uses
/// Buffer::ViewOrCopy under the hood.
Result<std::shared_ptr<RecordBatch>> ViewOrCopyTo(
const std::shared_ptr<MemoryManager>& to) const;

/// \brief Slice each of the arrays in the record batch
/// \param[in] offset the starting offset to slice, through end of batch
/// \return new record batch
Expand Down
7 changes: 5 additions & 2 deletions dev/release/binary-task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,7 @@ def available_apt_targets
["ubuntu", "focal", "main"],
["ubuntu", "jammy", "main"],
["ubuntu", "mantic", "main"],
["ubuntu", "noble", "main"],
]
end

Expand Down Expand Up @@ -2121,8 +2122,10 @@ def apt_test_targets_default
# "ubuntu-focal-arm64",
"ubuntu-jammy",
# "ubuntu-jammy-arm64",
"ubuntu-lunar",
# "ubuntu-lunar-arm64",
"ubuntu-mantic",
# "ubuntu-mantic-arm64",
"ubuntu-noble",
# "ubuntu-noble-arm64",
]
end

Expand Down
4 changes: 3 additions & 1 deletion dev/release/verify-release-candidate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ test_apt() {
"ubuntu:jammy" \
"arm64v8/ubuntu:jammy" \
"ubuntu:mantic" \
"arm64v8/ubuntu:mantic"; do \
"arm64v8/ubuntu:mantic" \
"ubuntu:noble" \
"arm64v8/ubuntu:noble"; do \
case "${target}" in
arm64v8/*)
if [ "$(arch)" = "aarch64" -o -e /usr/bin/qemu-aarch64-static ]; then
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

FROM ubuntu:noble

RUN \
echo "debconf debconf/frontend select Noninteractive" | \
debconf-set-selections

RUN \
echo 'APT::Install-Recommends "false";' > \
/etc/apt/apt.conf.d/disable-install-recommends

ARG DEBUG

RUN \
quiet=$([ "${DEBUG}" = "yes" ] || echo "-qq") && \
apt update ${quiet} && \
apt install -y -V ${quiet} \
build-essential \
debhelper \
devscripts \
fakeroot \
gnupg \
lsb-release && \
apt clean && \
rm -rf /var/lib/apt/lists/*
18 changes: 18 additions & 0 deletions dev/tasks/linux-packages/apache-arrow/apt/ubuntu-noble-arm64/from
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

arm64v8/ubuntu:noble
Loading

0 comments on commit e36adcb

Please sign in to comment.