Skip to content

Commit

Permalink
Add TCP connection rate throttling to SvxReflector
Browse files Browse the repository at this point in the history
  • Loading branch information
sm0svx committed Jan 15, 2025
1 parent 203d09c commit 39a9d4b
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 71 deletions.
2 changes: 2 additions & 0 deletions src/async/ChangeLog
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
* Async::Config now have a mechanism for subscribing to changes for specific
configuration variables.

* Add connection rate throttling, using token buckets, to Async::TcpServer.



1.7.0 -- 25 Feb 2024
Expand Down
32 changes: 28 additions & 4 deletions src/async/core/AsyncTcpConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ to a remote host. See usage instructions in the class definition.
\verbatim
Async - A library for programming event driven applications
Copyright (C) 2003-2022 Tobias Blomberg
Copyright (C) 2003-2025 Tobias Blomberg
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -293,6 +293,21 @@ void TcpConnection::setSslContext(SslContext& ctx, bool is_server)
} /* TcpConnection::setSslContext */


void TcpConnection::freeze(void)
{
m_freezed = true;
m_wr_watch.setEnabled(false);
} /* TcpConnection::freeze */


void TcpConnection::unfreeze(void)
{
m_freezed = false;
m_wr_watch.setEnabled(!m_write_buf.empty());
processRecvBuf();
} /* TcpConnection::unfreeze */


Async::SslX509 TcpConnection::sslPeerCertificate(void)
{
#if OPENSSL_VERSION_NUMBER >= 0x30000000L
Expand Down Expand Up @@ -501,6 +516,15 @@ void TcpConnection::recvHandler(FdWatch *watch)
m_recv_buf.reserve(new_capacity);
}

if (!m_freezed)
{
processRecvBuf();
}
} /* TcpConnection::recvHandler */


void TcpConnection::processRecvBuf(void)
{
ssize_t processed = -1;
if (m_ssl != nullptr)
{
Expand Down Expand Up @@ -534,13 +558,13 @@ void TcpConnection::recvHandler(FdWatch *watch)
onDisconnected(DR_PROTOCOL_ERROR);
}
}
} /* TcpConnection::recvHandler */
} /* TcpConnection::processRecvBuf */


void TcpConnection::addToWriteBuf(const char *buf, size_t len)
{
m_write_buf.insert(m_write_buf.end(), buf, buf+len);
m_wr_watch.setEnabled(!m_write_buf.empty());
m_wr_watch.setEnabled(!m_freezed && !m_write_buf.empty());
} /* TcpConnection::addToWriteBuf */


Expand Down Expand Up @@ -699,7 +723,7 @@ int TcpConnection::sslRecvHandler(char* src, int count)
}

return (orig_count - count);
} /* TcpConnection::readHandler */
} /* TcpConnection::sslRecvHandler */


enum TcpConnection::SslStatus TcpConnection::sslDoHandshake(void)
Expand Down
22 changes: 21 additions & 1 deletion src/async/core/AsyncTcpConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ to a remote host. See usage instructions in the class definition.
\verbatim
Async - A library for programming event driven applications
Copyright (C) 2003-2022 Tobias Blomberg
Copyright (C) 2003-2025 Tobias Blomberg
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -328,6 +328,23 @@ class TcpConnection : virtual public sigc::trackable

bool isServer(void) const { return m_ssl_is_server; }

/**
* @brief Stop all communication
*
* When a connection is freezed incoming data will be buffered but the
* dataReceived signal will not be emitted. Written data is also buffered.
*/
void freeze(void);

/**
* @brief Reenable all communication
*
* When a frozen connection is unfreezed the dataReceived signal will be
* emitted for any received and buffered data. If there are any written
* data that has been buffered it will be transmitted.
*/
void unfreeze(void);

/**
* @brief Get common name for the SSL connection
* @return Returns the common name for the associated X509 certificate
Expand Down Expand Up @@ -515,6 +532,8 @@ class TcpConnection : virtual public sigc::trackable
BIO* m_ssl_wr_bio = nullptr; // SSL writes, we read
std::vector<char> m_ssl_encrypt_buf;

bool m_freezed = false;

static TcpConnection* lookupConnection(SSL* ssl)
{
auto it = ssl_con_map.find(ssl);
Expand All @@ -524,6 +543,7 @@ class TcpConnection : virtual public sigc::trackable
X509_STORE_CTX* x509_store_ctx);

void recvHandler(FdWatch *watch);
void processRecvBuf(void);
void addToWriteBuf(const char *buf, size_t len);
void onWriteSpaceAvailable(Async::FdWatch* w);
int rawWrite(const void* buf, int count);
Expand Down
46 changes: 38 additions & 8 deletions src/async/core/AsyncTcpServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ This class is used to create a TCP server that listens to a TCP-port.
\verbatim
Async - A library for programming event driven applications
Copyright (C) 2003-2014 Tobias Blomberg / SM0SVX
Copyright (C) 2003-2025 Tobias Blomberg / SM0SVX
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -170,20 +170,50 @@ class TcpServer : public TcpServerBase

protected:
virtual void createConnection(int sock, const IpAddress& remote_addr,
uint16_t remote_port)
uint16_t remote_port) override
{
ConT *con = new ConT(sock, remote_addr, remote_port);
ConT *con = new TcpServerConnection(sock, remote_addr, remote_port);
con->disconnected.connect(
mem_fun(*this, &TcpServer<ConT>::onDisconnected));
addConnection(con);
clientConnected(con);
}


virtual void emitClientConnected(TcpConnection *con_base) override
{
auto con = dynamic_cast<TcpServerConnection*>(con_base);
con->m_is_connected = true;
clientConnected(reinterpret_cast<ConT*>(con));
}

private:
void onDisconnected(ConT *con, typename ConT::DisconnectReason reason)
struct TcpServerConnection : public ConT
{
TcpServerConnection(int sock, const IpAddress& remote_addr,
uint16_t remote_port)
: ConT(sock, remote_addr, remote_port)
{
}
virtual TcpConnection& operator=(TcpConnection&& other_base) override
{
this->TcpConnection::operator=(std::move(other_base));
auto& other = dynamic_cast<TcpServerConnection&>(other_base);
m_is_connected = other.m_is_connected;
m_is_connected = false;
return *this;
}

bool m_is_connected = false;
};

void onDisconnected(ConT *con_base, typename ConT::DisconnectReason reason)
{
clientDisconnected(con, reason);
removeConnection(con);
auto con = dynamic_cast<TcpServerConnection*>(con_base);
if (con->m_is_connected)
{
con->m_is_connected = false;
clientDisconnected(con_base, reason);
}
removeConnection(con_base);
}

}; /* class TcpServer */
Expand Down
Loading

0 comments on commit 39a9d4b

Please sign in to comment.