Skip to content

Commit

Permalink
Gracefully closes connection.
Browse files Browse the repository at this point in the history
  • Loading branch information
caike committed Jan 2, 2024
1 parent 2a8f1a8 commit abffd6c
Show file tree
Hide file tree
Showing 14 changed files with 156 additions and 75 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ jobs:
restore-keys: |
xogmios-versions-${{ hashFiles('.tool-versions') }}-${{ hashFiles('mix.lock') }}-YYYY-MM-DD
- name: Credo
run: mix credo

- name: Mix Audit
run: mix deps.audit

Expand Down
22 changes: 13 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,26 @@ Clone this repo, populate `OGMIOS_URL` and run the following to start following

```shell
OGMIOS_URL="ws://..." mix run --no-halt
connected!
Finding intersection...
Compiling 4 files (.ex)

Intersection found.
Waiting for next block...
21:29:25.609 [info] Finding intersection...

New Block!
Height: 9739937 ID: 1a13643c99270355251808eec434f99f2ac439971b88b9afda5b055752b546b2
21:29:25.619 [info] Intersection found.

New Block!
Height: 9739938 ID: eba6cfb41f2e1c777ec282b24ea27cf23d118d4522fd26397d8f4b179ea70340
21:29:25.619 [info] Waiting for next block...

21:29:36.330 [info] Elixir.Xogmios.ClientExampleA handle_block 9751015

21:29:53.522 [info] Elixir.Xogmios.ClientExampleA handle_block 9751016

21:30:08.763 [info] Elixir.Xogmios.ClientExampleA handle_block 9751017

21:30:12.240 [info] Elixir.Xogmios.ClientExampleA handle_block 9751018

...
```

## Text
## Test

Run `mix test`

14 changes: 14 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import Config

# Set LOGGER_LEVEL env to one of the valid
# values for level https://hexdocs.pm/logger/1.12.3/Logger.html#module-levels
# In order to help with debugging, set LOGGER_LEVEL="debug"
case config_env() do
:test ->
config :logger,
level: System.get_env("LOGGER_LEVEL", "warning") |> String.to_existing_atom()

_ ->
config :logger,
level: System.get_env("LOGGER_LEVEL", "info") |> String.to_existing_atom()
end
1 change: 1 addition & 0 deletions lib/xogmios.ex
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
defmodule Xogmios do
@moduledoc false
end
2 changes: 2 additions & 0 deletions lib/xogmios/application.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
defmodule Xogmios.Application do
@moduledoc false

use Application

@impl true
Expand Down
100 changes: 40 additions & 60 deletions lib/xogmios/chain_sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,35 @@ defmodule Xogmios.ChainSync do
implements the connection with the Websocket server
"""

alias Xogmios.ChainSync.Messages

@callback init(keyword()) :: {:ok, map()}
@callback handle_block(map(), any()) ::
{:ok, :next_block, map()} | {:ok, map()}
{:ok, :next_block, map()} | {:ok, map()} | {:ok, :close, map()}

defmacro __using__(_opts) do
quote do
use WebSockex
@behaviour Xogmios.ChainSync

use WebSockex

require Logger

@name __MODULE__

def init(_opts), do: {:ok, %{}}
defoverridable init: 1

def child_spec(opts) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts]},
shutdown: 5_000,
restart: Keyword.get(opts, :restart, :transient),
type: :worker
}
end

def start_connection(opts),
do: do_start_link(opts)

Expand All @@ -29,15 +47,8 @@ defmodule Xogmios.ChainSync do
{:ok, ws} ->
receive do
{:connected, _connection} ->
start = ~S"""
{
"jsonrpc": "2.0",
"method": "nextBlock",
"id": "start"
}
"""

send_frame(ws, start)
message = Messages.next_block_start()
send_frame(ws, message)

{:ok, ws}
after
Expand All @@ -51,12 +62,6 @@ defmodule Xogmios.ChainSync do
end
end

def init(_opts) do
{:ok, %{}}
end

defoverridable init: 1

def send_frame(connection, frame) do
try do
case WebSockex.send_frame(connection, {:text, frame}) do
Expand All @@ -74,7 +79,7 @@ defmodule Xogmios.ChainSync do
handle_message(message, state)

{:error, error} ->
IO.puts("Error decoding response #{inspect(error)}")
Logger.warning("Error decoding response #{inspect(error)}")
{:close, state}
end
end
Expand All @@ -87,38 +92,20 @@ defmodule Xogmios.ChainSync do
} = _message,
state
) do
IO.puts("Finding intersection...\n")

reply = ~s"""
{
"jsonrpc": "2.0",
"method": "findIntersection",
"params": {
"points": [
{
"slot": #{tip["slot"]},
"id": "#{tip["id"]}"
}
]
}
}
"""
Logger.info("Finding intersection...")

{:reply, {:text, reply}, state}
message = Messages.find_intersection(tip["slot"], tip["id"])

{:reply, {:text, message}, state}
end

defp handle_message(
%{"method" => "nextBlock", "result" => %{"direction" => "backward"}} = _message,
state
) do
reply = ~S"""
{
"jsonrpc": "2.0",
"method": "nextBlock"
}
"""
message = Messages.next_block()

{:reply, {:text, reply}, state}
{:reply, {:text, message}, state}
end

defp handle_message(
Expand All @@ -130,38 +117,31 @@ defmodule Xogmios.ChainSync do

case apply(__MODULE__, :handle_block, [block, state]) do
{:ok, :next_block, new_state} ->
reply = ~S"""
{
"jsonrpc": "2.0",
"method": "nextBlock"
}
"""

{:reply, {:text, reply}, new_state}
message = Messages.next_block()
{:reply, {:text, message}, new_state}

{:ok, new_state} ->
{:ok, new_state}

{:ok, :close, new_state} ->
{:close, new_state}

_ ->
raise "Invalid return type"
end
end

defp handle_message(%{"method" => "findIntersection"}, state) do
IO.puts("Intersection found.\nWaiting for next block...\n")
Logger.info("Intersection found.")
Logger.info("Waiting for next block...")

reply = ~S"""
{
"jsonrpc": "2.0",
"method": "nextBlock"
}
"""
message = Messages.next_block()

{:reply, {:text, reply}, state}
{:reply, {:text, message}, state}
end

defp handle_message(message, state) do
IO.puts("handle message: #{message}")
Logger.info("handle message: #{message}")
{:ok, state}
end

Expand All @@ -171,7 +151,7 @@ defmodule Xogmios.ChainSync do
end

def handle_disconnect(%{reason: {:local, reason}}, state) do
IO.puts("Local close with reason: #{inspect(reason)}")
Logger.info("#{__MODULE__} local close with reason: #{inspect(reason)}")
{:ok, state}
end
end
Expand Down
41 changes: 41 additions & 0 deletions lib/xogmios/chain_sync/messages.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
defmodule Xogmios.ChainSync.Messages do
@moduledoc """
This module returns messages according to the Ogmios API
"""

def next_block_start() do
~S"""
{
"jsonrpc": "2.0",
"method": "nextBlock",
"id": "start"
}
"""
end

def next_block() do
~S"""
{
"jsonrpc": "2.0",
"method": "nextBlock"
}
"""
end

def find_intersection(slot, id) do
~s"""
{
"jsonrpc": "2.0",
"method": "findIntersection",
"params": {
"points": [
{
"slot": #{slot},
"id": "#{id}"
}
]
}
}
"""
end
end
4 changes: 3 additions & 1 deletion lib/xogmios/client_example_a.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ defmodule Xogmios.ClientExampleA do

use Xogmios.ChainSync

require Logger

def start_link(opts),
do: start_connection(opts)

@impl true
def handle_block(block, state) do
IO.puts("#{__MODULE__} handle_block #{block["height"]}")
Logger.info("#{__MODULE__} handle_block #{block["height"]}")
{:ok, :next_block, state}
end
end
10 changes: 6 additions & 4 deletions lib/xogmios/client_example_b.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,26 @@ defmodule Xogmios.ClientExampleB do

use Xogmios.ChainSync

require Logger

def start_link(opts),
do: start_connection(opts)

@impl true
def init(_args) do
{:ok, %{counter: 3}}
{:ok, %{counter: 2}}
end

@impl true
def handle_block(block, %{counter: counter} = state) when counter > 1 do
IO.puts("#{__MODULE__} handle_block #{block["height"]}")
Logger.info("#{__MODULE__} handle_block #{block["height"]}")
new_state = Map.merge(state, %{counter: counter - 1})
{:ok, :next_block, new_state}
end

@impl true
def handle_block(block, state) do
IO.puts("#{__MODULE__} final handle_block #{block["height"]}")
{:ok, state}
Logger.info("#{__MODULE__} final handle_block #{block["height"]}")
{:ok, :close, state}
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ defmodule Xogmios.MixProject do
defp deps do
[
{:cowboy, "~> 2.10", only: :test},
{:credo, "~> 1.7", only: [:dev, :test]},
{:dialyxir, "~> 1.4.1", only: [:dev, :test], runtime: false},
{:jason, "~> 1.4"},
{:mix_audit, "~> 2.1", only: [:dev, :test], runtime: false},
Expand Down
7 changes: 7 additions & 0 deletions test/support/chain_sync/test_handler.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
defmodule ChainSync.TestHandler do
@moduledoc false

@behaviour :cowboy_websocket

@impl true
Expand Down Expand Up @@ -28,6 +30,11 @@ defmodule ChainSync.TestHandler do
end
end

@impl true
def terminate(_arg1, _arg2, _arg3) do
:ok
end

def websocket_info(:stop, state) do
{:stop, state}
end
Expand Down
2 changes: 2 additions & 0 deletions test/support/chain_sync/test_router.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
defmodule ChainSync.TestRouter do
@moduledoc false

use Plug.Router

plug(:match)
Expand Down
2 changes: 2 additions & 0 deletions test/support/chain_sync/test_server.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
defmodule ChainSync.TestServer do
@moduledoc false

@default_port 8989

def get_url(port \\ @default_port) do
Expand Down
Loading

0 comments on commit abffd6c

Please sign in to comment.