Skip to content

Commit

Permalink
Revert "Log errors from ranch:handshake" (#12304)
Browse files Browse the repository at this point in the history
This reverts commit 620fff2.

It intoduced a regression in another area - a TCP health check,
such as the default (with cluster-operator) readinessProbe,
on a TLS-enabled instance would log a `rabbit_reader` crash
every few seconds:
```
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0>   crasher:
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0>     initial call: rabbit_reader:init/3
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0>     pid: <0.999.0>
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0>     registered_name: []
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0>     exception error: no match of right hand side value {error, handshake_failed}
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0>       in function  rabbit_reader:init/3 (rabbit_reader.erl, line 171)
```
  • Loading branch information
mkuratczyk authored Sep 13, 2024
1 parent f78f14a commit f0f7500
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 104 deletions.
22 changes: 7 additions & 15 deletions deps/rabbit/src/rabbit_networking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ failed_to_recv_proxy_header(Ref, Error) ->
end,
rabbit_log:debug(Msg, [Error]),
% The following call will clean up resources then exit
_ = catch ranch:handshake(Ref),
_ = ranch:handshake(Ref),
exit({shutdown, failed_to_recv_proxy_header}).

handshake(Ref, ProxyProtocolEnabled) ->
Expand All @@ -559,22 +559,14 @@ handshake(Ref, ProxyProtocolEnabled) ->
{error, protocol_error, Error} ->
failed_to_recv_proxy_header(Ref, Error);
{ok, ProxyInfo} ->
case catch ranch:handshake(Ref) of
{'EXIT', normal} ->
{error, handshake_failed};
{ok, Sock} ->
ok = tune_buffer_size(Sock),
{ok, {rabbit_proxy_socket, Sock, ProxyInfo}}
end
{ok, Sock} = ranch:handshake(Ref),
ok = tune_buffer_size(Sock),
{ok, {rabbit_proxy_socket, Sock, ProxyInfo}}
end;
false ->
case catch ranch:handshake(Ref) of
{'EXIT', normal} ->
{error, handshake_failed};
{ok, Sock} ->
ok = tune_buffer_size(Sock),
{ok, Sock}
end
{ok, Sock} = ranch:handshake(Ref),
ok = tune_buffer_size(Sock),
{ok, Sock}
end.

tune_buffer_size(Sock) ->
Expand Down
4 changes: 0 additions & 4 deletions deps/rabbit/src/rabbit_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,6 @@ shutdown(Pid, Explanation) ->
no_return().
init(Parent, HelperSups, Ref) ->
?LG_PROCESS_TYPE(reader),
%% Note:
%% This function could return an error if the handshake times out.
%% It is less likely to happen here as compared to MQTT, so
%% crashing with a `badmatch` seems appropriate.
{ok, Sock} = rabbit_networking:handshake(Ref,
application:get_env(rabbit, proxy_protocol, false)),
Deb = sys:debug_options([]),
Expand Down
59 changes: 27 additions & 32 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,39 +71,34 @@ close_connection(Pid, Reason) ->
init(Ref) ->
process_flag(trap_exit, true),
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN ++ [mqtt]}),
ProxyProtocolEnabled = application:get_env(?APP_NAME, proxy_protocol, false),
case rabbit_networking:handshake(Ref, ProxyProtocolEnabled) of
{ok, Sock} = rabbit_networking:handshake(Ref,
application:get_env(?APP_NAME, proxy_protocol, false)),
RealSocket = rabbit_net:unwrap_socket(Sock),
case rabbit_net:connection_string(Sock, inbound) of
{ok, ConnStr} ->
ConnName = rabbit_data_coercion:to_binary(ConnStr),
?LOG_DEBUG("MQTT accepting TCP connection ~tp (~ts)", [self(), ConnName]),
_ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
LoginTimeout = application:get_env(?APP_NAME, login_timeout, 10_000),
erlang:send_after(LoginTimeout, self(), login_timeout),
State0 = #state{socket = RealSocket,
proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock),
conn_name = ConnName,
await_recv = false,
connection_state = running,
conserve = false,
parse_state = rabbit_mqtt_packet:init_state()},
State1 = control_throttle(State0),
State = rabbit_event:init_stats_timer(State1, #state.stats_timer),
gen_server:enter_loop(?MODULE, [], State);
{error, Reason = enotconn} ->
?LOG_INFO("MQTT could not get connection string: ~s", [Reason]),
rabbit_net:fast_close(RealSocket),
ignore;
{error, Reason} ->
?LOG_ERROR("MQTT could not establish connection: ~s", [Reason]),
{stop, Reason};
{ok, Sock} ->
RealSocket = rabbit_net:unwrap_socket(Sock),
case rabbit_net:connection_string(Sock, inbound) of
{ok, ConnStr} ->
ConnName = rabbit_data_coercion:to_binary(ConnStr),
?LOG_DEBUG("MQTT accepting TCP connection ~tp (~ts)", [self(), ConnName]),
_ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
LoginTimeout = application:get_env(?APP_NAME, login_timeout, 10_000),
erlang:send_after(LoginTimeout, self(), login_timeout),
State0 = #state{socket = RealSocket,
proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock),
conn_name = ConnName,
await_recv = false,
connection_state = running,
conserve = false,
parse_state = rabbit_mqtt_packet:init_state()},
State1 = control_throttle(State0),
State = rabbit_event:init_stats_timer(State1, #state.stats_timer),
gen_server:enter_loop(?MODULE, [], State);
{error, Reason = enotconn} ->
?LOG_INFO("MQTT could not get connection string: ~s", [Reason]),
rabbit_net:fast_close(RealSocket),
ignore;
{error, Reason} ->
?LOG_ERROR("MQTT could not get connection string: ~p", [Reason]),
rabbit_net:fast_close(RealSocket),
{stop, Reason}
end
?LOG_ERROR("MQTT could not get connection string: ~p", [Reason]),
rabbit_net:fast_close(RealSocket),
{stop, Reason}
end.

handle_call({info, InfoItems}, _From, State) ->
Expand Down
88 changes: 42 additions & 46 deletions deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,55 +63,51 @@ close_connection(Pid, Reason) ->

init([SupHelperPid, Ref, Configuration]) ->
process_flag(trap_exit, true),
ProxyProtocolEnabled = application:get_env(rabbitmq_stomp, proxy_protocol, false),
case rabbit_networking:handshake(Ref, ProxyProtocolEnabled) of
{ok, Sock} = rabbit_networking:handshake(Ref,
application:get_env(rabbitmq_stomp, proxy_protocol, false)),
RealSocket = rabbit_net:unwrap_socket(Sock),

case rabbit_net:connection_string(Sock, inbound) of
{ok, ConnStr} ->
ConnName = rabbit_data_coercion:to_binary(ConnStr),
ProcInitArgs = processor_args(Configuration, Sock),
ProcState = rabbit_stomp_processor:initial_state(Configuration,
ProcInitArgs),

rabbit_log_connection:info("accepting STOMP connection ~tp (~ts)",
[self(), ConnName]),

ParseState = rabbit_stomp_frame:initial_state(),
_ = register_resource_alarm(),

LoginTimeout = application:get_env(rabbitmq_stomp, login_timeout, 10_000),
MaxFrameSize = application:get_env(rabbitmq_stomp, max_frame_size, ?DEFAULT_MAX_FRAME_SIZE),
erlang:send_after(LoginTimeout, self(), login_timeout),

gen_server2:enter_loop(?MODULE, [],
rabbit_event:init_stats_timer(
run_socket(control_throttle(
#reader_state{socket = RealSocket,
conn_name = ConnName,
parse_state = ParseState,
processor_state = ProcState,
heartbeat_sup = SupHelperPid,
heartbeat = {none, none},
max_frame_size = MaxFrameSize,
current_frame_size = 0,
state = running,
conserve_resources = false,
recv_outstanding = false})), #reader_state.stats_timer),
{backoff, 1000, 1000, 10000});
{error, enotconn} ->
rabbit_net:fast_close(RealSocket),
terminate(shutdown, undefined);
{error, Reason} ->
rabbit_log_connection:error(
"STOMP could not establish connection: ~s", [Reason]),
{stop, Reason};
{ok, Sock} ->
RealSocket = rabbit_net:unwrap_socket(Sock),
case rabbit_net:connection_string(Sock, inbound) of
{ok, ConnStr} ->
ConnName = rabbit_data_coercion:to_binary(ConnStr),
ProcInitArgs = processor_args(Configuration, Sock),
ProcState = rabbit_stomp_processor:initial_state(Configuration,
ProcInitArgs),

rabbit_log_connection:info("accepting STOMP connection ~tp (~ts)",
[self(), ConnName]),

ParseState = rabbit_stomp_frame:initial_state(),
_ = register_resource_alarm(),

LoginTimeout = application:get_env(rabbitmq_stomp, login_timeout, 10_000),
MaxFrameSize = application:get_env(rabbitmq_stomp, max_frame_size, ?DEFAULT_MAX_FRAME_SIZE),
erlang:send_after(LoginTimeout, self(), login_timeout),

gen_server2:enter_loop(?MODULE, [],
rabbit_event:init_stats_timer(
run_socket(control_throttle(
#reader_state{socket = RealSocket,
conn_name = ConnName,
parse_state = ParseState,
processor_state = ProcState,
heartbeat_sup = SupHelperPid,
heartbeat = {none, none},
max_frame_size = MaxFrameSize,
current_frame_size = 0,
state = running,
conserve_resources = false,
recv_outstanding = false})), #reader_state.stats_timer),
{backoff, 1000, 1000, 10000});
{error, enotconn} ->
rabbit_net:fast_close(RealSocket),
terminate(shutdown, undefined);
{error, Reason} ->
rabbit_net:fast_close(RealSocket),
terminate({network_error, Reason}, undefined)
end
rabbit_net:fast_close(RealSocket),
terminate({network_error, Reason}, undefined)
end.


handle_call({info, InfoItems}, _From, State) ->
Infos = lists:map(
fun(InfoItem) ->
Expand Down
11 changes: 4 additions & 7 deletions deps/rabbitmq_stream/src/rabbit_stream_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,10 @@ init([KeepaliveSup,
heartbeat := Heartbeat,
transport := ConnTransport}]) ->
process_flag(trap_exit, true),
ProxyProtocolEnabled =
application:get_env(rabbitmq_stream, proxy_protocol, false),
%% Note:
%% This function could return an error if the handshake times out.
%% It is less likely to happen here as compared to MQTT, so
%% crashing with a `badmatch` seems appropriate.
{ok, Sock} = rabbit_networking:handshake(Ref, ProxyProtocolEnabled),
{ok, Sock} =
rabbit_networking:handshake(Ref,
application:get_env(rabbitmq_stream,
proxy_protocol, false)),
RealSocket = rabbit_net:unwrap_socket(Sock),
case rabbit_net:connection_string(Sock, inbound) of
{ok, ConnStr} ->
Expand Down

0 comments on commit f0f7500

Please sign in to comment.