Skip to content

Commit

Permalink
chore: more work on glob list - Azure API
Browse files Browse the repository at this point in the history
Prepare the API

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Nov 1, 2024
1 parent 6e77555 commit 1ce2476
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 99 deletions.
36 changes: 23 additions & 13 deletions examples/gcs_demo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
// See LICENSE for licensing terms.

#include <absl/strings/str_cat.h>

#include "base/flags.h"
#include "base/init.h"
#include "base/logging.h"
#include "io/file_util.h"

#include "util/cloud/azure/creds_provider.h"
#include "util/cloud/gcp/gcs.h"
#include "util/cloud/gcp/gcs_file.h"
Expand All @@ -31,8 +31,7 @@ static io::Result<string> ReadToString(io::ReadonlyFile* file) {
constexpr size_t kBufSize = 1U << 20;
size_t offset = res_str.size();
res_str.resize(offset + kBufSize);
io::MutableBytes mb{reinterpret_cast<uint8_t*>(res_str.data() + offset),
kBufSize};
io::MutableBytes mb{reinterpret_cast<uint8_t*>(res_str.data() + offset), kBufSize};
io::Result<size_t> res = file->Read(offset, mb);
if (!res) {
return nonstd::make_unexpected(res.error());
Expand Down Expand Up @@ -69,8 +68,7 @@ void Run(SSL_CTX* ctx) {
cloud::GcsWriteFileOptions opts;
opts.creds_provider = &provider;
opts.pool = conn_pool;
io::Result<io::WriteFile*> dest_res =
cloud::OpenWriteGcsFile(bucket, dest_key, opts);
io::Result<io::WriteFile*> dest_res = cloud::OpenWriteGcsFile(bucket, dest_key, opts);
CHECK(dest_res) << "Could not open " << dest_key << " " << dest_res.error().message();
unique_ptr<io::WriteFile> dest(*dest_res);
error_code ec = dest->Write(*src);
Expand All @@ -85,8 +83,7 @@ void Run(SSL_CTX* ctx) {
cloud::GcsReadFileOptions opts;
opts.creds_provider = &provider;
opts.pool = conn_pool;
io::Result<io::ReadonlyFile*> dest_res =
cloud::OpenReadGcsFile(bucket, dest_key, opts);
io::Result<io::ReadonlyFile*> dest_res = cloud::OpenReadGcsFile(bucket, dest_key, opts);
CHECK(dest_res) << "Could not open " << dest_key << " " << dest_res.error().message();
unique_ptr<io::ReadonlyFile> dest(*dest_res);
io::Result<string> dest_str = ReadToString(dest.get());
Expand All @@ -110,6 +107,24 @@ void Run(SSL_CTX* ctx) {
CHECK(!ec) << ec.message();
}

void RunAzure(SSL_CTX* ctx) {
util::cloud::azure::CredsProvider provider;
util::cloud::azure::Storage storage(&provider);

error_code ec = provider.Init();
CHECK(!ec) << "Could not load credentials " << ec.message();
auto bucket = GetFlag(FLAGS_bucket);
if (bucket.empty()) {
ec = storage.ListContainers([](std::string_view item) { CONSOLE_INFO << item << endl; });
CHECK(!ec) << ec.message();
return;
}

storage.List(bucket, [](const util::cloud::azure::Storage::ObjectItem& item) {
CONSOLE_INFO << "Object: " << item << endl;
});
}

int main(int argc, char** argv) {
MainInitGuard guard(&argc, &argv);

Expand All @@ -130,12 +145,7 @@ int main(int argc, char** argv) {
SSL_CTX* ctx = util::http::TlsClient::CreateSslContext();
bool azure = GetFlag(FLAGS_azure);
if (azure) {
util::cloud::azure::CredsProvider provider;
pp->GetNextProactor()->Await([&] {
error_code ec = provider.Init();
CHECK(!ec) << "Could not load credentials " << ec.message();
provider.ListContainers([](std::string_view item) { CONSOLE_INFO << item << endl; });
});
pp->GetNextProactor()->Await([&] { RunAzure(ctx); });
} else {
pp->GetNextProactor()->Await([ctx] { Run(ctx); });
}
Expand Down
167 changes: 115 additions & 52 deletions util/cloud/azure/azure.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <absl/cleanup/cleanup.h>
#include <absl/strings/escaping.h>
#include <absl/strings/str_cat.h>
#include <absl/strings/strip.h>

#include <boost/beast/http/string_body.hpp>
#include <pugixml.hpp>
Expand All @@ -12,6 +13,7 @@
#include "util/cloud/azure/creds_provider.h"
#include "util/cloud/utils.h"
#include "util/http/http_client.h"
#include "util/http/http_common.h"
#include "util/http/https_client_pool.h"

#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
Expand All @@ -22,23 +24,42 @@ namespace cloud::azure {
using namespace std;
namespace h2 = boost::beast::http;

error_code CredsProvider::Init() {
const char* name = getenv("AZURE_STORAGE_ACCOUNT");
const char* secret_key = getenv("AZURE_STORAGE_KEY");
if (!name || !secret_key) {
return make_error_code(errc::permission_denied);
}
namespace {

account_name_ = name;
account_key_ = secret_key;

return {};
}
const char kVersion[] = "2025-01-05";

auto UnexpectedError(errc code) {
return nonstd::make_unexpected(make_error_code(code));
}

io::Result<vector<string>> ParseXmlListBuckets(string_view xml_resp) {
pugi::xml_document doc;
pugi::xml_parse_result result = doc.load_buffer(xml_resp.data(), xml_resp.size());

if (!result) {
LOG(ERROR) << "Could not parse xml response " << result.description();
return UnexpectedError(errc::bad_message);
}

pugi::xml_node root = doc.child("EnumerationResults");
if (root.type() != pugi::node_element) {
LOG(ERROR) << "Could not find root node " << xml_resp;
return UnexpectedError(errc::bad_message);
}

pugi::xml_node buckets = root.child("Containers");
if (buckets.type() != pugi::node_element) {
LOG(ERROR) << "Could not find buckets node " << xml_resp;
return UnexpectedError(errc::bad_message);
}

vector<string> res;
for (pugi::xml_node bucket = buckets.child("Container"); bucket; bucket = bucket.next_sibling()) {
res.push_back(bucket.child_value("Name"));
}
return res;
}

void HMAC(absl::string_view key, absl::string_view msg, uint8_t dest[32]) {
// HMAC_xxx are deprecated since openssl 3.0
// Ubuntu 20.04 uses openssl 1.1.
Expand Down Expand Up @@ -78,7 +99,22 @@ string Sign(string_view account, const boost::beast::http::header<true>& req_hea
for (const auto& p : x_head) {
absl::StrAppend(&to_sign, p.first, ":", p.second, "\n");
}
string canonic_resource = absl::StrCat("/", account, "/\n", "comp:list");

string_view target = detail::FromBoostSV(req_header.target());
http::QueryParam qparams = http::ParseQuery(target);
string_view path = qparams.first;
DCHECK(absl::StartsWith(path, "/"));

http::QueryArgs args = http::SplitQuery(qparams.second);
sort(args.begin(), args.end());
string query_canon;
for (const auto& p : args) {
absl::StrAppend(&query_canon, "\n", p.first, ":", p.second);
}

string canonic_resource = absl::StrCat("/", account, path, query_canon);
VLOG(1) << "Canonical resource: " << absl::CEscape(canonic_resource);

absl::StrAppend(&to_sign, canonic_resource);

uint8_t dest[32];
Expand All @@ -88,57 +124,55 @@ string Sign(string_view account, const boost::beast::http::header<true>& req_hea
return signature;
}

io::Result<vector<string>> ParseXmlListBuckets(string_view xml_resp) {
pugi::xml_document doc;
pugi::xml_parse_result result = doc.load_buffer(xml_resp.data(), xml_resp.size());
detail::EmptyRequestImpl FillRequest(string_view endpoint, string_view url, CredsProvider* creds) {
detail::EmptyRequestImpl req(h2::verb::get, url);
const absl::TimeZone utc_tz = absl::UTCTimeZone();
string date = absl::FormatTime("%a, %d %b %Y %H:%M:%S GMT", absl::Now(), utc_tz);
req.SetHeader("x-ms-date", date);
req.SetHeader("x-ms-version", kVersion);

if (!result) {
LOG(ERROR) << "Could not parse xml response " << result.description();
return UnexpectedError(errc::bad_message);
}
const string& account_name = creds->account_name();
string signature = Sign(account_name, req.GetHeaders(), creds->account_key());
req.SetHeader("Authorization", absl::StrCat("SharedKey ", account_name, ":", signature));
req.SetHeader(h2::field::host, endpoint);
req.SetHeader(h2::field::accept_encoding, "gzip, deflate");

pugi::xml_node root = doc.child("EnumerationResults");
if (root.type() != pugi::node_element) {
LOG(ERROR) << "Could not find root node " << xml_resp;
return UnexpectedError(errc::bad_message);
}
return req;
}

pugi::xml_node buckets = root.child("Containers");
if (buckets.type() != pugi::node_element) {
LOG(ERROR) << "Could not find buckets node " << xml_resp;
return UnexpectedError(errc::bad_message);
}
unique_ptr<http::ClientPool> CreatePool(const string& endpoint, SSL_CTX* ctx,
fb2::ProactorBase* pb) {
CHECK(pb);
unique_ptr<http::ClientPool> pool(new http::ClientPool(endpoint, ctx, pb));
pool->set_connect_timeout(2000);
return pool;
}

vector<string> res;
for (pugi::xml_node bucket = buckets.child("Container"); bucket; bucket = bucket.next_sibling()) {
res.push_back(bucket.child_value("Name"));
} // namespace

error_code CredsProvider::Init() {
const char* name = getenv("AZURE_STORAGE_ACCOUNT");
const char* secret_key = getenv("AZURE_STORAGE_KEY");
if (!name || !secret_key) {
return make_error_code(errc::permission_denied);
}
return res;

account_name_ = name;
account_key_ = secret_key;

return {};
}

error_code CredsProvider::ListContainers(function<void(ContainerItem)> cb) {
SSL_CTX* ctx = util::http::TlsClient::CreateSslContext();
absl::Cleanup cleanup([ctx] { util::http::TlsClient::FreeContext(ctx); });
error_code Storage::ListContainers(function<void(const ContainerItem&)> cb) {
SSL_CTX* ctx = http::TlsClient::CreateSslContext();
absl::Cleanup cleanup([ctx] { http::TlsClient::FreeContext(ctx); });

fb2::ProactorBase* pb = fb2::ProactorBase::me();
CHECK(pb);
string endpoint = account_name_ + ".blob.core.windows.net";
unique_ptr<http::ClientPool> pool(new http::ClientPool(endpoint, ctx, pb));
pool->set_connect_timeout(2000);
string endpoint = creds_->account_name() + ".blob.core.windows.net";
unique_ptr<http::ClientPool> pool = CreatePool(endpoint, ctx, fb2::ProactorBase::me());

auto client = pool->GetHandle();
CHECK(client);
detail::EmptyRequestImpl req(h2::verb::get, "/?comp=list");
const absl::TimeZone utc_tz = absl::UTCTimeZone();
string date = absl::FormatTime("%a, %d %b %Y %H:%M:%S GMT", absl::Now(), utc_tz);
req.SetHeader("x-ms-date", date);
const char kVersion[] = "2025-01-05";
req.SetHeader("x-ms-version", kVersion);

string signature = Sign(account_name_, req.GetHeaders(), account_key_);
req.SetHeader("Authorization", absl::StrCat("SharedKey ", account_name_, ":", signature));
req.SetHeader(h2::field::host, endpoint);
req.SetHeader(h2::field::accept_encoding, "gzip, deflate");
detail::EmptyRequestImpl req = FillRequest(endpoint, "/?comp=list", creds_);

DVLOG(1) << "Request: " << req.GetHeaders();
auto ec = req.Send(client->get());
Expand All @@ -164,5 +198,34 @@ error_code CredsProvider::ListContainers(function<void(ContainerItem)> cb) {
return {};
}

error_code Storage::List(string_view container, function<void(const ObjectItem&)> cb) {
SSL_CTX* ctx = http::TlsClient::CreateSslContext();
absl::Cleanup cleanup([ctx] { http::TlsClient::FreeContext(ctx); });

string endpoint = creds_->account_name() + ".blob.core.windows.net";
unique_ptr<http::ClientPool> pool = CreatePool(endpoint, ctx, fb2::ProactorBase::me());

auto client = pool->GetHandle();
CHECK(client);
string url = absl::StrCat("/", container, "?restype=container&comp=list");
detail::EmptyRequestImpl req = FillRequest(endpoint, url, creds_);

DVLOG(1) << "Request: " << req.GetHeaders();
auto ec = req.Send(client->get());
CHECK(!ec) << ec;

h2::response_parser<h2::empty_body> parser;
ec = client->get()->ReadHeader(&parser);
CHECK(!ec) << ec;
DVLOG(1) << "Response: " << parser.get().base();
h2::response_parser<h2::string_body> resp(std::move(parser));
client->get()->Recv(&resp);

auto msg = resp.release();
LOG(INFO) << "Body: " << msg.body();

return {};
}

} // namespace cloud::azure
} // namespace util
26 changes: 21 additions & 5 deletions util/cloud/azure/creds_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,32 @@ class CredsProvider {
public:
std::error_code Init();

const std::string& account_name() const { return account_name_; }
const std::string& account_key() const { return account_key_; }

using ContainerItem = std::string_view;
std::error_code ListContainers(std::function<void(ContainerItem)>);
const std::string& account_name() const {
return account_name_;
}
const std::string& account_key() const {
return account_key_;
}

private:
std::string account_name_;
std::string account_key_;
};

class Storage {
public:
Storage(CredsProvider* creds) : creds_(creds) {
}

using ContainerItem = std::string_view;
using ObjectItem = std::string_view;

std::error_code ListContainers(std::function<void(const ContainerItem&)> cb);
std::error_code List(std::string_view container, std::function<void(const ObjectItem&)> cb);

private:
CredsProvider* creds_;
};

}; // namespace cloud::azure
} // namespace util
24 changes: 24 additions & 0 deletions util/http/http_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include "util/http/http_common.h"

#include <absl/strings/str_split.h>

#include "base/logging.h"

namespace util {
Expand All @@ -17,5 +19,27 @@ const char kTextMime[] = "text/plain";
const char kXmlMime[] = "application/xml";
const char kBinMime[] = "application/octet-stream";


QueryParam ParseQuery(std::string_view str) {
std::pair<std::string_view, std::string_view> res;
size_t pos = str.find('?');
res.first = str.substr(0, pos);
if (pos != std::string_view::npos) {
res.second = str.substr(pos + 1);
}
return res;
}

QueryArgs SplitQuery(std::string_view query) {
vector<std::string_view> args = absl::StrSplit(query, '&');
vector<std::pair<std::string_view, std::string_view>> res(args.size());
for (size_t i = 0; i < args.size(); ++i) {
size_t pos = args[i].find('=');
res[i].first = args[i].substr(0, pos);
res[i].second = (pos == std::string_view::npos) ? std::string_view() : args[i].substr(pos + 1);
}
return res;
}

} // namespace http
} // namespace util
Loading

0 comments on commit 1ce2476

Please sign in to comment.