Skip to content

Commit

Permalink
control: Limit memory use by control connections
Browse files Browse the repository at this point in the history
Signed-off-by: Mobin "Hojjat" Aydinfar <[email protected]>
  • Loading branch information
mobin-2008 committed Jul 24, 2023
1 parent 7d491be commit acbd941
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 4 deletions.
29 changes: 25 additions & 4 deletions src/control.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,7 @@ void control_conn_t::service_event(service_record *service, service_event_t even

bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept
{
int in_flag = bad_conn_close ? 0 : IN_EVENTS;
int in_flag = bad_conn_close || input_memlimit() ? 0 : IN_EVENTS;
bool was_empty = outbuf.empty();

// If the queue is empty, we can try to write the packet out now rather than queueing it.
Expand Down Expand Up @@ -1166,6 +1166,7 @@ bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept
// Create a vector out of the (remaining part of the) packet:
try {
outbuf.emplace_back(pkt, pkt + size);
outbuf_size += size;
iob.set_watches(in_flag | OUT_EVENTS);
return true;
}
Expand All @@ -1190,7 +1191,7 @@ bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept
// make them extraordinary difficult to combine into a single method.
bool control_conn_t::queue_packet(std::vector<char> &&pkt) noexcept
{
int in_flag = bad_conn_close ? 0 : IN_EVENTS;
int in_flag = bad_conn_close || input_memlimit() ? 0 : IN_EVENTS;
bool was_empty = outbuf.empty();

if (was_empty) {
Expand Down Expand Up @@ -1219,6 +1220,7 @@ bool control_conn_t::queue_packet(std::vector<char> &&pkt) noexcept

try {
outbuf.emplace_back(std::move(pkt));
outbuf_size += pkt.size();
iob.set_watches(in_flag | OUT_EVENTS);
return true;
}
Expand Down Expand Up @@ -1275,8 +1277,9 @@ bool control_conn_t::data_ready() noexcept
iob.set_watches(OUT_EVENTS);
}
else {
int in_flags = !input_memlimit() ? IN_EVENTS : 0;
int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0;
iob.set_watches(IN_EVENTS | out_flags);
iob.set_watches(in_flags | out_flags);
}

return false;
Expand Down Expand Up @@ -1314,6 +1317,7 @@ bool control_conn_t::send_data() noexcept
outpkt_index += written;
if (outpkt_index == pkt.size()) {
// We've finished this packet, move on to the next:
outbuf_size -= pkt.size();
outbuf.pop_front();
outpkt_index = 0;
if (oom_close) {
Expand All @@ -1324,14 +1328,31 @@ bool control_conn_t::send_data() noexcept
if (bad_conn_close) {
return true;
}
iob.set_watches(IN_EVENTS);
int in_flag = !input_memlimit() ? IN_EVENTS : 0;
iob.set_watches(in_flag);
}
}

// more to send
return false;
}

bool control_conn_t::input_memlimit() noexcept
{
if (outbuf_size >= 1000) {
// // Outbuf contains much data, Temporarily disable "IN_EVENTS" to reduce it
if (!in_memlimit) {
in_memlimit = true;
}
return true;
}
else if (in_memlimit) {
// if "IN_EVENTS" disabled in previous packet and outbuf size has reduced, enable it again
in_memlimit = false;
}
return false;
}

control_conn_t::~control_conn_t() noexcept
{
int fd = iob.get_watched_fd();
Expand Down
9 changes: 9 additions & 0 deletions src/includes/control.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,14 @@ class control_conn_t : private service_listener

// Buffer for outgoing packets. Each outgoing back is represented as a vector<char>.
list<vector<char>> outbuf;
// Output buffer size. functions will increase this value when pushing something into
// outbuf and lowering this value when something get removed from outbuf.
unsigned outbuf_size = 0;
// Current index within the first outgoing packet (all previous bytes have been sent).
unsigned outpkt_index = 0;
// If outbuf is almost full, We will disable "IN_EVENTS" and set this variable as "true"
// and vice-versa
bool in_memlimit = false;

// Queue a packet to be sent
// Returns: false if the packet could not be queued and a suitable error packet
Expand Down Expand Up @@ -181,6 +187,9 @@ class control_conn_t : private service_listener
bool data_ready() noexcept;

bool send_data() noexcept;

// Check the output buffer size and limit inputs if necessary
bool input_memlimit() noexcept;

// Check if any dependents will be affected by stopping a service, generate a response packet if so.
// had_dependents will be set true if the service should not be stopped, false otherwise.
Expand Down

0 comments on commit acbd941

Please sign in to comment.