Skip to content

Commit

Permalink
proxy client: close old socket when restarting listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
aberaud committed Apr 19, 2018
1 parent c16a65a commit e140116
Showing 1 changed file with 42 additions and 38 deletions.
80 changes: 42 additions & 38 deletions src/dht_proxy_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Va
auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
if (reader->parse(char_data, char_data + body.size(), &json, &err)) {
auto value = std::make_shared<Value>(json);
if ((not filter or filter(*value)) && cb) {
if ((not filter or filter(*value)) and cb) {
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([cb, value, finished]() {
if (not *finished and not cb({value}))
Expand Down Expand Up @@ -747,7 +747,7 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi
auto* char_data = reinterpret_cast<const char*>(&body[0]);
if (reader->parse(char_data, char_data + body.size(), &json, &err)) {
auto value = std::make_shared<Value>(json);
if (not filter or filter(*value)) {
if ((not filter or filter(*value)) and vcb) {
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([vcb, value, state]() {
if (not state->cancel and not vcb({value}, false))
Expand All @@ -761,7 +761,7 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi
}
}
} catch (std::runtime_error&) {
state->ok = false;
// NOTE: Http::close() can occurs here. Ignore this.
}
} else {
state->ok = false;
Expand Down Expand Up @@ -792,6 +792,7 @@ DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken)
DHT_LOG.d(key, "[search %s] cancel listen", key.to_c_str());

auto& listener = it->second;
listener.state->cancel = true;
if (not deviceKey_.empty()) {
// First, be sure to have a token
if (listener.thread.joinable()) {
Expand All @@ -818,8 +819,7 @@ DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken)
} else {
// Just stop the request
if (listener.thread.joinable()) {
// Close connection to stop listener?
listener.state->cancel = true;
// Close connection to stop listener
if (listener.req)
restbed::Http::close(listener.req);
listener.thread.join();
Expand Down Expand Up @@ -864,67 +864,70 @@ DhtProxyClient::restartListeners()
for (auto& search: searches_) {
for (auto& l: search.second.listeners) {
auto& listener = l.second;
if (listener.thread.joinable())
auto state = listener.state;
if (listener.thread.joinable()) {
state->cancel = true;
if (listener.req)
restbed::Http::close(listener.req);
listener.thread.join();
}
// Redo listen
state->cancel = false;
state->ok = true;
auto filter = listener.filter;
auto cb = listener.cb;
restbed::Uri uri(proxy::HTTP_PROTO + serverHost_ + "/" + search.first.toString());
auto req = std::make_shared<restbed::Request>(uri);
req->set_method("LISTEN");
listener.req = req;
listener.thread = std::thread([this, filter, cb, req]()
listener.thread = std::thread([this, filter, cb, req, state]()
{
auto settings = std::make_shared<restbed::Settings>();
std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds.

auto ok = std::make_shared<std::atomic_bool>(true);
restbed::Http::async(req,
[this, filter, cb, ok](const std::shared_ptr<restbed::Request>& req,
[this, filter, cb, state](const std::shared_ptr<restbed::Request>& req,
const std::shared_ptr<restbed::Response>& reply) {
auto code = reply->get_status_code();

if (code == 200) {
try {
while (restbed::Http::is_open(req)) {
while (restbed::Http::is_open(req) and not state->cancel) {
restbed::Http::fetch("\n", reply);
if (state->cancel)
break;
std::string body;
reply->get_body(body);
reply->set_body(""); // Reset the body for the next fetch

Json::Value json;
std::string err;
Json::CharReaderBuilder rbuilder;
auto* char_data = reinterpret_cast<const char*>(&body[0]);
auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
if (reader->parse(char_data, char_data + body.size(), &json, &err)) {
if (reader->parse(body.data(), body.data() + body.size(), &json, &err)) {
auto value = std::make_shared<Value>(json);
if ((not filter or filter(*value)) && cb) {
auto okCb = std::make_shared<std::promise<bool>>();
auto futureCb = okCb->get_future();
{
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([cb, value, okCb](){
okCb->set_value(cb({value}, false));
});
loopSignal_();
}
futureCb.wait();
if (!futureCb.get()) {
return;
}
if ((not filter or filter(*value)) and cb) {
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([cb, value, state]() {
if (not state->cancel and not cb({value}, false))
state->cancel = true;
});
loopSignal_();
}
}
}
} catch (std::runtime_error&) {
// NOTE: Http::close() can occurs here. Ignore this.
}
} else {
*ok = false;
state->ok = false;
}
}, settings).get();
if (!ok) opFailed();
auto& s = *state;
if (not s.ok and not s.cancel)
opFailed();
}
);
}
Expand Down Expand Up @@ -957,12 +960,12 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string
auto token = std::stoull(notification.at("token"));
for (auto& search: searches_) {
for (auto& list : search.second.listeners) {
if (*list.second.pushNotifToken != token)
if (*list.second.pushNotifToken != token or list.second.state->cancel)
continue;
DHT_LOG.d(search.first, "[search %s] handling push notification", search.first.to_c_str());
auto cb = list.second.cb;
auto filter = list.second.filter;
get(search.first, [cb](const std::vector<Sp<Value>>& vals){
get(search.first, [cb](const std::vector<Sp<Value>>& vals) {
cb(vals, false);
return true;
}, DoneCallbackSimple{}, std::move(filter));
Expand All @@ -987,18 +990,18 @@ DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener)
req->set_method("SUBSCRIBE");

auto pushNotifToken = std::make_shared<proxy::ListenToken>(0);

auto state = listener.state;
if (listener.thread.joinable())
listener.thread.join();
state->cancel = false;
state->ok = true;
listener.req = req;
listener.pushNotifToken = pushNotifToken;
listener.thread = std::thread([=]()
{
listener.thread = std::thread([this, req, pushNotifToken, state]() {
fillBodyToGetToken(req);
auto settings = std::make_shared<restbed::Settings>();
auto ok = std::make_shared<std::atomic_bool>(true);
restbed::Http::async(req,
[pushNotifToken, ok](const std::shared_ptr<restbed::Request>&,
[pushNotifToken, state](const std::shared_ptr<restbed::Request>&,
const std::shared_ptr<restbed::Response>& reply) {
auto code = reply->get_status_code();
if (code == 200) {
Expand All @@ -1010,20 +1013,21 @@ DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener)
std::string err;
Json::Value json;
Json::CharReaderBuilder rbuilder;
auto* char_data = reinterpret_cast<const char*>(&body[0]);
auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
if (reader->parse(char_data, char_data + body.size(), &json, &err)) {
if (reader->parse(body.data(), body.data() + body.size(), &json, &err)) {
if (!json.isMember("token")) return;
*pushNotifToken = unpackId(json, "token");
}
} catch (std::runtime_error&) {
// NOTE: Http::close() can occurs here. Ignore this.
}
} else {
*ok = false;
state->ok = false;
}
}, settings).get();
if (!ok) opFailed();
auto& s = *state;
if (s.ok and not s.cancel)
opFailed();
});
#endif
}
Expand Down

0 comments on commit e140116

Please sign in to comment.