diff --git a/CHANGELOG.md b/CHANGELOG.md index f552a78..757d011 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +- Add manual syncing mechanism for ChainSync. This adds an optional back-pressure when building chain indexers that rely on Xogmios. Tested with GenStage on [the following experimental branch](https://github.com/wowica/xogmios_watcher/tree/chain-indexer) + +- Fix process naming for ChainSync clients. Given different process names and ids as options, multiple clients are now supported. + ### Changed - StateQuery.send_query interface. Now accepts queries as strings from user input. diff --git a/examples/chain_sync_client.ex b/examples/chain_sync_client.ex index 7a58d9e..172c4c2 100644 --- a/examples/chain_sync_client.ex +++ b/examples/chain_sync_client.ex @@ -41,6 +41,6 @@ defmodule ChainSyncClient do @impl true def handle_block(block, state) do IO.puts("final handle_block #{block["height"]}") - {:ok, :close, state} + {:close, state} end end diff --git a/lib/xogmios/chain_sync.ex b/lib/xogmios/chain_sync.ex index de199c5..dbc8b50 100644 --- a/lib/xogmios/chain_sync.ex +++ b/lib/xogmios/chain_sync.ex @@ -5,14 +5,17 @@ defmodule Xogmios.ChainSync do alias Xogmios.ChainSync.Messages + require Logger + @doc """ Invoked when a new block is emitted. This callback is required. - Returning `{:ok, :next_block, new_state}` will request the next block once it's made available. + Returning `{:ok, :next_block, new_state}` will request the next block once + it's made available. Returning `{:ok, new_state}` will not request anymore blocks. - Returning `{:ok, :close, new_state}` will close the connection to the server. + Returning `{:close, new_state}` will close the connection to the server. """ @callback handle_block(block :: map(), state) :: {:ok, :next_block, new_state} @@ -42,6 +45,9 @@ defmodule Xogmios.ChainSync do # This is important because proxies might close idle connections after a few seconds. @keepalive_in_ms 5_000 + # The websocket client library + @client :websocket_client + @doc """ Starts a new Chain Sync process linked to the current process. @@ -50,11 +56,92 @@ defmodule Xogmios.ChainSync do @spec start_link(module(), start_options :: Keyword.t()) :: {:ok, pid()} | {:error, term()} def start_link(client, opts) do {url, opts} = Keyword.pop(opts, :url) - initial_state = Keyword.merge(opts, handler: client) + {name, opts} = Keyword.pop(opts, :name, client) + initial_state = Keyword.merge(opts, handler: client, notify_on_connect: self()) + + with {:ok, process_name} <- build_process_name(name), + {:ok, ws_pid} <- start_link(process_name, url, client, initial_state) do + # Blocks until the connection with the Ogmios server + # is established or until timeout is reached. + receive do + {:connected, _connection} -> {:ok, ws_pid} + after + _timeout = 5_000 -> + Logger.warning("Timeout connecting to Ogmios server") + send(ws_pid, :close) + {:error, :connection_timeout} + end + else + {:error, :invalid_process_name} = error -> + error + + {:error, _} = error -> + Logger.warning("Error connecting with Ogmios server #{inspect(error)}") + error + end + end - :websocket_client.start_link({:local, client}, url, client, initial_state, - keepalive: @keepalive_in_ms - ) + defp start_link(name, url, client, state) do + @client.start_link(name, url, client, state, keepalive: @keepalive_in_ms) + end + + # Builds process name from valid argument or returns error + @spec build_process_name(term() | {:global, term()} | {:via, term(), term()}) :: + {:ok, any()} | {:error, term()} + defp build_process_name(name) do + case name do + name when is_atom(name) -> + {:ok, {:local, name}} + + {:global, term} = tuple when is_atom(term) -> + {:ok, tuple} + + {:via, registry, _term} = tuple when is_atom(registry) -> + {:ok, tuple} + + _ -> + # Returns error if name does not comply with + # values accepted by the websocket client library + {:error, :invalid_process_name} + end + end + + @doc """ + > #### Warning {: .warning} + > + > This is a highly experimental function and should not be relied on just yet. + + Issues a synchronous message for reading the next block. + Potentially useful for building chain indexers with support for backpressure mechanism. + """ + @spec read_next_block(pid()) :: {:ok, block :: map()} | :error + def read_next_block(pid) do + # hacky af but it does the job for now + + state = :sys.get_state(pid) + + {_c, %{ws_pid: ws_pid}} = state |> elem(1) |> elem(5) + + caller = self() + + :sys.replace_state(pid, fn current_state -> + {:connected, {:context, req, transport, empty_list, ws, {module, client_info}, _, _, _}} = + current_state + + updated_client_info = Map.put(client_info, :caller, caller) + + {:connected, + {:context, req, transport, empty_list, ws, {module, updated_client_info}, "", true, 0}} + end) + + next_block_message = Xogmios.ChainSync.Messages.next_block() + :websocket_client.cast(ws_pid, {:text, next_block_message}) + + receive do + {:ok, next_block} -> {:ok, next_block} + after + 5_000 -> :error + end end defmacro __using__(_opts) do @@ -114,19 +201,25 @@ defmodule Xogmios.ChainSync do %{"method" => "nextBlock", "result" => %{"direction" => "forward"} = result}, state ) do - case state.handler.handle_block(result["block"], state) do - {:ok, :next_block, new_state} -> - message = Messages.next_block() - {:reply, {:text, message}, new_state} - - {:ok, new_state} -> - {:ok, new_state} + if caller = Map.get(state, :caller) do + # Returns to sync caller + send(caller, {:ok, result["block"]}) + {:ok, state} + else + case state.handler.handle_block(result["block"], state) do + {:ok, :next_block, new_state} -> + message = Messages.next_block() + {:reply, {:text, message}, new_state} + + {:ok, new_state} -> + {:ok, new_state} - {:close, new_state} -> - {:close, "finished", new_state} + {:close, new_state} -> + {:close, "finished", new_state} - response -> - Logger.warning("Invalid response #{inspect(response)}") + response -> + Logger.warning("Invalid response #{inspect(response)}") + end end end diff --git a/lib/xogmios/chain_sync/connection.ex b/lib/xogmios/chain_sync/connection.ex index 23b2725..bd17170 100644 --- a/lib/xogmios/chain_sync/connection.ex +++ b/lib/xogmios/chain_sync/connection.ex @@ -6,8 +6,6 @@ defmodule Xogmios.ChainSync.Connection do alias Xogmios.ChainSync.Messages - require Logger - defmacro __using__(_opts) do quote do @behaviour :websocket_client @@ -19,7 +17,7 @@ defmodule Xogmios.ChainSync.Connection do def child_spec(opts) do %{ - id: __MODULE__, + id: Keyword.get(opts, :id, __MODULE__), start: {__MODULE__, :start_link, [opts]}, shutdown: 5_000, restart: Keyword.get(opts, :restart, :transient), @@ -38,9 +36,12 @@ defmodule Xogmios.ChainSync.Connection do end @impl true - def onconnect(_arg, state) do + def onconnect(connection, state) do + state = Map.put(state, :ws_pid, self()) + start_message = Messages.next_block_start() :websocket_client.cast(self(), {:text, start_message}) + send(state.notify_on_connect, {:connected, connection}) case state.handler.handle_connect(state) do {:ok, new_state} -> diff --git a/test/chain_sync_test.exs b/test/chain_sync_test.exs index 772b230..2191f81 100644 --- a/test/chain_sync_test.exs +++ b/test/chain_sync_test.exs @@ -40,4 +40,76 @@ defmodule Xogmios.ChainSyncTest do refute Process.alive?(pid) assert GenServer.whereis(DummyClient) == nil end + + defmodule DummyClientWithName do + use Xogmios, :chain_sync + + def start_link(opts) do + Xogmios.start_chain_sync_link(__MODULE__, opts) + end + + @impl true + def handle_block(_block, state) do + send(state.test_handler, :handle_block) + {:ok, state} + end + end + + test "allows multiple named processes" do + opts1 = [url: @ws_url, test_handler: self(), name: :apple, id: :apple] + pid1 = start_supervised!({DummyClientWithName, opts1}) + + assert is_pid(pid1) + assert Process.info(pid1)[:registered_name] == :apple + + opts2 = [url: @ws_url, test_handler: self(), name: :banana, id: :banana] + pid2 = start_supervised!({DummyClientWithName, opts2}) + + assert is_pid(pid2) + assert pid1 != pid2 + assert Process.info(pid2)[:registered_name] == :banana + + # Duplicate process names raise error + assert_raise RuntimeError, fn -> + start_supervised!({DummyClientWithName, opts2}) + end + + # Process name must be atom + opts22 = [url: @ws_url, test_handler: self(), name: "banana", id: :banana_string] + {:error, {:invalid_process_name, _}} = start_supervised({DummyClientWithName, opts22}) + + # Named via Registry + opts3 = [ + url: @ws_url, + test_handler: self(), + name: {:via, Registry, {DummyRegistry, :apple}}, + id: :registry_apple + ] + + _registry_pid = start_supervised!({Registry, keys: :unique, name: DummyRegistry}) + + pid3 = start_supervised!({DummyClientWithName, opts3}) + + assert is_pid(pid3) + assert Process.info(pid3)[:registered_name] == nil + assert [{^pid3, nil}] = Registry.lookup(DummyRegistry, :apple) + + # Global + opts4 = [ + url: @ws_url, + test_handler: self(), + name: {:global, :global_banana}, + id: :global_banana + ] + + pid4 = start_supervised!({DummyClientWithName, opts4}) + + assert is_pid(pid4) + + global_pid4 = :global.whereis_name(:global_banana) + + assert is_pid(global_pid4) + + assert pid4 == global_pid4 + end end