From 620fff22f12cf36dca79bf4b5099da20b86f1cf2 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 6 May 2024 08:24:38 -0700 Subject: [PATCH] Log errors from `ranch:handshake` Fixes #11171 An MQTT user encountered TLS handshake timeouts with their IoT device, and the actual error from `ssl:handshake` / `ranch:handshake` was not caught and logged. At this time, `ranch` uses `exit(normal)` in the case of timeouts, but that should change in the future (https://github.com/ninenines/ranch/issues/336) --- deps/rabbit/src/rabbit_networking.erl | 22 +++-- deps/rabbit/src/rabbit_reader.erl | 4 + deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl | 59 +++++++------ .../src/rabbit_stomp_reader.erl | 88 ++++++++++--------- .../src/rabbit_stream_reader.erl | 11 ++- 5 files changed, 104 insertions(+), 80 deletions(-) diff --git a/deps/rabbit/src/rabbit_networking.erl b/deps/rabbit/src/rabbit_networking.erl index 4563da64d1b4..da242d0f6d87 100644 --- a/deps/rabbit/src/rabbit_networking.erl +++ b/deps/rabbit/src/rabbit_networking.erl @@ -560,7 +560,7 @@ failed_to_recv_proxy_header(Ref, Error) -> end, rabbit_log:debug(Msg, [Error]), % The following call will clean up resources then exit - _ = ranch:handshake(Ref), + _ = catch ranch:handshake(Ref), exit({shutdown, failed_to_recv_proxy_header}). handshake(Ref, ProxyProtocolEnabled) -> @@ -572,14 +572,22 @@ handshake(Ref, ProxyProtocolEnabled) -> {error, protocol_error, Error} -> failed_to_recv_proxy_header(Ref, Error); {ok, ProxyInfo} -> - {ok, Sock} = ranch:handshake(Ref), - setup_socket(Sock), - {ok, {rabbit_proxy_socket, Sock, ProxyInfo}} + case catch ranch:handshake(Ref) of + {'EXIT', normal} -> + {error, handshake_failed}; + {ok, Sock} -> + setup_socket(Sock), + {ok, {rabbit_proxy_socket, Sock, ProxyInfo}} + end end; false -> - {ok, Sock} = ranch:handshake(Ref), - setup_socket(Sock), - {ok, Sock} + case catch ranch:handshake(Ref) of + {'EXIT', normal} -> + {error, handshake_failed}; + {ok, Sock} -> + setup_socket(Sock), + {ok, Sock} + end end. setup_socket(Sock) -> diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index 005c92afc6b3..f85e92b42e4b 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -162,6 +162,10 @@ shutdown(Pid, Explanation) -> no_return(). init(Parent, HelperSups, Ref) -> ?LG_PROCESS_TYPE(reader), + %% Note: + %% This function could return an error if the handshake times out. + %% It is less likely to happen here as compared to MQTT, so + %% crashing with a `badmatch` seems appropriate. {ok, Sock} = rabbit_networking:handshake(Ref, application:get_env(rabbit, proxy_protocol, false)), Deb = sys:debug_options([]), diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl index c24898288d7d..1050b38f7258 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -71,34 +71,39 @@ close_connection(Pid, Reason) -> init(Ref) -> process_flag(trap_exit, true), logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN ++ [mqtt]}), - {ok, Sock} = rabbit_networking:handshake(Ref, - application:get_env(?APP_NAME, proxy_protocol, false)), - RealSocket = rabbit_net:unwrap_socket(Sock), - case rabbit_net:connection_string(Sock, inbound) of - {ok, ConnStr} -> - ConnName = rabbit_data_coercion:to_binary(ConnStr), - ?LOG_DEBUG("MQTT accepting TCP connection ~tp (~ts)", [self(), ConnName]), - _ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), - LoginTimeout = application:get_env(?APP_NAME, login_timeout, 10_000), - erlang:send_after(LoginTimeout, self(), login_timeout), - State0 = #state{socket = RealSocket, - proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock), - conn_name = ConnName, - await_recv = false, - connection_state = running, - conserve = false, - parse_state = rabbit_mqtt_packet:init_state()}, - State1 = control_throttle(State0), - State = rabbit_event:init_stats_timer(State1, #state.stats_timer), - gen_server:enter_loop(?MODULE, [], State); - {error, Reason = enotconn} -> - ?LOG_INFO("MQTT could not get connection string: ~s", [Reason]), - rabbit_net:fast_close(RealSocket), - ignore; + ProxyProtocolEnabled = application:get_env(?APP_NAME, proxy_protocol, false), + case rabbit_networking:handshake(Ref, ProxyProtocolEnabled) of {error, Reason} -> - ?LOG_ERROR("MQTT could not get connection string: ~p", [Reason]), - rabbit_net:fast_close(RealSocket), - {stop, Reason} + ?LOG_ERROR("MQTT could not establish connection: ~s", [Reason]), + {stop, Reason}; + {ok, Sock} -> + RealSocket = rabbit_net:unwrap_socket(Sock), + case rabbit_net:connection_string(Sock, inbound) of + {ok, ConnStr} -> + ConnName = rabbit_data_coercion:to_binary(ConnStr), + ?LOG_DEBUG("MQTT accepting TCP connection ~tp (~ts)", [self(), ConnName]), + _ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), + LoginTimeout = application:get_env(?APP_NAME, login_timeout, 10_000), + erlang:send_after(LoginTimeout, self(), login_timeout), + State0 = #state{socket = RealSocket, + proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock), + conn_name = ConnName, + await_recv = false, + connection_state = running, + conserve = false, + parse_state = rabbit_mqtt_packet:init_state()}, + State1 = control_throttle(State0), + State = rabbit_event:init_stats_timer(State1, #state.stats_timer), + gen_server:enter_loop(?MODULE, [], State); + {error, Reason = enotconn} -> + ?LOG_INFO("MQTT could not get connection string: ~s", [Reason]), + rabbit_net:fast_close(RealSocket), + ignore; + {error, Reason} -> + ?LOG_ERROR("MQTT could not get connection string: ~p", [Reason]), + rabbit_net:fast_close(RealSocket), + {stop, Reason} + end end. handle_call({info, InfoItems}, _From, State) -> diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl index 7bb9b8986bf6..ccf7af95f24a 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl @@ -63,51 +63,55 @@ close_connection(Pid, Reason) -> init([SupHelperPid, Ref, Configuration]) -> process_flag(trap_exit, true), - {ok, Sock} = rabbit_networking:handshake(Ref, - application:get_env(rabbitmq_stomp, proxy_protocol, false)), - RealSocket = rabbit_net:unwrap_socket(Sock), - - case rabbit_net:connection_string(Sock, inbound) of - {ok, ConnStr} -> - ConnName = rabbit_data_coercion:to_binary(ConnStr), - ProcInitArgs = processor_args(Configuration, Sock), - ProcState = rabbit_stomp_processor:initial_state(Configuration, - ProcInitArgs), - - rabbit_log_connection:info("accepting STOMP connection ~tp (~ts)", - [self(), ConnName]), - - ParseState = rabbit_stomp_frame:initial_state(), - _ = register_resource_alarm(), - - LoginTimeout = application:get_env(rabbitmq_stomp, login_timeout, 10_000), - MaxFrameSize = application:get_env(rabbitmq_stomp, max_frame_size, ?DEFAULT_MAX_FRAME_SIZE), - erlang:send_after(LoginTimeout, self(), login_timeout), - - gen_server2:enter_loop(?MODULE, [], - rabbit_event:init_stats_timer( - run_socket(control_throttle( - #reader_state{socket = RealSocket, - conn_name = ConnName, - parse_state = ParseState, - processor_state = ProcState, - heartbeat_sup = SupHelperPid, - heartbeat = {none, none}, - max_frame_size = MaxFrameSize, - current_frame_size = 0, - state = running, - conserve_resources = false, - recv_outstanding = false})), #reader_state.stats_timer), - {backoff, 1000, 1000, 10000}); - {error, enotconn} -> - rabbit_net:fast_close(RealSocket), - terminate(shutdown, undefined); + ProxyProtocolEnabled = application:get_env(rabbitmq_stomp, proxy_protocol, false), + case rabbit_networking:handshake(Ref, ProxyProtocolEnabled) of {error, Reason} -> - rabbit_net:fast_close(RealSocket), - terminate({network_error, Reason}, undefined) + rabbit_log_connection:error( + "STOMP could not establish connection: ~s", [Reason]), + {stop, Reason}; + {ok, Sock} -> + RealSocket = rabbit_net:unwrap_socket(Sock), + case rabbit_net:connection_string(Sock, inbound) of + {ok, ConnStr} -> + ConnName = rabbit_data_coercion:to_binary(ConnStr), + ProcInitArgs = processor_args(Configuration, Sock), + ProcState = rabbit_stomp_processor:initial_state(Configuration, + ProcInitArgs), + + rabbit_log_connection:info("accepting STOMP connection ~tp (~ts)", + [self(), ConnName]), + + ParseState = rabbit_stomp_frame:initial_state(), + _ = register_resource_alarm(), + + LoginTimeout = application:get_env(rabbitmq_stomp, login_timeout, 10_000), + MaxFrameSize = application:get_env(rabbitmq_stomp, max_frame_size, ?DEFAULT_MAX_FRAME_SIZE), + erlang:send_after(LoginTimeout, self(), login_timeout), + + gen_server2:enter_loop(?MODULE, [], + rabbit_event:init_stats_timer( + run_socket(control_throttle( + #reader_state{socket = RealSocket, + conn_name = ConnName, + parse_state = ParseState, + processor_state = ProcState, + heartbeat_sup = SupHelperPid, + heartbeat = {none, none}, + max_frame_size = MaxFrameSize, + current_frame_size = 0, + state = running, + conserve_resources = false, + recv_outstanding = false})), #reader_state.stats_timer), + {backoff, 1000, 1000, 10000}); + {error, enotconn} -> + rabbit_net:fast_close(RealSocket), + terminate(shutdown, undefined); + {error, Reason} -> + rabbit_net:fast_close(RealSocket), + terminate({network_error, Reason}, undefined) + end end. - handle_call({info, InfoItems}, _From, State) -> Infos = lists:map( fun(InfoItem) -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 97541380689b..4c744338ca45 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -135,10 +135,13 @@ init([KeepaliveSup, heartbeat := Heartbeat, transport := ConnTransport}]) -> process_flag(trap_exit, true), - {ok, Sock} = - rabbit_networking:handshake(Ref, - application:get_env(rabbitmq_stream, - proxy_protocol, false)), + ProxyProtocolEnabled = + application:get_env(rabbitmq_stream, proxy_protocol, false), + %% Note: + %% This function could return an error if the handshake times out. + %% It is less likely to happen here as compared to MQTT, so + %% crashing with a `badmatch` seems appropriate. + {ok, Sock} = rabbit_networking:handshake(Ref, ProxyProtocolEnabled), RealSocket = rabbit_net:unwrap_socket(Sock), case rabbit_net:connection_string(Sock, inbound) of {ok, ConnStr} ->