From 8ea1bff201fae9d76daa11d14ff25a62903979f7 Mon Sep 17 00:00:00 2001 From: Mobin Date: Sun, 23 Jul 2023 16:31:43 +0330 Subject: [PATCH] control: Limit memory use by control connections Signed-off-by: Mobin "Hojjat" Aydinfar --- src/control.cc | 22 ++++++++++++++++++---- src/includes/control.h | 4 +++- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/control.cc b/src/control.cc index 6c1918ab..631b5ee4 100644 --- a/src/control.cc +++ b/src/control.cc @@ -18,6 +18,9 @@ namespace { constexpr auto OUT_EVENTS = dasynq::OUT_EVENTS; constexpr auto IN_EVENTS = dasynq::IN_EVENTS; + // Maximum buffer size for limiting "IN_EVENTS" + constexpr int outbuf_limit = 16384; // 16KB + // Control protocol minimum compatible version and current version: constexpr uint16_t min_compat_version = 1; constexpr uint16_t cp_version = 2; @@ -1135,7 +1138,8 @@ void control_conn_t::service_event(service_record *service, service_event_t even bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept { - int in_flag = bad_conn_close ? 0 : IN_EVENTS; + // Close connection on bad conn or temporarily Limit input if necessary + int in_flag = bad_conn_close || (outbuf_size >= outbuf_limit) ? 0 : IN_EVENTS; bool was_empty = outbuf.empty(); // If the queue is empty, we can try to write the packet out now rather than queueing it. @@ -1166,6 +1170,8 @@ bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept // Create a vector out of the (remaining part of the) packet: try { outbuf.emplace_back(pkt, pkt + size); + outbuf_size += size; + int in_flag = (outbuf_size >= outbuf_limit) ? 0 : IN_EVENTS; iob.set_watches(in_flag | OUT_EVENTS); return true; } @@ -1190,7 +1196,8 @@ bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept // make them extraordinary difficult to combine into a single method. bool control_conn_t::queue_packet(std::vector &&pkt) noexcept { - int in_flag = bad_conn_close ? 0 : IN_EVENTS; + // Close connection on bad conn or temporarily Limit input if necessary + int in_flag = bad_conn_close || (outbuf_size >= outbuf_limit) ? 0 : IN_EVENTS; bool was_empty = outbuf.empty(); if (was_empty) { @@ -1219,6 +1226,8 @@ bool control_conn_t::queue_packet(std::vector &&pkt) noexcept try { outbuf.emplace_back(std::move(pkt)); + outbuf_size += pkt.size(); + int in_flag = (outbuf_size >= outbuf_limit) ? 0 : IN_EVENTS; iob.set_watches(in_flag | OUT_EVENTS); return true; } @@ -1275,8 +1284,10 @@ bool control_conn_t::data_ready() noexcept iob.set_watches(OUT_EVENTS); } else { + // Temporarily Limit input if necessary + int in_flags = (outbuf_size < outbuf_limit) ? IN_EVENTS : 0; int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0; - iob.set_watches(IN_EVENTS | out_flags); + iob.set_watches(in_flags | out_flags); } return false; @@ -1314,6 +1325,7 @@ bool control_conn_t::send_data() noexcept outpkt_index += written; if (outpkt_index == pkt.size()) { // We've finished this packet, move on to the next: + outbuf_size -= pkt.size(); outbuf.pop_front(); outpkt_index = 0; if (oom_close) { @@ -1324,7 +1336,9 @@ bool control_conn_t::send_data() noexcept if (bad_conn_close) { return true; } - iob.set_watches(IN_EVENTS); + // Temporarily Limit input if necessary + int in_flag = (outbuf_size < outbuf_limit) ? IN_EVENTS : 0; + iob.set_watches(in_flag); } } diff --git a/src/includes/control.h b/src/includes/control.h index 82c4f02e..8520bd9a 100644 --- a/src/includes/control.h +++ b/src/includes/control.h @@ -111,6 +111,8 @@ class control_conn_t : private service_listener // Buffer for outgoing packets. Each outgoing back is represented as a vector. list> outbuf; + // Current output buffer size in bytes. + unsigned outbuf_size = 0; // Current index within the first outgoing packet (all previous bytes have been sent). unsigned outpkt_index = 0; @@ -181,7 +183,7 @@ class control_conn_t : private service_listener bool data_ready() noexcept; bool send_data() noexcept; - + // Check if any dependents will be affected by stopping a service, generate a response packet if so. // had_dependents will be set true if the service should not be stopped, false otherwise. // Returns false if the connection must be closed, true otherwise.