diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index c38c6fbd51c23..366420825d982 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -183,8 +183,8 @@ class GcsInputStream : public arrow::io::InputStream { class GcsOutputStream : public arrow::io::OutputStream { public: - explicit GcsOutputStream(gcs::ObjectWriteStream stream, io::IOContext context) - : stream_(std::move(stream)), io_context_(std::move(context)) {} + explicit GcsOutputStream(io::IOContext context, gcs::ObjectWriteStream stream) + : io_context_(std::move(context)), stream_(std::move(stream)) {} ~GcsOutputStream() override { if (!closed_) { // The common pattern is to close OutputStreams from destructor in arrow. @@ -196,6 +196,9 @@ class GcsOutputStream : public arrow::io::OutputStream { if (closed_) { return Status::OK(); } + if (future_.is_valid()) { + return future_.status(); + } stream_.Close(); closed_ = true; return internal::ToArrowStatus(stream_.last_status()); @@ -203,12 +206,18 @@ class GcsOutputStream : public arrow::io::OutputStream { Future<> CloseAsync() override { if (closed_) return Status::OK(); - + if (this->future_.is_valid()) { + return future_; + } auto self = std::dynamic_pointer_cast(shared_from_this()); - auto deferred = [self = std::move(self)]() -> Status { return self->Close(); }; - ARROW_ASSIGN_OR_RAISE(auto fut, + auto deferred = [self = std::move(self)]() -> Status { + self->stream_.Close(); + self->closed_ = true; + return internal::ToArrowStatus(self->stream_.last_status()); + }; + ARROW_ASSIGN_OR_RAISE(future_, io::internal::SubmitIO(io_context_, std::move(deferred))); - return fut; + return future_; } Result Tell() const override { @@ -225,7 +234,14 @@ class GcsOutputStream : public arrow::io::OutputStream { // "closed", // - uploads that prescribe their total size using the `x-upload-content-length` header // are completed and "closed" as soon as the upload reaches that size. - bool closed() const override { return closed_ || !stream_.IsOpen(); } + bool closed() const override { + if (future_.is_valid()) { + if (!future_.is_finished()) { + return false; + } + } + return closed_ || !stream_.IsOpen(); + } Status Write(const void* data, int64_t nbytes) override { if (closed()) return Status::Invalid("Cannot write to a closed stream"); @@ -243,8 +259,10 @@ class GcsOutputStream : public arrow::io::OutputStream { } private: - gcs::ObjectWriteStream stream_; const io::IOContext io_context_; + // future for the close operation + Future<> future_; + gcs::ObjectWriteStream stream_; int64_t tell_ = 0; bool closed_ = false; }; @@ -641,7 +659,7 @@ class GcsFileSystem::Impl { auto stream = client_.WriteObject(path.bucket, path.object, encryption_key, predefined_acl, kms_key_name, with_object_metadata); ARROW_GCS_RETURN_NOT_OK(stream.last_status()); - return std::make_shared(std::move(stream), io_context); + return std::make_shared(io_context, std::move(stream)); } google::cloud::StatusOr GetObjectMetadata(const GcsPath& path) {