Skip to content

Commit

Permalink
rabbitmq_peer_discovery_consul: Select the node to join
Browse files Browse the repository at this point in the history
[Why]
The default node selection of the peer discovery subsystem doesn't work
well with Consul. The reason is that that selection is based on the
nodes' uptime. However, the node with the highest uptime may not be the
first to register in Consul.

When this happens, the node that registered first will only discover
itself and boot as a standalone node. Then, the node with the highest
uptime will discover both of them, but will select itself as the node to
join because of its uptime. In the end, we end up with two clusters
instead of one.

[How]
We use the `CreateIndex` property in the Consul response to sort
services. We then derive the name of the node to join after the service
that has the lower `CreateIndex`, meaning it was the first to register.
  • Loading branch information
dumbbell committed May 3, 2024
1 parent 27b8046 commit 9e57581
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ list_nodes() ->
HttpOpts) of
{ok, Nodes} ->
IncludeWithWarnings = get_config_key(consul_include_nodes_with_warnings, M),
Result = extract_nodes(
filter_nodes(Nodes, IncludeWithWarnings)),
Result = extract_node(
sort_nodes(
filter_nodes(Nodes, IncludeWithWarnings))),
{ok, {Result, disc}};
{error, _} = Error ->
Error
Expand Down Expand Up @@ -276,13 +277,24 @@ filter_nodes(Nodes, Warn) ->
false -> Nodes
end.

-spec extract_nodes(ConsulResult :: [#{binary() => term()}]) -> list().
extract_nodes(Data) -> extract_nodes(Data, []).

-spec extract_nodes(ConsulResult :: [#{binary() => term()}], Nodes :: list())
-> list().
extract_nodes([], Nodes) -> Nodes;
extract_nodes([H | T], Nodes) ->
-spec sort_nodes(ConsulResult :: [#{binary() => term()}]) -> [#{binary() => term()}].
sort_nodes(Nodes) ->
lists:sort(
fun(NodeA, NodeB) ->
IndexA = maps:get(
<<"CreateIndex">>,
maps:get(<<"Service">>, NodeA, #{}), undefined),
IndexB = maps:get(
<<"CreateIndex">>,
maps:get(<<"Service">>, NodeB, #{}), undefined),
%% `undefined' is always greater than an integer, so we are fine here.
IndexA =< IndexB
end, Nodes).

-spec extract_node(ConsulResult :: [#{binary() => term()}]) -> list().
extract_node([]) ->
[];
extract_node([H | _]) ->
Service = maps:get(<<"Service">>, H),
Meta = maps:get(<<"Meta">>, Service, #{}),
NodeName = case Meta of
Expand All @@ -299,7 +311,7 @@ extract_nodes([H | T], Nodes) ->
?UTIL_MODULE:node_name(Address)
end
end,
extract_nodes(T, lists:merge(Nodes, [NodeName])).
NodeName.

-spec maybe_add_acl(QArgs :: list()) -> list().
maybe_add_acl(List) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ list_nodes_return_value_basic_test(_Config) ->
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body))
end),
meck:expect(rabbit_nodes, name_type, fun() -> shortnames end),
?assertEqual({ok, {['rabbit@rabbit1', 'rabbit@rabbit2'], disc}},
?assertEqual({ok, {'rabbit@rabbit2', disc}},
rabbit_peer_discovery_consul:list_nodes()),
?assert(meck:validate(rabbit_peer_discovery_httpc)).

Expand Down Expand Up @@ -388,7 +388,7 @@ list_nodes_return_value_basic_long_node_name_test(_Config) ->
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body))
end),
meck:expect(rabbit_nodes, name_type, fun() -> longnames end),
?assertEqual({ok, {['rabbit@rabbit1.node.consul', 'rabbit@rabbit2.node.consul'], disc}},
?assertEqual({ok, {'[email protected]', disc}},
rabbit_peer_discovery_consul:list_nodes()),
?assert(meck:validate(rabbit_peer_discovery_httpc)).

Expand Down Expand Up @@ -419,7 +419,7 @@ list_nodes_return_value_long_node_name_and_custom_domain_test(_Config) ->


meck:expect(rabbit_nodes, name_type, fun() -> longnames end),
?assertEqual({ok, {['rabbit@rabbit1.node.internal', 'rabbit@rabbit2.node.internal'], disc}},
?assertEqual({ok, {'[email protected]', disc}},
rabbit_peer_discovery_consul:list_nodes()),
?assert(meck:validate(rabbit_peer_discovery_httpc)).

Expand All @@ -446,7 +446,7 @@ list_nodes_return_value_srv_address_test(_Config) ->
Body = "[{\"Node\": {\"Node\": \"rabbit2.internal.domain\", \"Address\": \"10.20.16.160\"}, \"Checks\": [{\"Node\": \"rabbit2.internal.domain\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq:172.172.16.4.50\", \"Output\": \"\"}, {\"Node\": \"rabbit2.internal.domain\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"172.16.4.51\", \"Port\": 5672, \"ID\": \"rabbitmq:172.16.4.51\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}, {\"Node\": {\"Node\": \"rabbit1.internal.domain\", \"Address\": \"10.20.16.159\"}, \"Checks\": [{\"Node\": \"rabbit1.internal.domain\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq\", \"Output\": \"\"}, {\"Node\": \"rabbit1.internal.domain\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"172.172.16.51\", \"Port\": 5672, \"ID\": \"rabbitmq:172.172.16.51\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}]",
rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body))
end),
?assertEqual({ok, {['[email protected]', '[email protected]'], disc}},
?assertEqual({ok, {'[email protected]', disc}},
rabbit_peer_discovery_consul:list_nodes()),
?assert(meck:validate(rabbit_peer_discovery_httpc)).

Expand Down Expand Up @@ -475,7 +475,7 @@ list_nodes_return_value_nodes_in_warning_state_included_test(_Config) ->
rabbit_json:try_decode(list_of_nodes_without_warnings())
end),
os:putenv("CONSUL_INCLUDE_NODES_WITH_WARNINGS", "true"),
?assertEqual({ok, {['[email protected]'], disc}},
?assertEqual({ok, {'[email protected]', disc}},
rabbit_peer_discovery_consul:list_nodes()),
?assert(meck:validate(rabbit_peer_discovery_httpc)).

Expand Down Expand Up @@ -504,7 +504,7 @@ list_nodes_return_value_nodes_in_warning_state_filtered_out_test(_Config) ->
rabbit_json:try_decode(list_of_nodes_without_warnings())
end),
os:putenv("CONSUL_INCLUDE_NODES_WITH_WARNINGS", "false"),
?assertEqual({ok, {['[email protected]', '[email protected]'], disc}},
?assertEqual({ok, {'[email protected]', disc}},
rabbit_peer_discovery_consul:list_nodes()),
?assert(meck:validate(rabbit_peer_discovery_httpc)).

Expand Down

0 comments on commit 9e57581

Please sign in to comment.