Skip to content

Commit

Permalink
better stats engine
Browse files Browse the repository at this point in the history
  • Loading branch information
dpep committed Oct 30, 2023
1 parent cedc673 commit a9f4f9b
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 28 deletions.
28 changes: 26 additions & 2 deletions lib/network_resiliency.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ module Adapter

extend self

attr_accessor :statsd
attr_accessor :statsd, :redis

def configure
yield self
yield self if block_given?

start_syncing if redis
end

def patch(*adapters)
Expand Down Expand Up @@ -99,17 +101,39 @@ def record(adapter:, action:, destination:, duration:, error: nil)
error: error,
}.compact,
)

key = [ adapter, action, destination ].join(":")
StatsEngine.add(key, duration)
rescue => e
warn "[ERROR] NetworkResiliency: #{e.class}: #{e.message}"
end

def reset
@enabled = nil
Thread.current["network_resiliency"] = nil
StatsEngine.reset
@sync_worker.kill if @sync_worker
end

private

def thread_state
Thread.current["network_resiliency"] ||= {}
end

def start_syncing
@sync_worker.kill if @sync_worker

raise "Redis not configured" unless redis

@sync_worker = Thread.new do
while true do
StatsEngine.sync(redis)

sleep(3)
end
rescue Interrupt
# goodbye
end
end
end
63 changes: 52 additions & 11 deletions lib/network_resiliency/stats_engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module StatsEngine

LOCK = Thread::Mutex.new
STATS = {}
SYNC_LIMIT = 100

def add(key, value)
local, _ = synchronize do
Expand All @@ -14,37 +15,77 @@ def add(key, value)
end

def get(key)
local, remote = synchronize { STATS[key] }
local, remote = synchronize do
STATS[key] ||= [ Stats.new, Stats.new ]
end

local && remote ? (local + remote) : Stats.new
local + remote
end

def reset
synchronize { STATS.clear }
end

def sync(redis, keys)
def sync(redis)
dirty_keys = {}

# select data to be synced
data = synchronize do
dirty_keys = STATS.map do |key, (local, remote)|
# skip if no new local stats and remote already synced
next if local.n == 0 && remote.n > 0

[ key, local.n ]
end.compact.to_h

# select keys to sync, prioritizing most used
keys = dirty_keys.sort_by do |key, weight|
-weight
end.take(SYNC_LIMIT).map(&:first)

# update stats for keys being synced
keys.map do |key|
local, remote = STATS[key]
next unless local && remote
next unless local.n > 0

remote << local
STATS[key] = [ Stats.new, remote ]
remote << local # update remote stats until sync completes
STATS[key][0] = Stats.new # reset local stats

[ key, local ]
end.compact.to_h
end.to_h
end

NetworkResiliency.statsd&.distribution(
"network_resiliency.sync.keys",
data.size,
tags: {
empty: data.empty?,
truncated: data.size < dirty_keys.size,
}.select { |_, v| v },
)

NetworkResiliency.statsd&.distribution(
"network_resiliency.sync.keys.dirty",
dirty_keys.select { |_, n| n > 0 }.count,
)

return [] if data.empty?

# sync data to redis
remote_stats = Stats.sync(redis, **data)
remote_stats = if NetworkResiliency.statsd
NetworkResiliency.statsd&.time("network_resiliency.sync") do
Stats.sync(NetworkResiliency.redis, **data)
end
else
Stats.sync(NetworkResiliency.redis, **data)
end

# integrate remote results
# integrate new remote stats
synchronize do
remote_stats.each do |key, stats|
local, remote = STATS[key]
STATS[key] = [ local, stats ]

remote.reset
remote << stats
end
end

Expand Down
57 changes: 57 additions & 0 deletions spec/network_resiliency_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -246,5 +246,62 @@ def expect_enabled
is_expected.not_to have_received(:distribution)
end
end

context "when errors arise" do
before do
allow(NetworkResiliency::StatsEngine).to receive(:add).and_raise
end

it "warns, but don't explode" do
expect { subject }.to output(/ERROR/).to_stderr
end
end

context "when Datadog is not configured" do
before { NetworkResiliency.statsd = nil }

it "still works" do
expect(NetworkResiliency::StatsEngine).to receive(:add).with(String, duration)
subject
end
end
end

describe ".start_syncing" do
before do
# mocking not supported in Threads
NetworkResiliency.statsd = nil

# unstub from spec_helper
allow(NetworkResiliency).to receive(:start_syncing).and_call_original
end

context "when Redis is configured" do
it { expect(NetworkResiliency.redis).to be }

it "will be called by .configure" do
NetworkResiliency.configure
expect(NetworkResiliency).to have_received(:start_syncing)
end
end

it "can be called many times without error" do
3.times { NetworkResiliency.send :start_syncing }
end

context "when Redis is not configured" do
before { NetworkResiliency.redis = nil }

it "raises an error" do
expect {
NetworkResiliency.send :start_syncing
}.to raise_error(/Redis/)
end

it "does not get called from .configure" do
NetworkResiliency.configure
expect(NetworkResiliency).not_to have_received(:start_syncing)
end
end
end
end
8 changes: 7 additions & 1 deletion spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,17 @@
config.before do
NetworkResiliency.reset

NetworkResiliency.redis = Redis.new

NetworkResiliency.statsd = instance_double(Datadog::Statsd)
allow(NetworkResiliency.statsd).to receive(:distribution)
allow(NetworkResiliency.statsd).to receive(:time).and_yield

# disable background sync
allow(NetworkResiliency).to receive(:start_syncing)

# since Timecop doesn't work with Process.clock_gettime
allow(Process).to receive(:clock_gettime).and_return(*(1..10))
allow(Process).to receive(:clock_gettime).and_return(*(1..1_000))

Redis.new.flushall rescue nil
end
Expand Down
Loading

0 comments on commit a9f4f9b

Please sign in to comment.