diff --git a/.formatter.exs b/.formatter.exs index d2cda26..c185fe1 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -1,4 +1,4 @@ # Used by "mix format" [ - inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] + inputs: ["{mix,.formatter}.exs", "{config,lib,test,examples}/**/*.{ex,exs}"] ] diff --git a/.tool-versions b/.tool-versions index acdeda4..a3b0b43 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,3 +1,2 @@ -erlang 26.2.1 -elixir 1.16.0 - +erlang 27.0 +elixir 1.17.2 diff --git a/CHANGELOG.md b/CHANGELOG.md index 9768a78..7307595 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +- Added partial support for Mempool monitoring mini-protocol. Allows reading transactions in the mempool. + ## [v0.4.1](https://github.com/wowica/xogmios/releases/tag/v0.4.1) (2024-06-05) ### Fixed diff --git a/README.md b/README.md index fc64711..a255dd0 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ Mini-Protocols supported by this library: - [x] Chain Synchronization - [x] State Query - [x] Tx Submission -- [ ] Mempool Monitoring +- [x] Mempool Monitoring (reads transactions only) See [Examples](#examples) section below for information on how to use this library. @@ -156,6 +156,39 @@ defmodule TxSubmissionClient do end ``` +### Mempool Monitoring + +The following illustrates working with the **Mempool Monitoring** protocol. It provides a way to list transactions in the mempool. + +```elixir +defmodule MempoolTxsClient do + @moduledoc """ + This module prints transactions as they become available + in the mempool + """ + use Xogmios, :mempool_txs + + def start_link(opts) do + Xogmios.start_mempool_txs_link(__MODULE__, opts) + end + + # Callbacks + @impl true + def handle_acquired(%{"slot" => slot} = _snapshot, state) do + IO.puts("Snapshot acquired at slot #{slot}") + + {:ok, :next_transaction, state} + end + + @impl true + def handle_transaction(transaction, state) do + IO.puts("Transaction: #{transaction["id"]}") + + {:ok, :next_transaction, state} + end +end +``` + For examples of applications using this library, see [Blocks](https://github.com/wowica/blocks) and [xogmios_watcher](https://github.com/wowica/xogmios_watcher). ## Test diff --git a/examples/mempool_txs_client.ex b/examples/mempool_txs_client.ex new file mode 100644 index 0000000..8f1b3f3 --- /dev/null +++ b/examples/mempool_txs_client.ex @@ -0,0 +1,23 @@ +defmodule Xogmios.MempoolTxsClient do + @moduledoc """ + This module prints transactions as they become available + in the mempool. + """ + use Xogmios, :mempool_txs + + def start_link(opts) do + # set include_details: true to retrieve + # complete information about the transaction. + # set include_details: false (default) to retrieve + # only transaction id. + opts = Keyword.merge(opts, include_details: true) + Xogmios.start_mempool_link(__MODULE__, opts) + end + + @impl true + def handle_transaction(transaction, state) do + IO.puts("transaction #{inspect(transaction)}") + + {:ok, :next_transaction, state} + end +end diff --git a/lib/xogmios.ex b/lib/xogmios.ex index 9505bd7..ce7ad18 100644 --- a/lib/xogmios.ex +++ b/lib/xogmios.ex @@ -3,24 +3,33 @@ defmodule Xogmios do This is the top level module for Xogmios. It implements functions to be used by client modules that wish to connect with Ogmios. - When used, the it expects one of the supported mini-protocols as argument. For example: + When you `use` this module, it expects one of the supported mini-protocols as argument. For example: defmodule ChainSyncClient do use Xogmios, :chain_sync # ... end - or - defmodule StateQueryClient do use Xogmios, :state_query # ... end + + defmodule TxSubmissionClient do + use Xogmios, :tx_submission + # ... + end + + defmodule MempoolTxsClient do + use Xogmios, :mempool_txs + # ... + end """ alias Xogmios.ChainSync alias Xogmios.StateQuery alias Xogmios.TxSubmission + alias Xogmios.MempoolTxs @doc """ Starts a new State Query process linked to the current process. @@ -89,6 +98,30 @@ defmodule Xogmios do TxSubmission.start_link(client, opts) end + @doc """ + Starts a new MempoolTxs (Transactions) process linked to the current process. + + `opts` as keyword lists are passed to the underlying :websocket_client. + + The `:include_details` flag can be used to determine the level of details + to be returned with each transaction as part of `c:Xogmios.Mempool.handle_transaction/2`. + + Setting this option to `false` (default) means only transaction id is returned: + + ``` + Xogmios.start_mempool_txs_link(__MODULE__, url: ogmios_url, include_details: false) + ``` + + Setting it to `true` means all transaction fields are returned: + + ``` + Xogmios.start_mempool_txs_link(__MODULE__, url: ogmios_url, include_details: true) + ``` + """ + def start_mempool_txs_link(client, opts) do + MempoolTxs.start_link(client, opts) + end + defmacro __using__(:state_query) do quote do use Xogmios.StateQuery @@ -106,4 +139,16 @@ defmodule Xogmios do use Xogmios.TxSubmission end end + + defmacro __using__(:mempool_txs) do + quote do + use Xogmios.MempoolTxs + end + end + + defmacro __using__(_opts) do + quote do + raise "Unsupported method" + end + end end diff --git a/lib/xogmios/mempool/connection.ex b/lib/xogmios/mempool/connection.ex new file mode 100644 index 0000000..bca9c24 --- /dev/null +++ b/lib/xogmios/mempool/connection.ex @@ -0,0 +1,94 @@ +defmodule Xogmios.Mempool.Connection do + @moduledoc """ + This module implements a connection with the Ogmios Websocket server + for the Mempool protocol. + """ + + alias Xogmios.Mempool.Messages + + defmacro __using__(_opts) do + quote do + @behaviour :websocket_client + + require Logger + + @name __MODULE__ + @reconnect_interval 5_000 + + def child_spec(opts) do + %{ + id: Keyword.get(opts, :id, __MODULE__), + start: {__MODULE__, :start_link, [opts]}, + shutdown: 5_000, + restart: Keyword.get(opts, :restart, :transient), + type: :worker + } + end + + @impl true + def init(state) do + initial_state = + state + |> Enum.into(%{}) + |> Map.merge(%{handler: __MODULE__}) + + {:reconnect, initial_state} + end + + @impl true + def onconnect(connection, state) do + state = Map.put(state, :ws_pid, self()) + + start_message = Messages.acquire_mempool() + :websocket_client.cast(self(), {:text, start_message}) + send(state.notify_on_connect, {:connected, connection}) + + case state.handler.handle_connect(state) do + {:ok, new_state} -> + {:ok, new_state} + + _ -> + {:ok, state} + end + end + + @impl true + def ondisconnect(reason, state) do + case state.handler.handle_disconnect(reason, state) do + {:ok, state} -> + {:ok, state} + + {:reconnect, reconnect_interval_in_ms, new_state} -> + {:reconnect, reconnect_interval_in_ms, new_state} + end + end + + @impl true + def websocket_handle({:text, raw_message}, _conn, state) do + case Jason.decode(raw_message) do + {:ok, message} -> + handle_message(message, state) + + {:error, reason} -> + Logger.warning("Error decoding message #{inspect(reason)}") + {:ok, state} + end + end + + @impl true + def websocket_handle(_message, _conn, state) do + {:ok, state} + end + + @impl true + def websocket_info(_any, _arg1, state) do + {:ok, state} + end + + @impl true + def websocket_terminate(_arg0, _arg1, _state) do + :ok + end + end + end +end diff --git a/lib/xogmios/mempool/messages.ex b/lib/xogmios/mempool/messages.ex new file mode 100644 index 0000000..c43b382 --- /dev/null +++ b/lib/xogmios/mempool/messages.ex @@ -0,0 +1,52 @@ +defmodule Xogmios.Mempool.Messages do + @moduledoc """ + This module contains messages for the Mempool protocol + """ + + alias Jason.DecodeError + + def acquire_mempool() do + json = ~S""" + { + "jsonrpc": "2.0", + "method": "acquireMempool" + } + """ + + validate_json!(json) + json + end + + def next_transaction(include_details \\ false) do + json = + if include_details do + ~S""" + { + "jsonrpc": "2.0", + "method": "nextTransaction", + "params": { + "fields": "all" + } + } + """ + else + ~S""" + { + "jsonrpc": "2.0", + "method": "nextTransaction", + "params": {} + } + """ + end + + validate_json!(json) + json + end + + defp validate_json!(json) do + case Jason.decode(json) do + {:ok, _decoded} -> :ok + {:error, %DecodeError{} = error} -> raise "Invalid JSON: #{inspect(error)}" + end + end +end diff --git a/lib/xogmios/mempool_txs.ex b/lib/xogmios/mempool_txs.ex new file mode 100644 index 0000000..23def24 --- /dev/null +++ b/lib/xogmios/mempool_txs.ex @@ -0,0 +1,203 @@ +defmodule Xogmios.MempoolTxs do + @moduledoc """ + This module interfaces with the Mempool protocol for reading + transactions as they become available (`nextTransaction`). + """ + + alias Xogmios.Mempool.Messages + + require Logger + + @doc """ + Invoked when a new snapshot is acquired. + + Receives snapshot information as argument. + + Returning `{:ok, :next_transaction, new_state}` will request the next transaction + once it's made available. + + Returning `{:ok, new_state}` wil not request anymore transactions. + + Returning `{:close, new_state}` will close the connection to the server. + """ + @callback handle_acquired(snapshop :: map(), state) :: + {:ok, :next_transaction, new_state} + | {:ok, new_state} + | {:close, new_state} + when state: term(), new_state: term() + + @doc """ + Invoked when a new transaction is made available in the mempool. + + Receives transaction information as argument and current state of the handler. + + Returning `{:ok, :next_transaction, new_state}` will request the next transaction + once it's made available. + + Returning `{:ok, new_state}` wil not request anymore transactions. + + Returning `{:close, new_state}` will close the connection to the server. + """ + @callback handle_transaction(transaction :: map(), state) :: + {:ok, :next_transaction, new_state} + | {:ok, new_state} + | {:close, new_state} + when state: term(), new_state: term() + + @doc """ + Invoked upon connecting to the server. This callback is optional. + """ + @callback handle_connect(state) :: {:ok, new_state} + when state: term(), new_state: term() + + @doc """ + Invoked upon disconnecting from the server. This callback is optional. + + Returning `{:ok, new_state}` will allow the connection to close. + + Returning `{:reconnect, interval_in_ms}` will attempt a reconnection after `interval_in_ms` + """ + @callback handle_disconnect(reason :: String.t(), state) :: + {:ok, new_state} + | {:reconnect, interval_in_ms :: non_neg_integer(), new_state} + when state: term(), new_state: term() + + # The keepalive option is used to maintain the connection active. + # This is important because proxies might close idle connections after a few seconds. + @keepalive_in_ms 5_000 + + # The websocket client library + @client :websocket_client + + @doc """ + Starts a new Mempool process linked to the current process. + + This function should not be called directly, but rather via `Xogmios.start_mempool_link/2` + """ + @spec start_link(module(), start_options :: Keyword.t()) :: {:ok, pid()} | {:error, term()} + def start_link(client, opts) do + {url, opts} = Keyword.pop(opts, :url) + {name, opts} = Keyword.pop(opts, :name, client) + opts = Keyword.put_new(opts, :include_details, false) + initial_state = Keyword.merge(opts, handler: client, notify_on_connect: self()) + + with {:ok, process_name} <- build_process_name(name), + {:ok, ws_pid} <- start_link(process_name, url, client, initial_state) do + # Blocks until the connection with the Ogmios server + # is established or until timeout is reached. + + receive do + {:connected, _connection} -> {:ok, ws_pid} + after + _timeout = 5_000 -> + Logger.warning("Timeout connecting to Ogmios server") + send(ws_pid, :close) + {:error, :connection_timeout} + end + else + {:error, :invalid_process_name} = error -> + error + + {:error, _} = error -> + Logger.warning("Error connecting with Ogmios server #{inspect(error)}") + error + end + end + + defp start_link(name, url, client, state) do + @client.start_link(name, url, client, state, keepalive: @keepalive_in_ms) + end + + # Builds process name from valid argument or returns error + @spec build_process_name(term() | {:global, term()} | {:via, term(), term()}) :: + {:ok, any()} | {:error, term()} + defp build_process_name(name) do + case name do + name when is_atom(name) -> + {:ok, {:local, name}} + + {:global, term} = tuple when is_atom(term) -> + {:ok, tuple} + + {:via, registry, _term} = tuple when is_atom(registry) -> + {:ok, tuple} + + _ -> + # Returns error if name does not comply with + # values accepted by the websocket client library + {:error, :invalid_process_name} + end + end + + defmacro __using__(_opts) do + quote do + @behaviour Xogmios.MempoolTxs + + use Xogmios.Mempool.Connection + + require Logger + + def handle_connect(state), do: {:ok, state} + def handle_disconnect(_reason, state), do: {:ok, state} + def handle_acquired(_snapshot, state), do: {:ok, :next_transaction, state} + defoverridable handle_connect: 1, handle_disconnect: 2, handle_acquired: 2 + + def handle_message( + %{ + "method" => "acquireMempool", + "result" => %{"acquired" => "mempool", "slot" => slot} = _result + }, + state + ) do + case state.handler.handle_acquired(%{"slot" => slot}, state) do + {:ok, :next_transaction, new_state} -> + message = Messages.next_transaction(state.include_details) + {:reply, {:text, message}, new_state} + + {:ok, new_state} -> + {:ok, new_state} + + {:close, new_state} -> + {:close, "finished", new_state} + + response -> + Logger.warning("Invalid response #{inspect(response)}") + end + end + + def handle_message( + %{ + "method" => "nextTransaction", + "result" => %{"transaction" => nil} = _result + }, + state + ) do + message = Messages.acquire_mempool() + {:reply, {:text, message}, state} + end + + def handle_message( + %{ + "method" => "nextTransaction", + "result" => %{"transaction" => transaction} = _result + }, + state + ) do + case state.handler.handle_transaction(transaction, state) do + {:ok, :next_transaction, new_state} -> + message = Messages.next_transaction() + {:reply, {:text, message}, new_state} + + {:ok, new_state} -> + {:ok, new_state} + + {:close, new_state} -> + {:close, "finished", new_state} + + response -> + Logger.warning("Invalid response #{inspect(response)}") + end + end + end + end +end diff --git a/mix.lock b/mix.lock index 0e75739..f728a6a 100644 --- a/mix.lock +++ b/mix.lock @@ -3,7 +3,7 @@ "cowboy": {:hex, :cowboy, "2.10.0", "ff9ffeff91dae4ae270dd975642997afe2a1179d94b1887863e43f681a203e26", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "3afdccb7183cc6f143cb14d3cf51fa00e53db9ec80cdcd525482f5e99bc41d6b"}, "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, "cowlib": {:hex, :cowlib, "2.12.1", "a9fa9a625f1d2025fe6b462cb865881329b5caff8f1854d1cbc9f9533f00e1e1", [:make, :rebar3], [], "hexpm", "163b73f6367a7341b33c794c4e88e7dbfe6498ac42dcd69ef44c5bc5507c8db0"}, - "credo": {:hex, :credo, "1.7.2", "fdee3a7cb553d8f2e773569181f0a4a2bb7d192e27e325404cc31b354f59d68c", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "dd15d6fbc280f6cf9b269f41df4e4992dee6615939653b164ef951f60afcb68e"}, + "credo": {:hex, :credo, "1.7.7", "771445037228f763f9b2afd612b6aa2fd8e28432a95dbbc60d8e03ce71ba4446", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8bc87496c9aaacdc3f90f01b7b0582467b69b4bd2441fe8aae3109d843cc2f2e"}, "dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"}, "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, diff --git a/test/mempool_txs_test.exs b/test/mempool_txs_test.exs new file mode 100644 index 0000000..c263eae --- /dev/null +++ b/test/mempool_txs_test.exs @@ -0,0 +1,47 @@ +defmodule Xogmios.MempoolTest do + use ExUnit.Case + + @ws_url TestServer.get_url() + + setup_all do + {:ok, _server} = TestServer.start(handler: MempoolTxs.TestHandler) + + on_exit(fn -> + TestServer.shutdown() + end) + + :ok + end + + defmodule DummyClient do + use Xogmios, :mempool_txs + + def start_link(opts) do + Xogmios.start_mempool_txs_link(__MODULE__, opts) + end + + @impl true + def handle_acquired(_snapshot, state) do + send(state.test_handler, :handle_acquired) + {:ok, :next_transaction, state} + end + + @impl true + def handle_transaction(_block, state) do + send(state.test_handler, :handle_transaction) + {:close, state} + end + end + + test "receives callbacks and closes connection" do + pid = start_supervised!({DummyClient, url: @ws_url, test_handler: self()}) + assert is_pid(pid) + + assert_receive :handle_acquired + assert_receive :handle_transaction + + Process.sleep(500) + refute Process.alive?(pid) + assert GenServer.whereis(DummyClient) == nil + end +end diff --git a/test/support/mempool_txs/test_handler.ex b/test/support/mempool_txs/test_handler.ex new file mode 100644 index 0000000..1865451 --- /dev/null +++ b/test/support/mempool_txs/test_handler.ex @@ -0,0 +1,54 @@ +defmodule MempoolTxs.TestHandler do + @moduledoc false + + @behaviour :cowboy_websocket + + @impl true + def init(request, state) do + {:cowboy_websocket, request, state} + end + + @impl true + def websocket_init(state) do + {:ok, state} + end + + @impl true + def websocket_handle({:text, payload}, state) do + case Jason.decode(payload) do + {:ok, %{"method" => "acquireMempool"}} -> + payload = + Jason.encode!(%{ + "method" => "acquireMempool", + "result" => %{"acquired" => "mempool", "slot" => 123} + }) + + {:reply, {:text, payload}, state} + + {:ok, %{"method" => "nextTransaction"}} -> + payload = + Jason.encode!(%{ + "method" => "nextTransaction", + "result" => %{ + "transaction" => %{"id" => 456} + } + }) + + {:reply, {:text, payload}, state} + end + end + + @impl true + def terminate(_arg1, _arg2, _arg3) do + :ok + end + + def websocket_info(:stop, state) do + {:stop, state} + end + + @impl true + def websocket_info(info, state) do + {:reply, {:text, info}, state} + end +end