Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

router: Add support for removing callables. #80

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/wampcc/rpc_man.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class rpc_man
const std::string& uri, on_call_fn,
void* user);

void unregister_internal_rpc(const std::string& realm, t_request_id);

rpc_details get_rpc_details(const std::string& rpcname,
const std::string& realm);

Expand All @@ -56,6 +58,8 @@ class rpc_man
rpc_man& operator=(const rpc_man&) = delete;

void register_rpc(session_handle, std::string realm, rpc_details& r);
void unregister_rpc(session_handle, std::string realm, t_session_id,
t_registration_id);

logger& __logger; /* name chosen for log macros */
rpc_added_cb m_rpc_added_cb;
Expand Down
4 changes: 3 additions & 1 deletion include/wampcc/wamp_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,13 @@ class wamp_router : public std::enable_shared_from_this<wamp_router>
/** Associate a callback function with a procedure uri. The callback is
* called when a CALL request is received for the procedure. The callback
* should reply to the caller with a RESULT or ERROR message. */
void callable(const std::string& realm,
t_registration_id callable(const std::string& realm,
const std::string& uri,
on_call_fn,
void * user = nullptr);

/** Unregister a callable previously added. */
void unregister_callable(const std::string& realm, t_registration_id id);

/** Return list of addresses the router is currently listening on. */
std::vector<socket_address> get_listen_addresses() const;
Expand Down
83 changes: 46 additions & 37 deletions libs/wampcc/rpc_man.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ uint64_t rpc_man::register_internal_rpc(const std::string& realm,
return r.registration_id;
}

void rpc_man::unregister_internal_rpc(const std::string& realm, t_request_id id) {
unregister_rpc(session_handle(), realm, 0, id);
}


void rpc_man::handle_inbound_register(wamp_session& ws, t_request_id request_id,
const std::string& ___uri, const json_object& options) {
Expand Down Expand Up @@ -107,6 +111,45 @@ void rpc_man::register_rpc(session_handle session, std::string realm,
<< "::" << r.uri);
}

void rpc_man::unregister_rpc(session_handle session, std::string realm, t_session_id session_id,
t_registration_id registration_id) {
std::lock_guard<std::mutex> guard(m_rpc_map_lock);

auto session_iter = m_session_to_rpcs.find(session);
if (session_iter != m_session_to_rpcs.end()) {
auto rpc_iter = session_iter->second.find(registration_id);
if (rpc_iter != session_iter->second.end()) {
if (rpc_iter->second->registration_id) {
LOG_INFO("procedure unregistered, " //
<< rpc_iter->second->registration_id << ", " << realm
<< "::" << rpc_iter->second->uri);

/* Perform user-defined call-back, if present. */
if (m_rpc_removed_cb)
m_rpc_removed_cb(*(rpc_iter->second));

/* remove from realm index */
auto realm_iter = m_realm_registry.find(realm);
if (realm_iter != m_realm_registry.end())
realm_iter->second.erase(rpc_iter->second->uri);
}

/* remove from session index */
session_iter->second.erase(rpc_iter);
}
else {
LOG_WARN("unregister failed, registration_id " //
<< registration_id << " not found");
throw wamp_error(WAMP_ERROR_NO_SUCH_REGISTRATION);
}
}
else {
LOG_WARN("unregister failed, session #" //
<< session_id << " not found");
throw wamp_error(WAMP_ERROR_NO_SUCH_REGISTRATION);
}
}


void rpc_man::session_closed(std::shared_ptr<wamp_session>& session) {
/* EV thread */
Expand Down Expand Up @@ -142,43 +185,9 @@ void rpc_man::handle_inbound_unregister(wamp_session& session,
t_request_id request_id,
t_registration_id registration_id) {
/* EV thread */

std::lock_guard<std::mutex> guard(m_rpc_map_lock);

auto session_iter = m_session_to_rpcs.find(session.handle());
if (session_iter != m_session_to_rpcs.end()) {
auto rpc_iter = session_iter->second.find(registration_id);
if (rpc_iter != session_iter->second.end()) {
LOG_INFO("procedure unregistered, " //
<< rpc_iter->second->registration_id << ", " << session.realm()
<< "::" << rpc_iter->second->uri);

/* Perform user-defined call-back, if present. */
if (m_rpc_removed_cb)
m_rpc_removed_cb(*(rpc_iter->second));

/* remove from realm index */
auto realm_iter = m_realm_registry.find(session.realm());
if (realm_iter != m_realm_registry.end())
realm_iter->second.erase(rpc_iter->second->uri);

/* remove from session index */
session_iter->second.erase(rpc_iter);

/* reply to client, indicate success */
session.unregistered(request_id);
}
else {
LOG_WARN("unregister failed, registration_id " //
<< registration_id << " not found");
throw wamp_error(WAMP_ERROR_NO_SUCH_REGISTRATION);
}
}
else {
LOG_WARN("unregister failed, session #" //
<< session.unique_id() << " not found");
throw wamp_error(WAMP_ERROR_NO_SUCH_REGISTRATION);
}
unregister_rpc(session.handle(), session.realm(), session.unique_id(), registration_id);
/* reply to client, indicate success */
session.unregistered(request_id);
}


Expand Down
8 changes: 6 additions & 2 deletions libs/wampcc/wamp_router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,18 @@ std::future<uverr> wamp_router::listen(auth_provider auth, int p)
}


void wamp_router::callable(const std::string& realm,
t_registration_id wamp_router::callable(const std::string& realm,
const std::string& uri,
on_call_fn fn,
void * user)
{
m_rpcman->register_internal_rpc(realm, uri, std::move(fn), user);
return m_rpcman->register_internal_rpc(realm, uri, std::move(fn), user);
}

void wamp_router::unregister_callable(const std::string& realm,
t_registration_id id) {
m_rpcman->unregister_internal_rpc(realm, id);
}

void wamp_router::publish(const std::string& realm, const std::string& topic,
const json_object& options, wamp_args args)
Expand Down
69 changes: 69 additions & 0 deletions tests/wampcc/test_register_unregister.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,75 @@ TEST_CASE("test_unregister")
}
}

TEST_CASE("test_unregister_router") {
const std::string uri = "my_rpc";
const std::string realm = "default_realm";
internal_server iserver(logger::nolog());
int port = iserver.start(global_port++);

unique_ptr<kernel> the_kernel(new kernel({}, logger::nolog()));
auto session = establish_session(the_kernel, port);
perform_realm_logon(session);

auto registration_id = iserver.router()->callable(realm, uri,
[](wampcc::wamp_router& rtr, wampcc::wamp_session& caller, wampcc::call_info info) {
caller.result(info.request_id, {"hello"});
}, nullptr);

std::promise<result_info> promised_result_info;

session->call(uri, {}, {},
[&promised_result_info](wamp_session&, result_info info) {
promised_result_info.set_value(std::move(info));
},
nullptr);

auto future_result_info = promised_result_info.get_future();

REQUIRE(future_result_info.wait_for(std::chrono::seconds(3)) ==
std::future_status::ready);

REQUIRE(future_result_info.get().was_error == false);

// check that second registration fails
std::string error_uri;
try {
iserver.router()->callable(realm, uri,
[](wampcc::wamp_router&, wampcc::wamp_session&, wampcc::call_info) {},
nullptr);
}
catch (const wampcc::wamp_error& error) {
error_uri = error.error_uri();
}
REQUIRE(error_uri == WAMP_ERROR_PROCEDURE_ALREADY_EXISTS);

// check that incorrect unregister fails
try {
iserver.router()->unregister_callable(realm, registration_id + 1);
}
catch (const wampcc::wamp_error& error) {
error_uri = error.error_uri();
}
REQUIRE(error_uri == WAMP_ERROR_NO_SUCH_REGISTRATION);

// unregister
iserver.router()->unregister_callable(realm, registration_id);

// check that the procedure can no longer be called
promised_result_info = {};
session->call(uri, {}, {},
[&promised_result_info](wamp_session&, result_info info) {
promised_result_info.set_value(std::move(info));
},
nullptr);

future_result_info = promised_result_info.get_future();

REQUIRE(future_result_info.wait_for(std::chrono::seconds(3)) ==
std::future_status::ready);

REQUIRE(future_result_info.get().was_error == true);
}

int main(int argc, char** argv)
{
Expand Down