Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rabbit_peer_discovery: Fixes and improvements for Consul and etcd #11045

Merged
merged 9 commits into from
May 14, 2024
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ init() ->

ensure_dir_exists(),
rabbit_peer_discovery:maybe_init(),
rabbit_peer_discovery:maybe_register(),

pre_init(IsVirgin),

Expand All @@ -77,7 +78,6 @@ init() ->
"DB: initialization successeful",
#{domain => ?RMQLOG_DOMAIN_DB}),

rabbit_peer_discovery:maybe_register(),
init_finished(),

ok;
Expand Down
43 changes: 34 additions & 9 deletions deps/rabbit/src/rabbit_peer_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,22 @@ sync_desired_cluster(Backend, RetriesLeft, RetryDelay) ->
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok;
{ok, {DiscoveredNodes, NodeType}} ->
NodesAndProps = query_node_props(DiscoveredNodes),
case can_use_discovered_nodes(DiscoveredNodes, NodesAndProps) of
NodeAlreadySelected = is_atom(DiscoveredNodes),
NodesAndProps = case NodeAlreadySelected of
true ->
?LOG_DEBUG(
"Peer discovery: node '~ts' already "
"selected by backend",
[DiscoveredNodes],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
query_node_props([DiscoveredNodes]);
false ->
query_node_props(DiscoveredNodes)
end,
CanUse = (
NodeAlreadySelected orelse
can_use_discovered_nodes(DiscoveredNodes, NodesAndProps)),
case CanUse of
true ->
case select_node_to_join(NodesAndProps) of
SelectedNode when SelectedNode =/= false ->
Expand Down Expand Up @@ -249,8 +263,9 @@ retry_sync_desired_cluster(_Backend, 0, _RetryDelay) ->
ok.

-spec discover_cluster_nodes() -> {ok, Discovery} when
Discovery :: {DiscoveredNodes, NodeType},
Discovery :: {DiscoveredNodes | SelectedNode, NodeType},
DiscoveredNodes :: [node()],
SelectedNode :: node(),
NodeType :: rabbit_types:node_type().
%% @doc Queries the peer discovery backend to discover nodes.
%%
Expand All @@ -262,10 +277,11 @@ discover_cluster_nodes() ->

-spec discover_cluster_nodes(Backend) -> Ret when
Backend :: backend(),
Discovery :: {DiscoveredNodes, NodeType},
Ret :: {ok, Discovery} | {error, Reason},
Discovery :: {DiscoveredNodes | SelectedNode, NodeType},
DiscoveredNodes :: [node()],
SelectedNode :: node(),
NodeType :: rabbit_types:node_type(),
Ret :: {ok, Discovery} | {error, Reason},
Reason :: any().
%% @private

Expand Down Expand Up @@ -295,7 +311,7 @@ discover_cluster_nodes(Backend) ->

-spec check_discovered_nodes_list_validity(DiscoveredNodes, NodeType) ->
Ret when
DiscoveredNodes :: [node()],
DiscoveredNodes :: [node()] | node(),
NodeType :: rabbit_types:node_type(),
Ret :: ok.
%% @private
Expand All @@ -310,6 +326,12 @@ check_discovered_nodes_list_validity(DiscoveredNodes, NodeType)
[] -> ok;
_ -> e({invalid_cluster_node_names, BadNodenames})
end;
check_discovered_nodes_list_validity(SelectedNode, NodeType)
when NodeType =:= disc orelse NodeType =:= disk orelse NodeType =:= ram ->
case is_atom(SelectedNode) of
true -> ok;
false -> e({invalid_cluster_node_names, SelectedNode})
end;
check_discovered_nodes_list_validity(DiscoveredNodes, BadNodeType)
when is_list(DiscoveredNodes) ->
e({invalid_cluster_node_type, BadNodeType}).
Expand Down Expand Up @@ -836,7 +858,7 @@ can_use_discovered_nodes(_DiscoveredNodes, []) ->
false.

-spec select_node_to_join(NodesAndProps) -> SelectedNode when
NodesAndProps :: [node_and_props()],
NodesAndProps :: nonempty_list(node_and_props()),
SelectedNode :: node() | false.
%% @doc Selects the node to join among the sorted list of nodes.
%%
Expand Down Expand Up @@ -1140,10 +1162,10 @@ unlock(Backend, Data) ->
{Nodes :: [node()],
NodeType :: rabbit_types:node_type()} |
{ok, Nodes :: [node()]} |
{ok, {Nodes :: [node()],
{ok, {Nodes :: [node()] | node(),
NodeType :: rabbit_types:node_type()}} |
{error, Reason :: string()}) ->
{ok, {Nodes :: [node()], NodeType :: rabbit_types:node_type()}} |
{ok, {Nodes :: [node()] | node(), NodeType :: rabbit_types:node_type()}} |
{error, Reason :: string()}.

normalize(Nodes) when is_list(Nodes) ->
Expand All @@ -1154,6 +1176,9 @@ normalize({ok, Nodes}) when is_list(Nodes) ->
{ok, {Nodes, disc}};
normalize({ok, {Nodes, NodeType}}) when is_list(Nodes) andalso is_atom(NodeType) ->
{ok, {Nodes, NodeType}};
normalize({ok, {Node, NodeType}})
when is_atom(Node) andalso is_atom(NodeType) ->
{ok, {Node, NodeType}};
normalize({error, Reason}) ->
{error, Reason}.

Expand Down
7 changes: 6 additions & 1 deletion deps/rabbit_common/src/rabbit_peer_discovery_backend.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

-callback init() -> ok | {error, Reason :: string()}.

-callback list_nodes() -> {ok, {Nodes :: list(), NodeType :: rabbit_types:node_type()}} |
-callback list_nodes() -> {ok, {Nodes :: [node()] | node(), NodeType :: rabbit_types:node_type()}} |
{error, Reason :: string()}.

-callback supports_registration() -> boolean().
Expand All @@ -57,3 +57,8 @@
-callback unlock(Data :: term()) -> ok.

-optional_callbacks([init/0]).

-export([api_version/0]).

api_version() ->
2.
11 changes: 8 additions & 3 deletions deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,10 @@ exec([Cmd | Args], Options) when is_list(Cmd) orelse is_binary(Cmd) ->
Env1 = [
begin
Key1 = format_arg(Key),
Value1 = format_arg(Value),
Value1 = case Value of
false -> false;
_ -> format_arg(Value)
end,
Value2 = case is_binary(Value1) of
true -> binary_to_list(Value1);
false -> Value1
Expand All @@ -894,8 +897,10 @@ exec([Cmd | Args], Options) when is_list(Cmd) orelse is_binary(Cmd) ->
| proplists:delete(env, PortOptions1)],
Log ++ "~n~nEnvironment variables:~n" ++
string:join(
[rabbit_misc:format(" ~ts=~ts", [K, string:replace(V, "~", "~~", all)])
|| {K, V} <- Env1],
[rabbit_misc:format(
" ~ts=~ts",
[K, string:replace(V, "~", "~~", all)])
|| {K, V} <- Env1, is_list(V) ],
"~n")
}
end,
Expand Down
5 changes: 5 additions & 0 deletions deps/rabbitmq_peer_discovery_consul/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ rabbitmq_integration_suite(
name = "config_schema_SUITE",
)

rabbitmq_integration_suite(
name = "system_SUITE",
size = "large",
)

rabbitmq_suite(
name = "rabbitmq_peer_discovery_consul_SUITE",
size = "medium",
Expand Down
9 changes: 9 additions & 0 deletions deps/rabbitmq_peer_discovery_consul/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
app_name = "rabbitmq_peer_discovery_consul",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "system_SUITE_beam_files",
testonly = True,
srcs = ["test/system_SUITE.erl"],
outs = ["test/system_SUITE.beam"],
hdrs = [],
app_name = "rabbitmq_peer_discovery_consul",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "rabbitmq_peer_discovery_consul_SUITE_beam_files",
testonly = True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@
env_variable = "CONSUL_SVC_ADDR_NODENAME",
default_value = false
},
consul_svc_id => #peer_discovery_config_entry_meta{
type = string,
env_variable = "CONSUL_SVC_ID",
default_value = "undefined"
},
consul_svc_port => #peer_discovery_config_entry_meta{
type = integer,
env_variable = "CONSUL_SVC_PORT",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ fun(Conf) ->
end}.


%% use (Erlang) node name when compuing service address?
%% use (Erlang) node name when computing service address?

{mapping, "cluster_formation.consul.svc_addr_use_nodename", "rabbit.cluster_formation.peer_discovery_consul.consul_svc_addr_nodename", [
{datatype, {enum, [true, false]}}
Expand All @@ -155,6 +155,21 @@ fun(Conf) ->
end}.


%% service ID

{mapping, "cluster_formation.consul.svc_id", "rabbit.cluster_formation.peer_discovery_consul.consul_svc_id", [
{datatype, string}
]}.

{translation, "rabbit.cluster_formation.peer_discovery_consul.consul_svc_id",
fun(Conf) ->
case cuttlefish:conf_get("cluster_formation.consul.svc_id", Conf, undefined) of
undefined -> cuttlefish:unset();
Value -> Value
end
end}.


%% (optionally) append a suffix to node names retrieved from Consul

{mapping, "cluster_formation.consul.domain_suffix", "rabbit.cluster_formation.peer_discovery_consul.consul_domain", [
Expand Down
Loading
Loading