Skip to content

Commit

Permalink
control: Limit memory use by control connections
Browse files Browse the repository at this point in the history
Signed-off-by: Mobin "Hojjat" Aydinfar <[email protected]>
  • Loading branch information
mobin-2008 committed Jul 25, 2023
1 parent 7d491be commit 8ea1bff
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 5 deletions.
22 changes: 18 additions & 4 deletions src/control.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ namespace {
constexpr auto OUT_EVENTS = dasynq::OUT_EVENTS;
constexpr auto IN_EVENTS = dasynq::IN_EVENTS;

// Maximum buffer size for limiting "IN_EVENTS"
constexpr int outbuf_limit = 16384; // 16KB

// Control protocol minimum compatible version and current version:
constexpr uint16_t min_compat_version = 1;
constexpr uint16_t cp_version = 2;
Expand Down Expand Up @@ -1135,7 +1138,8 @@ void control_conn_t::service_event(service_record *service, service_event_t even

bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept
{
int in_flag = bad_conn_close ? 0 : IN_EVENTS;
// Close connection on bad conn or temporarily Limit input if necessary
int in_flag = bad_conn_close || (outbuf_size >= outbuf_limit) ? 0 : IN_EVENTS;
bool was_empty = outbuf.empty();

// If the queue is empty, we can try to write the packet out now rather than queueing it.
Expand Down Expand Up @@ -1166,6 +1170,8 @@ bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept
// Create a vector out of the (remaining part of the) packet:
try {
outbuf.emplace_back(pkt, pkt + size);
outbuf_size += size;
int in_flag = (outbuf_size >= outbuf_limit) ? 0 : IN_EVENTS;
iob.set_watches(in_flag | OUT_EVENTS);
return true;
}
Expand All @@ -1190,7 +1196,8 @@ bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept
// make them extraordinary difficult to combine into a single method.
bool control_conn_t::queue_packet(std::vector<char> &&pkt) noexcept
{
int in_flag = bad_conn_close ? 0 : IN_EVENTS;
// Close connection on bad conn or temporarily Limit input if necessary
int in_flag = bad_conn_close || (outbuf_size >= outbuf_limit) ? 0 : IN_EVENTS;
bool was_empty = outbuf.empty();

if (was_empty) {
Expand Down Expand Up @@ -1219,6 +1226,8 @@ bool control_conn_t::queue_packet(std::vector<char> &&pkt) noexcept

try {
outbuf.emplace_back(std::move(pkt));
outbuf_size += pkt.size();
int in_flag = (outbuf_size >= outbuf_limit) ? 0 : IN_EVENTS;
iob.set_watches(in_flag | OUT_EVENTS);
return true;
}
Expand Down Expand Up @@ -1275,8 +1284,10 @@ bool control_conn_t::data_ready() noexcept
iob.set_watches(OUT_EVENTS);
}
else {
// Temporarily Limit input if necessary
int in_flags = (outbuf_size < outbuf_limit) ? IN_EVENTS : 0;
int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0;
iob.set_watches(IN_EVENTS | out_flags);
iob.set_watches(in_flags | out_flags);
}

return false;
Expand Down Expand Up @@ -1314,6 +1325,7 @@ bool control_conn_t::send_data() noexcept
outpkt_index += written;
if (outpkt_index == pkt.size()) {
// We've finished this packet, move on to the next:
outbuf_size -= pkt.size();
outbuf.pop_front();
outpkt_index = 0;
if (oom_close) {
Expand All @@ -1324,7 +1336,9 @@ bool control_conn_t::send_data() noexcept
if (bad_conn_close) {
return true;
}
iob.set_watches(IN_EVENTS);
// Temporarily Limit input if necessary
int in_flag = (outbuf_size < outbuf_limit) ? IN_EVENTS : 0;
iob.set_watches(in_flag);
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/includes/control.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class control_conn_t : private service_listener

// Buffer for outgoing packets. Each outgoing back is represented as a vector<char>.
list<vector<char>> outbuf;
// Current output buffer size in bytes.
unsigned outbuf_size = 0;
// Current index within the first outgoing packet (all previous bytes have been sent).
unsigned outpkt_index = 0;

Expand Down Expand Up @@ -181,7 +183,7 @@ class control_conn_t : private service_listener
bool data_ready() noexcept;

bool send_data() noexcept;

// Check if any dependents will be affected by stopping a service, generate a response packet if so.
// had_dependents will be set true if the service should not be stopped, false otherwise.
// Returns false if the connection must be closed, true otherwise.
Expand Down

0 comments on commit 8ea1bff

Please sign in to comment.