Skip to content

Commit

Permalink
Revert changes for CollectListener
Browse files Browse the repository at this point in the history
Add CopyCollectListener() only for test because I'm not sure whether
the API is good or not.
  • Loading branch information
kou committed Jan 5, 2024
1 parent 8947d76 commit ea46f4f
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 44 deletions.
37 changes: 34 additions & 3 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1330,11 +1330,44 @@ struct StreamWriterHelper {
std::shared_ptr<RecordBatchWriter> writer_;
};

class CopyCollectListener : public CollectListener {
public:
CopyCollectListener() : CollectListener() {}

Status OnRecordBatchWithMetadataDecoded(
RecordBatchWithMetadata record_batch_with_metadata) override {
auto& record_batch = record_batch_with_metadata.batch;
for (auto column_data : record_batch->column_data()) {
ARROW_RETURN_NOT_OK(CopyArrayData(column_data));
}
return CollectListener::OnRecordBatchWithMetadataDecoded(record_batch_with_metadata);
}

private:
Status CopyArrayData(std::shared_ptr<ArrayData> data) {
auto& buffers = data->buffers;
for (size_t i = 0; i < buffers.size(); ++i) {
auto& buffer = buffers[i];
if (!buffer) {
continue;
}
ARROW_ASSIGN_OR_RAISE(buffers[i], Buffer::Copy(buffer, buffer->memory_manager()));
}
for (auto child_data : data->child_data) {
ARROW_RETURN_NOT_OK(CopyArrayData(child_data));
}
if (data->dictionary) {
ARROW_RETURN_NOT_OK(CopyArrayData(data->dictionary));
}
return Status::OK();
}
};

struct StreamDecoderWriterHelper : public StreamWriterHelper {
Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches,
ReadStats* out_stats = nullptr,
MetadataVector* out_metadata_list = nullptr) override {
auto listener = std::make_shared<CollectListener>(true);
auto listener = std::make_shared<CopyCollectListener>();
StreamDecoder decoder(listener, options);
RETURN_NOT_OK(DoConsume(&decoder));
*out_batches = listener->record_batches();
Expand Down Expand Up @@ -2177,7 +2210,6 @@ TEST(TestRecordBatchStreamReader, MalformedInput) {
ASSERT_RAISES(Invalid, RecordBatchStreamReader::Open(&garbage_reader));
}

namespace {
class EndlessCollectListener : public CollectListener {
public:
EndlessCollectListener() : CollectListener(), decoder_(nullptr) {}
Expand All @@ -2189,7 +2221,6 @@ class EndlessCollectListener : public CollectListener {
private:
StreamDecoder* decoder_;
};
}; // namespace

TEST(TestStreamDecoder, Reset) {
auto listener = std::make_shared<EndlessCollectListener>();
Expand Down
33 changes: 0 additions & 33 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2052,39 +2052,6 @@ Status Listener::OnRecordBatchWithMetadataDecoded(
return OnRecordBatchDecoded(std::move(record_batch_with_metadata.batch));
}

namespace {
Status CopyArrayData(std::shared_ptr<ArrayData> data) {
auto& buffers = data->buffers;
for (size_t i = 0; i < buffers.size(); ++i) {
auto& buffer = buffers[i];
if (!buffer) {
continue;
}
ARROW_ASSIGN_OR_RAISE(buffers[i], Buffer::Copy(buffer, buffer->memory_manager()));
}
for (auto child_data : data->child_data) {
ARROW_RETURN_NOT_OK(CopyArrayData(child_data));
}
if (data->dictionary) {
ARROW_RETURN_NOT_OK(CopyArrayData(data->dictionary));
}
return Status::OK();
}
}; // namespace

Status CollectListener::OnRecordBatchWithMetadataDecoded(
RecordBatchWithMetadata record_batch_with_metadata) {
auto record_batch = std::move(record_batch_with_metadata.batch);
if (copy_record_batch_) {
for (auto column_data : record_batch->column_data()) {
ARROW_RETURN_NOT_OK(CopyArrayData(column_data));
}
}
record_batches_.push_back(std::move(record_batch));
metadatas_.push_back(std::move(record_batch_with_metadata.custom_metadata));
return Status::OK();
}

class StreamDecoder::StreamDecoderImpl : public StreamDecoderInternal {
public:
explicit StreamDecoderImpl(std::shared_ptr<Listener> listener, IpcReadOptions options)
Expand Down
14 changes: 6 additions & 8 deletions cpp/src/arrow/ipc/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,7 @@ class ARROW_EXPORT Listener {
/// \since 0.17.0
class ARROW_EXPORT CollectListener : public Listener {
public:
explicit CollectListener(bool copy_record_batch = false)
: copy_record_batch_(copy_record_batch),
schema_(),
filtered_schema_(),
record_batches_(),
metadatas_() {}
CollectListener() : schema_(), filtered_schema_(), record_batches_(), metadatas_() {}
virtual ~CollectListener() = default;

Status OnSchemaDecoded(std::shared_ptr<Schema> schema,
Expand All @@ -333,7 +328,11 @@ class ARROW_EXPORT CollectListener : public Listener {
}

Status OnRecordBatchWithMetadataDecoded(
RecordBatchWithMetadata record_batch_with_metadata) override;
RecordBatchWithMetadata record_batch_with_metadata) override {
record_batches_.push_back(std::move(record_batch_with_metadata.batch));
metadatas_.push_back(std::move(record_batch_with_metadata.custom_metadata));
return Status::OK();
}

/// \return the decoded schema
std::shared_ptr<Schema> schema() const { return schema_; }
Expand Down Expand Up @@ -376,7 +375,6 @@ class ARROW_EXPORT CollectListener : public Listener {
}

private:
bool copy_record_batch_;
std::shared_ptr<Schema> schema_;
std::shared_ptr<Schema> filtered_schema_;
std::vector<std::shared_ptr<RecordBatch>> record_batches_;
Expand Down

0 comments on commit ea46f4f

Please sign in to comment.