Skip to content

Commit

Permalink
Simplify
Browse files Browse the repository at this point in the history
Co-authored-by: Felipe Oliveira Carvalho <felipekde@gmail.com>
  • Loading branch information
kou and felipecrv committed Jan 5, 2024
1 parent bb7c04e commit 8947d76
Showing 1 changed file with 15 additions and 22 deletions.
37 changes: 15 additions & 22 deletions cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -625,28 +625,20 @@ class MessageDecoder::MessageDecoderImpl {
case State::METADATA_LENGTH:
RETURN_NOT_OK(ConsumeMetadataLengthData(data, next_required_size_));
break;
case State::METADATA: {
// We need to copy metadata because it's used in
// ConsumeBody(). ConsumeBody() may be called from another
// ConsumeData(). We can't assume that the given data for
// the current ConsumeData() call is still valid in the
// next ConsumeData() call. So we need to copy metadata
// here.
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer,
AllocateBuffer(next_required_size_, pool_));
memcpy(buffer->mutable_data(), data, next_required_size_);
RETURN_NOT_OK(ConsumeMetadataBuffer(buffer));
} break;
case State::BODY: {
case State::METADATA:
RETURN_NOT_OK(ConsumeMetadataBuffer(
std::make_shared<Buffer>(data, next_required_size_)));
break;
case State::BODY:
// We don't need to copy the given data for body because
// we can assume that a decoded record batch should be
// valid only in a listener_->OnMessageDecoded() call. If
// the passed message is needed to be valid after the
// call, it's a listener_'s responsibility. The listener_
// may copy the data for it.
auto buffer = std::make_shared<Buffer>(data, next_required_size_);
RETURN_NOT_OK(ConsumeBodyBuffer(buffer));
} break;
RETURN_NOT_OK(
ConsumeBodyBuffer(std::make_shared<Buffer>(data, next_required_size_)));
break;
case State::EOS:
return Status::OK();
}
Expand Down Expand Up @@ -819,12 +811,13 @@ class MessageDecoder::MessageDecoderImpl {
}

Status ConsumeMetadataBuffer(const std::shared_ptr<Buffer>& buffer) {
if (buffer->is_cpu()) {
metadata_ = buffer;
} else {
ARROW_ASSIGN_OR_RAISE(metadata_,
Buffer::ViewOrCopy(buffer, CPUDevice::memory_manager(pool_)));
}
// We need to copy metadata because it's used in
// ConsumeBody(). ConsumeBody() may be called from another
// ConsumeData(). We can't assume that the given data for the
// current ConsumeData() call is still valid in the next
// ConsumeData() call. So we need to copy metadata here.
ARROW_ASSIGN_OR_RAISE(metadata_,
Buffer::ViewOrCopy(buffer, CPUDevice::memory_manager(pool_)));
return ConsumeMetadata();
}

Expand Down

0 comments on commit 8947d76

Please sign in to comment.