Skip to content
This repository has been archived by the owner on Jun 12, 2018. It is now read-only.

Commit

Permalink
Fixes #168: created non-templated base classes
Browse files Browse the repository at this point in the history
  • Loading branch information
eidheim committed Mar 4, 2018
1 parent 1056bd2 commit e030cb3
Show file tree
Hide file tree
Showing 5 changed files with 355 additions and 315 deletions.
273 changes: 150 additions & 123 deletions client_http.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <unordered_set>
#include <vector>

; // <- libclang bug workaround

#ifdef USE_STANDALONE_ASIO
#include <asio.hpp>
#include <asio/steady_timer.hpp>
Expand All @@ -33,14 +35,11 @@ namespace SimpleWeb {
#endif

namespace SimpleWeb {
template <class socket_type>
class Client;

template <class socket_type>
class ClientBase {
public:
class Response;
class Content : public std::istream {
friend class ClientBase<socket_type>;
friend class Response;

public:
std::size_t size() noexcept {
Expand All @@ -64,8 +63,10 @@ namespace SimpleWeb {
};

class Response {
friend class ClientBase<socket_type>;
friend class Client<socket_type>;
template <typename SocketType>
friend class ClientTemplate;
template <typename SocketType>
friend class Client;

asio::streambuf streambuf;

Expand All @@ -80,7 +81,7 @@ namespace SimpleWeb {
};

class Config {
friend class ClientBase<socket_type>;
friend class ClientBase;

private:
Config() noexcept {}
Expand All @@ -97,66 +98,54 @@ namespace SimpleWeb {
std::string proxy_server;
};

protected:
class Connection : public std::enable_shared_from_this<Connection> {
public:
template <typename... Args>
Connection(std::shared_ptr<ScopeRunner> handler_runner, long timeout, Args &&... args) noexcept
: handler_runner(std::move(handler_runner)), timeout(timeout), socket(new socket_type(std::forward<Args>(args)...)) {}

std::shared_ptr<ScopeRunner> handler_runner;
long timeout;

std::unique_ptr<socket_type> socket; // Socket must be unique_ptr since asio::ssl::stream<asio::ip::tcp::socket> is not movable
bool in_use = false;
bool attempt_reconnect = true;
/// Set before calling request
Config config;

std::unique_ptr<asio::steady_timer> timer;
/// If you have your own asio::io_service, store its pointer here before calling request().
/// When using asynchronous requests, running the io_service is up to the programmer.
std::shared_ptr<asio::io_service> io_service;

void set_timeout(long seconds = 0) noexcept {
if(seconds == 0)
seconds = timeout;
if(seconds == 0) {
timer = nullptr;
return;
}
timer = std::unique_ptr<asio::steady_timer>(new asio::steady_timer(socket->get_io_service()));
timer->expires_from_now(std::chrono::seconds(seconds));
auto self = this->shared_from_this();
timer->async_wait([self](const error_code &ec) {
if(!ec) {
error_code ec;
self->socket->lowest_layer().cancel(ec);
}
});
}
ClientBase(const std::string &host_port, unsigned short default_port) : default_port(default_port) {
auto parsed_host_port = parse_host_port(host_port, default_port);
host = parsed_host_port.first;
port = parsed_host_port.second;
}

void cancel_timeout() noexcept {
if(timer) {
error_code ec;
timer->cancel(ec);
}
}
};
/// Asynchronous request where setting and/or running Client's io_service is required.
/// Do not use concurrently with the synchronous request functions.
virtual void request(const std::string &method, const std::string &path, string_view content, const CaseInsensitiveMultimap &header,
std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) = 0;
/// Asynchronous request where setting and/or running Client's io_service is required.
/// Do not use concurrently with the synchronous request functions.
virtual void request(const std::string &method, const std::string &path, std::istream &content, const CaseInsensitiveMultimap &header,
std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) = 0;

class Session {
public:
Session(std::size_t max_response_streambuf_size, std::shared_ptr<Connection> connection, std::unique_ptr<asio::streambuf> request_streambuf) noexcept
: connection(std::move(connection)), request_streambuf(std::move(request_streambuf)), response(new Response(max_response_streambuf_size)) {}
/// Asynchronous request where setting and/or running Client's io_service is required.
/// Do not use concurrently with the synchronous request functions.
void request(const std::string &method, const std::string &path, string_view content,
std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback) {
request(method, path, content, CaseInsensitiveMultimap(), std::move(request_callback));
}

std::shared_ptr<Connection> connection;
std::unique_ptr<asio::streambuf> request_streambuf;
std::shared_ptr<Response> response;
std::function<void(const std::shared_ptr<Connection> &, const error_code &)> callback;
};
/// Asynchronous request where setting and/or running Client's io_service is required.
/// Do not use concurrently with the synchronous request functions.
void request(const std::string &method, const std::string &path,
std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback) {
request(method, path, std::string(), CaseInsensitiveMultimap(), std::move(request_callback));
}

public:
/// Set before calling request
Config config;
/// Asynchronous request where setting and/or running Client's io_service is required.
/// Do not use concurrently with the synchronous request functions.
void request(const std::string &method, std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback) {
request(method, std::string("/"), std::string(), CaseInsensitiveMultimap(), std::move(request_callback));
}

/// If you have your own asio::io_service, store its pointer here before calling request().
/// When using asynchronous requests, running the io_service is up to the programmer.
std::shared_ptr<asio::io_service> io_service;
/// Asynchronous request where setting and/or running Client's io_service is required.
/// Do not use concurrently with the synchronous request functions.
void request(const std::string &method, const std::string &path, std::istream &content,
std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback) {
request(method, path, content, CaseInsensitiveMultimap(), std::move(request_callback));
}

/// Convenience function to perform synchronous request. The io_service is run within this function.
/// If reusing the io_service for other tasks, use the asynchronous request functions instead.
Expand Down Expand Up @@ -218,10 +207,100 @@ namespace SimpleWeb {
return response;
}

/// Close connections
virtual void stop() noexcept = 0;

virtual ~ClientBase() {}

protected:
bool internal_io_service = false;

std::string host;
unsigned short port;
unsigned short default_port;

std::unique_ptr<asio::ip::tcp::resolver::query> query;

std::size_t concurrent_synchronous_requests = 0;
std::mutex concurrent_synchronous_requests_mutex;

std::pair<std::string, unsigned short> parse_host_port(const std::string &host_port, unsigned short default_port) const noexcept {
std::pair<std::string, unsigned short> parsed_host_port;
std::size_t host_end = host_port.find(':');
if(host_end == std::string::npos) {
parsed_host_port.first = host_port;
parsed_host_port.second = default_port;
}
else {
parsed_host_port.first = host_port.substr(0, host_end);
parsed_host_port.second = static_cast<unsigned short>(stoul(host_port.substr(host_end + 1)));
}
return parsed_host_port;
}
};

template <typename SocketType>
class ClientTemplate : public ClientBase {
protected:
class Connection : public std::enable_shared_from_this<Connection> {
public:
template <typename... Args>
Connection(std::shared_ptr<ScopeRunner> handler_runner, long timeout, Args &&... args) noexcept
: handler_runner(std::move(handler_runner)), timeout(timeout), socket(new SocketType(std::forward<Args>(args)...)) {}

std::shared_ptr<ScopeRunner> handler_runner;
long timeout;

std::unique_ptr<SocketType> socket; // Socket must be unique_ptr since asio::ssl::stream<asio::ip::tcp::socket> is not movable
bool in_use = false;
bool attempt_reconnect = true;

std::unique_ptr<asio::steady_timer> timer;

void set_timeout(long seconds = 0) noexcept {
if(seconds == 0)
seconds = timeout;
if(seconds == 0) {
timer = nullptr;
return;
}
timer = std::unique_ptr<asio::steady_timer>(new asio::steady_timer(socket->get_io_service()));
timer->expires_from_now(std::chrono::seconds(seconds));
auto self = this->shared_from_this();
timer->async_wait([self](const error_code &ec) {
if(!ec) {
error_code ec;
self->socket->lowest_layer().cancel(ec);
}
});
}

void cancel_timeout() noexcept {
if(timer) {
error_code ec;
timer->cancel(ec);
}
}
};

class Session {
public:
Session(std::size_t max_response_streambuf_size, std::shared_ptr<Connection> connection, std::unique_ptr<asio::streambuf> request_streambuf) noexcept
: connection(std::move(connection)), request_streambuf(std::move(request_streambuf)), response(new Response(max_response_streambuf_size)) {}

std::shared_ptr<Connection> connection;
std::unique_ptr<asio::streambuf> request_streambuf;
std::shared_ptr<Response> response;
std::function<void(const std::shared_ptr<Connection> &, const error_code &)> callback;
};

public:
using ClientBase::request;

/// Asynchronous request where setting and/or running Client's io_service is required.
/// Do not use concurrently with the synchronous request functions.
void request(const std::string &method, const std::string &path, string_view content, const CaseInsensitiveMultimap &header,
std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) {
std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) override {
auto session = std::make_shared<Session>(config.max_response_streambuf_size, get_connection(), create_request_header(method, path, header));
auto response = session->response;
auto request_callback = std::make_shared<std::function<void(std::shared_ptr<Response>, const error_code &)>>(std::move(request_callback_));
Expand Down Expand Up @@ -268,25 +347,8 @@ namespace SimpleWeb {

/// Asynchronous request where setting and/or running Client's io_service is required.
/// Do not use concurrently with the synchronous request functions.
void request(const std::string &method, const std::string &path, string_view content,
std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback) {
request(method, path, content, CaseInsensitiveMultimap(), std::move(request_callback));
}

/// Asynchronous request where setting and/or running Client's io_service is required.
void request(const std::string &method, const std::string &path,
std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback) {
request(method, path, std::string(), CaseInsensitiveMultimap(), std::move(request_callback));
}

/// Asynchronous request where setting and/or running Client's io_service is required.
void request(const std::string &method, std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback) {
request(method, std::string("/"), std::string(), CaseInsensitiveMultimap(), std::move(request_callback));
}

/// Asynchronous request where setting and/or running Client's io_service is required.
void request(const std::string &method, const std::string &path, std::istream &content, const CaseInsensitiveMultimap &header,
std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) {
std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) override {
auto session = std::make_shared<Session>(config.max_response_streambuf_size, get_connection(), create_request_header(method, path, header));
auto response = session->response;
auto request_callback = std::make_shared<std::function<void(std::shared_ptr<Response>, const error_code &)>>(std::move(request_callback_));
Expand Down Expand Up @@ -335,14 +397,8 @@ namespace SimpleWeb {
connect(session);
}

/// Asynchronous request where setting and/or running Client's io_service is required.
void request(const std::string &method, const std::string &path, std::istream &content,
std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback) {
request(method, path, content, CaseInsensitiveMultimap(), std::move(request_callback));
}

/// Close connections
void stop() noexcept {
void stop() noexcept override {
std::unique_lock<std::mutex> lock(connections_mutex);
for(auto it = connections.begin(); it != connections.end();) {
error_code ec;
Expand All @@ -351,33 +407,18 @@ namespace SimpleWeb {
}
}

virtual ~ClientBase() noexcept {
~ClientTemplate() {
handler_runner->stop();
stop();
}

protected:
bool internal_io_service = false;

std::string host;
unsigned short port;
unsigned short default_port;

std::unique_ptr<asio::ip::tcp::resolver::query> query;

std::unordered_set<std::shared_ptr<Connection>> connections;
std::mutex connections_mutex;

std::shared_ptr<ScopeRunner> handler_runner;

std::size_t concurrent_synchronous_requests = 0;
std::mutex concurrent_synchronous_requests_mutex;

ClientBase(const std::string &host_port, unsigned short default_port) noexcept : default_port(default_port), handler_runner(new ScopeRunner()) {
auto parsed_host_port = parse_host_port(host_port, default_port);
host = parsed_host_port.first;
port = parsed_host_port.second;
}
ClientTemplate(const std::string &host_port, unsigned short default_port) noexcept : ClientBase(host_port, default_port), handler_runner(new ScopeRunner()) {}

std::shared_ptr<Connection> get_connection() noexcept {
std::shared_ptr<Connection> connection;
Expand Down Expand Up @@ -420,7 +461,7 @@ namespace SimpleWeb {
auto corrected_path = path;
if(corrected_path == "")
corrected_path = "/";
if(!config.proxy_server.empty() && std::is_same<socket_type, asio::ip::tcp::socket>::value)
if(!config.proxy_server.empty() && std::is_same<SocketType, asio::ip::tcp::socket>::value)
corrected_path = "http://" + host + ':' + std::to_string(port) + corrected_path;

std::unique_ptr<asio::streambuf> streambuf(new asio::streambuf());
Expand All @@ -435,20 +476,6 @@ namespace SimpleWeb {
return streambuf;
}

std::pair<std::string, unsigned short> parse_host_port(const std::string &host_port, unsigned short default_port) const noexcept {
std::pair<std::string, unsigned short> parsed_host_port;
std::size_t host_end = host_port.find(':');
if(host_end == std::string::npos) {
parsed_host_port.first = host_port;
parsed_host_port.second = default_port;
}
else {
parsed_host_port.first = host_port.substr(0, host_end);
parsed_host_port.second = static_cast<unsigned short>(stoul(host_port.substr(host_end + 1)));
}
return parsed_host_port;
}

void write(const std::shared_ptr<Session> &session) {
session->connection->set_timeout();
asio::async_write(*session->connection->socket, session->request_streambuf->data(), [this, session](const error_code &ec, std::size_t /*bytes_transferred*/) {
Expand Down Expand Up @@ -638,15 +665,15 @@ namespace SimpleWeb {
}
};

template <class socket_type>
class Client : public ClientBase<socket_type> {};
template <typename = void>
class Client : public ClientBase {};

using HTTP = asio::ip::tcp::socket;

template <>
class Client<HTTP> : public ClientBase<HTTP> {
class Client<HTTP> : public ClientTemplate<HTTP> {
public:
Client(const std::string &server_port_path) noexcept : ClientBase<HTTP>::ClientBase(server_port_path, 80) {}
Client(const std::string &server_port_path) noexcept : ClientTemplate<HTTP>::ClientTemplate(server_port_path, 80) {}

protected:
std::shared_ptr<Connection> create_connection() noexcept override {
Expand Down
Loading

0 comments on commit e030cb3

Please sign in to comment.