Skip to content

Commit

Permalink
include a logger exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
the-mikedavis committed Jun 15, 2021
1 parent 7aab90f commit 848ac09
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 36 deletions.
66 changes: 66 additions & 0 deletions lib/beeline/health_checker/logger.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
defmodule Beeline.HealthChecker.Logger do
@moduledoc """
A Task to attach a logger exporter for health-checker telemetry
Attaches a telemetry handler which writes out how far a producer is behind
the head of the stream to the logger.
This task can be started in a supervision tree such as an app's application
supervision tree:
```elixir
def start(_type, _args) do
children = [
Beeline.HealthChecker.Logger,
MyApp.MyBeelineTopology
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
```
The log messages are 'info' level in a format of the producer name
concatenated with "is caught up." if the producer's current stream position
matches the latest available stream position and a 'warn' level message
with "is behind: n events." when the producer is behind, with `n` being
the number of events. The log messages also include metadata fields
`:event_listener` - the name of the producer - and `delta`: the number
of events by which the producer is behind.
"""

use Task
require Logger

@doc false
def start_link(_opts) do
Task.start_link(__MODULE__, :attach, [])
end

@doc false
def attach do
:telemetry.attach(
"beeline-health-checker-logger",
[:beeline, :health_check, :stop],
&__MODULE__.handle_event/4,
:ok
)
end

@doc false
def handle_event(_event, _measurement, metadata, state) do
producer = inspect(metadata[:producer])
delta = metadata[:head_position] - metadata[:current_position]
log_metadata = [delta: delta, event_listener: producer]

if delta == 0 do
Logger.info("#{producer} is caught up.", log_metadata)
else
# coveralls-ignore-start
Logger.warn("#{producer} is behind: #{delta} events.", log_metadata)

# coveralls-ignore-stop
end

state
end
end
82 changes: 46 additions & 36 deletions test/beeline/good_handler_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ defmodule Beeline.GoodHandlerTest do

use ExUnit.Case, async: true

import ExUnit.CaptureLog

@moduletag :capture_log

alias Beeline.Fixtures.{
Expand All @@ -23,7 +25,8 @@ defmodule Beeline.GoodHandlerTest do
setup do
[
grpc_stream_name: "Beeline.Test-#{Spear.Event.uuid_v4()}",
tcp_stream_name: "Beeline.Test-#{Spear.Event.uuid_v4()}"
tcp_stream_name: "Beeline.Test-#{Spear.Event.uuid_v4()}",
self: self()
]
end

Expand All @@ -45,51 +48,58 @@ defmodule Beeline.GoodHandlerTest do

test "the consumer will store these events in state and increment stream positions",
c do
self_pid = self()
_exporter = start_supervised!(Beeline.HealthChecker.Logger)

:telemetry.attach(
"beeline-health-checker-tester",
[:beeline, :health_check, :stop],
fn event, measurements, metadata, _state ->
send(self_pid, {:health_check, event, measurements, metadata})
send(c.self, {:health_check, event, measurements, metadata})
end,
:ok
)

start_supervised!(
{GoodHandler,
grpc_stream_name: c.grpc_stream_name,
tcp_stream_name: c.tcp_stream_name}
)

spawn(fn -> check_position(GoodHandler.Producer_grpc, 3, self_pid) end)
spawn(fn -> check_position(GoodHandler.Producer_tcp, 3, self_pid) end)

assert_receive {:done, GoodHandler.Producer_grpc}, 2000
assert_receive {:done, GoodHandler.Producer_tcp}, 2000

assert GoodHandler.get_stream_position(GoodHandler.Producer_grpc) == 2
assert GoodHandler.get_stream_position(GoodHandler.Producer_tcp) == 2

state = GoodHandler.get_state()

assert length(state.events) == 6

assert_receive {:health_check, _event, _measurements,
%{
current_position: 2,
head_position: 2,
producer: GoodHandler.Producer_tcp
}},
2_000
log =
capture_log([level: :info], fn ->
start_supervised!(
{GoodHandler,
grpc_stream_name: c.grpc_stream_name,
tcp_stream_name: c.tcp_stream_name}
)

spawn(fn -> check_position(GoodHandler.Producer_grpc, 3, c.self) end)
spawn(fn -> check_position(GoodHandler.Producer_tcp, 3, c.self) end)

assert_receive {:done, GoodHandler.Producer_grpc}, 2000
assert_receive {:done, GoodHandler.Producer_tcp}, 2000

assert GoodHandler.get_stream_position(GoodHandler.Producer_grpc) == 2
assert GoodHandler.get_stream_position(GoodHandler.Producer_tcp) == 2

state = GoodHandler.get_state()

assert length(state.events) == 6

assert_receive {:health_check, _event, _measurements,
%{
current_position: 2,
head_position: 2,
producer: GoodHandler.Producer_tcp
}},
2_000

assert_receive {:health_check, _event, _measurements,
%{
current_position: 2,
head_position: 2,
producer: GoodHandler.Producer_grpc
}},
2_000
end)

assert_receive {:health_check, _event, _measurements,
%{
current_position: 2,
head_position: 2,
producer: GoodHandler.Producer_grpc
}},
2_000
assert log =~ inspect(GoodHandler.Producer_tcp)
assert log =~ inspect(GoodHandler.Producer_grpc)
assert log =~ "caught up"
end
end

Expand Down

0 comments on commit 848ac09

Please sign in to comment.