Skip to content
This repository has been archived by the owner on Nov 18, 2020. It is now read-only.

Commit

Permalink
Merge bug26418
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon MacMullen committed Oct 20, 2014
2 parents 55ba086 + ba52b73 commit d8d4332
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 5 deletions.
3 changes: 3 additions & 0 deletions src/amqp_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,9 @@ handle_call({subscribe, BasicConsume, Subscriber}, From, State) ->
State).

%% @private
handle_cast({set_writer, Writer}, State = #state{driver = direct}) ->
link(Writer),
{noreply, State#state{writer = Writer}};
handle_cast({set_writer, Writer}, State) ->
{noreply, State#state{writer = Writer}};
%% @private
Expand Down
1 change: 0 additions & 1 deletion src/amqp_channel_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ start_writer(_Sup, direct, [ConnPid, Node, User, VHost, Collector],
rpc:call(Node, rabbit_direct, start_channel,
[ChNumber, ChPid, ConnPid, ConnName, ?PROTOCOL, User,
VHost, ?CLIENT_CAPABILITIES, Collector]),
link(RabbitCh),
RabbitCh;
start_writer(Sup, network, [Sock, FrameMax], ConnName, ChNumber, ChPid) ->
{ok, Writer} = supervisor2:start_child(
Expand Down
10 changes: 10 additions & 0 deletions src/amqp_direct_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ handle_message({force_event_refresh, Ref}, State = #state{node = Node}) ->
{ok, State};
handle_message(closing_timeout, State = #state{closing_reason = Reason}) ->
{stop, {closing_timeout, Reason}, State};
handle_message({'DOWN', _MRef, process, _ConnSup, Reason},
State = #state{node = Node}) ->
{stop, {remote_node_down, Reason}, State};
handle_message(Msg, State) ->
{stop, {unexpected_msg, Msg}, State}.

Expand Down Expand Up @@ -134,6 +137,13 @@ connect(Params = #amqp_params_direct{username = Username,
{ok, ChMgr, Collector} = SIF(i(name, State1)),
State2 = State1#state{user = User,
collector = Collector},
%% There's no real connection-level process on the remote
%% node for us to monitor or link to, but we want to
%% detect connection death if the remote node goes down
%% when there are no channels. So we monitor the
%% supervisor; that way we find out if the node goes down
%% or the rabbit app stops.
erlang:monitor(process, {rabbit_direct_client_sup, Node}),
{ok, {ServerProperties, 0, ChMgr, State2}};
{error, _} = E ->
E;
Expand Down
5 changes: 3 additions & 2 deletions test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ subscribe_nowait_test_() -> ?RUN([]).
connection_blocked_network_test_() -> ?RUN([]).

non_existent_exchange_test_() -> ?RUN([negative]).
bogus_rpc_test_() -> ?RUN([negative, repeat]).
hard_error_test_() -> ?RUN([negative, repeat]).
bogus_rpc_test_() -> ?RUN([negative, repeat]).
hard_error_test_() -> ?RUN([negative, repeat]).
non_existent_user_test_() -> ?RUN([negative]).
invalid_password_test_() -> ?RUN([negative]).
non_existent_vhost_test_() -> ?RUN([negative]).
no_permission_test_() -> ?RUN([negative]).
channel_writer_death_test_() -> ?RUN([negative]).
connection_failure_test_() -> ?RUN([negative]).
channel_death_test_() -> ?RUN([negative]).
shortstr_overflow_property_test_() -> ?RUN([negative]).
shortstr_overflow_field_test_() -> ?RUN([negative]).
Expand Down
26 changes: 24 additions & 2 deletions test/negative_test_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,33 @@ hard_error_test() ->
test_util:wait_for_death(Channel),
test_util:wait_for_death(Connection).

%% The connection should die if the underlying connection is prematurely
%% closed. For a network connection, this means that the TCP socket is
%% closed. For a direct connection (remotely only, of course), this means that
%% the RabbitMQ node appears as down.
connection_failure_test() ->
{ok, Connection} = test_util:new_connection(),
case amqp_connection:info(Connection, [type, amqp_params]) of
[{type, direct}, {amqp_params, Params}] ->
case Params#amqp_params_direct.node of
N when N == node() ->
amqp_connection:close(Connection);
N ->
true = erlang:disconnect_node(N),
net_adm:ping(N)
end;
[{type, network}, {amqp_params, _}] ->
[{sock, Sock}] = amqp_connection:info(Connection, [sock]),
ok = gen_tcp:close(Sock)
end,
test_util:wait_for_death(Connection),
ok.

%% An error in a channel should result in the death of the entire connection.
%% The death of the channel is caused by an error in generating the frames
%% (writer dies) - only in the network case
%% (writer dies)
channel_writer_death_test() ->
{ok, Connection} = test_util:new_connection(just_network),
{ok, Connection} = test_util:new_connection(),
{ok, Channel} = amqp_connection:open_channel(Connection),
Publish = #'basic.publish'{routing_key = <<>>, exchange = <<>>},
QoS = #'basic.qos'{prefetch_count = 0},
Expand Down

0 comments on commit d8d4332

Please sign in to comment.