Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement mempool monitoring #31

Merged
merged 16 commits into from
Aug 3, 2024
2 changes: 1 addition & 1 deletion .formatter.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
inputs: ["{mix,.formatter}.exs", "{config,lib,test,examples}/**/*.{ex,exs}"]
]
5 changes: 2 additions & 3 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
erlang 26.2.1
elixir 1.16.0

erlang 27.0
elixir 1.17.2
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

- Added partial support for Mempool monitoring mini-protocol. Allows reading transactions in the mempool.

## [v0.4.1](https://github.com/wowica/xogmios/releases/tag/v0.4.1) (2024-06-05)

### Fixed
Expand Down
35 changes: 34 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Mini-Protocols supported by this library:
- [x] Chain Synchronization
- [x] State Query
- [x] Tx Submission
- [ ] Mempool Monitoring
- [x] Mempool Monitoring (reads transactions only)

See [Examples](#examples) section below for information on how to use this library.

Expand Down Expand Up @@ -156,6 +156,39 @@ defmodule TxSubmissionClient do
end
```

### Mempool Monitoring

The following illustrates working with the **Mempool Monitoring** protocol. It provides a way to list transactions in the mempool.

```elixir
defmodule MempoolTxsClient do
@moduledoc """
This module prints transactions as they become available
in the mempool
"""
use Xogmios, :mempool_txs

def start_link(opts) do
Xogmios.start_mempool_txs_link(__MODULE__, opts)
end

# Callbacks
@impl true
def handle_acquired(%{"slot" => slot} = _snapshot, state) do
IO.puts("Snapshot acquired at slot #{slot}")

{:ok, :next_transaction, state}
end

@impl true
def handle_transaction(transaction, state) do
IO.puts("Transaction: #{transaction["id"]}")

{:ok, :next_transaction, state}
end
end
```

For examples of applications using this library, see [Blocks](https://github.com/wowica/blocks) and [xogmios_watcher](https://github.com/wowica/xogmios_watcher).

## Test
Expand Down
23 changes: 23 additions & 0 deletions examples/mempool_txs_client.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Xogmios.MempoolTxsClient do
@moduledoc """
This module prints transactions as they become available
in the mempool.
"""
use Xogmios, :mempool_txs

def start_link(opts) do
# set include_details: true to retrieve
# complete information about the transaction.
# set include_details: false (default) to retrieve
# only transaction id.
opts = Keyword.merge(opts, include_details: true)
Xogmios.start_mempool_link(__MODULE__, opts)
end

@impl true
def handle_transaction(transaction, state) do
IO.puts("transaction #{inspect(transaction)}")

{:ok, :next_transaction, state}
end
end
51 changes: 48 additions & 3 deletions lib/xogmios.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,33 @@ defmodule Xogmios do
This is the top level module for Xogmios. It implements functions to be used by client
modules that wish to connect with Ogmios.

When used, the it expects one of the supported mini-protocols as argument. For example:
When you `use` this module, it expects one of the supported mini-protocols as argument. For example:

defmodule ChainSyncClient do
use Xogmios, :chain_sync
# ...
end

or

defmodule StateQueryClient do
use Xogmios, :state_query
# ...
end

defmodule TxSubmissionClient do
use Xogmios, :tx_submission
# ...
end

defmodule MempoolTxsClient do
use Xogmios, :mempool_txs
# ...
end
"""

alias Xogmios.ChainSync
alias Xogmios.StateQuery
alias Xogmios.TxSubmission
alias Xogmios.MempoolTxs

@doc """
Starts a new State Query process linked to the current process.
Expand Down Expand Up @@ -89,6 +98,30 @@ defmodule Xogmios do
TxSubmission.start_link(client, opts)
end

@doc """
Starts a new MempoolTxs (Transactions) process linked to the current process.

`opts` as keyword lists are passed to the underlying :websocket_client.

The `:include_details` flag can be used to determine the level of details
to be returned with each transaction as part of `c:Xogmios.Mempool.handle_transaction/2`.

Setting this option to `false` (default) means only transaction id is returned:

```
Xogmios.start_mempool_txs_link(__MODULE__, url: ogmios_url, include_details: false)
```

Setting it to `true` means all transaction fields are returned:

```
Xogmios.start_mempool_txs_link(__MODULE__, url: ogmios_url, include_details: true)
```
"""
def start_mempool_txs_link(client, opts) do
MempoolTxs.start_link(client, opts)
end

defmacro __using__(:state_query) do
quote do
use Xogmios.StateQuery
Expand All @@ -106,4 +139,16 @@ defmodule Xogmios do
use Xogmios.TxSubmission
end
end

defmacro __using__(:mempool_txs) do
quote do
use Xogmios.MempoolTxs
end
end

defmacro __using__(_opts) do
quote do
raise "Unsupported method"
end
end
end
94 changes: 94 additions & 0 deletions lib/xogmios/mempool/connection.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
defmodule Xogmios.Mempool.Connection do
@moduledoc """
This module implements a connection with the Ogmios Websocket server
for the Mempool protocol.
"""

alias Xogmios.Mempool.Messages

defmacro __using__(_opts) do
quote do
@behaviour :websocket_client

require Logger

@name __MODULE__
@reconnect_interval 5_000

def child_spec(opts) do
%{
id: Keyword.get(opts, :id, __MODULE__),
start: {__MODULE__, :start_link, [opts]},
shutdown: 5_000,
restart: Keyword.get(opts, :restart, :transient),
type: :worker
}
end

@impl true
def init(state) do
initial_state =
state
|> Enum.into(%{})
|> Map.merge(%{handler: __MODULE__})

{:reconnect, initial_state}
end

@impl true
def onconnect(connection, state) do
state = Map.put(state, :ws_pid, self())

start_message = Messages.acquire_mempool()
:websocket_client.cast(self(), {:text, start_message})
send(state.notify_on_connect, {:connected, connection})

case state.handler.handle_connect(state) do
{:ok, new_state} ->
{:ok, new_state}

_ ->
{:ok, state}
end
end

@impl true
def ondisconnect(reason, state) do
case state.handler.handle_disconnect(reason, state) do
{:ok, state} ->
{:ok, state}

{:reconnect, reconnect_interval_in_ms, new_state} ->
{:reconnect, reconnect_interval_in_ms, new_state}
end
end

@impl true
def websocket_handle({:text, raw_message}, _conn, state) do
case Jason.decode(raw_message) do
{:ok, message} ->
handle_message(message, state)

{:error, reason} ->
Logger.warning("Error decoding message #{inspect(reason)}")
{:ok, state}
end
end

@impl true
def websocket_handle(_message, _conn, state) do
{:ok, state}
end

@impl true
def websocket_info(_any, _arg1, state) do
{:ok, state}
end

@impl true
def websocket_terminate(_arg0, _arg1, _state) do
:ok
end
end
end
end
52 changes: 52 additions & 0 deletions lib/xogmios/mempool/messages.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule Xogmios.Mempool.Messages do
@moduledoc """
This module contains messages for the Mempool protocol
"""

alias Jason.DecodeError

def acquire_mempool() do
json = ~S"""
{
"jsonrpc": "2.0",
"method": "acquireMempool"
}
"""

validate_json!(json)
json
end

def next_transaction(include_details \\ false) do
json =
if include_details do
~S"""
{
"jsonrpc": "2.0",
"method": "nextTransaction",
"params": {
"fields": "all"
}
}
"""
else
~S"""
{
"jsonrpc": "2.0",
"method": "nextTransaction",
"params": {}
}
"""
end

validate_json!(json)
json
end

defp validate_json!(json) do
case Jason.decode(json) do
{:ok, _decoded} -> :ok
{:error, %DecodeError{} = error} -> raise "Invalid JSON: #{inspect(error)}"
end
end
end
Loading
Loading