Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add manual sync mechanism #27

Merged
merged 15 commits into from
May 29, 2024
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- Add manual syncing mechanism for ChainSync. This adds an optional back-pressure when building chain indexers that rely on Xogmios. Tested with GenStage on [the following experimental branch](https://github.com/wowica/xogmios_watcher/tree/chain-indexer)

- Fix process naming for ChainSync clients. Given different process names and ids as options, multiple clients are now supported.

### Changed

- StateQuery.send_query interface. Now accepts queries as strings from user input.
Expand Down
2 changes: 1 addition & 1 deletion examples/chain_sync_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ defmodule ChainSyncClient do
@impl true
def handle_block(block, state) do
IO.puts("final handle_block #{block["height"]}")
{:ok, :close, state}
{:close, state}
end
end
127 changes: 110 additions & 17 deletions lib/xogmios/chain_sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ defmodule Xogmios.ChainSync do

alias Xogmios.ChainSync.Messages

require Logger

@doc """
Invoked when a new block is emitted. This callback is required.

Returning `{:ok, :next_block, new_state}` will request the next block once it's made available.
Returning `{:ok, :next_block, new_state}` will request the next block once
it's made available.

Returning `{:ok, new_state}` will not request anymore blocks.

Returning `{:ok, :close, new_state}` will close the connection to the server.
Returning `{:close, new_state}` will close the connection to the server.
"""
@callback handle_block(block :: map(), state) ::
{:ok, :next_block, new_state}
Expand Down Expand Up @@ -42,6 +45,9 @@ defmodule Xogmios.ChainSync do
# This is important because proxies might close idle connections after a few seconds.
@keepalive_in_ms 5_000

# The websocket client library
@client :websocket_client

@doc """
Starts a new Chain Sync process linked to the current process.

Expand All @@ -50,11 +56,92 @@ defmodule Xogmios.ChainSync do
@spec start_link(module(), start_options :: Keyword.t()) :: {:ok, pid()} | {:error, term()}
def start_link(client, opts) do
{url, opts} = Keyword.pop(opts, :url)
initial_state = Keyword.merge(opts, handler: client)
{name, opts} = Keyword.pop(opts, :name, client)
initial_state = Keyword.merge(opts, handler: client, notify_on_connect: self())

with {:ok, process_name} <- build_process_name(name),
{:ok, ws_pid} <- start_link(process_name, url, client, initial_state) do
# Blocks until the connection with the Ogmios server
# is established or until timeout is reached.
receive do
{:connected, _connection} -> {:ok, ws_pid}
after
_timeout = 5_000 ->
Logger.warning("Timeout connecting to Ogmios server")
send(ws_pid, :close)
{:error, :connection_timeout}
end
else
{:error, :invalid_process_name} = error ->
error

{:error, _} = error ->
Logger.warning("Error connecting with Ogmios server #{inspect(error)}")
error
end
end

:websocket_client.start_link({:local, client}, url, client, initial_state,
keepalive: @keepalive_in_ms
)
defp start_link(name, url, client, state) do
@client.start_link(name, url, client, state, keepalive: @keepalive_in_ms)
end

# Builds process name from valid argument or returns error
@spec build_process_name(term() | {:global, term()} | {:via, term(), term()}) ::
{:ok, any()} | {:error, term()}
defp build_process_name(name) do
case name do
name when is_atom(name) ->
{:ok, {:local, name}}

{:global, term} = tuple when is_atom(term) ->
{:ok, tuple}

{:via, registry, _term} = tuple when is_atom(registry) ->
{:ok, tuple}

_ ->
# Returns error if name does not comply with
# values accepted by the websocket client library
{:error, :invalid_process_name}
end
end

@doc """
> #### Warning {: .warning}
>
> This is a highly experimental function and should not be relied on just yet.

Issues a synchronous message for reading the next block.
Potentially useful for building chain indexers with support for backpressure mechanism.
"""
@spec read_next_block(pid()) :: {:ok, block :: map()} | :error
def read_next_block(pid) do
# hacky af but it does the job for now

state = :sys.get_state(pid)

{_c, %{ws_pid: ws_pid}} = state |> elem(1) |> elem(5)

caller = self()

:sys.replace_state(pid, fn current_state ->
{:connected, {:context, req, transport, empty_list, ws, {module, client_info}, _, _, _}} =
current_state

updated_client_info = Map.put(client_info, :caller, caller)

{:connected,
{:context, req, transport, empty_list, ws, {module, updated_client_info}, "", true, 0}}
end)

next_block_message = Xogmios.ChainSync.Messages.next_block()
:websocket_client.cast(ws_pid, {:text, next_block_message})

receive do
{:ok, next_block} -> {:ok, next_block}
after
5_000 -> :error
end
end

defmacro __using__(_opts) do
Expand Down Expand Up @@ -114,19 +201,25 @@ defmodule Xogmios.ChainSync do
%{"method" => "nextBlock", "result" => %{"direction" => "forward"} = result},
state
) do
case state.handler.handle_block(result["block"], state) do
{:ok, :next_block, new_state} ->
message = Messages.next_block()
{:reply, {:text, message}, new_state}

{:ok, new_state} ->
{:ok, new_state}
if caller = Map.get(state, :caller) do
# Returns to sync caller
send(caller, {:ok, result["block"]})
{:ok, state}
else
case state.handler.handle_block(result["block"], state) do
{:ok, :next_block, new_state} ->
message = Messages.next_block()
{:reply, {:text, message}, new_state}

{:ok, new_state} ->
{:ok, new_state}

{:close, new_state} ->
{:close, "finished", new_state}
{:close, new_state} ->
{:close, "finished", new_state}

response ->
Logger.warning("Invalid response #{inspect(response)}")
response ->
Logger.warning("Invalid response #{inspect(response)}")
end
end
end

Expand Down
9 changes: 5 additions & 4 deletions lib/xogmios/chain_sync/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ defmodule Xogmios.ChainSync.Connection do

alias Xogmios.ChainSync.Messages

require Logger

defmacro __using__(_opts) do
quote do
@behaviour :websocket_client
Expand All @@ -19,7 +17,7 @@ defmodule Xogmios.ChainSync.Connection do

def child_spec(opts) do
%{
id: __MODULE__,
id: Keyword.get(opts, :id, __MODULE__),
start: {__MODULE__, :start_link, [opts]},
shutdown: 5_000,
restart: Keyword.get(opts, :restart, :transient),
Expand All @@ -38,9 +36,12 @@ defmodule Xogmios.ChainSync.Connection do
end

@impl true
def onconnect(_arg, state) do
def onconnect(connection, state) do
state = Map.put(state, :ws_pid, self())

start_message = Messages.next_block_start()
:websocket_client.cast(self(), {:text, start_message})
send(state.notify_on_connect, {:connected, connection})

case state.handler.handle_connect(state) do
{:ok, new_state} ->
Expand Down
72 changes: 72 additions & 0 deletions test/chain_sync_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,76 @@ defmodule Xogmios.ChainSyncTest do
refute Process.alive?(pid)
assert GenServer.whereis(DummyClient) == nil
end

defmodule DummyClientWithName do
use Xogmios, :chain_sync

def start_link(opts) do
Xogmios.start_chain_sync_link(__MODULE__, opts)
end

@impl true
def handle_block(_block, state) do
send(state.test_handler, :handle_block)
{:ok, state}
end
end

test "allows multiple named processes" do
opts1 = [url: @ws_url, test_handler: self(), name: :apple, id: :apple]
pid1 = start_supervised!({DummyClientWithName, opts1})

assert is_pid(pid1)
assert Process.info(pid1)[:registered_name] == :apple

opts2 = [url: @ws_url, test_handler: self(), name: :banana, id: :banana]
pid2 = start_supervised!({DummyClientWithName, opts2})

assert is_pid(pid2)
assert pid1 != pid2
assert Process.info(pid2)[:registered_name] == :banana

# Duplicate process names raise error
assert_raise RuntimeError, fn ->
start_supervised!({DummyClientWithName, opts2})
end

# Process name must be atom
opts22 = [url: @ws_url, test_handler: self(), name: "banana", id: :banana_string]
{:error, {:invalid_process_name, _}} = start_supervised({DummyClientWithName, opts22})

# Named via Registry
opts3 = [
url: @ws_url,
test_handler: self(),
name: {:via, Registry, {DummyRegistry, :apple}},
id: :registry_apple
]

_registry_pid = start_supervised!({Registry, keys: :unique, name: DummyRegistry})

pid3 = start_supervised!({DummyClientWithName, opts3})

assert is_pid(pid3)
assert Process.info(pid3)[:registered_name] == nil
assert [{^pid3, nil}] = Registry.lookup(DummyRegistry, :apple)

# Global
opts4 = [
url: @ws_url,
test_handler: self(),
name: {:global, :global_banana},
id: :global_banana
]

pid4 = start_supervised!({DummyClientWithName, opts4})

assert is_pid(pid4)

global_pid4 = :global.whereis_name(:global_banana)

assert is_pid(global_pid4)

assert pid4 == global_pid4
end
end
Loading