Skip to content

Commit

Permalink
updated wire.h to be the same as tateyama
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed Dec 13, 2023
1 parent 0629c12 commit e969307
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 33 deletions.
2 changes: 1 addition & 1 deletion src/tateyama/transport/client_wire.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class session_wire_container
wire_->write(bip_buffer_, data.data(), message_header(index, data.length()));
}
void disconnect() {
wire_->write(bip_buffer_, nullptr, message_header(message_header::not_use, 0));
wire_->write(bip_buffer_, nullptr, message_header(message_header::termination_request, 0));
}

private:
Expand Down
105 changes: 73 additions & 32 deletions src/tateyama/transport/wire.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 Project Tsurugi.
* Copyright 2018-2023 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,7 +42,7 @@ class message_header {
public:
using length_type = std::uint32_t;
using index_type = std::uint16_t;
static constexpr index_type not_use = 0xffff;
static constexpr index_type termination_request = 0xffff;

static constexpr std::size_t size = sizeof(length_type) + sizeof(index_type);

Expand Down Expand Up @@ -360,6 +360,12 @@ static constexpr std::int64_t MAX_TIMEOUT = 10L * 365L * 24L * 3600L * 1000L * 1
inline static std::int64_t u_cap(std::int64_t timeout) {
return (timeout > MAX_TIMEOUT) ? MAX_TIMEOUT : timeout;
}
inline static std::int64_t u_round(std::int64_t timeout) {
if (timeout == 0) {
return timeout;
}
return (((timeout-500)/1000)+1) * 1000;
}
inline static std::int64_t n_cap(std::int64_t timeout) {
return (timeout > (MAX_TIMEOUT * 1000)) ? (MAX_TIMEOUT * 1000) : timeout;
}
Expand All @@ -374,22 +380,48 @@ class unidirectional_message_wire : public simple_wire<message_header> {
*/
message_header peep(const char* base, bool wait_flag = false) {
while (true) {
if(stored() >= message_header::size) {
if(stored() >= message_header::size || termination_requested_.load()) {
break;
}
if (wait_flag) {
boost::interprocess::scoped_lock lock(m_mutex_);
wait_for_read_ = true;
std::atomic_thread_fence(std::memory_order_acq_rel);
c_empty_.wait(lock, [this](){ return stored() >= message_header::size; });
c_empty_.wait(lock, [this](){ return (stored() >= message_header::size) || termination_requested_.load(); });
wait_for_read_ = false;
} else {
if (stored() < message_header::size) { return {}; }
}
}
copy_header(base);
if (!termination_requested_.load()) {
copy_header(base);
} else {
header_received_ = message_header(message_header::termination_request, 0);
}
return header_received_;
}

/**
* @brief wake up the worker thread waiting for request arrival, supposed to be used in server termination.
*/
void terminate() {
termination_requested_.store(true);
std::atomic_thread_fence(std::memory_order_acq_rel);
if (wait_for_read_) {
boost::interprocess::scoped_lock lock(m_mutex_);
c_empty_.notify_one();
}
}
/**
* @brief check if an termination request has been made
* @retrun true if terminate request has been made
*/
[[nodiscard]] bool terminate_requested() {
return termination_requested_.load();
}

private:
std::atomic_bool termination_requested_{};
};


Expand Down Expand Up @@ -420,7 +452,7 @@ class unidirectional_response_wire : public simple_wire<response_header> {
wait_for_read_ = true;
std::atomic_thread_fence(std::memory_order_acq_rel);

if (!c_empty_.timed_wait(lock, boost::get_system_time() + boost::posix_time::microseconds(u_cap(timeout)), [this](){ return (stored() >= response_header::size) || closed_.load(); })) {
if (!c_empty_.timed_wait(lock, boost::get_system_time() + boost::posix_time::microseconds(u_cap(u_round(timeout))), [this](){ return (stored() >= response_header::size) || closed_.load(); })) {
wait_for_read_ = false;
throw std::runtime_error("response has not been received within the specified time");
}
Expand Down Expand Up @@ -465,6 +497,7 @@ class unidirectional_response_wire : public simple_wire<response_header> {

// for resultset
class unidirectional_simple_wires {
constexpr static std::size_t watch_interval = 5;
public:

class unidirectional_simple_wire : public simple_wire<length_header> {
Expand Down Expand Up @@ -595,6 +628,9 @@ class unidirectional_simple_wires {
}

void write(char* base, const char* from, std::size_t length) {
if (length > room()) {
wait_to_resultset_write(length);
}
write_in_buffer(base, buffer_address(base, pushed_.load()), from, length);
pushed_.fetch_add(length);
}
Expand Down Expand Up @@ -716,6 +752,10 @@ class unidirectional_simple_wires {
* used by clinet
*/
unidirectional_simple_wire* active_wire(std::int64_t timeout = 0) {
if (timeout == 0) {
timeout = watch_interval * 1000 * 1000;
}

do {
for (auto&& wire: unidirectional_simple_wires_) {
if(wire.has_record()) {
Expand All @@ -727,35 +767,23 @@ class unidirectional_simple_wires {
wait_for_record_ = true;
std::atomic_thread_fence(std::memory_order_acq_rel);
unidirectional_simple_wire* active_wire = nullptr;
if (timeout <= 0) {
c_record_.wait(lock,
[this, &active_wire](){
for (auto&& wire: unidirectional_simple_wires_) {
if (wire.has_record()) {
active_wire = &wire;
return true;
}
}
return is_eor();
});
} else {
if (!c_record_.timed_wait(lock,
if (!c_record_.timed_wait(lock,
#ifdef BOOST_DATE_TIME_HAS_NANOSECONDS
boost::get_system_time() + boost::posix_time::nanoseconds(n_cap(timeout)),
boost::get_system_time() + boost::posix_time::nanoseconds(n_cap(timeout)),
#else
boost::get_system_time() + boost::posix_time::microseconds(u_cap(((timeout-500)/1000)+1)),
boost::get_system_time() + boost::posix_time::microseconds(u_cap(u_round(timeout))),
#endif
[this, &active_wire](){
for (auto&& wire: unidirectional_simple_wires_) {
if (wire.has_record()) {
active_wire = &wire;
return true;
}
[this, &active_wire](){
for (auto&& wire: unidirectional_simple_wires_) {
if (wire.has_record()) {
active_wire = &wire;
return true;
}
return is_eor();
})) {
throw std::runtime_error("record has not been received within the specified time");
}
}
return is_eor();
})) {
wait_for_record_ = false;
throw std::runtime_error("record has not been received within the specified time");
}
wait_for_record_ = false;
if (active_wire != nullptr) {
Expand Down Expand Up @@ -936,6 +964,11 @@ class connection_queue
void notify() {
condition_.notify_one();
}

// for diagnostic
[[nodiscard]] std::size_t size() const {
return pushed_.load() - poped_.load();
}
private:
boost::interprocess::vector<std::size_t, long_allocator> queue_;
std::size_t capacity_;
Expand Down Expand Up @@ -980,7 +1013,7 @@ class connection_queue
#ifdef BOOST_DATE_TIME_HAS_NANOSECONDS
boost::get_system_time() + boost::posix_time::nanoseconds(n_cap(timeout)),
#else
boost::get_system_time() + boost::posix_time::microseconds(u_cap(((timeout-500)/1000)+1)),
boost::get_system_time() + boost::posix_time::microseconds(u_cap(u_round(timeout))),
#endif
[this](){ return (session_id_ != 0); })) {
throw std::runtime_error("connection response has not been accepted within the specified time");
Expand Down Expand Up @@ -1057,6 +1090,14 @@ class connection_queue
bool is_terminated() noexcept { return terminate_; }
void confirm_terminated() { s_terminated_.post(); }

// for diagnostic
[[nodiscard]] std::size_t pending_requests() const {
return q_requested_.size();
}
[[nodiscard]] std::size_t session_id_accepted() const {
return session_id_;
}

private:
index_queue q_free_;
index_queue q_requested_;
Expand Down

0 comments on commit e969307

Please sign in to comment.