Skip to content

Commit

Permalink
Naive advance impl
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Jul 24, 2024
1 parent f54b120 commit 39d09ac
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 1 deletion.
28 changes: 28 additions & 0 deletions cpp/src/arrow/io/buffered.cc
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,32 @@ class BufferedInputStream::Impl : public BufferedBase {
return std::shared_ptr<Buffer>(std::move(buffer));
}

Status Advance(int64_t nbytes) {
if (nbytes < 0) {
return Status::Invalid("Bytes to advance must be non-negative. Received:", nbytes);
}
if (nbytes == 0) {
return Status::OK();
}

if (nbytes < bytes_buffered_) {
ConsumeBuffer(nbytes);
return Status::OK();
}

// Invalidate buffered data, as with a Seek or large Read
int64_t remain_skip_bytes = nbytes - bytes_buffered_;
RewindBuffer();
// TODO(mwish): Considering using raw_->Advance if available,
// currently we don't have a way to know if the underlying stream supports fast
// skipping. So we just read and discard the data.
auto result = Read(remain_skip_bytes);
if (!result.ok()) {
return result.status();
}
return Status::OK();
}

// For providing access to the raw file handles
std::shared_ptr<InputStream> raw() const { return raw_; }

Expand Down Expand Up @@ -498,6 +524,8 @@ Result<std::shared_ptr<Buffer>> BufferedInputStream::DoRead(int64_t nbytes) {
return impl_->Read(nbytes);
}

Status BufferedInputStream::DoAdvance(int64_t nbytes) { return impl_->Advance(nbytes); }

Result<std::shared_ptr<const KeyValueMetadata>> BufferedInputStream::ReadMetadata() {
return impl_->raw()->ReadMetadata();
}
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/io/buffered.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ class ARROW_EXPORT BufferedInputStream
/// expands the buffer size if necessary
Result<std::string_view> DoPeek(int64_t nbytes) override;

/// \brief Advance the position of the stream by nbytes.
Status DoAdvance(int64_t nbytes) override;

class ARROW_NO_EXPORT Impl;
std::unique_ptr<Impl> impl_;
};
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/io/concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ class ARROW_EXPORT InputStreamConcurrencyWrapper : public InputStream {
return derived()->DoPeek(nbytes);
}

Status Advance(int64_t nbytes) override {
auto guard = lock_.exclusive_guard();
return derived()->DoAdvance(nbytes);
}

/*
Methods to implement in derived class:
Expand All @@ -132,6 +137,7 @@ class ARROW_EXPORT InputStreamConcurrencyWrapper : public InputStream {
And optionally:
Status DoAbort() override;
Status DoAdvance(int64_t nbytes) override;
Result<std::string_view> DoPeek(int64_t nbytes) override;
These methods should be protected in the derived class and
Expand All @@ -145,6 +151,8 @@ class ARROW_EXPORT InputStreamConcurrencyWrapper : public InputStream {
// have derived classes itself.
virtual Status DoAbort() { return derived()->DoClose(); }

virtual Status DoAdvance(int64_t nbytes) { return derived()->Advance(nbytes); }

virtual Result<std::string_view> DoPeek(int64_t ARROW_ARG_UNUSED(nbytes)) {
return Status::NotImplemented("Peek not implemented");
}
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,13 @@ class FileSegmentReader
return buffer;
}

Status DoAdvance(int64_t nbytes) override {
RETURN_NOT_OK(CheckOpen());
int64_t bytes_to_skip = std::min(nbytes, nbytes_ - position_);
position_ += bytes_to_skip;
return Status::OK();
}

private:
std::shared_ptr<RandomAccessFile> file_;
bool closed_;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class ARROW_EXPORT InputStream : virtual public FileInterface, virtual public Re
/// \brief Advance or skip stream indicated number of bytes
/// \param[in] nbytes the number to move forward
/// \return Status
Status Advance(int64_t nbytes);
virtual Status Advance(int64_t nbytes);

/// \brief Return zero-copy string_view to upcoming bytes.
///
Expand Down

0 comments on commit 39d09ac

Please sign in to comment.