From acbd941be43c86c4d45d41ba740f55d82171fbe6 Mon Sep 17 00:00:00 2001 From: Mobin Date: Sun, 23 Jul 2023 16:31:43 +0330 Subject: [PATCH] control: Limit memory use by control connections Signed-off-by: Mobin "Hojjat" Aydinfar --- src/control.cc | 29 +++++++++++++++++++++++++---- src/includes/control.h | 9 +++++++++ 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/src/control.cc b/src/control.cc index 6c1918ab..094bbc84 100644 --- a/src/control.cc +++ b/src/control.cc @@ -1135,7 +1135,7 @@ void control_conn_t::service_event(service_record *service, service_event_t even bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept { - int in_flag = bad_conn_close ? 0 : IN_EVENTS; + int in_flag = bad_conn_close || input_memlimit() ? 0 : IN_EVENTS; bool was_empty = outbuf.empty(); // If the queue is empty, we can try to write the packet out now rather than queueing it. @@ -1166,6 +1166,7 @@ bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept // Create a vector out of the (remaining part of the) packet: try { outbuf.emplace_back(pkt, pkt + size); + outbuf_size += size; iob.set_watches(in_flag | OUT_EVENTS); return true; } @@ -1190,7 +1191,7 @@ bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept // make them extraordinary difficult to combine into a single method. bool control_conn_t::queue_packet(std::vector &&pkt) noexcept { - int in_flag = bad_conn_close ? 0 : IN_EVENTS; + int in_flag = bad_conn_close || input_memlimit() ? 0 : IN_EVENTS; bool was_empty = outbuf.empty(); if (was_empty) { @@ -1219,6 +1220,7 @@ bool control_conn_t::queue_packet(std::vector &&pkt) noexcept try { outbuf.emplace_back(std::move(pkt)); + outbuf_size += pkt.size(); iob.set_watches(in_flag | OUT_EVENTS); return true; } @@ -1275,8 +1277,9 @@ bool control_conn_t::data_ready() noexcept iob.set_watches(OUT_EVENTS); } else { + int in_flags = !input_memlimit() ? IN_EVENTS : 0; int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0; - iob.set_watches(IN_EVENTS | out_flags); + iob.set_watches(in_flags | out_flags); } return false; @@ -1314,6 +1317,7 @@ bool control_conn_t::send_data() noexcept outpkt_index += written; if (outpkt_index == pkt.size()) { // We've finished this packet, move on to the next: + outbuf_size -= pkt.size(); outbuf.pop_front(); outpkt_index = 0; if (oom_close) { @@ -1324,7 +1328,8 @@ bool control_conn_t::send_data() noexcept if (bad_conn_close) { return true; } - iob.set_watches(IN_EVENTS); + int in_flag = !input_memlimit() ? IN_EVENTS : 0; + iob.set_watches(in_flag); } } @@ -1332,6 +1337,22 @@ bool control_conn_t::send_data() noexcept return false; } +bool control_conn_t::input_memlimit() noexcept +{ + if (outbuf_size >= 1000) { + // // Outbuf contains much data, Temporarily disable "IN_EVENTS" to reduce it + if (!in_memlimit) { + in_memlimit = true; + } + return true; + } + else if (in_memlimit) { + // if "IN_EVENTS" disabled in previous packet and outbuf size has reduced, enable it again + in_memlimit = false; + } + return false; +} + control_conn_t::~control_conn_t() noexcept { int fd = iob.get_watched_fd(); diff --git a/src/includes/control.h b/src/includes/control.h index 82c4f02e..3d06cdaa 100644 --- a/src/includes/control.h +++ b/src/includes/control.h @@ -111,8 +111,14 @@ class control_conn_t : private service_listener // Buffer for outgoing packets. Each outgoing back is represented as a vector. list> outbuf; + // Output buffer size. functions will increase this value when pushing something into + // outbuf and lowering this value when something get removed from outbuf. + unsigned outbuf_size = 0; // Current index within the first outgoing packet (all previous bytes have been sent). unsigned outpkt_index = 0; + // If outbuf is almost full, We will disable "IN_EVENTS" and set this variable as "true" + // and vice-versa + bool in_memlimit = false; // Queue a packet to be sent // Returns: false if the packet could not be queued and a suitable error packet @@ -181,6 +187,9 @@ class control_conn_t : private service_listener bool data_ready() noexcept; bool send_data() noexcept; + + // Check the output buffer size and limit inputs if necessary + bool input_memlimit() noexcept; // Check if any dependents will be affected by stopping a service, generate a response packet if so. // had_dependents will be set true if the service should not be stopped, false otherwise.