diff --git a/cpp/src/arrow/io/buffered.cc b/cpp/src/arrow/io/buffered.cc index b41e4257af215..909c33f5d7037 100644 --- a/cpp/src/arrow/io/buffered.cc +++ b/cpp/src/arrow/io/buffered.cc @@ -434,6 +434,32 @@ class BufferedInputStream::Impl : public BufferedBase { return std::shared_ptr(std::move(buffer)); } + Status Advance(int64_t nbytes) { + if (nbytes < 0) { + return Status::Invalid("Bytes to advance must be non-negative. Received:", nbytes); + } + if (nbytes == 0) { + return Status::OK(); + } + + if (nbytes < bytes_buffered_) { + ConsumeBuffer(nbytes); + return Status::OK(); + } + + // Invalidate buffered data, as with a Seek or large Read + int64_t remain_skip_bytes = nbytes - bytes_buffered_; + RewindBuffer(); + // TODO(mwish): Considering using raw_->Advance if available, + // currently we don't have a way to know if the underlying stream supports fast + // skipping. So we just read and discard the data. + auto result = Read(remain_skip_bytes); + if (!result.ok()) { + return result.status(); + } + return Status::OK(); + } + // For providing access to the raw file handles std::shared_ptr raw() const { return raw_; } @@ -498,6 +524,8 @@ Result> BufferedInputStream::DoRead(int64_t nbytes) { return impl_->Read(nbytes); } +Status BufferedInputStream::DoAdvance(int64_t nbytes) { return impl_->Advance(nbytes); } + Result> BufferedInputStream::ReadMetadata() { return impl_->raw()->ReadMetadata(); } diff --git a/cpp/src/arrow/io/buffered.h b/cpp/src/arrow/io/buffered.h index 01c0a016daba0..6030d166d43c3 100644 --- a/cpp/src/arrow/io/buffered.h +++ b/cpp/src/arrow/io/buffered.h @@ -159,6 +159,9 @@ class ARROW_EXPORT BufferedInputStream /// expands the buffer size if necessary Result DoPeek(int64_t nbytes) override; + /// \brief Advance the position of the stream by nbytes. + Status DoAdvance(int64_t nbytes) override; + class ARROW_NO_EXPORT Impl; std::unique_ptr impl_; }; diff --git a/cpp/src/arrow/io/concurrency.h b/cpp/src/arrow/io/concurrency.h index 43ceb8debcecb..3078862fb4cc7 100644 --- a/cpp/src/arrow/io/concurrency.h +++ b/cpp/src/arrow/io/concurrency.h @@ -121,6 +121,11 @@ class ARROW_EXPORT InputStreamConcurrencyWrapper : public InputStream { return derived()->DoPeek(nbytes); } + Status Advance(int64_t nbytes) override { + auto guard = lock_.exclusive_guard(); + return derived()->DoAdvance(nbytes); + } + /* Methods to implement in derived class: @@ -132,6 +137,7 @@ class ARROW_EXPORT InputStreamConcurrencyWrapper : public InputStream { And optionally: Status DoAbort() override; + Status DoAdvance(int64_t nbytes) override; Result DoPeek(int64_t nbytes) override; These methods should be protected in the derived class and @@ -145,6 +151,8 @@ class ARROW_EXPORT InputStreamConcurrencyWrapper : public InputStream { // have derived classes itself. virtual Status DoAbort() { return derived()->DoClose(); } + virtual Status DoAdvance(int64_t nbytes) { return derived()->Advance(nbytes); } + virtual Result DoPeek(int64_t ARROW_ARG_UNUSED(nbytes)) { return Status::NotImplemented("Peek not implemented"); } diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc index 1d35549cc4345..05ec6cdd8d819 100644 --- a/cpp/src/arrow/io/interfaces.cc +++ b/cpp/src/arrow/io/interfaces.cc @@ -256,6 +256,13 @@ class FileSegmentReader return buffer; } + Status DoAdvance(int64_t nbytes) override { + RETURN_NOT_OK(CheckOpen()); + int64_t bytes_to_skip = std::min(nbytes, nbytes_ - position_); + position_ += bytes_to_skip; + return Status::OK(); + } + private: std::shared_ptr file_; bool closed_; diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index b36c38c6d4868..f974a33073fcc 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -210,7 +210,7 @@ class ARROW_EXPORT InputStream : virtual public FileInterface, virtual public Re /// \brief Advance or skip stream indicated number of bytes /// \param[in] nbytes the number to move forward /// \return Status - Status Advance(int64_t nbytes); + virtual Status Advance(int64_t nbytes); /// \brief Return zero-copy string_view to upcoming bytes. ///