Skip to content

Commit

Permalink
maint: Refactor metrics registration to streamline declaration and en…
Browse files Browse the repository at this point in the history
…able easier documentation generation (#1350)

## Which problem is this PR solving?

- part 1 for: #1152 
This PR prepares us to have a consistent metrics registration pattern so
that in a later PR we can use
https://pkg.go.dev/golang.org/x/tools/go/packages to automatically
generate metrics documentation.

- part 2 is implemented in #1351

## Short description of the changes

- Introduce a new struct type `metrics.Metadata` to represent all
information needed for a Refinery metric
- Change `metrics.Register` signature to accept `metrics.Metadata` as
its argument
- Refactor metrics registration calls in each packages so that we
declare metrics we want to register first in a package level variable
first
- Set zero value for each OTel metrics during registration
- set Unit and Description for each OTel metrics
  • Loading branch information
VinozzZ authored Sep 26, 2024
1 parent ebc1aed commit cd77129
Show file tree
Hide file tree
Showing 36 changed files with 525 additions and 311 deletions.
20 changes: 18 additions & 2 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ func (a *App) Start() error {
}

a.Logger.Debug().Logf("Starting up App...")
a.Metrics.Register("config_hash", "gauge")
a.Metrics.Register("rule_config_hash", "gauge")
for _, metric := range configHashMetrics {
a.Metrics.Register(metric)
}
a.IncomingRouter.SetVersion(a.Version)
a.PeerRouter.SetVersion(a.Version)

Expand All @@ -64,3 +65,18 @@ func (a *App) Stop() error {
a.Logger.Debug().Logf("Shutting down App...")
return nil
}

var configHashMetrics = []metrics.Metadata{
metrics.Metadata{
Name: "config_hash",
Type: metrics.Gauge,
Unit: metrics.Dimensionless,
Description: "The hash of the current configuration",
},
metrics.Metadata{
Name: "rule_config_hash",
Type: metrics.Gauge,
Unit: metrics.Dimensionless,
Description: "The hash of the current rules configuration",
},
}
61 changes: 48 additions & 13 deletions cmd/refinery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,19 +307,9 @@ func main() {

// these have to be done after the injection (of metrics)
// these are the metrics that libhoney will emit; we preregister them so that they always appear
libhoneyMetricsName := map[string]string{
"queue_length": "gauge",
"queue_overflow": "counter",
"send_errors": "counter",
"send_retries": "counter",
"batches_sent": "counter",
"messages_sent": "counter",
"response_decode_errors": "counter",
}

for name, typ := range libhoneyMetricsName {
upstreamMetricsRecorder.Register(name, typ)
peerMetricsRecorder.Register(name, typ)
for _, metric := range libhoneyMetrics {
upstreamMetricsRecorder.Register(metric)
peerMetricsRecorder.Register(metric)
}

// Register metrics after the metrics object has been created
Expand Down Expand Up @@ -381,3 +371,48 @@ func main() {
close(monitorDone)
close(sigsToExit)
}

var libhoneyMetrics = []metrics.Metadata{
metrics.Metadata{
Name: "queue_length",
Type: metrics.Gauge,
Unit: metrics.Dimensionless,
Description: "number of events waiting to be sent to destination",
},
metrics.Metadata{
Name: "queue_overflow",
Type: metrics.Counter,
Unit: metrics.Dimensionless,
Description: "number of events dropped due to queue overflow",
},
metrics.Metadata{
Name: "send_errors",
Type: metrics.Counter,
Unit: metrics.Dimensionless,
Description: "number of errors encountered while sending events to destination",
},
metrics.Metadata{
Name: "send_retries",
Type: metrics.Counter,
Unit: metrics.Dimensionless,
Description: "number of times a batch of events was retried",
},
metrics.Metadata{
Name: "batches_sent",
Type: metrics.Counter,
Unit: metrics.Dimensionless,
Description: "number of batches of events sent to destination",
},
metrics.Metadata{
Name: "messages_sent",
Type: metrics.Counter,
Unit: metrics.Dimensionless,
Description: "number of messages sent to destination",
},
metrics.Metadata{
Name: "response_decode_errors",
Type: metrics.Counter,
Unit: metrics.Dimensionless,
Description: "number of errors encountered while decoding responses from destination",
},
}
18 changes: 11 additions & 7 deletions collect/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,30 @@ type DefaultInMemCache struct {

const DefaultInMemCacheCapacity = 10000

var collectCacheMetrics = []metrics.Metadata{
{Name: "collect_cache_buffer_overrun", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "The number of times the trace overwritten in the circular buffer has not yet been sent"},
{Name: "collect_cache_capacity", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "The number of traces that can be stored in the cache"},
{Name: "collect_cache_entries", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "The number of traces currently stored in the cache"},
}

func NewInMemCache(
capacity int,
metrics metrics.Metrics,
met metrics.Metrics,
logger logger.Logger,
) *DefaultInMemCache {
logger.Debug().Logf("Starting DefaultInMemCache")
defer func() { logger.Debug().Logf("Finished starting DefaultInMemCache") }()

// buffer_overrun increments when the trace overwritten in the circular
// buffer has not yet been sent
metrics.Register("collect_cache_buffer_overrun", "counter")
metrics.Register("collect_cache_capacity", "gauge")
metrics.Register("collect_cache_entries", "histogram")
for _, metadata := range collectCacheMetrics {
met.Register(metadata)
}

if capacity == 0 {
capacity = DefaultInMemCacheCapacity
}

return &DefaultInMemCache{
Metrics: metrics,
Metrics: met,
Logger: logger,
cache: make(map[string]*types.Trace, capacity),
traceBuffer: make([]*types.Trace, capacity),
Expand Down
9 changes: 9 additions & 0 deletions collect/cache/cuckoo.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ const (
AddQueueSleepTime = 100 * time.Microsecond
)

var cuckooTraceCheckerMetrics = []metrics.Metadata{
{Name: CurrentCapacity, Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "current capacity of the cuckoo filter"},
{Name: FutureLoadFactor, Type: metrics.Gauge, Unit: metrics.Percent, Description: "the fraction of slots occupied in the future cuckoo filter"},
{Name: CurrentLoadFactor, Type: metrics.Gauge, Unit: metrics.Percent, Description: "the fraction of slots occupied in the current cuckoo filter"},
}

func NewCuckooTraceChecker(capacity uint, m metrics.Metrics) *CuckooTraceChecker {
c := &CuckooTraceChecker{
capacity: capacity,
Expand All @@ -52,6 +58,9 @@ func NewCuckooTraceChecker(capacity uint, m metrics.Metrics) *CuckooTraceChecker
met: m,
addch: make(chan string, AddQueueDepth),
}
for _, metric := range cuckooTraceCheckerMetrics {
m.Register(metric)
}

// To try to avoid blocking on Add, we have a goroutine that pulls from a
// channel and adds to the filter.
Expand Down
8 changes: 7 additions & 1 deletion collect/cache/cuckooSentCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ type cuckooSentCache struct {
// Make sure it implements TraceSentCache
var _ TraceSentCache = (*cuckooSentCache)(nil)

var cuckooSentCacheMetrics = []metrics.Metadata{
{Name: "cache_recent_dropped_traces", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "the current size of the most recent dropped trace cache"},
}

func NewCuckooSentCache(cfg config.SampleCacheConfig, met metrics.Metrics) (TraceSentCache, error) {
stc, err := lru.New[string, *keptTraceCacheEntry](int(cfg.KeptSize))
if err != nil {
Expand All @@ -180,7 +184,9 @@ func NewCuckooSentCache(cfg config.SampleCacheConfig, met metrics.Metrics) (Trac
// request.
recentDroppedIDs := generics.NewSetWithTTL[string](3 * time.Second)

met.Register("cache_recent_dropped_traces", "gauge")
for _, metric := range cuckooSentCacheMetrics {
met.Register(metric)
}

cache := &cuckooSentCache{
met: met,
Expand Down
12 changes: 9 additions & 3 deletions collect/cache/kept_reasons_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@ type KeptReasonsCache struct {
hashSeed uint64
}

var keptReasonCacheMetrics = []metrics.Metadata{
{Name: "collect_sent_reasons_cache_entries", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "Number of entries in the sent reasons cache"},
}

// NewKeptReasonsCache returns a new SentReasonsCache.
func NewKeptReasonsCache(metrics metrics.Metrics) *KeptReasonsCache {
metrics.Register("collect_sent_reasons_cache_entries", "histogram")
func NewKeptReasonsCache(met metrics.Metrics) *KeptReasonsCache {
for _, metric := range keptReasonCacheMetrics {
met.Register(metric)
}

return &KeptReasonsCache{
Metrics: metrics,
Metrics: met,
keys: make(map[uint64]uint32),
hashSeed: rand.Uint64(),
}
Expand Down
72 changes: 38 additions & 34 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,39 @@ type InMemCollector struct {
hostname string
}

var inMemCollectorMetrics = []metrics.Metadata{
{Name: "trace_duration_ms", Type: metrics.Histogram, Unit: metrics.Milliseconds, Description: "time taken to process a trace from arrival to send"},
{Name: "trace_span_count", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of spans in a trace"},
{Name: "collector_incoming_queue", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of spans currently in the incoming queue"},
{Name: "collector_peer_queue_length", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of spans in the peer queue"},
{Name: "collector_incoming_queue_length", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of spans in the incoming queue"},
{Name: "collector_peer_queue", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of spans currently in the peer queue"},
{Name: "collector_cache_size", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of traces currently stored in the trace cache"},
{Name: "memory_heap_allocation", Type: metrics.Gauge, Unit: metrics.Bytes, Description: "current heap allocation"},
{Name: "span_received", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of spans received by the collector"},
{Name: "span_processed", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of spans processed by the collector"},
{Name: "spans_waiting", Type: metrics.UpDown, Unit: metrics.Dimensionless, Description: "number of spans waiting to be processed by the collector"},
{Name: "trace_sent_cache_hit", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of late spans received for traces that have already been sent"},
{Name: "trace_accepted", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of new traces received by the collector"},
{Name: "trace_send_kept", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that has been kept"},
{Name: "trace_send_dropped", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that has been dropped"},
{Name: "trace_send_has_root", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of kept traces that have a root span"},
{Name: "trace_send_no_root", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of kept traces that do not have a root span"},
{Name: "trace_forwarded_on_peer_change", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of traces forwarded due to peer membership change"},
{Name: "trace_redistribution_count", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of traces redistributed due to peer membership change"},
{Name: "trace_send_on_shutdown", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces sent during shutdown"},
{Name: "trace_forwarded_on_shutdown", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces forwarded during shutdown"},

{Name: TraceSendGotRoot, Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that are ready for decision due to root span arrival"},
{Name: TraceSendExpired, Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that are ready for decision due to TraceTimeout or SendDelay"},
{Name: TraceSendSpanLimit, Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that are ready for decision due to span limit"},
{Name: TraceSendEjectedFull, Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that are ready for decision due to cache capacity overrun"},
{Name: TraceSendEjectedMemsize, Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that are ready for decision due to memory overrun"},
{Name: TraceSendLateSpan, Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of spans that are sent due to late span arrival"},

{Name: "dropped_from_stress", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces dropped due to stress relief"},
}

func (i *InMemCollector) Start() error {
i.Logger.Debug().Logf("Starting InMemCollector")
defer func() { i.Logger.Debug().Logf("Finished starting InMemCollector") }()
Expand All @@ -107,39 +140,11 @@ func (i *InMemCollector) Start() error {

i.Health.Register(CollectorHealthKey, 3*time.Second)

i.Metrics.Register("trace_duration_ms", "histogram")
i.Metrics.Register("trace_span_count", "histogram")
i.Metrics.Register("collector_incoming_queue", "histogram")
i.Metrics.Register("collector_peer_queue_length", "gauge")
i.Metrics.Register("collector_incoming_queue_length", "gauge")
i.Metrics.Register("collector_peer_queue", "histogram")
i.Metrics.Register("collector_cache_size", "gauge")
i.Metrics.Register("memory_heap_allocation", "gauge")
i.Metrics.Register("span_received", "counter")
i.Metrics.Register("span_processed", "counter")
i.Metrics.Register("spans_waiting", "updown")
i.Metrics.Register("trace_sent_cache_hit", "counter")
i.Metrics.Register("trace_accepted", "counter")
i.Metrics.Register("trace_send_kept", "counter")
i.Metrics.Register("trace_send_dropped", "counter")
i.Metrics.Register("trace_send_has_root", "counter")
i.Metrics.Register("trace_send_no_root", "counter")
i.Metrics.Register("trace_forwarded_on_peer_change", "gauge")
i.Metrics.Register("trace_redistribution_count", "gauge")
i.Metrics.Register("trace_send_on_shutdown", "counter")
i.Metrics.Register("trace_forwarded_on_shutdown", "counter")

i.Metrics.Register(TraceSendGotRoot, "counter")
i.Metrics.Register(TraceSendExpired, "counter")
i.Metrics.Register(TraceSendSpanLimit, "counter")
i.Metrics.Register(TraceSendEjectedFull, "counter")
i.Metrics.Register(TraceSendEjectedMemsize, "counter")
i.Metrics.Register(TraceSendLateSpan, "counter")
for _, metric := range inMemCollectorMetrics {
i.Metrics.Register(metric)
}

sampleCacheConfig := i.Config.GetSampleCacheConfig()
i.Metrics.Register(cache.CurrentCapacity, "gauge")
i.Metrics.Register(cache.FutureLoadFactor, "gauge")
i.Metrics.Register(cache.CurrentLoadFactor, "gauge")
var err error
i.sampleTraceCache, err = cache.NewCuckooSentCache(sampleCacheConfig, i.Metrics)
if err != nil {
Expand Down Expand Up @@ -1061,19 +1066,18 @@ func (i *InMemCollector) addAdditionalAttributes(sp *types.Span) {
}
}

func newRedistributeNotifier(logger logger.Logger, metrics metrics.Metrics, clock clockwork.Clock) *redistributeNotifier {
func newRedistributeNotifier(logger logger.Logger, met metrics.Metrics, clock clockwork.Clock) *redistributeNotifier {
r := &redistributeNotifier{
initialDelay: 3 * time.Second,
maxDelay: 30 * time.Second,
maxAttempts: 5,
done: make(chan struct{}),
clock: clock,
logger: logger,
metrics: metrics,
metrics: met,
triggered: make(chan struct{}),
reset: make(chan struct{}),
}
r.metrics.Register("trace_redistribution_count", "gauge")

return r
}
Expand Down
14 changes: 10 additions & 4 deletions collect/stressRelief.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ type StressRelief struct {

const StressReliefHealthKey = "stress_relief"

var stressReliefMetrics = []metrics.Metadata{
{Name: "cluster_stress_level", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "The overall stress level of the cluster"},
{Name: "individual_stress_level", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "The stress level of the individual node"},
{Name: "stress_level", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "The stress level that's being used to determine whether to activate stress relief"},
{Name: "stress_relief_activated", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "Whether stress relief is currently activated"},
}

func (s *StressRelief) Start() error {
s.Logger.Debug().Logf("Starting StressRelief system")
defer func() { s.Logger.Debug().Logf("Finished starting StressRelief system") }()
Expand All @@ -115,10 +122,9 @@ func (s *StressRelief) Start() error {
s.Health.Register(StressReliefHealthKey, 3*time.Second)

// register stress level metrics
s.RefineryMetrics.Register("cluster_stress_level", "gauge")
s.RefineryMetrics.Register("individual_stress_level", "gauge")
s.RefineryMetrics.Register("stress_level", "gauge")
s.RefineryMetrics.Register("stress_relief_activated", "gauge")
for _, m := range stressReliefMetrics {
s.RefineryMetrics.Register(m)
}

// We use an algorithms map so that we can name these algorithms, which makes it easier for several things:
// - change our mind about which algorithm to use
Expand Down
15 changes: 12 additions & 3 deletions collect/stress_relief_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ func TestStressRelief_Monitor(t *testing.T) {
defer stop()
require.NoError(t, sr.Start())

sr.RefineryMetrics.Register("collector_incoming_queue_length", "gauge")
sr.RefineryMetrics.Register(metrics.Metadata{
Name: "collector_incoming_queue_length",
Type: metrics.Gauge,
})

sr.RefineryMetrics.Store("INCOMING_CAP", 1200)

Expand Down Expand Up @@ -81,7 +84,10 @@ func TestStressRelief_Peer(t *testing.T) {
defer stop()
require.NoError(t, sr.Start())

sr.RefineryMetrics.Register("collector_incoming_queue_length", "gauge")
sr.RefineryMetrics.Register(metrics.Metadata{
Name: "collector_incoming_queue_length",
Type: metrics.Gauge,
})

sr.RefineryMetrics.Store("INCOMING_CAP", 1200)

Expand Down Expand Up @@ -139,7 +145,10 @@ func TestStressRelief_OverallStressLevel(t *testing.T) {
sr.disableStressLevelReport = true
sr.Start()

sr.RefineryMetrics.Register("collector_incoming_queue_length", "gauge")
sr.RefineryMetrics.Register(metrics.Metadata{
Name: "collector_incoming_queue_length",
Type: metrics.Gauge,
})

sr.RefineryMetrics.Store("INCOMING_CAP", 1200)

Expand Down
8 changes: 8 additions & 0 deletions internal/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ type Health struct {
Reporter
}

var healthMetrics = []metrics.Metadata{
{Name: "is_ready", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "Whether the system is ready to receive traffic"},
{Name: "is_alive", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "Whether the system is alive and reporting in"},
}

func (h *Health) Start() error {
// if we don't have a logger or metrics object, we'll use the null ones (makes testing easier)
if h.Logger == nil {
Expand All @@ -76,6 +81,9 @@ func (h *Health) Start() error {
if h.Metrics == nil {
h.Metrics = &metrics.NullMetrics{}
}
for _, metric := range healthMetrics {
h.Metrics.Register(metric)
}
h.timeouts = make(map[string]time.Duration)
h.timeLeft = make(map[string]time.Duration)
h.readies = make(map[string]bool)
Expand Down
Loading

0 comments on commit cd77129

Please sign in to comment.