diff --git a/include/MySQL_Data_Stream.h b/include/MySQL_Data_Stream.h index 9a8047d2c1..64ca5040f6 100644 --- a/include/MySQL_Data_Stream.h +++ b/include/MySQL_Data_Stream.h @@ -157,6 +157,7 @@ class MySQL_Data_Stream int array2buffer_full(); void init(); // initialize the data stream void init(enum MySQL_DS_type, MySQL_Session *, int); // initialize with arguments + void update_client_addr(struct sockaddr *); void shut_soft(); void shut_hard(); int read_from_net(); diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 4554c9d687..58f9cbb239 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -4,6 +4,8 @@ #include "proxysql.h" #include "cpp.h" #include "MySQL_Variables.h" +#include "Proxy_Protocol.h" + #ifdef IDLE_THREADS #include #endif // IDLE_THREADS @@ -305,6 +307,12 @@ class MySQL_Threads_Handler bool sessions_sort; char *default_schema; char *interfaces; + /** + * List (",; ") of CIDR blocks for which PROXY protocol headers + * should be parsed for incoming MySQL frontend connections + * (mysql-proxy_protocol_frontend_nets) + */ + char *proxy_protocol_frontend_nets; char *server_version; char *keep_multiplexing_variables; //unsigned int default_charset; // removed in 2.0.13 . Obsoleted previously using MySQL_Variables instead diff --git a/include/Proxy_Protocol.h b/include/Proxy_Protocol.h new file mode 100644 index 0000000000..2eb98ffb79 --- /dev/null +++ b/include/Proxy_Protocol.h @@ -0,0 +1,22 @@ +#ifndef __CLASS_PROXY_PROTOCOL_H +#define __CLASS_PROXY_PROTOCOL_H + +#include +#include +#include +#include + + +class Proxy_Protocol { + private: + static bool add_subnet(const char *cidr, std::vector &subnets); + + public: + Proxy_Protocol() {} + ~Proxy_Protocol() {} + static bool parse_subnets(const char *list, std::vector &subnets); + static bool match_subnet(struct sockaddr *addr, socklen_t addrlen, std::vector &subnets); + static bool parse_header(unsigned char *, size_t n, struct sockaddr_storage *out); +}; + +#endif /* __CLASS_PROXY_PROTOCOL_H */ diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index e9ca940fb1..b5c57e437a 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -219,6 +219,8 @@ typedef struct { enum mysql_data_stream_status { STATE_NOT_INITIALIZED, STATE_NOT_CONNECTED, + /* parse HAProxy PROXY protocol header before server handshake */ + STATE_PROXY_PROTOCOL, STATE_SERVER_HANDSHAKE, STATE_CLIENT_HANDSHAKE, STATE_CLIENT_AUTH_OK, @@ -406,6 +408,11 @@ typedef struct _PtrSize_t PtrSize_t; typedef struct _proxysql_mysql_thread_t proxysql_mysql_thread_t; typedef struct { char * table_name; char * table_def; } table_def_t; typedef struct __SQP_query_parser_t SQP_par_t; +typedef struct { + unsigned short family; + unsigned short bits; + unsigned char addr[16]; +} proxy_protocol_subnet_t; #endif /* PROXYSQL_TYPEDEFS */ //#ifdef __cplusplus @@ -961,6 +968,7 @@ extern __thread char * mysql_thread___monitor_replication_lag_use_percona_heartb extern __thread char * mysql_thread___add_ldap_user_comment; + #ifdef DEBUG extern __thread bool mysql_thread___session_debug; #endif /* DEBUG */ diff --git a/lib/Makefile b/lib/Makefile index db3b10d85c..c41c4b91b5 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -98,7 +98,7 @@ default: libproxysql.a _OBJ = c_tokenizer.o OBJ = $(patsubst %,$(ODIR)/%,$(_OBJ)) -_OBJ_CXX = ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo SpookyV2.oo MySQL_Authentication.oo gen_utils.oo sqlite3db.oo mysql_connection.oo MySQL_HostGroups_Manager.oo mysql_data_stream.oo MySQL_Thread.oo MySQL_Session.oo MySQL_Protocol.oo mysql_backend.oo Query_Processor.oo ProxySQL_Admin.oo ProxySQL_Config.oo ProxySQL_Restapi.oo MySQL_Monitor.oo MySQL_Logger.oo thread.oo MySQL_PreparedStatement.oo ProxySQL_Cluster.oo ClickHouse_Authentication.oo ClickHouse_Server.oo ProxySQL_Statistics.oo Chart_bundle_js.oo ProxySQL_HTTP_Server.oo ProxySQL_RESTAPI_Server.oo font-awesome.min.css.oo main-bundle.min.css.oo set_parser.oo MySQL_Variables.oo +_OBJ_CXX = ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo SpookyV2.oo MySQL_Authentication.oo gen_utils.oo sqlite3db.oo mysql_connection.oo MySQL_HostGroups_Manager.oo mysql_data_stream.oo MySQL_Thread.oo MySQL_Session.oo MySQL_Protocol.oo mysql_backend.oo Query_Processor.oo ProxySQL_Admin.oo ProxySQL_Config.oo ProxySQL_Restapi.oo MySQL_Monitor.oo MySQL_Logger.oo thread.oo MySQL_PreparedStatement.oo ProxySQL_Cluster.oo ClickHouse_Authentication.oo ClickHouse_Server.oo ProxySQL_Statistics.oo Chart_bundle_js.oo ProxySQL_HTTP_Server.oo ProxySQL_RESTAPI_Server.oo font-awesome.min.css.oo main-bundle.min.css.oo set_parser.oo MySQL_Variables.oo Proxy_Protocol.oo OBJ_CXX = $(patsubst %,$(ODIR)/%,$(_OBJ_CXX)) HEADERS = ../include/*.h ../include/*.hpp diff --git a/lib/MySQL_Protocol.cpp b/lib/MySQL_Protocol.cpp index bf5ed96e23..eabe94bba9 100644 --- a/lib/MySQL_Protocol.cpp +++ b/lib/MySQL_Protocol.cpp @@ -1,5 +1,9 @@ #include #include "proxysql.h" +#if !defined(__FreeBSD__) && !defined(__APPLE__) +#define HAVE_BOOL +#endif +#include "ma_global.h" #include "cpp.h" #include "MySQL_PreparedStatement.h" diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index fccdd340bd..fff2cf7d3b 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -2809,6 +2809,8 @@ int MySQL_Session::handler() { case CONNECTING_CLIENT: switch (client_myds->DSS) { + case STATE_PROXY_PROTOCOL: + break; case STATE_SERVER_HANDSHAKE: handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(&pkt, &wrong_pass); break; diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index a6a8500224..c0acc47d66 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -14,6 +14,7 @@ #include "StatCounters.h" #include "MySQL_PreparedStatement.h" #include "MySQL_Logger.hpp" +#include "Proxy_Protocol.h" #ifdef DEBUG MySQL_Session *sess_stopat; @@ -36,6 +37,10 @@ extern MySQL_Logger *GloMyLogger; extern mysql_variable_st mysql_tracked_variables[]; + +thread_local std::vector mysql_thread___proxy_protocol_subnets; + + const MARIADB_CHARSET_INFO * proxysql_find_charset_nr(unsigned int nr) { const MARIADB_CHARSET_INFO * c = mariadb_compiled_charsets; do { @@ -375,6 +380,7 @@ static char * mysql_thread_variables_names[]= { (char *)"have_compress", (char *)"client_found_rows", (char *)"interfaces", + (char *)"proxy_protocol_frontend_nets", (char *)"monitor_enabled", (char *)"monitor_history", (char *)"monitor_connect_interval", @@ -596,6 +602,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.default_schema=strdup((char *)"information_schema"); variables.handle_unknown_charset=1; variables.interfaces=strdup((char *)""); + variables.proxy_protocol_frontend_nets = strdup(""); variables.server_version=strdup((char *)"5.5.30"); variables.eventslog_filename=strdup((char *)""); // proxysql-mysql-eventslog is recommended variables.eventslog_filesize=100*1024*1024; @@ -822,6 +829,7 @@ char * MySQL_Threads_Handler::get_variable_string(char *name) { if (!strcmp(name,"eventslog_filename")) return strdup(variables.eventslog_filename); if (!strcmp(name,"auditlog_filename")) return strdup(variables.auditlog_filename); if (!strcmp(name,"interfaces")) return strdup(variables.interfaces); + if (!strcmp(name, "proxy_protocol_frontend_nets")) return strdup(variables.proxy_protocol_frontend_nets); if (!strcmp(name,"keep_multiplexing_variables")) return strdup(variables.keep_multiplexing_variables); proxy_error("Not existing variable: %s\n", name); assert(0); return NULL; @@ -1117,6 +1125,7 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f if (!strcasecmp(name,"default_schema")) return strdup(variables.default_schema); if (!strcasecmp(name,"keep_multiplexing_variables")) return strdup(variables.keep_multiplexing_variables); if (!strcasecmp(name,"interfaces")) return strdup(variables.interfaces); + if (!strcasecmp(name,"proxy_protocol_frontend_nets")) return strdup(variables.proxy_protocol_frontend_nets); if (!strcasecmp(name,"server_capabilities")) { // FIXME : make it human readable sprintf(intbuf,"%d",variables.server_capabilities); @@ -2377,6 +2386,11 @@ bool MySQL_Threads_Handler::set_variable(char *name, const char *value) { // thi } } } + if (!strcasecmp(name,"proxy_protocol_frontend_nets")) { + free(variables.proxy_protocol_frontend_nets); + variables.proxy_protocol_frontend_nets=strdup(value); + return true; + } if (!strcasecmp(name,"server_version")) { if (vallen) { free(variables.server_version); @@ -3150,6 +3164,7 @@ void MySQL_Threads_Handler::stop_listeners() { free_tokenizer( &tok ); } + MySQL_Threads_Handler::~MySQL_Threads_Handler() { if (variables.monitor_username) { free(variables.monitor_username); variables.monitor_username=NULL; } if (variables.monitor_password) { free(variables.monitor_password); variables.monitor_password=NULL; } @@ -3159,6 +3174,7 @@ MySQL_Threads_Handler::~MySQL_Threads_Handler() { } if (variables.default_schema) free(variables.default_schema); if (variables.interfaces) free(variables.interfaces); + free(variables.proxy_protocol_frontend_nets); if (variables.server_version) free(variables.server_version); if (variables.keep_multiplexing_variables) free(variables.keep_multiplexing_variables); if (variables.firewall_whitelist_errormsg) free(variables.firewall_whitelist_errormsg); @@ -4062,134 +4078,137 @@ void MySQL_Thread::run() { } bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n) { - if (mypolls.fds[n].revents) { + if (mypolls.fds[n].revents) { #ifdef IDLE_THREADS - if (myds->myds_type==MYDS_FRONTEND) { - if (epoll_thread) { - mypolls.remove_index_fast(n); - myds->mypolls=NULL; - unsigned int i; - for (i=0;ilen;i++) { - MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(i); - if (mysess==myds->sess) { - mysess->thread=NULL; - unregister_session(i); - //exit_cond=true; - resume_mysql_sessions->add(myds->sess); - return false; - } - } - } + if (myds->myds_type==MYDS_FRONTEND) { + if (epoll_thread) { + mypolls.remove_index_fast(n); + myds->mypolls=NULL; + unsigned int i; + for (i=0;ilen;i++) { + MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(i); + if (mysess==myds->sess) { + mysess->thread=NULL; + unregister_session(i); + //exit_cond=true; + resume_mysql_sessions->add(myds->sess); + return false; } + } + } + } #endif // IDLE_THREADS - mypolls.last_recv[n]=curtime; - myds->revents=mypolls.fds[n].revents; - myds->sess->to_process=1; - assert(myds->sess->status!=NONE); - } else { - // no events - if (myds->wait_until && curtime > myds->wait_until) { - // timeout - myds->sess->to_process=1; - assert(myds->sess->status!=NONE); - } else { - if (myds->sess->pause_until && curtime > myds->sess->pause_until) { - // timeout - myds->sess->to_process=1; - } - } + mypolls.last_recv[n]=curtime; + myds->revents=mypolls.fds[n].revents; + myds->sess->to_process=1; + assert(myds->sess->status!=NONE); + } else { + // no events + if (myds->wait_until && curtime > myds->wait_until) { + // timeout + myds->sess->to_process=1; + assert(myds->sess->status!=NONE); + } else { + if (myds->sess->pause_until && curtime > myds->sess->pause_until) { + // timeout + myds->sess->to_process=1; + } + } + } + + if (myds->myds_type==MYDS_BACKEND && myds->sess->status!=FAST_FORWARD) { + if (mypolls.fds[n].revents) { + // this part of the code fixes an important bug + // if a connection in use but idle (ex: running a transaction) + // get data, immediately destroy the session + // + // this can happen, for example, with a low wait_timeout and running transaction + if (myds->sess->status==WAITING_CLIENT_DATA) { + if (myds->myconn->async_state_machine==ASYNC_IDLE) { + proxy_warning("Detected broken idle connection on %s:%d\n", myds->myconn->parent->address, myds->myconn->parent->port); + myds->destroy_MySQL_Connection_From_Pool(false); + myds->sess->set_unhealthy(); + return false; } - if (myds->myds_type==MYDS_BACKEND && myds->sess->status!=FAST_FORWARD) { - if (mypolls.fds[n].revents) { - // this part of the code fixes an important bug - // if a connection in use but idle (ex: running a transaction) - // get data, immediately destroy the session - // - // this can happen, for example, with a low wait_timeout and running transaction - if (myds->sess->status==WAITING_CLIENT_DATA) { - if (myds->myconn->async_state_machine==ASYNC_IDLE) { - proxy_warning("Detected broken idle connection on %s:%d\n", myds->myconn->parent->address, myds->myconn->parent->port); - myds->destroy_MySQL_Connection_From_Pool(false); - myds->sess->set_unhealthy(); - return false; - } + } + } + return true; + } + + if (mypolls.fds[n].revents) { + if (mypolls.myds[n]->DSS < STATE_MARIADB_BEGIN || mypolls.myds[n]->DSS > STATE_MARIADB_END) { + // only if we aren't using MariaDB Client Library + int rb = 0; + do { + rb = myds->read_from_net(); + if (rb > 0 && myds->myds_type == MYDS_FRONTEND) { + status_variables.queries_frontends_bytes_recv += rb; + } + + myds->read_pkts(); + if (rb > 0 && myds->myds_type == MYDS_BACKEND) { + if (myds->sess->session_fast_forward) { + struct pollfd _fds; + nfds_t _nfds = 1; + _fds.fd = mypolls.fds[n].fd; + _fds.events = POLLIN; + _fds.revents = 0; + int _rc = poll(&_fds, _nfds, 0); + if ((_rc > 0) && _fds.revents == POLLIN) { + // there is more data + myds->revents = _fds.revents; + } else { + rb = 0; // exit loop } + } else { + rb = 0; // exit loop } - return true; - } - if (mypolls.fds[n].revents) { - if (mypolls.myds[n]->DSS < STATE_MARIADB_BEGIN || mypolls.myds[n]->DSS > STATE_MARIADB_END) { - // only if we aren't using MariaDB Client Library - int rb = 0; - do { - rb = myds->read_from_net(); - if (rb > 0 && myds->myds_type == MYDS_FRONTEND) { - status_variables.queries_frontends_bytes_recv += rb; - } - myds->read_pkts(); - - if (rb > 0 && myds->myds_type == MYDS_BACKEND) { - if (myds->sess->session_fast_forward) { - struct pollfd _fds; - nfds_t _nfds = 1; - _fds.fd = mypolls.fds[n].fd; - _fds.events = POLLIN; - _fds.revents = 0; - int _rc = poll(&_fds, _nfds, 0); - if ((_rc > 0) && _fds.revents == POLLIN) { - // there is more data - myds->revents = _fds.revents; - } else { - rb = 0; // exit loop - } - } else { - rb = 0; // exit loop - } - } else { - bool set_rb_zero = true; - if (rb > 0 && myds->myds_type == MYDS_FRONTEND) { - if (myds->encrypted == true) { - if (SSL_is_init_finished(myds->ssl)) { - if (myds->data_in_rbio()) { - set_rb_zero = false; - } - } - } + } else { + bool set_rb_zero = true; + if (rb > 0 && myds->myds_type == MYDS_FRONTEND) { + if (myds->encrypted == true) { + if (SSL_is_init_finished(myds->ssl)) { + if (myds->data_in_rbio()) { + set_rb_zero = false; } - if (set_rb_zero) - rb = 0; // exit loop } - } while (rb > 0); - - } else { - if (mypolls.fds[n].revents) { - myds->myconn->handler(mypolls.fds[n].revents); } } - if ( (mypolls.fds[n].events & POLLOUT) - && - ( (mypolls.fds[n].revents & POLLERR) || (mypolls.fds[n].revents & POLLHUP) ) - ) { - myds->set_net_failure(); - } - myds->check_data_flow(); + if (set_rb_zero) + rb = 0; // exit loop } + } while (rb > 0); + } else { + if (mypolls.fds[n].revents) { + myds->myconn->handler(mypolls.fds[n].revents); + } + } + if ( (mypolls.fds[n].events & POLLOUT) + && + ( (mypolls.fds[n].revents & POLLERR) || (mypolls.fds[n].revents & POLLHUP) ) + ) { + myds->set_net_failure(); + } + myds->check_data_flow(); + } - if (myds->active==0) { - if (myds->sess->client_myds==myds) { - proxy_debug(PROXY_DEBUG_NET,1, "Session=%p, DataStream=%p -- Deleting FD %d\n", myds->sess, myds, myds->fd); - myds->sess->set_unhealthy(); - } else { - // if this is a backend with fast_forward, set unhealthy - // if this is a backend without fast_forward, do not set unhealthy: it will be handled by client library - if (myds->sess->session_fast_forward) { // if fast forward - if (myds->myds_type==MYDS_BACKEND) { // and backend - myds->sess->set_unhealthy(); // set unhealthy - } - } - } + + if (myds->active==0) { + if (myds->sess->client_myds==myds) { + proxy_debug(PROXY_DEBUG_NET,1, "Session=%p, DataStream=%p -- Deleting FD %d\n", myds->sess, myds, myds->fd); + myds->sess->set_unhealthy(); + } else { + // if this is a backend with fast_forward, set unhealthy + // if this is a backend without fast_forward, do not set unhealthy: it will be handled by client library + if (myds->sess->session_fast_forward) { // if fast forward + if (myds->myds_type==MYDS_BACKEND) { // and backend + myds->sess->set_unhealthy(); // set unhealthy } + } + } + } + return true; } @@ -4538,6 +4557,12 @@ void MySQL_Thread::refresh_variables() { #ifdef DEBUG mysql_thread___session_debug=(bool)GloMTH->get_variable_int((char *)"session_debug"); #endif /* DEBUG */ + { + char *s = GloMTH->get_variable_string((char *)"proxy_protocol_frontend_nets"); + Proxy_Protocol::parse_subnets(s, mysql_thread___proxy_protocol_subnets); + free(s); + } + GloMTH->wrunlock(); pthread_mutex_unlock(&GloVars.global.ext_glomth_mutex); } @@ -4678,38 +4703,29 @@ void MySQL_Thread::listener_handle_new_connection(MySQL_Data_Stream *myds, unsig if (__sync_add_and_fetch(&MyHGM->status.client_connections,1) > mysql_thread___max_connections) { sess->max_connections_reached=true; } - sess->client_myds->client_addrlen=addrlen; - sess->client_myds->client_addr=addr; - - switch (sess->client_myds->client_addr->sa_family) { - case AF_INET: { - struct sockaddr_in *ipv4 = (struct sockaddr_in *)sess->client_myds->client_addr; - char buf[INET_ADDRSTRLEN]; - inet_ntop(sess->client_myds->client_addr->sa_family, &ipv4->sin_addr, buf, INET_ADDRSTRLEN); - sess->client_myds->addr.addr = strdup(buf); - sess->client_myds->addr.port = htons(ipv4->sin_port); - break; - } - case AF_INET6: { - struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)sess->client_myds->client_addr; - char buf[INET6_ADDRSTRLEN]; - inet_ntop(sess->client_myds->client_addr->sa_family, &ipv6->sin6_addr, buf, INET6_ADDRSTRLEN); - sess->client_myds->addr.addr = strdup(buf); - sess->client_myds->addr.port = htons(ipv6->sin6_port); - break; - } - default: - sess->client_myds->addr.addr = strdup("localhost"); - break; - } + // Update client_myds' client IP and port details + sess->client_myds->update_client_addr(addr); iface_info *ifi=NULL; ifi=GloMTH->MLM_find_iface_from_fd(myds->fd); // here we try to get the info about the proxy bind address if (ifi) { sess->client_myds->proxy_addr.addr=strdup(ifi->address); sess->client_myds->proxy_addr.port=ifi->port; } + + // NOTE: This will set sess->status=CONNECTIING_CLIENT, sess-client_myds->DSS=STATE_SERVER_HANDSHAKE sess->client_myds->myprot.generate_pkt_initial_handshake(true,NULL,NULL, &sess->thread_session_id); + + // Check if the peer IP matches a subnet for which we should parse + // PROXY protocol headers + if (Proxy_Protocol::match_subnet(addr, addrlen, mysql_thread___proxy_protocol_subnets)) { + sess->set_status(CONNECTING_CLIENT); + // We'll change this back to STATE_SERVER_HANDSHAKE once the proxy protocol header is parsed + sess->client_myds->setDSS(STATE_PROXY_PROTOCOL); + } + //proxy_info("accepted from %s:%d, PROXY protocol: %s, client_myds:%p\n", + // sess->client_myds->addr.addr, sess->client_myds->addr.port, sess->client_myds->DSS == STATE_PROXY_PROTOCOL? "YES": "NO", sess->client_myds); + ioctl_FIONBIO(sess->client_myds->fd, 1); mypolls.add(POLLIN|POLLOUT, sess->client_myds->fd, sess->client_myds, curtime); proxy_debug(PROXY_DEBUG_NET,1,"Session=%p -- Adding client FD %d\n", sess, sess->client_myds->fd); diff --git a/lib/Proxy_Protocol.cpp b/lib/Proxy_Protocol.cpp new file mode 100644 index 0000000000..d987f4b87d --- /dev/null +++ b/lib/Proxy_Protocol.cpp @@ -0,0 +1,156 @@ +#include "proxysql.h" + +#include +#include +#include +#include +#include +#include + +#include "Proxy_Protocol.h" + + +bool Proxy_Protocol::parse_subnets(const char *list, std::vector &subnets) { + char *input = strdup(list), *cidr, *saveptr = NULL; + const char *delim = ",; "; + + subnets.clear(); + proxy_info("Parsing PROXY protocol frontend nets: %s\n", list); + for(cidr = strtok_r(input, delim, &saveptr); cidr != NULL; cidr = strtok_r(NULL, delim, &saveptr)) { + proxy_info("PROXY protocol subnet: %s\n", cidr); + if(!Proxy_Protocol::add_subnet(cidr, subnets)) { + proxy_warning("PROXY protocol subnet parsing failed, invalid CIDR: %s\n", cidr); + free(input); + return false; + }; + } + + free(input); + return true; +} + +bool Proxy_Protocol::add_subnet(const char *cidr, std::vector &subnets) { + char *addr = strdup(cidr), *mask; + proxy_protocol_subnet_t subnet = { + .family = AF_INET + }; + + if (strchr(addr, ':')) { + subnet.family = AF_INET6; + } + + subnet.bits = (unsigned short)(subnet.family == AF_INET? 32: 128); + mask = strchr(addr, '/'); + if (mask) { + *mask++ = '\0'; + subnet.bits = (unsigned short)atoi(mask); + if (subnet.bits > (subnet.family == AF_INET? 32: 128)) { + free(addr); + return false; + } + } + + if (inet_pton(subnet.family, addr, subnet.addr) == 1) { + subnets.push_back(subnet); + free(addr); + return true; + } + + free(addr); + return false; +} + +bool Proxy_Protocol::match_subnet(struct sockaddr *addr, socklen_t addrlen, std::vector &subnets) { + struct sockaddr_in *sin; + struct sockaddr_in6 *sin6; + unsigned char *a1, *a2; + + for (size_t i = 0; i < subnets.size(); i++) { + if (addr->sa_family != subnets[i].family) { + continue; + } + + switch (addr->sa_family) { + case AF_INET: + sin = (struct sockaddr_in *)addr; + a1 = (unsigned char *)&(sin->sin_addr); + break; + case AF_INET6: + sin6 = (struct sockaddr_in6 *)addr; + a1 = (unsigned char *)&(sin6->sin6_addr); + break; + default: + return false; + break; + } + a2 = subnets[i].addr; + + int n_bytes = subnets[i].bits / 8; + if (n_bytes && memcmp(a1, a2, n_bytes)) { + continue; + } + + int n_bits = subnets[i].bits % 8; + if (n_bits) { + int mask = (1<sin_family = AF_INET; + sin->sin_port = htons(src_port); + if (inet_pton(sin->sin_family, src_ip, (void *)&sin->sin_addr) != 1) + return false; + } + else if(!memcmp(header, "TCP6 ", 5)) { + struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)out; + + header += 5; + if (sscanf((char *)header, "%63s %63s %d %d", src_ip, dst_ip, &src_port, &dst_port) != 4) + return false; + src_ip[sizeof(dst_ip)-1] = '\0'; + dst_ip[sizeof(dst_ip)-1] = '\0'; + sin6->sin6_family = AF_INET; + sin6->sin6_port = htons(src_port); + if (inet_pton(sin6->sin6_family, src_ip, (void *)&sin6->sin6_addr) != 1) + return false; + } + else if(!memcmp(header, "UNKNOWN", 7)) { + // not sure how to deal with this + out->ss_family = AF_UNIX; + return true; + } + else { + return false; + } + + return true; +} diff --git a/lib/mysql_data_stream.cpp b/lib/mysql_data_stream.cpp index f8f1a55314..04e7490f9a 100644 --- a/lib/mysql_data_stream.cpp +++ b/lib/mysql_data_stream.cpp @@ -7,6 +7,7 @@ #include "MySQL_PreparedStatement.h" #include "MySQL_Data_Stream.h" +#include "Proxy_Protocol.h" /* @@ -61,6 +62,7 @@ struct bio_st { extern MySQL_Threads_Handler *GloMTH; + #ifdef DEBUG static void __dump_pkt(const char *func, unsigned char *_ptr, unsigned int len) { @@ -382,6 +384,45 @@ void MySQL_Data_Stream::init(enum MySQL_DS_type _type, MySQL_Session *_sess, int if (myconn) myconn->fd=fd; } + +// Update the client MySQL datastream peer sockaddr and ASCII representation +void MySQL_Data_Stream::update_client_addr(struct sockaddr *addr) { + this->client_addr=addr; + + if (this->addr.addr) { + /* Handle an update from the PROXY protocol implementation */ + free(this->addr.addr); + this->addr.port = 0; + } + + switch (this->client_addr->sa_family) { + case AF_INET: { + struct sockaddr_in *ipv4 = (struct sockaddr_in *)this->client_addr; + char buf[INET_ADDRSTRLEN]; + + this->client_addrlen = sizeof(*ipv4); + inet_ntop(this->client_addr->sa_family, &ipv4->sin_addr, buf, INET_ADDRSTRLEN); + this->addr.addr = strdup(buf); + this->addr.port = htons(ipv4->sin_port); + break; + } + case AF_INET6: { + struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)this->client_addr; + char buf[INET6_ADDRSTRLEN]; + + this->client_addrlen = sizeof(*ipv6); + inet_ntop(this->client_addr->sa_family, &ipv6->sin6_addr, buf, INET6_ADDRSTRLEN); + this->addr.addr = strdup(buf); + this->addr.port = htons(ipv6->sin6_port); + break; + } + default: + this->addr.addr = strdup("localhost"); + break; + } +} + + // Soft shutdown of socket : it only deactivate the data stream // TODO: should check the status of the data stream, and identify if it is safe to reconnect or if the session should be destroyed void MySQL_Data_Stream::shut_soft() { @@ -817,6 +858,31 @@ int MySQL_Data_Stream::write_to_net_poll() { int MySQL_Data_Stream::read_pkts() { int rc=0; int r=0; + + if(unlikely(DSS == STATE_PROXY_PROTOCOL)) { + struct sockaddr *sa; + size_t s = queue_data(queueIN), n = 0; + unsigned char *start = queue_r_ptr(queueIN), + *end = (unsigned char *)memchr(start, '\n', s); + + if (end) { + n = end+1-start; + sa = (struct sockaddr *)calloc(1, sizeof(struct sockaddr_storage)); + bool success = Proxy_Protocol::parse_header(start, n, (struct sockaddr_storage *)sa); + if (success) { + this->update_client_addr(sa); + } + else { + free(sa); + } + + queue_r(queueIN, n); + DSS = STATE_SERVER_HANDSHAKE; + } + + if (s == n) return rc; + } + while((r=buffer2array())) rc+=r; return rc; } diff --git a/src/proxysql.cfg b/src/proxysql.cfg index dd230c349f..5e0def9bf2 100644 --- a/src/proxysql.cfg +++ b/src/proxysql.cfg @@ -40,6 +40,8 @@ mysql_variables= have_compress=true poll_timeout=2000 interfaces="0.0.0.0:6033" + proxy_protocol_frontend_nets="" + //proxy_protocol_frontend_nets="127.0.0.0/8" default_schema="information_schema" stacksize=1048576 server_version="5.5.30"