Skip to content

Commit

Permalink
Merge pull request #11647 from rabbitmq/mergify/bp/v3.13.x/pr-11045
Browse files Browse the repository at this point in the history
rabbit_peer_discovery: Fixes and improvements for Consul and etcd (backport #11045)
  • Loading branch information
dumbbell authored Jul 10, 2024
2 parents 2af9b09 + 4108700 commit cb288ab
Show file tree
Hide file tree
Showing 17 changed files with 911 additions and 128 deletions.
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ init() ->

ensure_dir_exists(),
rabbit_peer_discovery:maybe_init(),
rabbit_peer_discovery:maybe_register(),

pre_init(IsVirgin),

Expand All @@ -77,7 +78,6 @@ init() ->
"DB: initialization successeful",
#{domain => ?RMQLOG_DOMAIN_DB}),

rabbit_peer_discovery:maybe_register(),
init_finished(),

ok;
Expand Down
43 changes: 34 additions & 9 deletions deps/rabbit/src/rabbit_peer_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,22 @@ sync_desired_cluster(Backend, RetriesLeft, RetryDelay) ->
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok;
{ok, {DiscoveredNodes, NodeType}} ->
NodesAndProps = query_node_props(DiscoveredNodes),
case can_use_discovered_nodes(DiscoveredNodes, NodesAndProps) of
NodeAlreadySelected = is_atom(DiscoveredNodes),
NodesAndProps = case NodeAlreadySelected of
true ->
?LOG_DEBUG(
"Peer discovery: node '~ts' already "
"selected by backend",
[DiscoveredNodes],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
query_node_props([DiscoveredNodes]);
false ->
query_node_props(DiscoveredNodes)
end,
CanUse = (
NodeAlreadySelected orelse
can_use_discovered_nodes(DiscoveredNodes, NodesAndProps)),
case CanUse of
true ->
case select_node_to_join(NodesAndProps) of
SelectedNode when SelectedNode =/= false ->
Expand Down Expand Up @@ -249,8 +263,9 @@ retry_sync_desired_cluster(_Backend, 0, _RetryDelay) ->
ok.

-spec discover_cluster_nodes() -> {ok, Discovery} when
Discovery :: {DiscoveredNodes, NodeType},
Discovery :: {DiscoveredNodes | SelectedNode, NodeType},
DiscoveredNodes :: [node()],
SelectedNode :: node(),
NodeType :: rabbit_types:node_type().
%% @doc Queries the peer discovery backend to discover nodes.
%%
Expand All @@ -262,10 +277,11 @@ discover_cluster_nodes() ->

-spec discover_cluster_nodes(Backend) -> Ret when
Backend :: backend(),
Discovery :: {DiscoveredNodes, NodeType},
Ret :: {ok, Discovery} | {error, Reason},
Discovery :: {DiscoveredNodes | SelectedNode, NodeType},
DiscoveredNodes :: [node()],
SelectedNode :: node(),
NodeType :: rabbit_types:node_type(),
Ret :: {ok, Discovery} | {error, Reason},
Reason :: any().
%% @private

Expand Down Expand Up @@ -295,7 +311,7 @@ discover_cluster_nodes(Backend) ->

-spec check_discovered_nodes_list_validity(DiscoveredNodes, NodeType) ->
Ret when
DiscoveredNodes :: [node()],
DiscoveredNodes :: [node()] | node(),
NodeType :: rabbit_types:node_type(),
Ret :: ok.
%% @private
Expand All @@ -310,6 +326,12 @@ check_discovered_nodes_list_validity(DiscoveredNodes, NodeType)
[] -> ok;
_ -> e({invalid_cluster_node_names, BadNodenames})
end;
check_discovered_nodes_list_validity(SelectedNode, NodeType)
when NodeType =:= disc orelse NodeType =:= disk orelse NodeType =:= ram ->
case is_atom(SelectedNode) of
true -> ok;
false -> e({invalid_cluster_node_names, SelectedNode})
end;
check_discovered_nodes_list_validity(DiscoveredNodes, BadNodeType)
when is_list(DiscoveredNodes) ->
e({invalid_cluster_node_type, BadNodeType}).
Expand Down Expand Up @@ -836,7 +858,7 @@ can_use_discovered_nodes(_DiscoveredNodes, []) ->
false.

-spec select_node_to_join(NodesAndProps) -> SelectedNode when
NodesAndProps :: [node_and_props()],
NodesAndProps :: nonempty_list(node_and_props()),
SelectedNode :: node() | false.
%% @doc Selects the node to join among the sorted list of nodes.
%%
Expand Down Expand Up @@ -1140,10 +1162,10 @@ unlock(Backend, Data) ->
{Nodes :: [node()],
NodeType :: rabbit_types:node_type()} |
{ok, Nodes :: [node()]} |
{ok, {Nodes :: [node()],
{ok, {Nodes :: [node()] | node(),
NodeType :: rabbit_types:node_type()}} |
{error, Reason :: string()}) ->
{ok, {Nodes :: [node()], NodeType :: rabbit_types:node_type()}} |
{ok, {Nodes :: [node()] | node(), NodeType :: rabbit_types:node_type()}} |
{error, Reason :: string()}.

normalize(Nodes) when is_list(Nodes) ->
Expand All @@ -1154,6 +1176,9 @@ normalize({ok, Nodes}) when is_list(Nodes) ->
{ok, {Nodes, disc}};
normalize({ok, {Nodes, NodeType}}) when is_list(Nodes) andalso is_atom(NodeType) ->
{ok, {Nodes, NodeType}};
normalize({ok, {Node, NodeType}})
when is_atom(Node) andalso is_atom(NodeType) ->
{ok, {Node, NodeType}};
normalize({error, Reason}) ->
{error, Reason}.

Expand Down
7 changes: 6 additions & 1 deletion deps/rabbit_common/src/rabbit_peer_discovery_backend.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

-callback init() -> ok | {error, Reason :: string()}.

-callback list_nodes() -> {ok, {Nodes :: list(), NodeType :: rabbit_types:node_type()}} |
-callback list_nodes() -> {ok, {Nodes :: [node()] | node(), NodeType :: rabbit_types:node_type()}} |
{error, Reason :: string()}.

-callback supports_registration() -> boolean().
Expand All @@ -57,3 +57,8 @@
-callback unlock(Data :: term()) -> ok.

-optional_callbacks([init/0]).

-export([api_version/0]).

api_version() ->
2.
11 changes: 8 additions & 3 deletions deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,10 @@ exec([Cmd | Args], Options) when is_list(Cmd) orelse is_binary(Cmd) ->
Env1 = [
begin
Key1 = format_arg(Key),
Value1 = format_arg(Value),
Value1 = case Value of
false -> false;
_ -> format_arg(Value)
end,
Value2 = case is_binary(Value1) of
true -> binary_to_list(Value1);
false -> Value1
Expand All @@ -899,8 +902,10 @@ exec([Cmd | Args], Options) when is_list(Cmd) orelse is_binary(Cmd) ->
| proplists:delete(env, PortOptions1)],
Log ++ "~n~nEnvironment variables:~n" ++
string:join(
[rabbit_misc:format(" ~ts=~ts", [K, string:replace(V, "~", "~~", all)])
|| {K, V} <- Env1],
[rabbit_misc:format(
" ~ts=~ts",
[K, string:replace(V, "~", "~~", all)])
|| {K, V} <- Env1, is_list(V) ],
"~n")
}
end,
Expand Down
5 changes: 5 additions & 0 deletions deps/rabbitmq_peer_discovery_consul/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ rabbitmq_integration_suite(
name = "config_schema_SUITE",
)

rabbitmq_integration_suite(
name = "system_SUITE",
size = "large",
)

rabbitmq_suite(
name = "rabbitmq_peer_discovery_consul_SUITE",
size = "medium",
Expand Down
9 changes: 9 additions & 0 deletions deps/rabbitmq_peer_discovery_consul/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
app_name = "rabbitmq_peer_discovery_consul",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "system_SUITE_beam_files",
testonly = True,
srcs = ["test/system_SUITE.erl"],
outs = ["test/system_SUITE.beam"],
hdrs = [],
app_name = "rabbitmq_peer_discovery_consul",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "rabbitmq_peer_discovery_consul_SUITE_beam_files",
testonly = True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@
env_variable = "CONSUL_SVC_ADDR_NODENAME",
default_value = false
},
consul_svc_id => #peer_discovery_config_entry_meta{
type = string,
env_variable = "CONSUL_SVC_ID",
default_value = "undefined"
},
consul_svc_port => #peer_discovery_config_entry_meta{
type = integer,
env_variable = "CONSUL_SVC_PORT",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ fun(Conf) ->
end}.


%% use (Erlang) node name when compuing service address?
%% use (Erlang) node name when computing service address?

{mapping, "cluster_formation.consul.svc_addr_use_nodename", "rabbit.cluster_formation.peer_discovery_consul.consul_svc_addr_nodename", [
{datatype, {enum, [true, false]}}
Expand All @@ -155,6 +155,21 @@ fun(Conf) ->
end}.


%% service ID

{mapping, "cluster_formation.consul.svc_id", "rabbit.cluster_formation.peer_discovery_consul.consul_svc_id", [
{datatype, string}
]}.

{translation, "rabbit.cluster_formation.peer_discovery_consul.consul_svc_id",
fun(Conf) ->
case cuttlefish:conf_get("cluster_formation.consul.svc_id", Conf, undefined) of
undefined -> cuttlefish:unset();
Value -> Value
end
end}.


%% (optionally) append a suffix to node names retrieved from Consul

{mapping, "cluster_formation.consul.domain_suffix", "rabbit.cluster_formation.peer_discovery_consul.consul_domain", [
Expand Down
Loading

0 comments on commit cb288ab

Please sign in to comment.