Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Finish with GCS::ListBuckets #292

Merged
merged 1 commit into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions examples/gcs_demo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ void Run(SSL_CTX* ctx) {
cloud::GCS gcs(&provider, ctx, pb);
ec = gcs.Connect(connect_ms);
CHECK(!ec) << "Could not connect " << ec;
auto res = gcs.ListBuckets();
CHECK(res) << res.error().message();
for (auto v : *res) {
CONSOLE_INFO << v;
}
auto cb = [](std::string_view bname) {
CONSOLE_INFO << bname;
};

ec = gcs.ListBuckets(cb);
CHECK(!ec) << ec.message();
}

int main(int argc, char** argv) {
Expand Down
115 changes: 81 additions & 34 deletions util/cloud/gcp/gcs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ auto Unexpected(std::errc code) {
return nonstd::make_unexpected(make_error_code(code));
}

#define RETURN_UNEXPECTED(x) do { \
auto ec = (x); \
if (ec) \
return nonstd::make_unexpected(ec); \
} while(false)

#define RETURN_ERROR(x) do { \
auto ec = (x); \
if (ec) \
return ec; \
} while (false)

string AuthHeader(string_view access_token) {
return absl::StrCat("Bearer ", access_token);
}
Expand Down Expand Up @@ -162,35 +174,34 @@ io::Result<TokenTtl> ParseTokenResponse(std::string&& response) {
return result;
}

template <typename RespBody>
error_code SendWithToken(GCPCredsProvider* provider, http::Client* client, EmptyRequest* req, h2::response<RespBody>* resp) {
using EmptyParserPtr = std::unique_ptr<h2::response_parser<h2::empty_body>>;
io::Result<EmptyParserPtr> SendWithToken(GCPCredsProvider* provider, http::Client* client,
EmptyRequest* req) {
error_code ec;
for (unsigned i = 0; i < 2; ++i) { // Iterate for possible token refresh.
VLOG(1) << "HttpReq" << i << ": " << *req << ", socket " << client->native_handle();

error_code ec = client->Send(*req, resp);
if (ec) {
return ec;
}
VLOG(1) << "HttpResp" << i << ": " << *resp;
RETURN_UNEXPECTED(client->Send(*req));
EmptyParserPtr parser(new h2::response_parser<h2::empty_body>());
RETURN_UNEXPECTED(client->ReadHeader(parser.get()));

if (resp->result() == h2::status::ok) {
break;
};
VLOG(1) << "RespHeader" << i << ": " << parser.get();

if (IsUnauthorized(*resp)) {
ec = provider->RefreshToken(client->proactor());
if (ec) {
return ec;
}
if (parser->get().result() == h2::status::ok) {
return parser;
};

*resp = {};
if (IsUnauthorized(parser->get())) {
RETURN_UNEXPECTED(provider->RefreshToken(client->proactor()));
req->set(h2::field::authorization, AuthHeader(provider->access_token()));

continue;
}
LOG(FATAL) << "Unexpected response " << *resp;
ec = make_error_code(errc::bad_message);
LOG(DFATAL) << "Unexpected response " << parser.get();
}
return {};

return nonstd::make_unexpected(ec);
}

} // namespace
Expand Down Expand Up @@ -218,18 +229,16 @@ error_code GCPCredsProvider::Init(unsigned connect_ms, fb2::ProactorBase* pb) {
use_instance_metadata_ = true;
LOG(FATAL) << "TBD: do not support reading from instance metadata";
} else {
error_code ec = LoadGCPConfig(&account_id_, &project_id_);
if (ec)
return ec;
RETURN_ERROR(LoadGCPConfig(&account_id_, &project_id_));

if (account_id_.empty() || project_id_.empty()) {
LOG(WARNING) << "gcloud config file is not valid";
return make_error_code(errc::not_supported);
}
string adc_file = absl::StrCat(*root_path, "/legacy_credentials/", account_id_, "/adc.json");
VLOG(1) << "ADC file: " << adc_file;
ec = ParseADC(adc_file, &client_id_, &client_secret_, &refresh_token_);
if (ec)
return ec;
RETURN_ERROR(ParseADC(adc_file, &client_id_, &client_secret_, &refresh_token_));

if (client_id_.empty() || client_secret_.empty() || refresh_token_.empty()) {
LOG(WARNING) << "Bad ADC file " << adc_file;
return make_error_code(errc::bad_message);
Expand Down Expand Up @@ -264,9 +273,8 @@ error_code GCPCredsProvider::RefreshToken(fb2::ProactorBase* pb) {
VLOG(1) << "Req: " << req;

h2::response<h2::string_body> resp;
ec = https_client.Send(req, &resp);
if (ec)
return ec;
RETURN_ERROR(https_client.Send(req, &resp));

if (resp.result() != h2::status::ok) {
LOG(WARNING) << "Http error: " << string(resp.reason()) << ", Body: ", resp.body();
return make_error_code(errc::permission_denied);
Expand Down Expand Up @@ -297,18 +305,57 @@ std::error_code GCS::Connect(unsigned msec) {
return client_->Connect(kDomain, "443", ssl_ctx_);
}

auto GCS::ListBuckets() -> ListBucketResult {
error_code GCS::ListBuckets(ListBucketCb cb) {
string url = absl::StrCat("/storage/v1/b?project=", creds_provider_.project_id());
absl::StrAppend(&url, "&fields=items,nextPageToken");
absl::StrAppend(&url, "&maxResults=50&fields=items,nextPageToken");

auto http_req = PrepareRequest(h2::verb::get, url, creds_provider_.access_token());

rj::Document doc;
h2::response<h2::string_body> resp_msg;
error_code ec = SendWithToken(&creds_provider_, client_.get(), &http_req, &resp_msg);
if (ec)
return nonstd::make_unexpected(ec);
VLOG(2) << "ListResponse: " << resp_msg.body();

while (true) {
io::Result<EmptyParserPtr> parse_res =
SendWithToken(&creds_provider_, client_.get(), &http_req);
if (!parse_res)
return parse_res.error();
EmptyParserPtr empty_parser = std::move(*parse_res);
h2::response_parser<h2::string_body> resp(std::move(*empty_parser));
RETURN_ERROR(client_->Recv(&resp));

auto msg = resp.release();

VLOG(2) << "ListResponse: " << msg.body();

doc.ParseInsitu(&msg.body().front());
if (doc.HasParseError()) {
return make_error_code(errc::bad_message);
}

auto it = doc.FindMember("items");
if (it == doc.MemberEnd())
break;

const auto& val = it->value;
if (!val.IsArray()) {
return make_error_code(errc::bad_message);
}
auto array = val.GetArray();

for (size_t i = 0; i < array.Size(); ++i) {
const auto& item = array[i];
auto it = item.FindMember("name");
if (it != item.MemberEnd()) {
cb(string_view{it->value.GetString(), it->value.GetStringLength()});
}
}

it = doc.FindMember("nextPageToken");
if (it == doc.MemberEnd()) {
break;
}
absl::string_view page_token{it->value.GetString(), it->value.GetStringLength()};
http_req.target(absl::StrCat(url, "&pageToken=", page_token));
}
return {};
}

Expand Down
5 changes: 3 additions & 2 deletions util/cloud/gcp/gcs.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@ class GCPCredsProvider {

class GCS {
public:
using ListBucketResult = io::Result<std::vector<std::string>>;
using BucketItem = std::string_view;
using ListBucketCb = std::function<void(BucketItem)>;

GCS(GCPCredsProvider* creds_provider, SSL_CTX* ssl_cntx, fb2::ProactorBase* pb);
~GCS();

std::error_code Connect(unsigned msec);

ListBucketResult ListBuckets();
std::error_code ListBuckets(ListBucketCb cb);

private:
GCPCredsProvider& creds_provider_;
Expand Down
Loading