diff --git a/README.md b/README.md index b5b4f6c..3710169 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,17 @@ defmodule ChainSyncClient do IO.puts("final handle_block #{block["height"]}") {:close, state} end + + @impl true + def handle_rollback(point, state) do + IO.puts("handle_rollback") + + # Use this information to update your custom state accordingly + IO.puts("Block id: #{point["id"]}") + IO.puts("Slot: #{point["slot"]}") + + {:ok, :next_block, state} + end end ``` diff --git a/lib/xogmios/chain_sync.ex b/lib/xogmios/chain_sync.ex index dbc8b50..3b609aa 100644 --- a/lib/xogmios/chain_sync.ex +++ b/lib/xogmios/chain_sync.ex @@ -10,6 +10,8 @@ defmodule Xogmios.ChainSync do @doc """ Invoked when a new block is emitted. This callback is required. + Receives block information as argument and the current state of the handler. + Returning `{:ok, :next_block, new_state}` will request the next block once it's made available. @@ -23,6 +25,28 @@ defmodule Xogmios.ChainSync do | {:close, new_state} when state: term(), new_state: term() + @doc """ + Invoked when a rollback event is emitted. This callback is optional. + + Receives as argument a point and the state of the handler. The point is a + map with keys for `id` (block id) and a `slot`. This information can then + be used by the handler module to perform the necessary corrections. + For example, resetting all current known state past this point and then + rewriting it from future invokations of `c:handle_block/2` + + Returning `{:ok, :next_block, new_state}` will request the next block once + it's made available. + + Returning `{:ok, new_state}` will not request anymore blocks. + + Returning `{:close, new_state}` will close the connection to the server. + """ + @callback handle_rollback(point :: map(), state) :: + {:ok, :next_block, new_state} + | {:ok, new_state} + | {:close, new_state} + when state: term(), new_state: term() + @doc """ Invoked upon connecting to the server. This callback is optional. """ @@ -154,9 +178,10 @@ defmodule Xogmios.ChainSync do def handle_connect(state), do: {:ok, state} def handle_disconnect(_reason, state), do: {:ok, state} - defoverridable handle_connect: 1, handle_disconnect: 2 + def handle_rollback(_point, state), do: {:ok, :next_block, state} + defoverridable handle_connect: 1, handle_disconnect: 2, handle_rollback: 2 - def handle_message(%{"id" => "start"} = message, state) do + def handle_message(%{"id" => "initial_sync"} = message, state) do %{ "method" => "nextBlock", "result" => %{"direction" => "backward", "tip" => tip} @@ -185,18 +210,58 @@ defmodule Xogmios.ChainSync do end def handle_message(%{"method" => "findIntersection"}, state) do - message = Messages.next_block() + message = Messages.next_block_start() {:reply, {:text, message}, state} end + # This function handles the initial and unique roll backward event as + # part of finding and intersection. + # + # + # From Ogmios' official docs: + # + # "After successfully finding an intersection, the node will always ask + # to roll backward to that intersection point. This is because it is + # possible to provide many points when looking for an intersection and + # the protocol makes sure that both the node and the client are in sync. + # This allows clients applications to be somewhat “dumb” and blindly + # follow instructions from the node." def handle_message( - %{"method" => "nextBlock", "result" => %{"direction" => "backward"}}, + %{ + "id" => "next_block_start", + "method" => "nextBlock", + "result" => %{"direction" => "backward"} + } = message, state ) do message = Messages.next_block() {:reply, {:text, message}, state} end + # This function handles rollbacks + def handle_message( + %{ + "method" => "nextBlock", + "result" => %{"direction" => "backward"} = result + }, + state + ) do + case state.handler.handle_rollback(result["point"], state) do + {:ok, :next_block, new_state} -> + message = Messages.next_block() + {:reply, {:text, message}, new_state} + + {:ok, new_state} -> + {:ok, new_state} + + {:close, new_state} -> + {:close, "finished", new_state} + + response -> + Logger.warning("Invalid response #{inspect(response)}") + end + end + def handle_message( %{"method" => "nextBlock", "result" => %{"direction" => "forward"} = result}, state diff --git a/lib/xogmios/chain_sync/connection.ex b/lib/xogmios/chain_sync/connection.ex index bd17170..18e99f3 100644 --- a/lib/xogmios/chain_sync/connection.ex +++ b/lib/xogmios/chain_sync/connection.ex @@ -39,7 +39,7 @@ defmodule Xogmios.ChainSync.Connection do def onconnect(connection, state) do state = Map.put(state, :ws_pid, self()) - start_message = Messages.next_block_start() + start_message = Messages.initial_sync() :websocket_client.cast(self(), {:text, start_message}) send(state.notify_on_connect, {:connected, connection}) diff --git a/lib/xogmios/chain_sync/messages.ex b/lib/xogmios/chain_sync/messages.ex index 25ac641..45495a4 100644 --- a/lib/xogmios/chain_sync/messages.ex +++ b/lib/xogmios/chain_sync/messages.ex @@ -11,15 +11,32 @@ defmodule Xogmios.ChainSync.Messages do Once the response from this initial message is received, then the client proceeds with the appropriate syncing strategy. """ - def next_block_start() do - # The `id:"start"` is returned as part of the message response, + def initial_sync() do + # The `id:"initial_sync"` is returned as part of the message response, # and helps the client determine that this is a "nextBlock" response # to the initial message. json = ~S""" { "jsonrpc": "2.0", "method": "nextBlock", - "id": "start" + "id": "initial_sync" + } + """ + + validate_json!(json) + json + end + + @doc """ + Request first block which preceeds the initial rollback received + as a response from Ogmios. + """ + def next_block_start() do + json = ~S""" + { + "jsonrpc": "2.0", + "method": "nextBlock", + "id": "next_block_start" } """ @@ -76,6 +93,7 @@ defmodule Xogmios.ChainSync.Messages do { "jsonrpc": "2.0", "method": "findIntersection", + "id": "start", "params": { "points": [ "origin", @@ -93,7 +111,7 @@ defmodule Xogmios.ChainSync.Messages do end # The following are the last points (absolute slot and block id) of - # the previous era of each entry. The sync is done against the last + # the previous **mainnet** era of each entry. The sync is done against the last # point, so that the next block received is the first of the following era. @era_bounds %{ shelley: {4_492_799, "f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457"}, @@ -104,7 +122,7 @@ defmodule Xogmios.ChainSync.Messages do } @doc """ - Syncs with a particular era bound. + Syncs with a particular **mainnet** era bound. Values accepted are #{Map.keys(@era_bounds) |> Enum.map_join(",\n ", fn key -> "`:#{key}`" end)} diff --git a/test/chain_sync_test.exs b/test/chain_sync_test.exs index 2191f81..090ab70 100644 --- a/test/chain_sync_test.exs +++ b/test/chain_sync_test.exs @@ -112,4 +112,42 @@ defmodule Xogmios.ChainSyncTest do assert pid4 == global_pid4 end + + defmodule DummyClientRollback do + use Xogmios, :chain_sync + + def start_link(opts) do + opts = Keyword.merge(opts, after_rollback: false) + Xogmios.start_chain_sync_link(__MODULE__, opts) + end + + @impl true + def handle_block(_block, %{after_rollback: true} = state) do + send(state.test_handler, :after_rollback) + {:close, state} + end + + @impl true + def handle_block(_block, state) do + send(state.test_handler, :handle_block) + {:ok, :next_block, state} + end + + @impl true + def handle_rollback(point, state) do + send(state.test_handler, {:rollback, point}) + new_state = Map.put(state, :after_rollback, true) + {:ok, :next_block, new_state} + end + end + + test "handle_rollback" do + pid = start_supervised!({DummyClientRollback, url: @ws_url, test_handler: self()}) + assert is_pid(pid) + assert_receive :handle_block + assert_receive {:rollback, point} + assert point["id"] + assert point["slot"] + assert_receive :after_rollback + end end diff --git a/test/support/chain_sync/test_handler.ex b/test/support/chain_sync/test_handler.ex index ae3917f..ce37e1d 100644 --- a/test/support/chain_sync/test_handler.ex +++ b/test/support/chain_sync/test_handler.ex @@ -15,17 +15,41 @@ defmodule ChainSync.TestHandler do @impl true def websocket_handle({:text, payload}, state) do + current_counter = Process.get(:counter) || 1 + case Jason.decode(payload) do - {:ok, %{"id" => "start"}} -> + {:ok, %{"id" => "initial_sync"}} -> payload = Jason.encode!(%{ "method" => "nextBlock", - "result" => %{"direction" => "forward", "block" => %{}} + "result" => %{"direction" => "forward", "block" => %{"height" => 123}} }) + Process.put(:counter, current_counter + 1) {:reply, {:text, payload}, state} - _ -> + {:ok, %{"method" => "nextBlock"}} -> + payload = + if current_counter == 2 do + Jason.encode!(%{ + "method" => "nextBlock", + "result" => %{ + "direction" => "backward", + "point" => %{"id" => "abc123", "slot" => 123} + } + }) + else + Jason.encode!(%{ + "method" => "nextBlock", + "result" => %{ + "direction" => "forward", + "block" => %{"height" => 456} + } + }) + end + + Process.put(:counter, current_counter + 1) + {:reply, {:text, payload}, state} end end