diff --git a/config/test.exs b/config/test.exs index f5854ff802..279842bce6 100755 --- a/config/test.exs +++ b/config/test.exs @@ -136,6 +136,11 @@ config :archethic, Archethic.P2P.GeoPatch.GeoIP, MockGeoIP config :archethic, Archethic.P2P.BootstrappingSeeds, enabled: false +config :archethic, Archethic.P2P.Client.Connection, + backoff_strategy: :static, + heartbeat_interval: 200, + reconnect_delay: 50 + config :archethic, Archethic.Mining.PendingTransactionValidation, validate_node_ip: true config :archethic, Archethic.Metrics.Poller, enabled: false diff --git a/lib/archethic/p2p/client/connection.ex b/lib/archethic/p2p/client/connection.ex index 32601d416b..97f59d4ae5 100644 --- a/lib/archethic/p2p/client/connection.ex +++ b/lib/archethic/p2p/client/connection.ex @@ -21,8 +21,22 @@ defmodule Archethic.P2P.Client.Connection do require Logger use GenStateMachine, callback_mode: [:handle_event_function, :state_enter], restart: :temporary - @vsn 1 + @vsn 2 @table_name :connection_status + @max_reconnect_delay :timer.hours(6) + + @heartbeat_interval Keyword.get( + Application.compile_env(:archethic, __MODULE__, []), + :heartbeat_interval, + 10_000 + ) + @reconnect_delay Keyword.get( + Application.compile_env(:archethic, __MODULE__, []), + :reconnect_delay, + 500 + ) + # we cap the attemps to avoid doing 2 ** BIGNUM + @max_attempts (@max_reconnect_delay / @reconnect_delay) |> :math.log2() |> :math.ceil() @doc """ Starts a new connection @@ -59,6 +73,18 @@ defmodule Archethic.P2P.Client.Connection do end end + @doc """ + When called, if disconnect, it will try to connect to socket + Noop if it's already connected + + It's used when some node has been offline for a long time + It has connected to us so we know we can connect to it as well + """ + @spec wake_up(Crypto.key()) :: :ok + def wake_up(public_key) do + GenStateMachine.cast(via_tuple(public_key), :wake_up) + end + @doc """ Get the availability timer and reset it with a new start time if it was already started """ @@ -102,7 +128,10 @@ defmodule Archethic.P2P.Client.Connection do request_id: 0, messages: %{}, send_tasks: %{}, - availability_timer: {nil, 0} + availability_timer: {nil, 0}, + reconnect_attempts: 0, + heartbeats_sent: 0, + heartbeats_received: 0 } {:ok, :initializing, data, [{:next_event, :internal, {:connect, from}}]} @@ -190,7 +219,7 @@ defmodule Archethic.P2P.Client.Connection do end) # Reconnect with backoff - actions = [{{:timeout, :reconnect}, 500, nil} | actions] + actions = [{{:timeout, :reconnect}, backoff(data.reconnect_attempts), nil} | actions] {:keep_state, new_data, actions} end @@ -204,7 +233,11 @@ defmodule Archethic.P2P.Client.Connection do # Start availability timer new_data = - Map.update!(data, :availability_timer, fn + data + |> Map.put(:reconnect_attempts, 0) + |> Map.put(:heartbeats_sent, 0) + |> Map.put(:heartbeats_received, 0) + |> Map.update!(:availability_timer, fn {nil, time} -> {System.monotonic_time(:second), time} @@ -212,12 +245,13 @@ defmodule Archethic.P2P.Client.Connection do timer end) + Process.send_after(self(), :heartbeat, @heartbeat_interval) + {:keep_state, new_data} end def handle_event(:enter, _old_state, :initializing, _data), do: :keep_state_and_data def handle_event(:enter, _old_state, :disconnected, _data), do: :keep_state_and_data - def handle_event(:enter, _old_state, {:connected, _socket}, _data), do: :keep_state_and_data # called from the :disconnected or :initializing state def handle_event( @@ -258,9 +292,11 @@ defmodule Archethic.P2P.Client.Connection do end # this message is used to delay next connection attempt - def handle_event({:timeout, :reconnect}, _event_data, _state, _data) do + def handle_event({:timeout, :reconnect}, _event_data, _state, data) do actions = [{:next_event, :internal, {:connect, nil}}] - {:keep_state_and_data, actions} + + new_data = Map.update!(data, :reconnect_attempts, &min(@max_attempts, &1 + 1)) + {:keep_state, new_data, actions} end def handle_event( @@ -273,6 +309,25 @@ defmodule Archethic.P2P.Client.Connection do :keep_state_and_data end + def handle_event( + :cast, + :wake_up, + :disconnected, + data + ) do + actions = [{:next_event, :internal, {:connect, nil}}] + {:keep_state, %{data | reconnect_attempts: 0}, actions} + end + + def handle_event( + :cast, + :wake_up, + _, + _data + ) do + :keep_state_and_data + end + def handle_event( :cast, {:send_message, ref, from, message, timeout}, @@ -381,6 +436,36 @@ defmodule Archethic.P2P.Client.Connection do end end + def handle_event( + :info, + :heartbeat, + {:connected, socket}, + data = %{ + transport: transport, + heartbeats_sent: heartbeats_sent, + heartbeats_received: heartbeats_received + } + ) do + # disconnect if missed more than 2 heartbeats + if heartbeats_sent - heartbeats_received >= 2 do + transport.handle_close(socket) + {:next_state, :disconnected, data} + else + transport.handle_send(socket, "hb") + Process.send_after(self(), :heartbeat, @heartbeat_interval) + {:keep_state, %{data | heartbeats_sent: heartbeats_sent + 1}} + end + end + + def handle_event( + :info, + :heartbeat, + _state, + _data + ) do + :keep_state_and_data + end + def handle_event(:info, {ref, :ok}, {:connected, _socket}, data = %{send_tasks: send_tasks}) do case Map.pop(send_tasks, ref) do {nil, _} -> @@ -440,13 +525,13 @@ defmodule Archethic.P2P.Client.Connection do # Task.async sending us the result of the handle_connect def handle_event(:info, {_ref, {:error, _reason, nil}}, _, data) do - actions = [{{:timeout, :reconnect}, 500, nil}] + actions = [{{:timeout, :reconnect}, backoff(data.reconnect_attempts), nil}] {:next_state, :disconnected, data, actions} end def handle_event(:info, {_ref, {:error, reason, from}}, _, data) do send(from, {:error, reason}) - actions = [{{:timeout, :reconnect}, 500, nil}] + actions = [{{:timeout, :reconnect}, backoff(data.reconnect_attempts), nil}] {:next_state, :disconnected, data, actions} end @@ -456,7 +541,8 @@ defmodule Archethic.P2P.Client.Connection do {:connected, _socket}, data = %{ transport: transport, - node_public_key: node_public_key + node_public_key: node_public_key, + heartbeats_received: heartbeats_received } ) do case transport.handle_message(event) do @@ -467,6 +553,9 @@ defmodule Archethic.P2P.Client.Connection do {:next_state, :disconnected, data} + {:ok, "hb"} -> + {:keep_state, %{data | heartbeats_received: heartbeats_received + 1}} + {:ok, msg} -> set_node_connected(node_public_key) @@ -540,5 +629,28 @@ defmodule Archethic.P2P.Client.Connection do :ets.delete(@table_name, node_public_key) end + def code_change(1, state, data, _extra) do + {:ok, state, + data + |> Map.merge(%{ + reconnect_attempts: 0, + heartbeats_sent: 0, + heartbeats_received: 0 + })} + end + def code_change(_old_vsn, state, data, _extra), do: {:ok, state, data} + + defp backoff(attempts) do + config = Application.get_env(:archethic, __MODULE__, []) + + case Keyword.get(config, :backoff_strategy, :exponential) do + :static -> + @reconnect_delay + + :exponential -> + # cap at a few hours + min(@max_reconnect_delay, 2 ** attempts * @reconnect_delay) + end + end end diff --git a/lib/archethic/p2p/client/transport.ex b/lib/archethic/p2p/client/transport.ex index 4e9dc5b615..37a84d4c5f 100644 --- a/lib/archethic/p2p/client/transport.ex +++ b/lib/archethic/p2p/client/transport.ex @@ -4,4 +4,5 @@ defmodule Archethic.P2P.Client.Transport do @callback handle_connect(:inet.ip_address(), :inet.port_number()) :: {:ok, :inet.socket()} @callback handle_message(tuple()) :: {:ok, binary()} | {:error, :closed} | {:error, any()} @callback handle_send(:inet.socket(), binary()) :: :ok + @callback handle_close(:inet.socket()) :: :ok end diff --git a/lib/archethic/p2p/client/transport/tcp_impl.ex b/lib/archethic/p2p/client/transport/tcp_impl.ex index f4d1c0144b..d1bab0a019 100644 --- a/lib/archethic/p2p/client/transport/tcp_impl.ex +++ b/lib/archethic/p2p/client/transport/tcp_impl.ex @@ -20,6 +20,11 @@ defmodule Archethic.P2P.Client.Transport.TCPImpl do :gen_tcp.connect(ip, port, @options, 4000) end + @impl Transport + def handle_close(socket) do + :gen_tcp.close(socket) + end + @impl Transport def handle_message({:tcp, socket, data}) do :inet.setopts(socket, active: :once) diff --git a/lib/archethic/p2p/listener_protocol.ex b/lib/archethic/p2p/listener_protocol.ex index 4c98945228..1dacd951d1 100644 --- a/lib/archethic/p2p/listener_protocol.ex +++ b/lib/archethic/p2p/listener_protocol.ex @@ -9,6 +9,7 @@ defmodule Archethic.P2P.ListenerProtocol do alias Archethic.Crypto alias Archethic.P2P + alias Archethic.P2P.Client.Connection alias Archethic.P2P.Message alias Archethic.P2P.MessageEnvelop alias Archethic.TaskSupervisor @@ -35,6 +36,19 @@ defmodule Archethic.P2P.ListenerProtocol do }) end + def handle_info( + {_transport, socket, "hb"}, + state = %{transport: transport} + ) do + :inet.setopts(socket, active: :once) + + Task.Supervisor.start_child(TaskSupervisor, fn -> + transport.send(socket, "hb") + end) + + {:noreply, state} + end + def handle_info( {_transport, socket, err}, state = %{transport: transport, ip: ip} @@ -45,6 +59,7 @@ defmodule Archethic.P2P.ListenerProtocol do end transport.close(socket) + {:noreply, state} end @@ -84,6 +99,9 @@ defmodule Archethic.P2P.ListenerProtocol do ) if valid_signature? do + # we may attempt to wakeup a connection that offline + Connection.wake_up(sender_pkey) + message |> process_msg(sender_pkey) |> encode_response(message_id, sender_pkey) diff --git a/test/archethic/p2p/client/connection_test.exs b/test/archethic/p2p/client/connection_test.exs index e3cc0bf040..28234b1a07 100644 --- a/test/archethic/p2p/client/connection_test.exs +++ b/test/archethic/p2p/client/connection_test.exs @@ -14,6 +14,12 @@ defmodule Archethic.P2P.Client.ConnectionTest do alias Archethic.Utils + @heartbeat_interval Keyword.get( + Application.compile_env(:archethic, Connection, []), + :heartbeat_interval, + 10_000 + ) + test "start_link/1 should open a socket and a connection worker and initialize the backlog and lookup tables" do {:ok, pid} = Connection.start_link( @@ -83,6 +89,10 @@ defmodule Archethic.P2P.Client.ConnectionTest do {:error, :timeout} end + def handle_close(_socket) do + :ok + end + def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok def handle_message({_, _, _}), do: {:error, :closed} @@ -118,6 +128,10 @@ defmodule Archethic.P2P.Client.ConnectionTest do {:error, :timeout} end + def handle_close(_socket) do + :ok + end + def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok def handle_message({_, _, _}), do: {:error, :closed} @@ -166,7 +180,11 @@ defmodule Archethic.P2P.Client.ConnectionTest do {:ok, make_ref()} end - def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok + def handle_close(_socket) do + :ok + end + + def handle_send(_socket, _), do: :ok def handle_message({_, _, _}), do: {:error, :closed} end @@ -268,6 +286,10 @@ defmodule Archethic.P2P.Client.ConnectionTest do {:ok, make_ref()} end + def handle_close(_socket) do + :ok + end + def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok def handle_send(_socket, <<_::32, _rest::bitstring>>), do: {:error, :closed} @@ -334,6 +356,10 @@ defmodule Archethic.P2P.Client.ConnectionTest do end end + def handle_close(_socket) do + :ok + end + def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok def handle_send(_socket, <<_::32, _rest::bitstring>>) do @@ -555,6 +581,64 @@ defmodule Archethic.P2P.Client.ConnectionTest do end end + describe "Stale detection" do + test "should change state to disconnected once a few heartbeats are missed" do + defmodule MockTransportStale do + alias Archethic.P2P.Client.Transport + + @behaviour Transport + + def handle_connect({127, 0, 0, 1}, _port) do + conn_count = :persistent_term.get(:conn_count, 0) + :persistent_term.put(:conn_count, conn_count + 1) + + if conn_count == 0 do + {:ok, make_ref()} + else + {:error, :timeout} + end + end + + def handle_close(_socket) do + :persistent_term.put(:transport_closed, true) + :ok + end + + def handle_send(_socket, "hb") do + hb_count = :persistent_term.get(:hb_count, 0) + :persistent_term.put(:hb_count, hb_count + 1) + + # become stale after 5 hbs + if hb_count <= 5 do + send(self(), {:tcp, make_ref(), "hb"}) + end + + :ok + end + + def handle_send(_socket, _), do: :ok + + def handle_message({_, _, data}), do: {:ok, data} + end + + {:ok, pid} = + Connection.start_link( + transport: MockTransportStale, + ip: {127, 0, 0, 1}, + port: 3000, + node_public_key: Crypto.first_node_public_key() + ) + + Process.sleep(@heartbeat_interval * 5) + assert {{:connected, _}, _} = :sys.get_state(pid) + + Process.sleep(@heartbeat_interval * 5) + assert {:disconnected, _} = :sys.get_state(pid) + + assert :persistent_term.get(:transport_closed) + end + end + defmodule MockTransport do alias Archethic.P2P.Client.Transport @@ -564,6 +648,15 @@ defmodule Archethic.P2P.Client.ConnectionTest do {:ok, make_ref()} end + def handle_close(_socket) do + :ok + end + + def handle_send(_socket, "hb") do + send(self(), {:tcp, make_ref(), "hb"}) + :ok + end + def handle_send(_socket, _data), do: :ok def handle_message({_, _, data}), do: {:ok, data} @@ -578,7 +671,11 @@ defmodule Archethic.P2P.Client.ConnectionTest do {:ok, make_ref()} end - def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok + def handle_close(_socket) do + :ok + end + + def handle_send(_socket, _), do: :ok def handle_message({_, _, _}), do: {:error, :closed} end