Skip to content

Commit

Permalink
[RFC] V2.1.1: support HAProxy PROXY protocol V1 on MySQL frontends
Browse files Browse the repository at this point in the history
This adds basic support for the HAProxy PROXY protocol V1 on MySQL frontend
client connections.  This allows the true client IP to be seen in the output
of `SHOW FULL PROCESSLIST` in ProxySQL's admin frontend when the ProxySQL
servers sit behind a load balancer such as HAProxy or AWS classic ELBs with
the PROXY protocol feature enabled.

I believe this should resolve #2497 (Support for proxy_protocol for proxysql behind aws load balancer).

The patch adds a Proxy_Protocol class which handles:

- parsing of PROXY protocol headers (only the V1 header supported for now)
- matching client IPs against a list of configured network CIDRs

A new option, `proxy_protocol_frontend_nets` (list of network CIDRs), is
introduced to allow the PROXY protocol feature to be selectively turned on for
specific subnets where e.g. load balancers are running.  This is similar to how
the PROXY protocol is implemented in MariaDB.

This variable can be set either in the config file or via the ProxySQL Admin
interface.  Configuration file example:

    mysql_variables=
    {
        ...
        proxy_protocol_frontend_nets="10.42.0.0/28 127.10.0.0/16"
        ...
    }

ProxySQL Admin interface example:

    UPDATE global_variables
    SET variable_value="192.168.42.0/24"
    WHERE variable_name="mysql-proxy_protocol_frontend_nets";
    LOAD MYSQL VARIABLES TO RUNTIME;

The network CIDRs are stored as a thread-local variable in the MySQL_Thread
class.  It's only updated by MySQL_Thread::refresh_variables().

A new MySQL Data Stream state (DSS), `STATE_PROXY_PROTOCOL`, is introduced to
handle the initial parsing of the PROXY protocol header.  Once this has
completed, the state is reset back to `STATE_SERVER_HANDSHAKE`.

The patch hooks into the MySQL_Thread's main loop where `accept()` is called
to handle incoming connections.  Next, the actual parsing of the PROXY protocol
header is invoked from the `MySQL_Data_Stream::read_pkts()` method, which is
also called from MySQL_Thread's main loop (where incoming data is handled).

I've only tested this with a HAProxy configuration block like:

    listen proxysql-with-proxy
        bind *:6030
        option tcp-check
        mode tcp
        timeout server 6h
        timeout client 6h
        balance roundrobin
        server proxysql-backend-1 127.0.0.1:6033 check send-proxy
    listen proxysql-without-proxy
        bind *:6031
        option tcp-check
        mode tcp
        timeout server 6h
        timeout client 6h
        balance roundrobin
        server proxysql-backend-2 192.168.1.123:6033 check
  • Loading branch information
noahwilliamsson committed Feb 28, 2021
1 parent 6d49f5f commit c3e187c
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 24 deletions.
1 change: 1 addition & 0 deletions include/MySQL_Data_Stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class MySQL_Data_Stream
int array2buffer_full();
void init(); // initialize the data stream
void init(enum MySQL_DS_type, MySQL_Session *, int); // initialize with arguments
void update_client_addr(struct sockaddr *);
void shut_soft();
void shut_hard();
int read_from_net();
Expand Down
7 changes: 7 additions & 0 deletions include/MySQL_Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "proxysql.h"
#include "cpp.h"
#include "MySQL_Variables.h"

#ifdef IDLE_THREADS
#include <sys/epoll.h>
#endif // IDLE_THREADS
Expand Down Expand Up @@ -424,6 +425,12 @@ class MySQL_Threads_Handler
bool sessions_sort;
char *default_schema;
char *interfaces;
/**
* List (",; ") of CIDR blocks for which PROXY protocol headers
* should be parsed for incoming MySQL frontend connections
* (mysql-proxy_protocol_frontend_nets)
*/
char *proxy_protocol_frontend_nets;
char *server_version;
char *keep_multiplexing_variables;
//unsigned int default_charset; // removed in 2.0.13 . Obsoleted previously using MySQL_Variables instead
Expand Down
22 changes: 22 additions & 0 deletions include/Proxy_Protocol.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#ifndef __CLASS_PROXY_PROTOCOL_H
#define __CLASS_PROXY_PROTOCOL_H

#include <vector>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>


class Proxy_Protocol {
private:
Proxy_Protocol() {}
~Proxy_Protocol() {}
static bool add_subnet(const char *cidr, std::vector<proxy_protocol_subnet_t> &subnets);

public:
static bool parse_subnets(const char *list, std::vector<proxy_protocol_subnet_t> &subnets);
static bool match_subnet(struct sockaddr *addr, socklen_t addrlen, std::vector<proxy_protocol_subnet_t> &subnets);
static bool parse_header(unsigned char *, size_t n, struct sockaddr_storage *out);
};

#endif /* __CLASS_PROXY_PROTOCOL_H */
8 changes: 8 additions & 0 deletions include/proxysql_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ typedef struct {
enum mysql_data_stream_status {
STATE_NOT_INITIALIZED,
STATE_NOT_CONNECTED,
/* parse HAProxy PROXY protocol header before server handshake */
STATE_PROXY_PROTOCOL,
STATE_SERVER_HANDSHAKE,
STATE_CLIENT_HANDSHAKE,
STATE_CLIENT_AUTH_OK,
Expand Down Expand Up @@ -406,6 +408,11 @@ typedef struct _PtrSize_t PtrSize_t;
typedef struct _proxysql_mysql_thread_t proxysql_mysql_thread_t;
typedef struct { char * table_name; char * table_def; } table_def_t;
typedef struct __SQP_query_parser_t SQP_par_t;
typedef struct {
unsigned short family;
unsigned short bits;
unsigned char addr[16];
} proxy_protocol_subnet_t;
#endif /* PROXYSQL_TYPEDEFS */

//#ifdef __cplusplus
Expand Down Expand Up @@ -973,6 +980,7 @@ extern __thread char * mysql_thread___monitor_replication_lag_use_percona_heartb

extern __thread char * mysql_thread___add_ldap_user_comment;


#ifdef DEBUG
extern __thread bool mysql_thread___session_debug;
#endif /* DEBUG */
Expand Down
2 changes: 1 addition & 1 deletion lib/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ default: libproxysql.a

_OBJ = c_tokenizer.o
OBJ = $(patsubst %,$(ODIR)/%,$(_OBJ))
_OBJ_CXX = ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo SpookyV2.oo MySQL_Authentication.oo gen_utils.oo sqlite3db.oo mysql_connection.oo MySQL_HostGroups_Manager.oo mysql_data_stream.oo MySQL_Thread.oo MySQL_Session.oo MySQL_Protocol.oo mysql_backend.oo Query_Processor.oo ProxySQL_Admin.oo ProxySQL_Config.oo ProxySQL_Restapi.oo MySQL_Monitor.oo MySQL_Logger.oo thread.oo MySQL_PreparedStatement.oo ProxySQL_Cluster.oo ClickHouse_Authentication.oo ClickHouse_Server.oo ProxySQL_Statistics.oo Chart_bundle_js.oo ProxySQL_HTTP_Server.oo ProxySQL_RESTAPI_Server.oo font-awesome.min.css.oo main-bundle.min.css.oo set_parser.oo MySQL_Variables.oo
_OBJ_CXX = ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo SpookyV2.oo MySQL_Authentication.oo gen_utils.oo sqlite3db.oo mysql_connection.oo MySQL_HostGroups_Manager.oo mysql_data_stream.oo MySQL_Thread.oo MySQL_Session.oo MySQL_Protocol.oo mysql_backend.oo Query_Processor.oo ProxySQL_Admin.oo ProxySQL_Config.oo ProxySQL_Restapi.oo MySQL_Monitor.oo MySQL_Logger.oo thread.oo MySQL_PreparedStatement.oo ProxySQL_Cluster.oo ClickHouse_Authentication.oo ClickHouse_Server.oo ProxySQL_Statistics.oo Chart_bundle_js.oo ProxySQL_HTTP_Server.oo ProxySQL_RESTAPI_Server.oo font-awesome.min.css.oo main-bundle.min.css.oo set_parser.oo MySQL_Variables.oo Proxy_Protocol.oo
OBJ_CXX = $(patsubst %,$(ODIR)/%,$(_OBJ_CXX))
HEADERS = ../include/*.h ../include/*.hpp

Expand Down
2 changes: 2 additions & 0 deletions lib/MySQL_Session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3399,6 +3399,8 @@ int MySQL_Session::get_pkts_from_client(bool& wrong_pass, PtrSize_t& pkt) {

case CONNECTING_CLIENT:
switch (client_myds->DSS) {
case STATE_PROXY_PROTOCOL:
break;
case STATE_SERVER_HANDSHAKE:
handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(&pkt, &wrong_pass);
break;
Expand Down
54 changes: 31 additions & 23 deletions lib/MySQL_Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "StatCounters.h"
#include "MySQL_PreparedStatement.h"
#include "MySQL_Logger.hpp"
#include "Proxy_Protocol.h"

#ifdef DEBUG
MySQL_Session *sess_stopat;
Expand Down Expand Up @@ -97,6 +98,8 @@ mythr_g_st_vars_t MySQL_Thread_status_variables_gauge_array[] {

extern mysql_variable_st mysql_tracked_variables[];

thread_local std::vector<proxy_protocol_subnet_t> mysql_thread___proxy_protocol_subnets;

const MARIADB_CHARSET_INFO * proxysql_find_charset_nr(unsigned int nr) {
const MARIADB_CHARSET_INFO * c = mariadb_compiled_charsets;
do {
Expand Down Expand Up @@ -439,6 +442,7 @@ static char * mysql_thread_variables_names[]= {
(char *)"have_compress",
(char *)"client_found_rows",
(char *)"interfaces",
(char *)"proxy_protocol_frontend_nets",
(char *)"monitor_enabled",
(char *)"monitor_history",
(char *)"monitor_connect_interval",
Expand Down Expand Up @@ -1104,6 +1108,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
variables.default_schema=strdup((char *)"information_schema");
variables.handle_unknown_charset=1;
variables.interfaces=strdup((char *)"");
variables.proxy_protocol_frontend_nets = strdup("");
variables.server_version=strdup((char *)"5.5.30");
variables.eventslog_filename=strdup((char *)""); // proxysql-mysql-eventslog is recommended
variables.eventslog_filesize=100*1024*1024;
Expand Down Expand Up @@ -1347,6 +1352,7 @@ char * MySQL_Threads_Handler::get_variable_string(char *name) {
if (!strcmp(name,"eventslog_filename")) return strdup(variables.eventslog_filename);
if (!strcmp(name,"auditlog_filename")) return strdup(variables.auditlog_filename);
if (!strcmp(name,"interfaces")) return strdup(variables.interfaces);
if (!strcmp(name, "proxy_protocol_frontend_nets")) return strdup(variables.proxy_protocol_frontend_nets);
if (!strcmp(name,"keep_multiplexing_variables")) return strdup(variables.keep_multiplexing_variables);
proxy_error("Not existing variable: %s\n", name); assert(0);
return NULL;
Expand Down Expand Up @@ -1648,6 +1654,7 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f
if (!strcasecmp(name,"default_schema")) return strdup(variables.default_schema);
if (!strcasecmp(name,"keep_multiplexing_variables")) return strdup(variables.keep_multiplexing_variables);
if (!strcasecmp(name,"interfaces")) return strdup(variables.interfaces);
if (!strcasecmp(name,"proxy_protocol_frontend_nets")) return strdup(variables.proxy_protocol_frontend_nets);
if (!strcasecmp(name,"server_capabilities")) {
// FIXME : make it human readable
sprintf(intbuf,"%d",variables.server_capabilities);
Expand Down Expand Up @@ -2965,6 +2972,11 @@ bool MySQL_Threads_Handler::set_variable(char *name, const char *value) { // thi
}
}
}
if (!strcasecmp(name,"proxy_protocol_frontend_nets")) {
free(variables.proxy_protocol_frontend_nets);
variables.proxy_protocol_frontend_nets=strdup(value);
return true;
}
if (!strcasecmp(name,"server_version")) {
if (vallen) {
free(variables.server_version);
Expand Down Expand Up @@ -3779,6 +3791,7 @@ MySQL_Threads_Handler::~MySQL_Threads_Handler() {
}
if (variables.default_schema) free(variables.default_schema);
if (variables.interfaces) free(variables.interfaces);
if (variables.proxy_protocol_frontend_nets) free(variables.proxy_protocol_frontend_nets);
if (variables.server_version) free(variables.server_version);
if (variables.keep_multiplexing_variables) free(variables.keep_multiplexing_variables);
if (variables.firewall_whitelist_errormsg) free(variables.firewall_whitelist_errormsg);
Expand Down Expand Up @@ -5099,6 +5112,11 @@ void MySQL_Thread::refresh_variables() {
#ifdef DEBUG
mysql_thread___session_debug=(bool)GloMTH->get_variable_int((char *)"session_debug");
#endif /* DEBUG */
{
char *s = GloMTH->get_variable_string((char *)"proxy_protocol_frontend_nets");
Proxy_Protocol::parse_subnets(s, mysql_thread___proxy_protocol_subnets);
free(s);
}
GloMTH->wrunlock();
pthread_mutex_unlock(&GloVars.global.ext_glomth_mutex);
}
Expand Down Expand Up @@ -5201,30 +5219,9 @@ void MySQL_Thread::listener_handle_new_connection(MySQL_Data_Stream *myds, unsig
if (__sync_add_and_fetch(&MyHGM->status.client_connections,1) > mysql_thread___max_connections) {
sess->max_connections_reached=true;
}
sess->client_myds->client_addrlen=addrlen;
sess->client_myds->client_addr=addr;

switch (sess->client_myds->client_addr->sa_family) {
case AF_INET: {
struct sockaddr_in *ipv4 = (struct sockaddr_in *)sess->client_myds->client_addr;
char buf[INET_ADDRSTRLEN];
inet_ntop(sess->client_myds->client_addr->sa_family, &ipv4->sin_addr, buf, INET_ADDRSTRLEN);
sess->client_myds->addr.addr = strdup(buf);
sess->client_myds->addr.port = htons(ipv4->sin_port);
break;
}
case AF_INET6: {
struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)sess->client_myds->client_addr;
char buf[INET6_ADDRSTRLEN];
inet_ntop(sess->client_myds->client_addr->sa_family, &ipv6->sin6_addr, buf, INET6_ADDRSTRLEN);
sess->client_myds->addr.addr = strdup(buf);
sess->client_myds->addr.port = htons(ipv6->sin6_port);
break;
}
default:
sess->client_myds->addr.addr = strdup("localhost");
break;
}
// Update client_myds' client IP and port details
sess->client_myds->update_client_addr(addr);

iface_info *ifi=NULL;
ifi=GloMTH->MLM_find_iface_from_fd(myds->fd); // here we try to get the info about the proxy bind address
Expand All @@ -5233,6 +5230,17 @@ void MySQL_Thread::listener_handle_new_connection(MySQL_Data_Stream *myds, unsig
sess->client_myds->proxy_addr.port=ifi->port;
}
sess->client_myds->myprot.generate_pkt_initial_handshake(true,NULL,NULL, &sess->thread_session_id, true);

// Check if the peer IP matches a subnet for which we should parse
// PROXY protocol headers
if (Proxy_Protocol::match_subnet(addr, addrlen, mysql_thread___proxy_protocol_subnets)) {
sess->set_status(CONNECTING_CLIENT);
// We'll change this back to STATE_SERVER_HANDSHAKE once the proxy protocol header is parsed
sess->client_myds->setDSS(STATE_PROXY_PROTOCOL);
}
//proxy_info("accepted from %s:%d, PROXY protocol: %s, client_myds:%p\n",
// sess->client_myds->addr.addr, sess->client_myds->addr.port, sess->client_myds->DSS == STATE_PROXY_PROTOCOL? "YES": "NO", sess->client_myds);

ioctl_FIONBIO(sess->client_myds->fd, 1);
mypolls.add(POLLIN|POLLOUT, sess->client_myds->fd, sess->client_myds, curtime);
proxy_debug(PROXY_DEBUG_NET,1,"Session=%p -- Adding client FD %d\n", sess, sess->client_myds->fd);
Expand Down
154 changes: 154 additions & 0 deletions lib/Proxy_Protocol.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
#include "proxysql.h"

#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include "Proxy_Protocol.h"


bool Proxy_Protocol::parse_subnets(const char *list, std::vector<proxy_protocol_subnet_t> &subnets) {
char *input = strdup(list), *cidr, *saveptr = NULL;
const char *delim = ",; ";

subnets.clear();
for(cidr = strtok_r(input, delim, &saveptr); cidr != NULL; cidr = strtok_r(NULL, delim, &saveptr)) {
if(!Proxy_Protocol::add_subnet(cidr, subnets)) {
proxy_warning("PROXY protocol subnet parsing failed, invalid CIDR: %s\n", cidr);
free(input);
return false;
};
}

free(input);
return true;
}

bool Proxy_Protocol::add_subnet(const char *cidr, std::vector<proxy_protocol_subnet_t> &subnets) {
char *addr = strdup(cidr), *mask;
proxy_protocol_subnet_t subnet = {
.family = AF_INET
};

if (strchr(addr, ':')) {
subnet.family = AF_INET6;
}

subnet.bits = (unsigned short)(subnet.family == AF_INET? 32: 128);
mask = strchr(addr, '/');
if (mask) {
*mask++ = '\0';
subnet.bits = (unsigned short)atoi(mask);
if (subnet.bits > (subnet.family == AF_INET? 32: 128)) {
free(addr);
return false;
}
}

if (inet_pton(subnet.family, addr, subnet.addr) == 1) {
subnets.push_back(subnet);
free(addr);
return true;
}

free(addr);
return false;
}

bool Proxy_Protocol::match_subnet(struct sockaddr *addr, socklen_t addrlen, std::vector<proxy_protocol_subnet_t> &subnets) {
struct sockaddr_in *sin;
struct sockaddr_in6 *sin6;
unsigned char *a1, *a2;

for (size_t i = 0; i < subnets.size(); i++) {
if (addr->sa_family != subnets[i].family) {
continue;
}

switch (addr->sa_family) {
case AF_INET:
sin = (struct sockaddr_in *)addr;
a1 = (unsigned char *)&(sin->sin_addr);
break;
case AF_INET6:
sin6 = (struct sockaddr_in6 *)addr;
a1 = (unsigned char *)&(sin6->sin6_addr);
break;
default:
return false;
break;
}
a2 = subnets[i].addr;

int n_bytes = subnets[i].bits / 8;
if (n_bytes && memcmp(a1, a2, n_bytes)) {
continue;
}

int n_bits = subnets[i].bits % 8;
if (n_bits) {
int mask = (1<<n_bits)-1;
if ((a1[n_bytes] & mask) != (a2[n_bytes] & mask)) {
continue;
}
}

return true;
}

return false;
}

// https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
// TODO: parse TCP6, parse V2 headers
bool Proxy_Protocol::parse_header(unsigned char *header, size_t n, struct sockaddr_storage *out) {
char src_ip[64], dst_ip[64];
int src_port, dst_port;

// skip parsing if there's less data than the minimum V1 header
if (n < 15) return false;
if (memcmp(header, "PROXY ", 6)) return false;

header[n - 2] = '\0'; // Zap CRLF
header[n - 1] = '\0';
header += 6;
if (!memcmp(header, "TCP4 ", 5)) {
struct sockaddr_in *sin = (struct sockaddr_in *)out;

header += 5;
if (sscanf((char *)header, "%15s %15s %d %d", src_ip, dst_ip, &src_port, &dst_port) != 4)
return false;
src_ip[15] = '\0';
dst_ip[15] = '\0';
sin->sin_family = AF_INET;
sin->sin_port = htons(src_port);
if (inet_pton(sin->sin_family, src_ip, (void *)&sin->sin_addr) != 1)
return false;
}
else if(!memcmp(header, "TCP6 ", 5)) {
struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)out;

header += 5;
if (sscanf((char *)header, "%63s %63s %d %d", src_ip, dst_ip, &src_port, &dst_port) != 4)
return false;
src_ip[sizeof(dst_ip)-1] = '\0';
dst_ip[sizeof(dst_ip)-1] = '\0';
sin6->sin6_family = AF_INET;
sin6->sin6_port = htons(src_port);
if (inet_pton(sin6->sin6_family, src_ip, (void *)&sin6->sin6_addr) != 1)
return false;
}
else if(!memcmp(header, "UNKNOWN", 7)) {
// not sure how to deal with this
out->ss_family = AF_UNIX;
return true;
}
else {
return false;
}

return true;
}
Loading

0 comments on commit c3e187c

Please sign in to comment.