Skip to content

Commit

Permalink
Add chainsync rollback (#29)
Browse files Browse the repository at this point in the history
* Add support for rollback

* Update docs
  • Loading branch information
caike authored May 31, 2024
1 parent 6f22150 commit a3e977c
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 13 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ defmodule ChainSyncClient do
IO.puts("final handle_block #{block["height"]}")
{:close, state}
end

@impl true
def handle_rollback(point, state) do
IO.puts("handle_rollback")

# Use this information to update your custom state accordingly
IO.puts("Block id: #{point["id"]}")
IO.puts("Slot: #{point["slot"]}")

{:ok, :next_block, state}
end
end
```

Expand Down
73 changes: 69 additions & 4 deletions lib/xogmios/chain_sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ defmodule Xogmios.ChainSync do
@doc """
Invoked when a new block is emitted. This callback is required.
Receives block information as argument and the current state of the handler.
Returning `{:ok, :next_block, new_state}` will request the next block once
it's made available.
Expand All @@ -23,6 +25,28 @@ defmodule Xogmios.ChainSync do
| {:close, new_state}
when state: term(), new_state: term()

@doc """
Invoked when a rollback event is emitted. This callback is optional.
Receives as argument a point and the state of the handler. The point is a
map with keys for `id` (block id) and a `slot`. This information can then
be used by the handler module to perform the necessary corrections.
For example, resetting all current known state past this point and then
rewriting it from future invokations of `c:handle_block/2`
Returning `{:ok, :next_block, new_state}` will request the next block once
it's made available.
Returning `{:ok, new_state}` will not request anymore blocks.
Returning `{:close, new_state}` will close the connection to the server.
"""
@callback handle_rollback(point :: map(), state) ::
{:ok, :next_block, new_state}
| {:ok, new_state}
| {:close, new_state}
when state: term(), new_state: term()

@doc """
Invoked upon connecting to the server. This callback is optional.
"""
Expand Down Expand Up @@ -154,9 +178,10 @@ defmodule Xogmios.ChainSync do

def handle_connect(state), do: {:ok, state}
def handle_disconnect(_reason, state), do: {:ok, state}
defoverridable handle_connect: 1, handle_disconnect: 2
def handle_rollback(_point, state), do: {:ok, :next_block, state}
defoverridable handle_connect: 1, handle_disconnect: 2, handle_rollback: 2

def handle_message(%{"id" => "start"} = message, state) do
def handle_message(%{"id" => "initial_sync"} = message, state) do
%{
"method" => "nextBlock",
"result" => %{"direction" => "backward", "tip" => tip}
Expand Down Expand Up @@ -185,18 +210,58 @@ defmodule Xogmios.ChainSync do
end

def handle_message(%{"method" => "findIntersection"}, state) do
message = Messages.next_block()
message = Messages.next_block_start()
{:reply, {:text, message}, state}
end

# This function handles the initial and unique roll backward event as
# part of finding and intersection.
#
#
# From Ogmios' official docs:
#
# "After successfully finding an intersection, the node will always ask
# to roll backward to that intersection point. This is because it is
# possible to provide many points when looking for an intersection and
# the protocol makes sure that both the node and the client are in sync.
# This allows clients applications to be somewhat “dumb” and blindly
# follow instructions from the node."
def handle_message(
%{"method" => "nextBlock", "result" => %{"direction" => "backward"}},
%{
"id" => "next_block_start",
"method" => "nextBlock",
"result" => %{"direction" => "backward"}
} = message,
state
) do
message = Messages.next_block()
{:reply, {:text, message}, state}
end

# This function handles rollbacks
def handle_message(
%{
"method" => "nextBlock",
"result" => %{"direction" => "backward"} = result
},
state
) do
case state.handler.handle_rollback(result["point"], state) do
{:ok, :next_block, new_state} ->
message = Messages.next_block()
{:reply, {:text, message}, new_state}

{:ok, new_state} ->
{:ok, new_state}

{:close, new_state} ->
{:close, "finished", new_state}

response ->
Logger.warning("Invalid response #{inspect(response)}")
end
end

def handle_message(
%{"method" => "nextBlock", "result" => %{"direction" => "forward"} = result},
state
Expand Down
2 changes: 1 addition & 1 deletion lib/xogmios/chain_sync/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule Xogmios.ChainSync.Connection do
def onconnect(connection, state) do
state = Map.put(state, :ws_pid, self())

start_message = Messages.next_block_start()
start_message = Messages.initial_sync()
:websocket_client.cast(self(), {:text, start_message})
send(state.notify_on_connect, {:connected, connection})

Expand Down
28 changes: 23 additions & 5 deletions lib/xogmios/chain_sync/messages.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,32 @@ defmodule Xogmios.ChainSync.Messages do
Once the response from this initial message is received, then
the client proceeds with the appropriate syncing strategy.
"""
def next_block_start() do
# The `id:"start"` is returned as part of the message response,
def initial_sync() do
# The `id:"initial_sync"` is returned as part of the message response,
# and helps the client determine that this is a "nextBlock" response
# to the initial message.
json = ~S"""
{
"jsonrpc": "2.0",
"method": "nextBlock",
"id": "start"
"id": "initial_sync"
}
"""

validate_json!(json)
json
end

@doc """
Request first block which preceeds the initial rollback received
as a response from Ogmios.
"""
def next_block_start() do
json = ~S"""
{
"jsonrpc": "2.0",
"method": "nextBlock",
"id": "next_block_start"
}
"""

Expand Down Expand Up @@ -76,6 +93,7 @@ defmodule Xogmios.ChainSync.Messages do
{
"jsonrpc": "2.0",
"method": "findIntersection",
"id": "start",
"params": {
"points": [
"origin",
Expand All @@ -93,7 +111,7 @@ defmodule Xogmios.ChainSync.Messages do
end

# The following are the last points (absolute slot and block id) of
# the previous era of each entry. The sync is done against the last
# the previous **mainnet** era of each entry. The sync is done against the last
# point, so that the next block received is the first of the following era.
@era_bounds %{
shelley: {4_492_799, "f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457"},
Expand All @@ -104,7 +122,7 @@ defmodule Xogmios.ChainSync.Messages do
}

@doc """
Syncs with a particular era bound.
Syncs with a particular **mainnet** era bound.
Values accepted are #{Map.keys(@era_bounds) |> Enum.map_join(",\n ", fn key -> "`:#{key}`" end)}
Expand Down
38 changes: 38 additions & 0 deletions test/chain_sync_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,42 @@ defmodule Xogmios.ChainSyncTest do

assert pid4 == global_pid4
end

defmodule DummyClientRollback do
use Xogmios, :chain_sync

def start_link(opts) do
opts = Keyword.merge(opts, after_rollback: false)
Xogmios.start_chain_sync_link(__MODULE__, opts)
end

@impl true
def handle_block(_block, %{after_rollback: true} = state) do
send(state.test_handler, :after_rollback)
{:close, state}
end

@impl true
def handle_block(_block, state) do
send(state.test_handler, :handle_block)
{:ok, :next_block, state}
end

@impl true
def handle_rollback(point, state) do
send(state.test_handler, {:rollback, point})
new_state = Map.put(state, :after_rollback, true)
{:ok, :next_block, new_state}
end
end

test "handle_rollback" do
pid = start_supervised!({DummyClientRollback, url: @ws_url, test_handler: self()})
assert is_pid(pid)
assert_receive :handle_block
assert_receive {:rollback, point}
assert point["id"]
assert point["slot"]
assert_receive :after_rollback
end
end
30 changes: 27 additions & 3 deletions test/support/chain_sync/test_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,41 @@ defmodule ChainSync.TestHandler do

@impl true
def websocket_handle({:text, payload}, state) do
current_counter = Process.get(:counter) || 1

case Jason.decode(payload) do
{:ok, %{"id" => "start"}} ->
{:ok, %{"id" => "initial_sync"}} ->
payload =
Jason.encode!(%{
"method" => "nextBlock",
"result" => %{"direction" => "forward", "block" => %{}}
"result" => %{"direction" => "forward", "block" => %{"height" => 123}}
})

Process.put(:counter, current_counter + 1)
{:reply, {:text, payload}, state}

_ ->
{:ok, %{"method" => "nextBlock"}} ->
payload =
if current_counter == 2 do
Jason.encode!(%{
"method" => "nextBlock",
"result" => %{
"direction" => "backward",
"point" => %{"id" => "abc123", "slot" => 123}
}
})
else
Jason.encode!(%{
"method" => "nextBlock",
"result" => %{
"direction" => "forward",
"block" => %{"height" => 456}
}
})
end

Process.put(:counter, current_counter + 1)

{:reply, {:text, payload}, state}
end
end
Expand Down

0 comments on commit a3e977c

Please sign in to comment.