From ccae75e76ac866da9a93e23a8b6d29b0f2128581 Mon Sep 17 00:00:00 2001 From: Carlos Souza Date: Thu, 6 Jun 2024 17:54:32 -0400 Subject: [PATCH 01/16] Read tx from mempool --- examples/mempool_client.ex | 18 ++++ lib/xogmios.ex | 12 +++ lib/xogmios/mempool.ex | 172 ++++++++++++++++++++++++++++++ lib/xogmios/mempool/connection.ex | 94 ++++++++++++++++ lib/xogmios/mempool/messages.ex | 41 +++++++ 5 files changed, 337 insertions(+) create mode 100644 examples/mempool_client.ex create mode 100644 lib/xogmios/mempool.ex create mode 100644 lib/xogmios/mempool/connection.ex create mode 100644 lib/xogmios/mempool/messages.ex diff --git a/examples/mempool_client.ex b/examples/mempool_client.ex new file mode 100644 index 0000000..5fb0b78 --- /dev/null +++ b/examples/mempool_client.ex @@ -0,0 +1,18 @@ +defmodule XogmiosWatcher.MempoolClient do + @moduledoc """ + This module prints transactions as they become available + in the mempool + """ + use Xogmios, :mempool + + def start_link(opts) do + Xogmios.start_mempool_link(__MODULE__, opts) + end + + @impl true + def handle_transaction(transaction, state) do + IO.puts("transaction #{transaction["id"]}") + + {:ok, :next_transaction, state} + end +end diff --git a/lib/xogmios.ex b/lib/xogmios.ex index 9505bd7..1e67e5e 100644 --- a/lib/xogmios.ex +++ b/lib/xogmios.ex @@ -21,6 +21,7 @@ defmodule Xogmios do alias Xogmios.ChainSync alias Xogmios.StateQuery alias Xogmios.TxSubmission + alias Xogmios.Mempool @doc """ Starts a new State Query process linked to the current process. @@ -89,6 +90,11 @@ defmodule Xogmios do TxSubmission.start_link(client, opts) end + # TODO: doc + def start_mempool_link(client, opts) do + Mempool.start_link(client, opts) + end + defmacro __using__(:state_query) do quote do use Xogmios.StateQuery @@ -106,4 +112,10 @@ defmodule Xogmios do use Xogmios.TxSubmission end end + + defmacro __using__(:mempool) do + quote do + use Xogmios.Mempool + end + end end diff --git a/lib/xogmios/mempool.ex b/lib/xogmios/mempool.ex new file mode 100644 index 0000000..d0335de --- /dev/null +++ b/lib/xogmios/mempool.ex @@ -0,0 +1,172 @@ +defmodule Xogmios.Mempool do + @moduledoc """ + This module interfaces with the Mempool protocol. + """ + + alias Xogmios.Mempool.Messages + + require Logger + + # TODO: doc + @callback handle_transaction(transaction :: map(), state) :: + {:ok, :next_transaction, new_state} + | {:ok, new_state} + | {:close, new_state} + when state: term(), new_state: term() + + @doc """ + Invoked upon connecting to the server. This callback is optional. + """ + @callback handle_connect(state) :: {:ok, new_state} + when state: term(), new_state: term() + + @doc """ + Invoked upon disconnecting from the server. This callback is optional. + + Returning `{:ok, new_state}` will allow the connection to close. + + Returning `{:reconnect, interval_in_ms}` will attempt a reconnection after `interval_in_ms` + """ + @callback handle_disconnect(reason :: String.t(), state) :: + {:ok, new_state} + | {:reconnect, interval_in_ms :: non_neg_integer(), new_state} + when state: term(), new_state: term() + + # The keepalive option is used to maintain the connection active. + # This is important because proxies might close idle connections after a few seconds. + @keepalive_in_ms 5_000 + + # The websocket client library + @client :websocket_client + + @doc """ + Starts a new Mempool process linked to the current process. + + This function should not be called directly, but rather via `Xogmios.start_mempool_link/2` + """ + @spec start_link(module(), start_options :: Keyword.t()) :: {:ok, pid()} | {:error, term()} + def start_link(client, opts) do + {url, opts} = Keyword.pop(opts, :url) + {name, opts} = Keyword.pop(opts, :name, client) + initial_state = Keyword.merge(opts, handler: client, notify_on_connect: self()) + + with {:ok, process_name} <- build_process_name(name), + {:ok, ws_pid} <- start_link(process_name, url, client, initial_state) do + # Blocks until the connection with the Ogmios server + # is established or until timeout is reached. + + receive do + {:connected, _connection} -> {:ok, ws_pid} + after + _timeout = 5_000 -> + Logger.warning("Timeout connecting to Ogmios server") + send(ws_pid, :close) + {:error, :connection_timeout} + end + else + {:error, :invalid_process_name} = error -> + error + + {:error, _} = error -> + Logger.warning("Error connecting with Ogmios server #{inspect(error)}") + error + end + end + + defp start_link(name, url, client, state) do + @client.start_link(name, url, client, state, keepalive: @keepalive_in_ms) + end + + # Builds process name from valid argument or returns error + @spec build_process_name(term() | {:global, term()} | {:via, term(), term()}) :: + {:ok, any()} | {:error, term()} + defp build_process_name(name) do + case name do + name when is_atom(name) -> + {:ok, {:local, name}} + + {:global, term} = tuple when is_atom(term) -> + {:ok, tuple} + + {:via, registry, _term} = tuple when is_atom(registry) -> + {:ok, tuple} + + _ -> + # Returns error if name does not comply with + # values accepted by the websocket client library + {:error, :invalid_process_name} + end + end + + defmacro __using__(_opts) do + quote do + @behaviour Xogmios.Mempool + + use Xogmios.Mempool.Connection + + require Logger + + def handle_connect(state), do: {:ok, state} + def handle_disconnect(_reason, state), do: {:ok, state} + def handle_acquired(_slot, state), do: {:ok, :next_transaction, state} + defoverridable handle_connect: 1, handle_disconnect: 2, handle_acquired: 2 + + def handle_message( + %{ + "method" => "acquireMempool", + "result" => %{"acquired" => "mempool", "slot" => slot} = _result + }, + state + ) do + case state.handler.handle_acquired(%{"slot" => slot}, state) do + {:ok, :next_transaction, new_state} -> + message = Messages.next_transaction() + {:reply, {:text, message}, new_state} + + {:ok, new_state} -> + {:ok, new_state} + + {:close, new_state} -> + {:close, "finished", new_state} + + response -> + Logger.warning("Invalid response #{inspect(response)}") + end + end + + def handle_message( + %{ + "method" => "nextTransaction", + "result" => %{"transaction" => nil} = _result + }, + state + ) do + message = Messages.acquire_mempool() + {:reply, {:text, message}, state} + end + + def handle_message( + %{ + "method" => "nextTransaction", + "result" => %{"transaction" => transaction} = _result + }, + state + ) do + case state.handler.handle_transaction(transaction, state) do + {:ok, :next_transaction, new_state} -> + message = Messages.next_transaction() + {:reply, {:text, message}, new_state} + + {:ok, new_state} -> + {:ok, new_state} + + {:close, new_state} -> + {:close, "finished", new_state} + + response -> + Logger.warning("Invalid response #{inspect(response)}") + end + end + end + end +end diff --git a/lib/xogmios/mempool/connection.ex b/lib/xogmios/mempool/connection.ex new file mode 100644 index 0000000..bca9c24 --- /dev/null +++ b/lib/xogmios/mempool/connection.ex @@ -0,0 +1,94 @@ +defmodule Xogmios.Mempool.Connection do + @moduledoc """ + This module implements a connection with the Ogmios Websocket server + for the Mempool protocol. + """ + + alias Xogmios.Mempool.Messages + + defmacro __using__(_opts) do + quote do + @behaviour :websocket_client + + require Logger + + @name __MODULE__ + @reconnect_interval 5_000 + + def child_spec(opts) do + %{ + id: Keyword.get(opts, :id, __MODULE__), + start: {__MODULE__, :start_link, [opts]}, + shutdown: 5_000, + restart: Keyword.get(opts, :restart, :transient), + type: :worker + } + end + + @impl true + def init(state) do + initial_state = + state + |> Enum.into(%{}) + |> Map.merge(%{handler: __MODULE__}) + + {:reconnect, initial_state} + end + + @impl true + def onconnect(connection, state) do + state = Map.put(state, :ws_pid, self()) + + start_message = Messages.acquire_mempool() + :websocket_client.cast(self(), {:text, start_message}) + send(state.notify_on_connect, {:connected, connection}) + + case state.handler.handle_connect(state) do + {:ok, new_state} -> + {:ok, new_state} + + _ -> + {:ok, state} + end + end + + @impl true + def ondisconnect(reason, state) do + case state.handler.handle_disconnect(reason, state) do + {:ok, state} -> + {:ok, state} + + {:reconnect, reconnect_interval_in_ms, new_state} -> + {:reconnect, reconnect_interval_in_ms, new_state} + end + end + + @impl true + def websocket_handle({:text, raw_message}, _conn, state) do + case Jason.decode(raw_message) do + {:ok, message} -> + handle_message(message, state) + + {:error, reason} -> + Logger.warning("Error decoding message #{inspect(reason)}") + {:ok, state} + end + end + + @impl true + def websocket_handle(_message, _conn, state) do + {:ok, state} + end + + @impl true + def websocket_info(_any, _arg1, state) do + {:ok, state} + end + + @impl true + def websocket_terminate(_arg0, _arg1, _state) do + :ok + end + end + end +end diff --git a/lib/xogmios/mempool/messages.ex b/lib/xogmios/mempool/messages.ex new file mode 100644 index 0000000..56bfad9 --- /dev/null +++ b/lib/xogmios/mempool/messages.ex @@ -0,0 +1,41 @@ +defmodule Xogmios.Mempool.Messages do + @moduledoc """ + This module contains messages for the Mempool protocol + """ + + alias Jason.DecodeError + + def acquire_mempool() do + json = ~S""" + { + "jsonrpc": "2.0", + "method": "acquireMempool" + } + """ + + validate_json!(json) + json + end + + def next_transaction() do + json = ~S""" + { + "jsonrpc": "2.0", + "method": "nextTransaction", + "params": { + "fields": "all" + } + } + """ + + validate_json!(json) + json + end + + defp validate_json!(json) do + case Jason.decode(json) do + {:ok, _decoded} -> :ok + {:error, %DecodeError{} = error} -> raise "Invalid JSON: #{inspect(error)}" + end + end +end From ddc3cb3fc29e3de5017df8d333ae397fd890cc72 Mon Sep 17 00:00:00 2001 From: Carlos Souza Date: Thu, 13 Jun 2024 18:25:31 -0400 Subject: [PATCH 02/16] Implement size_of_mempool and has_transaction Update docs. --- README.md | 2 +- lib/xogmios.ex | 35 +++++++++++-- lib/xogmios/mempool.ex | 91 ++++++++++++++++++++++++++++++++- lib/xogmios/mempool/messages.ex | 44 ++++++++++++++-- 4 files changed, 162 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index fc64711..925c0cf 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ Mini-Protocols supported by this library: - [x] Chain Synchronization - [x] State Query - [x] Tx Submission -- [ ] Mempool Monitoring +- [x] Mempool Monitoring See [Examples](#examples) section below for information on how to use this library. diff --git a/lib/xogmios.ex b/lib/xogmios.ex index 1e67e5e..e588d38 100644 --- a/lib/xogmios.ex +++ b/lib/xogmios.ex @@ -3,19 +3,27 @@ defmodule Xogmios do This is the top level module for Xogmios. It implements functions to be used by client modules that wish to connect with Ogmios. - When used, the it expects one of the supported mini-protocols as argument. For example: + When you `use` this module, it expects one of the supported mini-protocols as argument. For example: defmodule ChainSyncClient do use Xogmios, :chain_sync # ... end - or - defmodule StateQueryClient do use Xogmios, :state_query # ... end + + defmodule TxSubmissionClient do + use Xogmios, :tx_submission + # ... + end + + defmodule MempoolClient do + use Xogmios, :mempool + # ... + end """ alias Xogmios.ChainSync @@ -90,7 +98,26 @@ defmodule Xogmios do TxSubmission.start_link(client, opts) end - # TODO: doc + @doc """ + Starts a new Mempool process linked to the current process. + + `opts` as keyword lists are passed to the underlying :websocket_client. + + The `:include_details` option can be used to determine which values + should be returned with each transaction as part of `c:Xogmios.Mempool.handle_transaction/2`. + + Setting this option to `false` (default) means only transaction id is returned: + + ``` + Xogmios.start_mempool_link(__MODULE__, url: ogmios_url, include_details: false) + ``` + + Setting it to `true` means all transaction fields are returned: + + ``` + Xogmios.start_mempool_link(__MODULE__, url: ogmios_url, include_details: true) + ``` + """ def start_mempool_link(client, opts) do Mempool.start_link(client, opts) end diff --git a/lib/xogmios/mempool.ex b/lib/xogmios/mempool.ex index d0335de..37e106b 100644 --- a/lib/xogmios/mempool.ex +++ b/lib/xogmios/mempool.ex @@ -7,7 +7,18 @@ defmodule Xogmios.Mempool do require Logger - # TODO: doc + @doc """ + Invoked when a new transaction is made available in the mempool. + + Receives transaction information as argument and current state of the handler. + + Returning `{:ok, :next_transaction, new_state}` will request the next transaction + once it's made available. + + Returning `{:ok, new_state}` wil not request anymore transactions. + + Returning `{:close, new_state}` will close the connection to the server + """ @callback handle_transaction(transaction :: map(), state) :: {:ok, :next_transaction, new_state} | {:ok, new_state} @@ -48,6 +59,7 @@ defmodule Xogmios.Mempool do def start_link(client, opts) do {url, opts} = Keyword.pop(opts, :url) {name, opts} = Keyword.pop(opts, :name, client) + opts = Keyword.put_new(opts, :include_details, false) initial_state = Keyword.merge(opts, handler: client, notify_on_connect: self()) with {:ok, process_name} <- build_process_name(name), @@ -98,6 +110,61 @@ defmodule Xogmios.Mempool do end end + @doc """ + Issues a synchronous message for getting the size of the mempool. + """ + @spec size_of_mempool(pid()) :: {:ok, response :: map()} | :error + def size_of_mempool(pid) do + # hacky af but it does the job for now + ws_pid = update_ws_with_caller(pid) + + message = Xogmios.Mempool.Messages.size_of_mempool() + :websocket_client.cast(ws_pid, {:text, message}) + + receive do + {:ok, response} -> {:ok, response} + after + 5_000 -> :error + end + end + + @spec has_transaction(pid(), tx_id :: binary()) :: boolean() + def has_transaction(pid, tx_id) do + # hacky af but it does the job for now + ws_pid = update_ws_with_caller(pid) + + message = Xogmios.Mempool.Messages.has_transaction(tx_id) + :websocket_client.cast(ws_pid, {:text, message}) + + receive do + {:ok, response} -> {:ok, response} + after + 5_000 -> :error + end + end + + # Updates Websocket process with self() as + # caller and returns the Websocket process id + defp update_ws_with_caller(pid) do + state = :sys.get_state(pid) + + {_c, %{ws_pid: ws_pid}} = state |> elem(1) |> elem(5) + + caller = self() + + :sys.replace_state(pid, fn current_state -> + {:connected, {:context, req, transport, empty_list, ws, {module, client_info}, _, _, _}} = + current_state + + updated_client_info = Map.put(client_info, :caller, caller) + + {:connected, + {:context, req, transport, empty_list, ws, {module, updated_client_info}, "", true, 0}} + end) + + ws_pid + end + defmacro __using__(_opts) do quote do @behaviour Xogmios.Mempool @@ -120,7 +187,7 @@ defmodule Xogmios.Mempool do ) do case state.handler.handle_acquired(%{"slot" => slot}, state) do {:ok, :next_transaction, new_state} -> - message = Messages.next_transaction() + message = Messages.next_transaction(state.include_details) {:reply, {:text, message}, new_state} {:ok, new_state} -> @@ -167,6 +234,26 @@ defmodule Xogmios.Mempool do Logger.warning("Invalid response #{inspect(response)}") end end + + # Responds to synchronous call + def handle_message( + %{"method" => "sizeOfMempool", "result" => result}, + state + ) do + caller = Map.get(state, :caller) + send(caller, {:ok, result}) + {:ok, state} + end + + # Responds to synchronous call + def handle_message( + %{"method" => "hasTransaction", "result" => has_it}, + state + ) do + caller = Map.get(state, :caller) + send(caller, {:ok, has_it}) + {:ok, state} + end end end end diff --git a/lib/xogmios/mempool/messages.ex b/lib/xogmios/mempool/messages.ex index 56bfad9..2a06514 100644 --- a/lib/xogmios/mempool/messages.ex +++ b/lib/xogmios/mempool/messages.ex @@ -17,13 +17,51 @@ defmodule Xogmios.Mempool.Messages do json end - def next_transaction() do + def next_transaction(include_details \\ false) do + json = + if include_details do + ~S""" + { + "jsonrpc": "2.0", + "method": "nextTransaction", + "params": { + "fields": "all" + } + } + """ + else + ~S""" + { + "jsonrpc": "2.0", + "method": "nextTransaction", + "params": {} + } + """ + end + + validate_json!(json) + json + end + + def size_of_mempool() do json = ~S""" { "jsonrpc": "2.0", - "method": "nextTransaction", + "method": "sizeOfMempool" + } + """ + + validate_json!(json) + json + end + + def has_transaction(tx_id) do + json = ~s""" + { + "jsonrpc": "2.0", + "method": "hasTransaction", "params": { - "fields": "all" + "id": "#{tx_id}" } } """ From 115331a4debdf4c2d747280d97050555e1235243 Mon Sep 17 00:00:00 2001 From: Carlos Souza Date: Thu, 13 Jun 2024 18:28:10 -0400 Subject: [PATCH 03/16] Update CHANGELOG --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9768a78..bfed6a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +- Support for Mempool monitoring mini-protocol. + ## [v0.4.1](https://github.com/wowica/xogmios/releases/tag/v0.4.1) (2024-06-05) ### Fixed From 264bd027439bea895df0e50532cad3985c099e5d Mon Sep 17 00:00:00 2001 From: Carlos Souza Date: Thu, 13 Jun 2024 18:34:17 -0400 Subject: [PATCH 04/16] Fix dialyzer --- lib/xogmios/mempool.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/xogmios/mempool.ex b/lib/xogmios/mempool.ex index 37e106b..df57dfd 100644 --- a/lib/xogmios/mempool.ex +++ b/lib/xogmios/mempool.ex @@ -128,7 +128,7 @@ defmodule Xogmios.Mempool do end end - @spec has_transaction(pid(), tx_id :: binary()) :: boolean() + @spec has_transaction(pid(), tx_id :: binary()) :: {:ok, boolean()} | :error def has_transaction(pid, tx_id) do # hacky af but it does the job for now ws_pid = update_ws_with_caller(pid) From 6226252e70ad11504ae04fdb85cba9ac742264f2 Mon Sep 17 00:00:00 2001 From: Carlos Souza Date: Thu, 13 Jun 2024 20:49:19 -0400 Subject: [PATCH 05/16] Add mempool tests --- lib/xogmios/mempool.ex | 22 ++++++++++-- test/mempool_test.exs | 47 ++++++++++++++++++++++++ test/support/mempool/test_handler.ex | 54 ++++++++++++++++++++++++++++ 3 files changed, 121 insertions(+), 2 deletions(-) create mode 100644 test/mempool_test.exs create mode 100644 test/support/mempool/test_handler.ex diff --git a/lib/xogmios/mempool.ex b/lib/xogmios/mempool.ex index df57dfd..6dd7763 100644 --- a/lib/xogmios/mempool.ex +++ b/lib/xogmios/mempool.ex @@ -7,6 +7,24 @@ defmodule Xogmios.Mempool do require Logger + @doc """ + Invoked when a new snapshot is acquired. + + Receives snapshot information as argument. + + Returning `{:ok, :next_transaction, new_state}` will request the next transaction + once it's made available. + + Returning `{:ok, new_state}` wil not request anymore transactions. + + Returning `{:close, new_state}` will close the connection to the server. + """ + @callback handle_acquired(snapshop :: map(), state) :: + {:ok, :next_transaction, new_state} + | {:ok, new_state} + | {:close, new_state} + when state: term(), new_state: term() + @doc """ Invoked when a new transaction is made available in the mempool. @@ -17,7 +35,7 @@ defmodule Xogmios.Mempool do Returning `{:ok, new_state}` wil not request anymore transactions. - Returning `{:close, new_state}` will close the connection to the server + Returning `{:close, new_state}` will close the connection to the server. """ @callback handle_transaction(transaction :: map(), state) :: {:ok, :next_transaction, new_state} @@ -175,7 +193,7 @@ defmodule Xogmios.Mempool do def handle_connect(state), do: {:ok, state} def handle_disconnect(_reason, state), do: {:ok, state} - def handle_acquired(_slot, state), do: {:ok, :next_transaction, state} + def handle_acquired(_snapshot, state), do: {:ok, :next_transaction, state} defoverridable handle_connect: 1, handle_disconnect: 2, handle_acquired: 2 def handle_message( diff --git a/test/mempool_test.exs b/test/mempool_test.exs new file mode 100644 index 0000000..d7171a1 --- /dev/null +++ b/test/mempool_test.exs @@ -0,0 +1,47 @@ +defmodule Xogmios.MempoolTest do + use ExUnit.Case + + @ws_url TestServer.get_url() + + setup_all do + {:ok, _server} = TestServer.start(handler: Mempool.TestHandler) + + on_exit(fn -> + TestServer.shutdown() + end) + + :ok + end + + defmodule DummyClient do + use Xogmios, :mempool + + def start_link(opts) do + Xogmios.start_mempool_link(__MODULE__, opts) + end + + @impl true + def handle_acquired(_snapshot, state) do + send(state.test_handler, :handle_acquired) + {:ok, :next_transaction, state} + end + + @impl true + def handle_transaction(_block, state) do + send(state.test_handler, :handle_transaction) + {:close, state} + end + end + + test "receives callbacks and closes connection" do + pid = start_supervised!({DummyClient, url: @ws_url, test_handler: self()}) + assert is_pid(pid) + + assert_receive :handle_acquired + assert_receive :handle_transaction + + Process.sleep(500) + refute Process.alive?(pid) + assert GenServer.whereis(DummyClient) == nil + end +end diff --git a/test/support/mempool/test_handler.ex b/test/support/mempool/test_handler.ex new file mode 100644 index 0000000..7a2bdeb --- /dev/null +++ b/test/support/mempool/test_handler.ex @@ -0,0 +1,54 @@ +defmodule Mempool.TestHandler do + @moduledoc false + + @behaviour :cowboy_websocket + + @impl true + def init(request, state) do + {:cowboy_websocket, request, state} + end + + @impl true + def websocket_init(state) do + {:ok, state} + end + + @impl true + def websocket_handle({:text, payload}, state) do + case Jason.decode(payload) do + {:ok, %{"method" => "acquireMempool"}} -> + payload = + Jason.encode!(%{ + "method" => "acquireMempool", + "result" => %{"acquired" => "mempool", "slot" => 123} + }) + + {:reply, {:text, payload}, state} + + {:ok, %{"method" => "nextTransaction"}} -> + payload = + Jason.encode!(%{ + "method" => "nextTransaction", + "result" => %{ + "transaction" => %{"id" => 456} + } + }) + + {:reply, {:text, payload}, state} + end + end + + @impl true + def terminate(_arg1, _arg2, _arg3) do + :ok + end + + def websocket_info(:stop, state) do + {:stop, state} + end + + @impl true + def websocket_info(info, state) do + {:reply, {:text, info}, state} + end +end From e53219a685a2a06fdc29c8f0831f37d7f165d8b4 Mon Sep 17 00:00:00 2001 From: Carlos Souza Date: Thu, 13 Jun 2024 20:49:57 -0400 Subject: [PATCH 06/16] Update docs and README --- README.md | 42 ++++++++++++++++++++++++++++++++++++++ examples/mempool_client.ex | 18 +++++++++++++++- 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 925c0cf..5a92a28 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,48 @@ defmodule TxSubmissionClient do end ``` +### Mempool Monitoring + +The following illustrates working with the **Mempool Monitoring** protocol. It provides a way to list and query transactions in the mempool, and to query the size of the mempool. + +```elixir +defmodule MempoolClient do + @moduledoc """ + This module prints transactions as they become available + in the mempool + """ + use Xogmios, :mempool + + def start_link(opts) do + Xogmios.start_mempool_link(__MODULE__, opts) + end + + # Callbacks + @impl true + def handle_acquired(%{"slot" => slot} = _snapshot, state) do + IO.puts("Snapshot acquired at slot #{slot}") + + {:ok, :next_transaction, state} + end + + @impl true + def handle_transaction(transaction, state) do + IO.puts("Transaction: #{transaction["id"]}") + + {:ok, :next_transaction, state} + end + + # Synchronous calls + def size(pid \\ __MODULE__) do + Xogmios.Mempool.size_of_mempool(pid) + end + + def has_tx(pid \\ __MODULE__, tx_id) do + Xogmios.Mempool.has_transaction(pid, tx_id) + end +end +``` + For examples of applications using this library, see [Blocks](https://github.com/wowica/blocks) and [xogmios_watcher](https://github.com/wowica/xogmios_watcher). ## Test diff --git a/examples/mempool_client.ex b/examples/mempool_client.ex index 5fb0b78..5749920 100644 --- a/examples/mempool_client.ex +++ b/examples/mempool_client.ex @@ -9,10 +9,26 @@ defmodule XogmiosWatcher.MempoolClient do Xogmios.start_mempool_link(__MODULE__, opts) end + @impl true + def handle_acquired(%{"slot" => slot} = _snapshot, state) do + IO.puts("Snapshot acquired at slot #{slot}") + + {:ok, :next_transaction, state} + end + @impl true def handle_transaction(transaction, state) do - IO.puts("transaction #{transaction["id"]}") + IO.puts("Transaction: #{transaction["id"]}") {:ok, :next_transaction, state} end + + # Synchronous calls + def size(pid \\ __MODULE__) do + Xogmios.Mempool.size_of_mempool(pid) + end + + def has_tx(pid \\ __MODULE__, tx_id) do + Xogmios.Mempool.has_transaction(pid, tx_id) + end end From 32cae9db1ee09ad370a57fee9e01915c8e7c3727 Mon Sep 17 00:00:00 2001 From: Carlos Souza Date: Thu, 13 Jun 2024 21:10:06 -0400 Subject: [PATCH 07/16] Rename functions --- README.md | 8 ++++---- examples/mempool_client.ex | 8 ++++---- lib/xogmios/mempool.ex | 8 ++++---- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 5a92a28..6dfa983 100644 --- a/README.md +++ b/README.md @@ -188,12 +188,12 @@ defmodule MempoolClient do end # Synchronous calls - def size(pid \\ __MODULE__) do - Xogmios.Mempool.size_of_mempool(pid) + def get_size(pid \\ __MODULE__) do + Xogmios.Mempool.get_size(pid) end - def has_tx(pid \\ __MODULE__, tx_id) do - Xogmios.Mempool.has_transaction(pid, tx_id) + def has_tx?(pid \\ __MODULE__, tx_id) do + Xogmios.Mempool.has_transaction?(pid, tx_id) end end ``` diff --git a/examples/mempool_client.ex b/examples/mempool_client.ex index 5749920..8d8e8c1 100644 --- a/examples/mempool_client.ex +++ b/examples/mempool_client.ex @@ -24,11 +24,11 @@ defmodule XogmiosWatcher.MempoolClient do end # Synchronous calls - def size(pid \\ __MODULE__) do - Xogmios.Mempool.size_of_mempool(pid) + def get_size(pid \\ __MODULE__) do + Xogmios.Mempool.get_size(pid) end - def has_tx(pid \\ __MODULE__, tx_id) do - Xogmios.Mempool.has_transaction(pid, tx_id) + def has_tx?(pid \\ __MODULE__, tx_id) do + Xogmios.Mempool.has_transaction?(pid, tx_id) end end diff --git a/lib/xogmios/mempool.ex b/lib/xogmios/mempool.ex index 6dd7763..de7f5d2 100644 --- a/lib/xogmios/mempool.ex +++ b/lib/xogmios/mempool.ex @@ -131,8 +131,8 @@ defmodule Xogmios.Mempool do @doc """ Issues a synchronous message for getting the size of the mempool. """ - @spec size_of_mempool(pid()) :: {:ok, response :: map()} | :error - def size_of_mempool(pid) do + @spec get_size(pid()) :: {:ok, response :: map()} | :error + def get_size(pid) do # hacky af but it does the job for now ws_pid = update_ws_with_caller(pid) @@ -146,8 +146,8 @@ defmodule Xogmios.Mempool do end end - @spec has_transaction(pid(), tx_id :: binary()) :: {:ok, boolean()} | :error - def has_transaction(pid, tx_id) do + @spec has_transaction?(pid(), tx_id :: binary()) :: {:ok, boolean()} | :error + def has_transaction?(pid, tx_id) do # hacky af but it does the job for now ws_pid = update_ws_with_caller(pid) From 33d3fc5bc1d214692381dbbc9c5deea7bd35c8b3 Mon Sep 17 00:00:00 2001 From: Carlos Souza Date: Thu, 13 Jun 2024 21:10:16 -0400 Subject: [PATCH 08/16] Update erlang and elixir --- .tool-versions | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.tool-versions b/.tool-versions index acdeda4..9d6088f 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,3 +1,3 @@ -erlang 26.2.1 -elixir 1.16.0 +erlang 27.0 +elixir 1.17.0-rc.1 From 00b3af98b8e4dcbb864a6b81e3af0622bdacad9d Mon Sep 17 00:00:00 2001 From: Carlos Souza Date: Thu, 13 Jun 2024 21:13:17 -0400 Subject: [PATCH 09/16] Rollback due to credo errors --- .tool-versions | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.tool-versions b/.tool-versions index 9d6088f..acdeda4 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,3 +1,3 @@ -erlang 27.0 -elixir 1.17.0-rc.1 +erlang 26.2.1 +elixir 1.16.0 From e994653f22c1147f69cd7cc6c5b4fd8c9a96f842 Mon Sep 17 00:00:00 2001 From: Carlos Souza Date: Fri, 14 Jun 2024 16:20:29 -0400 Subject: [PATCH 10/16] Update to Elixir 1.17.0-rc1 --- .tool-versions | 5 ++--- mix.lock | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/.tool-versions b/.tool-versions index acdeda4..9a41ca1 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,3 +1,2 @@ -erlang 26.2.1 -elixir 1.16.0 - +erlang 27.0 +elixir 1.17.0-rc.1 diff --git a/mix.lock b/mix.lock index 0e75739..f728a6a 100644 --- a/mix.lock +++ b/mix.lock @@ -3,7 +3,7 @@ "cowboy": {:hex, :cowboy, "2.10.0", "ff9ffeff91dae4ae270dd975642997afe2a1179d94b1887863e43f681a203e26", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "3afdccb7183cc6f143cb14d3cf51fa00e53db9ec80cdcd525482f5e99bc41d6b"}, "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, "cowlib": {:hex, :cowlib, "2.12.1", "a9fa9a625f1d2025fe6b462cb865881329b5caff8f1854d1cbc9f9533f00e1e1", [:make, :rebar3], [], "hexpm", "163b73f6367a7341b33c794c4e88e7dbfe6498ac42dcd69ef44c5bc5507c8db0"}, - "credo": {:hex, :credo, "1.7.2", "fdee3a7cb553d8f2e773569181f0a4a2bb7d192e27e325404cc31b354f59d68c", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "dd15d6fbc280f6cf9b269f41df4e4992dee6615939653b164ef951f60afcb68e"}, + "credo": {:hex, :credo, "1.7.7", "771445037228f763f9b2afd612b6aa2fd8e28432a95dbbc60d8e03ce71ba4446", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8bc87496c9aaacdc3f90f01b7b0582467b69b4bd2441fe8aae3109d843cc2f2e"}, "dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"}, "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, From 966c2bc343ced6429b812a169db1b81483d9875f Mon Sep 17 00:00:00 2001 From: Carlos Souza Date: Mon, 29 Jul 2024 14:16:50 -0400 Subject: [PATCH 11/16] Split mempool txs into its own module. --- .formatter.exs | 2 +- .tool-versions | 2 +- examples/mempool_client.ex | 34 --- examples/mempool_txs_client.ex | 23 ++ lib/xogmios.ex | 10 +- lib/xogmios/mempool_txs.ex | 203 ++++++++++++++++++ ...{mempool_test.exs => mempool_txs_test.exs} | 4 +- .../{mempool => mempool_txs}/test_handler.ex | 2 +- 8 files changed, 239 insertions(+), 41 deletions(-) delete mode 100644 examples/mempool_client.ex create mode 100644 examples/mempool_txs_client.ex create mode 100644 lib/xogmios/mempool_txs.ex rename test/{mempool_test.exs => mempool_txs_test.exs} (90%) rename test/support/{mempool => mempool_txs}/test_handler.ex (96%) diff --git a/.formatter.exs b/.formatter.exs index d2cda26..c185fe1 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -1,4 +1,4 @@ # Used by "mix format" [ - inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] + inputs: ["{mix,.formatter}.exs", "{config,lib,test,examples}/**/*.{ex,exs}"] ] diff --git a/.tool-versions b/.tool-versions index 9a41ca1..a3b0b43 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ erlang 27.0 -elixir 1.17.0-rc.1 +elixir 1.17.2 diff --git a/examples/mempool_client.ex b/examples/mempool_client.ex deleted file mode 100644 index 8d8e8c1..0000000 --- a/examples/mempool_client.ex +++ /dev/null @@ -1,34 +0,0 @@ -defmodule XogmiosWatcher.MempoolClient do - @moduledoc """ - This module prints transactions as they become available - in the mempool - """ - use Xogmios, :mempool - - def start_link(opts) do - Xogmios.start_mempool_link(__MODULE__, opts) - end - - @impl true - def handle_acquired(%{"slot" => slot} = _snapshot, state) do - IO.puts("Snapshot acquired at slot #{slot}") - - {:ok, :next_transaction, state} - end - - @impl true - def handle_transaction(transaction, state) do - IO.puts("Transaction: #{transaction["id"]}") - - {:ok, :next_transaction, state} - end - - # Synchronous calls - def get_size(pid \\ __MODULE__) do - Xogmios.Mempool.get_size(pid) - end - - def has_tx?(pid \\ __MODULE__, tx_id) do - Xogmios.Mempool.has_transaction?(pid, tx_id) - end -end diff --git a/examples/mempool_txs_client.ex b/examples/mempool_txs_client.ex new file mode 100644 index 0000000..8f1b3f3 --- /dev/null +++ b/examples/mempool_txs_client.ex @@ -0,0 +1,23 @@ +defmodule Xogmios.MempoolTxsClient do + @moduledoc """ + This module prints transactions as they become available + in the mempool. + """ + use Xogmios, :mempool_txs + + def start_link(opts) do + # set include_details: true to retrieve + # complete information about the transaction. + # set include_details: false (default) to retrieve + # only transaction id. + opts = Keyword.merge(opts, include_details: true) + Xogmios.start_mempool_link(__MODULE__, opts) + end + + @impl true + def handle_transaction(transaction, state) do + IO.puts("transaction #{inspect(transaction)}") + + {:ok, :next_transaction, state} + end +end diff --git a/lib/xogmios.ex b/lib/xogmios.ex index e588d38..68c1358 100644 --- a/lib/xogmios.ex +++ b/lib/xogmios.ex @@ -140,9 +140,15 @@ defmodule Xogmios do end end - defmacro __using__(:mempool) do + defmacro __using__(:mempool_txs) do quote do - use Xogmios.Mempool + use Xogmios.MempoolTxs + end + end + + defmacro __using__(_opts) do + quote do + raise "Unsupported method" end end end diff --git a/lib/xogmios/mempool_txs.ex b/lib/xogmios/mempool_txs.ex new file mode 100644 index 0000000..7aa4ab9 --- /dev/null +++ b/lib/xogmios/mempool_txs.ex @@ -0,0 +1,203 @@ +defmodule Xogmios.MempoolTxs do + @moduledoc """ + This module interfaces with the Mempool protocol and specializes + strictly in reading transactions as they come in (`nextTransaction`). + """ + + alias Xogmios.Mempool.Messages + + require Logger + + @doc """ + Invoked when a new snapshot is acquired. + + Receives snapshot information as argument. + + Returning `{:ok, :next_transaction, new_state}` will request the next transaction + once it's made available. + + Returning `{:ok, new_state}` wil not request anymore transactions. + + Returning `{:close, new_state}` will close the connection to the server. + """ + @callback handle_acquired(snapshop :: map(), state) :: + {:ok, :next_transaction, new_state} + | {:ok, new_state} + | {:close, new_state} + when state: term(), new_state: term() + + @doc """ + Invoked when a new transaction is made available in the mempool. + + Receives transaction information as argument and current state of the handler. + + Returning `{:ok, :next_transaction, new_state}` will request the next transaction + once it's made available. + + Returning `{:ok, new_state}` wil not request anymore transactions. + + Returning `{:close, new_state}` will close the connection to the server. + """ + @callback handle_transaction(transaction :: map(), state) :: + {:ok, :next_transaction, new_state} + | {:ok, new_state} + | {:close, new_state} + when state: term(), new_state: term() + + @doc """ + Invoked upon connecting to the server. This callback is optional. + """ + @callback handle_connect(state) :: {:ok, new_state} + when state: term(), new_state: term() + + @doc """ + Invoked upon disconnecting from the server. This callback is optional. + + Returning `{:ok, new_state}` will allow the connection to close. + + Returning `{:reconnect, interval_in_ms}` will attempt a reconnection after `interval_in_ms` + """ + @callback handle_disconnect(reason :: String.t(), state) :: + {:ok, new_state} + | {:reconnect, interval_in_ms :: non_neg_integer(), new_state} + when state: term(), new_state: term() + + # The keepalive option is used to maintain the connection active. + # This is important because proxies might close idle connections after a few seconds. + @keepalive_in_ms 5_000 + + # The websocket client library + @client :websocket_client + + @doc """ + Starts a new Mempool process linked to the current process. + + This function should not be called directly, but rather via `Xogmios.start_mempool_link/2` + """ + @spec start_link(module(), start_options :: Keyword.t()) :: {:ok, pid()} | {:error, term()} + def start_link(client, opts) do + {url, opts} = Keyword.pop(opts, :url) + {name, opts} = Keyword.pop(opts, :name, client) + opts = Keyword.put_new(opts, :include_details, false) + initial_state = Keyword.merge(opts, handler: client, notify_on_connect: self()) + + with {:ok, process_name} <- build_process_name(name), + {:ok, ws_pid} <- start_link(process_name, url, client, initial_state) do + # Blocks until the connection with the Ogmios server + # is established or until timeout is reached. + + receive do + {:connected, _connection} -> {:ok, ws_pid} + after + _timeout = 5_000 -> + Logger.warning("Timeout connecting to Ogmios server") + send(ws_pid, :close) + {:error, :connection_timeout} + end + else + {:error, :invalid_process_name} = error -> + error + + {:error, _} = error -> + Logger.warning("Error connecting with Ogmios server #{inspect(error)}") + error + end + end + + defp start_link(name, url, client, state) do + @client.start_link(name, url, client, state, keepalive: @keepalive_in_ms) + end + + # Builds process name from valid argument or returns error + @spec build_process_name(term() | {:global, term()} | {:via, term(), term()}) :: + {:ok, any()} | {:error, term()} + defp build_process_name(name) do + case name do + name when is_atom(name) -> + {:ok, {:local, name}} + + {:global, term} = tuple when is_atom(term) -> + {:ok, tuple} + + {:via, registry, _term} = tuple when is_atom(registry) -> + {:ok, tuple} + + _ -> + # Returns error if name does not comply with + # values accepted by the websocket client library + {:error, :invalid_process_name} + end + end + + defmacro __using__(_opts) do + quote do + @behaviour Xogmios.MempoolTxs + + use Xogmios.Mempool.Connection + + require Logger + + def handle_connect(state), do: {:ok, state} + def handle_disconnect(_reason, state), do: {:ok, state} + def handle_acquired(_snapshot, state), do: {:ok, :next_transaction, state} + defoverridable handle_connect: 1, handle_disconnect: 2, handle_acquired: 2 + + def handle_message( + %{ + "method" => "acquireMempool", + "result" => %{"acquired" => "mempool", "slot" => slot} = _result + }, + state + ) do + case state.handler.handle_acquired(%{"slot" => slot}, state) do + {:ok, :next_transaction, new_state} -> + message = Messages.next_transaction(state.include_details) + {:reply, {:text, message}, new_state} + + {:ok, new_state} -> + {:ok, new_state} + + {:close, new_state} -> + {:close, "finished", new_state} + + response -> + Logger.warning("Invalid response #{inspect(response)}") + end + end + + def handle_message( + %{ + "method" => "nextTransaction", + "result" => %{"transaction" => nil} = _result + }, + state + ) do + message = Messages.acquire_mempool() + {:reply, {:text, message}, state} + end + + def handle_message( + %{ + "method" => "nextTransaction", + "result" => %{"transaction" => transaction} = _result + }, + state + ) do + case state.handler.handle_transaction(transaction, state) do + {:ok, :next_transaction, new_state} -> + message = Messages.next_transaction() + {:reply, {:text, message}, new_state} + + {:ok, new_state} -> + {:ok, new_state} + + {:close, new_state} -> + {:close, "finished", new_state} + + response -> + Logger.warning("Invalid response #{inspect(response)}") + end + end + end + end +end diff --git a/test/mempool_test.exs b/test/mempool_txs_test.exs similarity index 90% rename from test/mempool_test.exs rename to test/mempool_txs_test.exs index d7171a1..2c45048 100644 --- a/test/mempool_test.exs +++ b/test/mempool_txs_test.exs @@ -4,7 +4,7 @@ defmodule Xogmios.MempoolTest do @ws_url TestServer.get_url() setup_all do - {:ok, _server} = TestServer.start(handler: Mempool.TestHandler) + {:ok, _server} = TestServer.start(handler: MempoolTxs.TestHandler) on_exit(fn -> TestServer.shutdown() @@ -14,7 +14,7 @@ defmodule Xogmios.MempoolTest do end defmodule DummyClient do - use Xogmios, :mempool + use Xogmios, :mempool_txs def start_link(opts) do Xogmios.start_mempool_link(__MODULE__, opts) diff --git a/test/support/mempool/test_handler.ex b/test/support/mempool_txs/test_handler.ex similarity index 96% rename from test/support/mempool/test_handler.ex rename to test/support/mempool_txs/test_handler.ex index 7a2bdeb..1865451 100644 --- a/test/support/mempool/test_handler.ex +++ b/test/support/mempool_txs/test_handler.ex @@ -1,4 +1,4 @@ -defmodule Mempool.TestHandler do +defmodule MempoolTxs.TestHandler do @moduledoc false @behaviour :cowboy_websocket From 3fd009c3112fad6f59b1e7f2e55d01329767693f Mon Sep 17 00:00:00 2001 From: Carlos Souza Date: Mon, 29 Jul 2024 14:18:24 -0400 Subject: [PATCH 12/16] Remove single mempool module --- lib/xogmios/mempool.ex | 277 ----------------------------------------- 1 file changed, 277 deletions(-) delete mode 100644 lib/xogmios/mempool.ex diff --git a/lib/xogmios/mempool.ex b/lib/xogmios/mempool.ex deleted file mode 100644 index de7f5d2..0000000 --- a/lib/xogmios/mempool.ex +++ /dev/null @@ -1,277 +0,0 @@ -defmodule Xogmios.Mempool do - @moduledoc """ - This module interfaces with the Mempool protocol. - """ - - alias Xogmios.Mempool.Messages - - require Logger - - @doc """ - Invoked when a new snapshot is acquired. - - Receives snapshot information as argument. - - Returning `{:ok, :next_transaction, new_state}` will request the next transaction - once it's made available. - - Returning `{:ok, new_state}` wil not request anymore transactions. - - Returning `{:close, new_state}` will close the connection to the server. - """ - @callback handle_acquired(snapshop :: map(), state) :: - {:ok, :next_transaction, new_state} - | {:ok, new_state} - | {:close, new_state} - when state: term(), new_state: term() - - @doc """ - Invoked when a new transaction is made available in the mempool. - - Receives transaction information as argument and current state of the handler. - - Returning `{:ok, :next_transaction, new_state}` will request the next transaction - once it's made available. - - Returning `{:ok, new_state}` wil not request anymore transactions. - - Returning `{:close, new_state}` will close the connection to the server. - """ - @callback handle_transaction(transaction :: map(), state) :: - {:ok, :next_transaction, new_state} - | {:ok, new_state} - | {:close, new_state} - when state: term(), new_state: term() - - @doc """ - Invoked upon connecting to the server. This callback is optional. - """ - @callback handle_connect(state) :: {:ok, new_state} - when state: term(), new_state: term() - - @doc """ - Invoked upon disconnecting from the server. This callback is optional. - - Returning `{:ok, new_state}` will allow the connection to close. - - Returning `{:reconnect, interval_in_ms}` will attempt a reconnection after `interval_in_ms` - """ - @callback handle_disconnect(reason :: String.t(), state) :: - {:ok, new_state} - | {:reconnect, interval_in_ms :: non_neg_integer(), new_state} - when state: term(), new_state: term() - - # The keepalive option is used to maintain the connection active. - # This is important because proxies might close idle connections after a few seconds. - @keepalive_in_ms 5_000 - - # The websocket client library - @client :websocket_client - - @doc """ - Starts a new Mempool process linked to the current process. - - This function should not be called directly, but rather via `Xogmios.start_mempool_link/2` - """ - @spec start_link(module(), start_options :: Keyword.t()) :: {:ok, pid()} | {:error, term()} - def start_link(client, opts) do - {url, opts} = Keyword.pop(opts, :url) - {name, opts} = Keyword.pop(opts, :name, client) - opts = Keyword.put_new(opts, :include_details, false) - initial_state = Keyword.merge(opts, handler: client, notify_on_connect: self()) - - with {:ok, process_name} <- build_process_name(name), - {:ok, ws_pid} <- start_link(process_name, url, client, initial_state) do - # Blocks until the connection with the Ogmios server - # is established or until timeout is reached. - - receive do - {:connected, _connection} -> {:ok, ws_pid} - after - _timeout = 5_000 -> - Logger.warning("Timeout connecting to Ogmios server") - send(ws_pid, :close) - {:error, :connection_timeout} - end - else - {:error, :invalid_process_name} = error -> - error - - {:error, _} = error -> - Logger.warning("Error connecting with Ogmios server #{inspect(error)}") - error - end - end - - defp start_link(name, url, client, state) do - @client.start_link(name, url, client, state, keepalive: @keepalive_in_ms) - end - - # Builds process name from valid argument or returns error - @spec build_process_name(term() | {:global, term()} | {:via, term(), term()}) :: - {:ok, any()} | {:error, term()} - defp build_process_name(name) do - case name do - name when is_atom(name) -> - {:ok, {:local, name}} - - {:global, term} = tuple when is_atom(term) -> - {:ok, tuple} - - {:via, registry, _term} = tuple when is_atom(registry) -> - {:ok, tuple} - - _ -> - # Returns error if name does not comply with - # values accepted by the websocket client library - {:error, :invalid_process_name} - end - end - - @doc """ - Issues a synchronous message for getting the size of the mempool. - """ - @spec get_size(pid()) :: {:ok, response :: map()} | :error - def get_size(pid) do - # hacky af but it does the job for now - ws_pid = update_ws_with_caller(pid) - - message = Xogmios.Mempool.Messages.size_of_mempool() - :websocket_client.cast(ws_pid, {:text, message}) - - receive do - {:ok, response} -> {:ok, response} - after - 5_000 -> :error - end - end - - @spec has_transaction?(pid(), tx_id :: binary()) :: {:ok, boolean()} | :error - def has_transaction?(pid, tx_id) do - # hacky af but it does the job for now - ws_pid = update_ws_with_caller(pid) - - message = Xogmios.Mempool.Messages.has_transaction(tx_id) - :websocket_client.cast(ws_pid, {:text, message}) - - receive do - {:ok, response} -> {:ok, response} - after - 5_000 -> :error - end - end - - # Updates Websocket process with self() as - # caller and returns the Websocket process id - defp update_ws_with_caller(pid) do - state = :sys.get_state(pid) - - {_c, %{ws_pid: ws_pid}} = state |> elem(1) |> elem(5) - - caller = self() - - :sys.replace_state(pid, fn current_state -> - {:connected, {:context, req, transport, empty_list, ws, {module, client_info}, _, _, _}} = - current_state - - updated_client_info = Map.put(client_info, :caller, caller) - - {:connected, - {:context, req, transport, empty_list, ws, {module, updated_client_info}, "", true, 0}} - end) - - ws_pid - end - - defmacro __using__(_opts) do - quote do - @behaviour Xogmios.Mempool - - use Xogmios.Mempool.Connection - - require Logger - - def handle_connect(state), do: {:ok, state} - def handle_disconnect(_reason, state), do: {:ok, state} - def handle_acquired(_snapshot, state), do: {:ok, :next_transaction, state} - defoverridable handle_connect: 1, handle_disconnect: 2, handle_acquired: 2 - - def handle_message( - %{ - "method" => "acquireMempool", - "result" => %{"acquired" => "mempool", "slot" => slot} = _result - }, - state - ) do - case state.handler.handle_acquired(%{"slot" => slot}, state) do - {:ok, :next_transaction, new_state} -> - message = Messages.next_transaction(state.include_details) - {:reply, {:text, message}, new_state} - - {:ok, new_state} -> - {:ok, new_state} - - {:close, new_state} -> - {:close, "finished", new_state} - - response -> - Logger.warning("Invalid response #{inspect(response)}") - end - end - - def handle_message( - %{ - "method" => "nextTransaction", - "result" => %{"transaction" => nil} = _result - }, - state - ) do - message = Messages.acquire_mempool() - {:reply, {:text, message}, state} - end - - def handle_message( - %{ - "method" => "nextTransaction", - "result" => %{"transaction" => transaction} = _result - }, - state - ) do - case state.handler.handle_transaction(transaction, state) do - {:ok, :next_transaction, new_state} -> - message = Messages.next_transaction() - {:reply, {:text, message}, new_state} - - {:ok, new_state} -> - {:ok, new_state} - - {:close, new_state} -> - {:close, "finished", new_state} - - response -> - Logger.warning("Invalid response #{inspect(response)}") - end - end - - # Responds to synchronous call - def handle_message( - %{"method" => "sizeOfMempool", "result" => result}, - state - ) do - caller = Map.get(state, :caller) - send(caller, {:ok, result}) - {:ok, state} - end - - # Responds to synchronous call - def handle_message( - %{"method" => "hasTransaction", "result" => has_it}, - state - ) do - caller = Map.get(state, :caller) - send(caller, {:ok, has_it}) - {:ok, state} - end - end - end -end From fd4f68eac1efec4d01a22dacc980d94395577d4c Mon Sep 17 00:00:00 2001 From: Carlos Souza Date: Mon, 29 Jul 2024 14:24:19 -0400 Subject: [PATCH 13/16] Update mempool_txs reference --- README.md | 11 +---------- lib/xogmios.ex | 8 ++++---- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 6dfa983..625c486 100644 --- a/README.md +++ b/README.md @@ -166,7 +166,7 @@ defmodule MempoolClient do This module prints transactions as they become available in the mempool """ - use Xogmios, :mempool + use Xogmios, :mempool_txs def start_link(opts) do Xogmios.start_mempool_link(__MODULE__, opts) @@ -186,15 +186,6 @@ defmodule MempoolClient do {:ok, :next_transaction, state} end - - # Synchronous calls - def get_size(pid \\ __MODULE__) do - Xogmios.Mempool.get_size(pid) - end - - def has_tx?(pid \\ __MODULE__, tx_id) do - Xogmios.Mempool.has_transaction?(pid, tx_id) - end end ``` diff --git a/lib/xogmios.ex b/lib/xogmios.ex index 68c1358..8f4a710 100644 --- a/lib/xogmios.ex +++ b/lib/xogmios.ex @@ -20,8 +20,8 @@ defmodule Xogmios do # ... end - defmodule MempoolClient do - use Xogmios, :mempool + defmodule MempoolTxsClient do + use Xogmios, :mempool_txs # ... end """ @@ -29,7 +29,7 @@ defmodule Xogmios do alias Xogmios.ChainSync alias Xogmios.StateQuery alias Xogmios.TxSubmission - alias Xogmios.Mempool + alias Xogmios.MempoolTxs @doc """ Starts a new State Query process linked to the current process. @@ -119,7 +119,7 @@ defmodule Xogmios do ``` """ def start_mempool_link(client, opts) do - Mempool.start_link(client, opts) + MempoolTxs.start_link(client, opts) end defmacro __using__(:state_query) do From e3a303433f17fec39e005d28839ac221191f4ef3 Mon Sep 17 00:00:00 2001 From: Carlos Souza Date: Sat, 3 Aug 2024 17:17:02 -0400 Subject: [PATCH 14/16] Remove unused messages --- lib/xogmios/mempool/messages.ex | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/lib/xogmios/mempool/messages.ex b/lib/xogmios/mempool/messages.ex index 2a06514..c43b382 100644 --- a/lib/xogmios/mempool/messages.ex +++ b/lib/xogmios/mempool/messages.ex @@ -43,33 +43,6 @@ defmodule Xogmios.Mempool.Messages do json end - def size_of_mempool() do - json = ~S""" - { - "jsonrpc": "2.0", - "method": "sizeOfMempool" - } - """ - - validate_json!(json) - json - end - - def has_transaction(tx_id) do - json = ~s""" - { - "jsonrpc": "2.0", - "method": "hasTransaction", - "params": { - "id": "#{tx_id}" - } - } - """ - - validate_json!(json) - json - end - defp validate_json!(json) do case Jason.decode(json) do {:ok, _decoded} -> :ok From 615614354e7567b5cd724b22b1dfe613b14b551b Mon Sep 17 00:00:00 2001 From: Carlos Souza Date: Sat, 3 Aug 2024 17:17:21 -0400 Subject: [PATCH 15/16] Update docs --- CHANGELOG.md | 2 +- README.md | 8 ++++---- lib/xogmios.ex | 12 ++++++------ lib/xogmios/mempool_txs.ex | 4 ++-- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bfed6a2..7307595 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -- Support for Mempool monitoring mini-protocol. +- Added partial support for Mempool monitoring mini-protocol. Allows reading transactions in the mempool. ## [v0.4.1](https://github.com/wowica/xogmios/releases/tag/v0.4.1) (2024-06-05) diff --git a/README.md b/README.md index 625c486..a255dd0 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ Mini-Protocols supported by this library: - [x] Chain Synchronization - [x] State Query - [x] Tx Submission -- [x] Mempool Monitoring +- [x] Mempool Monitoring (reads transactions only) See [Examples](#examples) section below for information on how to use this library. @@ -158,10 +158,10 @@ end ### Mempool Monitoring -The following illustrates working with the **Mempool Monitoring** protocol. It provides a way to list and query transactions in the mempool, and to query the size of the mempool. +The following illustrates working with the **Mempool Monitoring** protocol. It provides a way to list transactions in the mempool. ```elixir -defmodule MempoolClient do +defmodule MempoolTxsClient do @moduledoc """ This module prints transactions as they become available in the mempool @@ -169,7 +169,7 @@ defmodule MempoolClient do use Xogmios, :mempool_txs def start_link(opts) do - Xogmios.start_mempool_link(__MODULE__, opts) + Xogmios.start_mempool_txs_link(__MODULE__, opts) end # Callbacks diff --git a/lib/xogmios.ex b/lib/xogmios.ex index 8f4a710..ce7ad18 100644 --- a/lib/xogmios.ex +++ b/lib/xogmios.ex @@ -99,26 +99,26 @@ defmodule Xogmios do end @doc """ - Starts a new Mempool process linked to the current process. + Starts a new MempoolTxs (Transactions) process linked to the current process. `opts` as keyword lists are passed to the underlying :websocket_client. - The `:include_details` option can be used to determine which values - should be returned with each transaction as part of `c:Xogmios.Mempool.handle_transaction/2`. + The `:include_details` flag can be used to determine the level of details + to be returned with each transaction as part of `c:Xogmios.Mempool.handle_transaction/2`. Setting this option to `false` (default) means only transaction id is returned: ``` - Xogmios.start_mempool_link(__MODULE__, url: ogmios_url, include_details: false) + Xogmios.start_mempool_txs_link(__MODULE__, url: ogmios_url, include_details: false) ``` Setting it to `true` means all transaction fields are returned: ``` - Xogmios.start_mempool_link(__MODULE__, url: ogmios_url, include_details: true) + Xogmios.start_mempool_txs_link(__MODULE__, url: ogmios_url, include_details: true) ``` """ - def start_mempool_link(client, opts) do + def start_mempool_txs_link(client, opts) do MempoolTxs.start_link(client, opts) end diff --git a/lib/xogmios/mempool_txs.ex b/lib/xogmios/mempool_txs.ex index 7aa4ab9..23def24 100644 --- a/lib/xogmios/mempool_txs.ex +++ b/lib/xogmios/mempool_txs.ex @@ -1,7 +1,7 @@ defmodule Xogmios.MempoolTxs do @moduledoc """ - This module interfaces with the Mempool protocol and specializes - strictly in reading transactions as they come in (`nextTransaction`). + This module interfaces with the Mempool protocol for reading + transactions as they become available (`nextTransaction`). """ alias Xogmios.Mempool.Messages From 6a6dfd7cb501ea69ae5fb68c4adb18a0c7e9db17 Mon Sep 17 00:00:00 2001 From: Carlos Souza Date: Sat, 3 Aug 2024 17:19:50 -0400 Subject: [PATCH 16/16] Update test --- test/mempool_txs_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/mempool_txs_test.exs b/test/mempool_txs_test.exs index 2c45048..c263eae 100644 --- a/test/mempool_txs_test.exs +++ b/test/mempool_txs_test.exs @@ -17,7 +17,7 @@ defmodule Xogmios.MempoolTest do use Xogmios, :mempool_txs def start_link(opts) do - Xogmios.start_mempool_link(__MODULE__, opts) + Xogmios.start_mempool_txs_link(__MODULE__, opts) end @impl true