Skip to content

Commit

Permalink
Unique sender (#51)
Browse files Browse the repository at this point in the history
* make sender unique pointer

* bump version to v0.5.0

* just use latest nightly

* remove PR build MACOS
  • Loading branch information
Congyuwang authored Nov 1, 2023
1 parent 9d660f0 commit 9513749
Show file tree
Hide file tree
Showing 14 changed files with 57 additions and 139 deletions.
86 changes: 0 additions & 86 deletions .github/workflows/PR.yml

This file was deleted.

2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set(CMAKE_CXX_STANDARD 17)
set(CMAKE_TOOLCHAIN_FILE ${CMAKE_SOURCE_DIR}/toolchain.cmake)

# define project
project(socket_manager LANGUAGES C CXX VERSION 0.4.0)
project(socket_manager LANGUAGES C CXX VERSION 0.5.0)

# set default build type as shared
option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tokio-socket-manager"
version = "0.4.0"
version = "0.5.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
2 changes: 1 addition & 1 deletion examples/echo_server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ set(CMAKE_CXX_STANDARD 20)
add_executable(echo_server src/echo_server.cpp)
set_property(TARGET echo_server PROPERTY INTERPROCEDURAL_OPTIMIZATION TRUE)

find_package(socket_manager 0.4.0 REQUIRED)
find_package(socket_manager 0.5.0 REQUIRED)
target_link_libraries(echo_server PUBLIC socket_manager)
6 changes: 3 additions & 3 deletions examples/echo_server/src/echo_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class WrapWaker : public socket_manager::Notifier {
*/
class EchoReceiver : public socket_manager::MsgReceiverAsync {
public:
explicit EchoReceiver(std::shared_ptr<socket_manager::MsgSender> &&sender,
explicit EchoReceiver(std::unique_ptr<socket_manager::MsgSender> &&sender,
const std::shared_ptr<WrapWaker> &waker)
: waker(waker), sender(std::move(sender)){};

Expand All @@ -48,7 +48,7 @@ class EchoReceiver : public socket_manager::MsgReceiverAsync {
return sender->send_async(data);
};
std::shared_ptr<WrapWaker> waker;
std::shared_ptr<socket_manager::MsgSender> sender;
std::unique_ptr<socket_manager::MsgSender> sender;
};

/**
Expand All @@ -57,7 +57,7 @@ class EchoReceiver : public socket_manager::MsgReceiverAsync {
class EchoCallback : public socket_manager::ConnCallback {
private:
void on_connect(std::shared_ptr<socket_manager::Connection> conn,
std::shared_ptr<socket_manager::MsgSender> sender) override {
std::unique_ptr<socket_manager::MsgSender> sender) override {
auto waker = std::make_shared<WrapWaker>(socket_manager::Waker());
auto recv = std::make_shared<EchoReceiver>(std::move(sender), waker);
{
Expand Down
2 changes: 1 addition & 1 deletion include/socket_manager/conn_callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ConnCallback {
* @param sender a `Sender` object for sending data.
*/
virtual void on_connect(std::shared_ptr<Connection> conn,
std::shared_ptr<MsgSender> sender) = 0;
std::unique_ptr<MsgSender> sender) = 0;

/**
* Called when a connection is closed.
Expand Down
2 changes: 1 addition & 1 deletion socket_manager/socket_manager_c_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ socket_manager_extern_on_conn(SOCKET_MANAGER_C_API_OnConnObj this_,

std::shared_ptr<socket_manager::Connection> conn(
new socket_manager::Connection(on_connect.Conn));
std::shared_ptr<socket_manager::MsgSender> sender(
std::unique_ptr<socket_manager::MsgSender> sender(
new socket_manager::MsgSender(on_connect.Send, conn));

// keep the connection alive
Expand Down
13 changes: 7 additions & 6 deletions tests/test_auto_flush.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <memory>
#undef NDEBUG

#include "test_utils.h"
Expand Down Expand Up @@ -32,33 +33,33 @@ class HelloWorldManager : public DoNothingConnCallback {
received(std::make_shared<std::atomic_bool>(false)) {}

void on_connect(std::shared_ptr<Connection> conn,
std::shared_ptr<MsgSender> send) override {
std::unique_ptr<MsgSender> send) override {
auto do_nothing =
std::make_unique<ReceiverHelloWorld>(mutex, cond, received);
conn->start(std::move(do_nothing));
this->sender = send;
this->sender = std::move(send);
sender->send_block("hello world");
}

std::shared_ptr<std::mutex> mutex;
std::shared_ptr<std::condition_variable> cond;
std::shared_ptr<std::atomic_bool> received;
std::shared_ptr<MsgSender> sender;
std::unique_ptr<MsgSender> sender;
};

class SendHelloWorldDoNotClose : public DoNothingConnCallback {
void on_connect(std::shared_ptr<Connection> conn,
std::shared_ptr<MsgSender> sender) override {
std::unique_ptr<MsgSender> sender) override {
auto do_nothing = std::make_unique<DoNothingReceiver>();
conn->start(std::move(do_nothing));
this->sender = sender;
this->sender = std::move(sender);
std::thread sender_t([this] { this->sender->send_block("hello world"); });
sender_t.detach();
}

private:
// store sender, do not close connection
std::shared_ptr<MsgSender> sender;
std::unique_ptr<MsgSender> sender;
};

int test_auto_flush(int argc, char **argv) {
Expand Down
16 changes: 8 additions & 8 deletions tests/test_callback_throw_error.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <memory>
#undef NDEBUG

#include "test_utils.h"
Expand All @@ -9,14 +10,14 @@ using namespace socket_manager;

class OnConnectErrorBeforeStartCallback : public DoNothingConnCallback {
void on_connect(std::shared_ptr<Connection> conn,
std::shared_ptr<MsgSender> sender) override {
std::unique_ptr<MsgSender> sender) override {
throw std::runtime_error("throw some error before calling start");
}
};

class OnConnectErrorAfterStartCallback : public DoNothingConnCallback {
void on_connect(std::shared_ptr<Connection> conn,
std::shared_ptr<MsgSender> sender) override {
std::unique_ptr<MsgSender> sender) override {
conn->start(std::make_unique<DoNothingReceiver>());
throw std::runtime_error("throw some error after calling start");
}
Expand All @@ -30,10 +31,9 @@ class OnMsgErrorReceiver : public MsgReceiver {

class OnMsgErrorCallback : public ConnCallback {
void on_connect(std::shared_ptr<Connection> conn,
std::shared_ptr<MsgSender> send) override {
std::unique_ptr<MsgSender> send) override {
conn->start(std::make_unique<OnMsgErrorReceiver>());
this->sender = send;
sender.use_count();
this->sender = std::move(send);
}

void on_remote_close(const std::string &local_addr,
Expand All @@ -48,19 +48,19 @@ class OnMsgErrorCallback : public ConnCallback {
void on_connect_error(const std::string &addr,
const std::string &err) override {}

std::shared_ptr<MsgSender> sender;
std::unique_ptr<MsgSender> sender;
};

class StoreAllEventsConnHelloCallback : public StoreAllEventsConnCallback {
void on_connect(std::shared_ptr<Connection> conn,
std::shared_ptr<MsgSender> sender) override {
std::unique_ptr<MsgSender> sender) override {
std::unique_lock<std::mutex> lock(*mutex);
auto conn_id = conn->local_address() + "->" + conn->peer_address();
events->emplace_back(CONNECTED, conn_id);
auto msg_storer =
std::make_unique<MsgStoreReceiver>(conn_id, mutex, cond, buffer);
conn->start(std::move(msg_storer));
std::thread sender_t([sender]() {
std::thread sender_t([sender = std::move(sender)]() {
try {
sender->send_block("hello");
} catch (std::runtime_error &e) { /* ignore */
Expand Down
2 changes: 1 addition & 1 deletion tests/test_find_package/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ set(CMAKE_CXX_STANDARD 17)
add_executable(helloworld_server helloworld_server.cpp)
set_property(TARGET helloworld_server PROPERTY INTERPROCEDURAL_OPTIMIZATION TRUE)

find_package(socket_manager 0.4.0 REQUIRED)
find_package(socket_manager 0.5.0 REQUIRED)
target_link_libraries(helloworld_server PUBLIC socket_manager)
17 changes: 9 additions & 8 deletions tests/test_find_package/helloworld_server.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <iostream>
#include <memory>
#include <mutex>
#include <socket_manager.h>
#include <unordered_map>
Expand All @@ -9,15 +10,15 @@ class HelloWorldReceiver : public socket_manager::MsgReceiver {
HelloWorldReceiver(
std::string conn_id, std::mutex &mutex,
std::unordered_map<std::string,
std::shared_ptr<socket_manager::MsgSender>> &senders)
std::unique_ptr<socket_manager::MsgSender>> &senders)
: conn_id(std::move(conn_id)), mutex(mutex), senders(senders) {}

void on_message(std::string_view data) override {
try {
std::unique_lock<std::mutex> my_lock(mutex);
auto sender = senders.at(conn_id);
sender->send_block("HTTP/1.1 200 OK\r\nContent-Length: 12\r\nConnection: "
"close\r\n\r\nHello, world");
senders.at(conn_id)->send_block(
"HTTP/1.1 200 OK\r\nContent-Length: 12\r\nConnection: "
"close\r\n\r\nHello, world");
senders.erase(conn_id);
} catch (const std::out_of_range &e) {
std::cerr << "Exception at " << e.what() << std::endl;
Expand All @@ -27,19 +28,19 @@ class HelloWorldReceiver : public socket_manager::MsgReceiver {
private:
std::string conn_id;
std::mutex &mutex;
std::unordered_map<std::string, std::shared_ptr<socket_manager::MsgSender>>
std::unordered_map<std::string, std::unique_ptr<socket_manager::MsgSender>>
&senders;
};

class MyCallback : public socket_manager::ConnCallback {
public:
void on_connect(std::shared_ptr<socket_manager::Connection> conn,
std::shared_ptr<socket_manager::MsgSender> sender) override {
std::unique_ptr<socket_manager::MsgSender> sender) override {
auto id = conn->local_address() + "->" + conn->peer_address();
conn->start(std::make_unique<HelloWorldReceiver>(id, mutex, senders));
{
std::unique_lock<std::mutex> my_lock(mutex);
senders[id] = sender;
senders[id] = std::move(sender);
}
}

Expand Down Expand Up @@ -67,7 +68,7 @@ class MyCallback : public socket_manager::ConnCallback {

private:
std::mutex mutex;
std::unordered_map<std::string, std::shared_ptr<socket_manager::MsgSender>>
std::unordered_map<std::string, std::unique_ptr<socket_manager::MsgSender>>
senders;
};

Expand Down
15 changes: 8 additions & 7 deletions tests/test_manual_flush.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,18 @@ class EchoReceiver : public MsgReceiver {

class HelloCallback : public ConnCallback {
void on_connect(std::shared_ptr<Connection> conn,
std::shared_ptr<MsgSender> sender) override {
std::unique_ptr<MsgSender> sender) override {
auto rcv = std::make_unique<FinalReceiver>(has_received, mutex, cond);
// disable write auto flush
conn->start(std::move(rcv), nullptr, DEFAULT_MSG_BUF_SIZE,
DEFAULT_READ_MSG_FLUSH_MILLI_SEC, 0);
std::thread sender_t([sender] {
sender->send_block("hello world");
sender->flush();
std::shared_ptr<MsgSender> sender_shared = std::move(sender);
std::thread sender_t([sender_shared] {
sender_shared->send_block("hello world");
sender_shared->flush();
});
sender_t.detach();
_sender = sender;
_sender = sender_shared;
std::cout << "hello world sent" << std::endl;
}

Expand Down Expand Up @@ -91,11 +92,11 @@ class HelloCallback : public ConnCallback {

class EchoCallback : public ConnCallback {
void on_connect(std::shared_ptr<Connection> conn,
std::shared_ptr<MsgSender> sender) override {
std::unique_ptr<MsgSender> sender) override {
auto rcv = std::make_unique<EchoReceiver>(has_received, _data, mutex, cond);
// disable write auto flush
conn->start(std::move(rcv), nullptr, DEFAULT_MSG_BUF_SIZE, 1, 0);
std::thread sender_t([sender, this]() {
std::thread sender_t([sender = std::move(sender), this]() {
std::unique_lock<std::mutex> lock(*mutex);
cond->wait(lock, [this]() { return has_received; });
sender->send_block(*_data);
Expand Down
Loading

0 comments on commit 9513749

Please sign in to comment.