diff --git a/examples/gcs_demo.cc b/examples/gcs_demo.cc index 0baf631f..8539b19e 100644 --- a/examples/gcs_demo.cc +++ b/examples/gcs_demo.cc @@ -28,11 +28,12 @@ void Run(SSL_CTX* ctx) { cloud::GCS gcs(&provider, ctx, pb); ec = gcs.Connect(connect_ms); CHECK(!ec) << "Could not connect " << ec; - auto res = gcs.ListBuckets(); - CHECK(res) << res.error().message(); - for (auto v : *res) { - CONSOLE_INFO << v; - } + auto cb = [](std::string_view bname) { + CONSOLE_INFO << bname; + }; + + ec = gcs.ListBuckets(cb); + CHECK(!ec) << ec.message(); } int main(int argc, char** argv) { diff --git a/util/cloud/gcp/gcs.cc b/util/cloud/gcp/gcs.cc index 8c4ac5ec..0977f027 100644 --- a/util/cloud/gcp/gcs.cc +++ b/util/cloud/gcp/gcs.cc @@ -31,6 +31,18 @@ auto Unexpected(std::errc code) { return nonstd::make_unexpected(make_error_code(code)); } +#define RETURN_UNEXPECTED(x) do { \ + auto ec = (x); \ + if (ec) \ + return nonstd::make_unexpected(ec); \ +} while(false) + +#define RETURN_ERROR(x) do { \ + auto ec = (x); \ + if (ec) \ + return ec; \ +} while (false) + string AuthHeader(string_view access_token) { return absl::StrCat("Bearer ", access_token); } @@ -162,35 +174,34 @@ io::Result ParseTokenResponse(std::string&& response) { return result; } -template -error_code SendWithToken(GCPCredsProvider* provider, http::Client* client, EmptyRequest* req, h2::response* resp) { +using EmptyParserPtr = std::unique_ptr>; +io::Result SendWithToken(GCPCredsProvider* provider, http::Client* client, + EmptyRequest* req) { + error_code ec; for (unsigned i = 0; i < 2; ++i) { // Iterate for possible token refresh. VLOG(1) << "HttpReq" << i << ": " << *req << ", socket " << client->native_handle(); - error_code ec = client->Send(*req, resp); - if (ec) { - return ec; - } - VLOG(1) << "HttpResp" << i << ": " << *resp; + RETURN_UNEXPECTED(client->Send(*req)); + EmptyParserPtr parser(new h2::response_parser()); + RETURN_UNEXPECTED(client->ReadHeader(parser.get())); - if (resp->result() == h2::status::ok) { - break; - }; + VLOG(1) << "RespHeader" << i << ": " << parser.get(); - if (IsUnauthorized(*resp)) { - ec = provider->RefreshToken(client->proactor()); - if (ec) { - return ec; - } + if (parser->get().result() == h2::status::ok) { + return parser; + }; - *resp = {}; + if (IsUnauthorized(parser->get())) { + RETURN_UNEXPECTED(provider->RefreshToken(client->proactor())); req->set(h2::field::authorization, AuthHeader(provider->access_token())); continue; } - LOG(FATAL) << "Unexpected response " << *resp; + ec = make_error_code(errc::bad_message); + LOG(DFATAL) << "Unexpected response " << parser.get(); } - return {}; + + return nonstd::make_unexpected(ec); } } // namespace @@ -218,18 +229,16 @@ error_code GCPCredsProvider::Init(unsigned connect_ms, fb2::ProactorBase* pb) { use_instance_metadata_ = true; LOG(FATAL) << "TBD: do not support reading from instance metadata"; } else { - error_code ec = LoadGCPConfig(&account_id_, &project_id_); - if (ec) - return ec; + RETURN_ERROR(LoadGCPConfig(&account_id_, &project_id_)); + if (account_id_.empty() || project_id_.empty()) { LOG(WARNING) << "gcloud config file is not valid"; return make_error_code(errc::not_supported); } string adc_file = absl::StrCat(*root_path, "/legacy_credentials/", account_id_, "/adc.json"); VLOG(1) << "ADC file: " << adc_file; - ec = ParseADC(adc_file, &client_id_, &client_secret_, &refresh_token_); - if (ec) - return ec; + RETURN_ERROR(ParseADC(adc_file, &client_id_, &client_secret_, &refresh_token_)); + if (client_id_.empty() || client_secret_.empty() || refresh_token_.empty()) { LOG(WARNING) << "Bad ADC file " << adc_file; return make_error_code(errc::bad_message); @@ -264,9 +273,8 @@ error_code GCPCredsProvider::RefreshToken(fb2::ProactorBase* pb) { VLOG(1) << "Req: " << req; h2::response resp; - ec = https_client.Send(req, &resp); - if (ec) - return ec; + RETURN_ERROR(https_client.Send(req, &resp)); + if (resp.result() != h2::status::ok) { LOG(WARNING) << "Http error: " << string(resp.reason()) << ", Body: ", resp.body(); return make_error_code(errc::permission_denied); @@ -297,18 +305,57 @@ std::error_code GCS::Connect(unsigned msec) { return client_->Connect(kDomain, "443", ssl_ctx_); } -auto GCS::ListBuckets() -> ListBucketResult { +error_code GCS::ListBuckets(ListBucketCb cb) { string url = absl::StrCat("/storage/v1/b?project=", creds_provider_.project_id()); - absl::StrAppend(&url, "&fields=items,nextPageToken"); + absl::StrAppend(&url, "&maxResults=50&fields=items,nextPageToken"); auto http_req = PrepareRequest(h2::verb::get, url, creds_provider_.access_token()); rj::Document doc; - h2::response resp_msg; - error_code ec = SendWithToken(&creds_provider_, client_.get(), &http_req, &resp_msg); - if (ec) - return nonstd::make_unexpected(ec); - VLOG(2) << "ListResponse: " << resp_msg.body(); + + while (true) { + io::Result parse_res = + SendWithToken(&creds_provider_, client_.get(), &http_req); + if (!parse_res) + return parse_res.error(); + EmptyParserPtr empty_parser = std::move(*parse_res); + h2::response_parser resp(std::move(*empty_parser)); + RETURN_ERROR(client_->Recv(&resp)); + + auto msg = resp.release(); + + VLOG(2) << "ListResponse: " << msg.body(); + + doc.ParseInsitu(&msg.body().front()); + if (doc.HasParseError()) { + return make_error_code(errc::bad_message); + } + + auto it = doc.FindMember("items"); + if (it == doc.MemberEnd()) + break; + + const auto& val = it->value; + if (!val.IsArray()) { + return make_error_code(errc::bad_message); + } + auto array = val.GetArray(); + + for (size_t i = 0; i < array.Size(); ++i) { + const auto& item = array[i]; + auto it = item.FindMember("name"); + if (it != item.MemberEnd()) { + cb(string_view{it->value.GetString(), it->value.GetStringLength()}); + } + } + + it = doc.FindMember("nextPageToken"); + if (it == doc.MemberEnd()) { + break; + } + absl::string_view page_token{it->value.GetString(), it->value.GetStringLength()}; + http_req.target(absl::StrCat(url, "&pageToken=", page_token)); + } return {}; } diff --git a/util/cloud/gcp/gcs.h b/util/cloud/gcp/gcs.h index 23b78cc7..382b6093 100644 --- a/util/cloud/gcp/gcs.h +++ b/util/cloud/gcp/gcs.h @@ -69,14 +69,15 @@ class GCPCredsProvider { class GCS { public: - using ListBucketResult = io::Result>; + using BucketItem = std::string_view; + using ListBucketCb = std::function; GCS(GCPCredsProvider* creds_provider, SSL_CTX* ssl_cntx, fb2::ProactorBase* pb); ~GCS(); std::error_code Connect(unsigned msec); - ListBucketResult ListBuckets(); + std::error_code ListBuckets(ListBucketCb cb); private: GCPCredsProvider& creds_provider_;