Skip to content

Commit

Permalink
resolve some comment
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Apr 17, 2024
1 parent 60f8fc3 commit d7afd5b
Showing 1 changed file with 27 additions and 9 deletions.
36 changes: 27 additions & 9 deletions cpp/src/arrow/filesystem/gcsfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ class GcsInputStream : public arrow::io::InputStream {

class GcsOutputStream : public arrow::io::OutputStream {
public:
explicit GcsOutputStream(gcs::ObjectWriteStream stream, io::IOContext context)
: stream_(std::move(stream)), io_context_(std::move(context)) {}
explicit GcsOutputStream(io::IOContext context, gcs::ObjectWriteStream stream)
: io_context_(std::move(context)), stream_(std::move(stream)) {}
~GcsOutputStream() override {
if (!closed_) {
// The common pattern is to close OutputStreams from destructor in arrow.
Expand All @@ -196,19 +196,28 @@ class GcsOutputStream : public arrow::io::OutputStream {
if (closed_) {
return Status::OK();
}
if (future_.is_valid()) {
return future_.status();
}
stream_.Close();
closed_ = true;
return internal::ToArrowStatus(stream_.last_status());
}

Future<> CloseAsync() override {
if (closed_) return Status::OK();

if (this->future_.is_valid()) {
return future_;
}
auto self = std::dynamic_pointer_cast<GcsOutputStream>(shared_from_this());
auto deferred = [self = std::move(self)]() -> Status { return self->Close(); };
ARROW_ASSIGN_OR_RAISE(auto fut,
auto deferred = [self = std::move(self)]() -> Status {
self->stream_.Close();
self->closed_ = true;
return internal::ToArrowStatus(self->stream_.last_status());
};
ARROW_ASSIGN_OR_RAISE(future_,
io::internal::SubmitIO(io_context_, std::move(deferred)));
return fut;
return future_;
}

Result<int64_t> Tell() const override {
Expand All @@ -225,7 +234,14 @@ class GcsOutputStream : public arrow::io::OutputStream {
// "closed",
// - uploads that prescribe their total size using the `x-upload-content-length` header
// are completed and "closed" as soon as the upload reaches that size.
bool closed() const override { return closed_ || !stream_.IsOpen(); }
bool closed() const override {
if (future_.is_valid()) {
if (!future_.is_finished()) {
return false;
}
}
return closed_ || !stream_.IsOpen();
}

Status Write(const void* data, int64_t nbytes) override {
if (closed()) return Status::Invalid("Cannot write to a closed stream");
Expand All @@ -243,8 +259,10 @@ class GcsOutputStream : public arrow::io::OutputStream {
}

private:
gcs::ObjectWriteStream stream_;
const io::IOContext io_context_;
// future for the close operation
Future<> future_;
gcs::ObjectWriteStream stream_;
int64_t tell_ = 0;
bool closed_ = false;
};
Expand Down Expand Up @@ -641,7 +659,7 @@ class GcsFileSystem::Impl {
auto stream = client_.WriteObject(path.bucket, path.object, encryption_key,
predefined_acl, kms_key_name, with_object_metadata);
ARROW_GCS_RETURN_NOT_OK(stream.last_status());
return std::make_shared<GcsOutputStream>(std::move(stream), io_context);
return std::make_shared<GcsOutputStream>(io_context, std::move(stream));
}

google::cloud::StatusOr<gcs::ObjectMetadata> GetObjectMetadata(const GcsPath& path) {
Expand Down

0 comments on commit d7afd5b

Please sign in to comment.