Skip to content

Commit

Permalink
Use cache instead of map in the plugin promwrapper (#14800)
Browse files Browse the repository at this point in the history
* use cache

* changed interval

* bump

* replace all maps to cache

* changeset

* fix tests

* addressing comments

* key formatting
  • Loading branch information
0xnogo committed Nov 4, 2024
1 parent 8e65aa8 commit ca4b33a
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 43 deletions.
5 changes: 5 additions & 0 deletions .changeset/tricky-candles-matter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#bugfix Memory leak fix on promwrapper
64 changes: 40 additions & 24 deletions core/services/ocr2/plugins/promwrapper/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,23 @@ import (
"context"
"fmt"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/patrickmn/go-cache"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
)

const (
// defaultExpiration is the default expiration time for cache items.
defaultExpiration = 30 * time.Minute

// defaultCleanupInterval is the default interval for cache cleanup.
defaultCleanupInterval = 5 * time.Minute
)

// Type assertions, buckets and labels.
var (
_ types.ReportingPlugin = &promPlugin{}
Expand Down Expand Up @@ -160,10 +168,10 @@ type (
chainID *big.Int
oracleID string
configDigest string
queryEndTimes sync.Map
observationEndTimes sync.Map
reportEndTimes sync.Map
acceptFinalizedReportEndTimes sync.Map
queryEndTimes *cache.Cache
observationEndTimes *cache.Cache
reportEndTimes *cache.Cache
acceptFinalizedReportEndTimes *cache.Cache
prometheusBackend PrometheusBackend
}
)
Expand Down Expand Up @@ -223,13 +231,17 @@ func New(
}

return &promPlugin{
wrapped: plugin,
name: name,
chainType: chainType,
chainID: chainID,
oracleID: fmt.Sprintf("%d", config.OracleID),
configDigest: common.Bytes2Hex(config.ConfigDigest[:]),
prometheusBackend: prometheusBackend,
wrapped: plugin,
name: name,
chainType: chainType,
chainID: chainID,
oracleID: fmt.Sprintf("%d", config.OracleID),
configDigest: common.Bytes2Hex(config.ConfigDigest[:]),
prometheusBackend: prometheusBackend,
queryEndTimes: cache.New(defaultExpiration, defaultCleanupInterval),
observationEndTimes: cache.New(defaultExpiration, defaultCleanupInterval),
reportEndTimes: cache.New(defaultExpiration, defaultCleanupInterval),
acceptFinalizedReportEndTimes: cache.New(defaultExpiration, defaultCleanupInterval),
}
}

Expand All @@ -238,7 +250,7 @@ func (p *promPlugin) Query(ctx context.Context, timestamp types.ReportTimestamp)
defer func() {
duration := float64(time.Now().UTC().Sub(start))
p.prometheusBackend.SetQueryDuration(getLabelsValues(p, timestamp), duration)
p.queryEndTimes.Store(timestamp, time.Now().UTC()) // note time at end of Query()
p.setEndTime(timestamp, p.queryEndTimes) // note time at end of Query()
}()

return p.wrapped.Query(ctx, timestamp)
Expand All @@ -249,17 +261,16 @@ func (p *promPlugin) Observation(ctx context.Context, timestamp types.ReportTime

// Report latency between Query() and Observation().
labelValues := getLabelsValues(p, timestamp)
if queryEndTime, ok := p.queryEndTimes.Load(timestamp); ok {
if queryEndTime, ok := p.queryEndTimes.Get(timestampToKey(timestamp)); ok {
latency := float64(start.Sub(queryEndTime.(time.Time)))
p.prometheusBackend.SetQueryToObservationLatency(labelValues, latency)
p.queryEndTimes.Delete(timestamp)
}

// Report latency for Observation() at end of call.
defer func() {
duration := float64(time.Now().UTC().Sub(start))
p.prometheusBackend.SetObservationDuration(labelValues, duration)
p.observationEndTimes.Store(timestamp, time.Now().UTC()) // note time at end of Observe()
p.setEndTime(timestamp, p.observationEndTimes) // note time at end of Observe()
}()

return p.wrapped.Observation(ctx, timestamp, query)
Expand All @@ -270,17 +281,16 @@ func (p *promPlugin) Report(ctx context.Context, timestamp types.ReportTimestamp

// Report latency between Observation() and Report().
labelValues := getLabelsValues(p, timestamp)
if observationEndTime, ok := p.observationEndTimes.Load(timestamp); ok {
if observationEndTime, ok := p.observationEndTimes.Get(timestampToKey(timestamp)); ok {
latency := float64(start.Sub(observationEndTime.(time.Time)))
p.prometheusBackend.SetObservationToReportLatency(labelValues, latency)
p.observationEndTimes.Delete(timestamp)
}

// Report latency for Report() at end of call.
defer func() {
duration := float64(time.Now().UTC().Sub(start))
p.prometheusBackend.SetReportDuration(labelValues, duration)
p.reportEndTimes.Store(timestamp, time.Now().UTC()) // note time at end of Report()
p.setEndTime(timestamp, p.reportEndTimes) // note time at end of Report()
}()

return p.wrapped.Report(ctx, timestamp, query, observations)
Expand All @@ -291,17 +301,16 @@ func (p *promPlugin) ShouldAcceptFinalizedReport(ctx context.Context, timestamp

// Report latency between Report() and ShouldAcceptFinalizedReport().
labelValues := getLabelsValues(p, timestamp)
if reportEndTime, ok := p.reportEndTimes.Load(timestamp); ok {
if reportEndTime, ok := p.reportEndTimes.Get(timestampToKey(timestamp)); ok {
latency := float64(start.Sub(reportEndTime.(time.Time)))
p.prometheusBackend.SetReportToAcceptFinalizedReportLatency(labelValues, latency)
p.reportEndTimes.Delete(timestamp)
}

// Report latency for ShouldAcceptFinalizedReport() at end of call.
defer func() {
duration := float64(time.Now().UTC().Sub(start))
p.prometheusBackend.SetShouldAcceptFinalizedReportDuration(labelValues, duration)
p.acceptFinalizedReportEndTimes.Store(timestamp, time.Now().UTC()) // note time at end of ShouldAcceptFinalizedReport()
p.setEndTime(timestamp, p.acceptFinalizedReportEndTimes) // note time at end of ShouldAcceptFinalizedReport()
}()

return p.wrapped.ShouldAcceptFinalizedReport(ctx, timestamp, report)
Expand All @@ -312,10 +321,9 @@ func (p *promPlugin) ShouldTransmitAcceptedReport(ctx context.Context, timestamp

// Report latency between ShouldAcceptFinalizedReport() and ShouldTransmitAcceptedReport().
labelValues := getLabelsValues(p, timestamp)
if acceptFinalizedReportEndTime, ok := p.acceptFinalizedReportEndTimes.Load(timestamp); ok {
if acceptFinalizedReportEndTime, ok := p.acceptFinalizedReportEndTimes.Get(timestampToKey(timestamp)); ok {
latency := float64(start.Sub(acceptFinalizedReportEndTime.(time.Time)))
p.prometheusBackend.SetAcceptFinalizedReportToTransmitAcceptedReportLatency(labelValues, latency)
p.acceptFinalizedReportEndTimes.Delete(timestamp)
}

defer func() {
Expand Down Expand Up @@ -343,3 +351,11 @@ func (p *promPlugin) Close() error {

return p.wrapped.Close()
}

func (p *promPlugin) setEndTime(timestamp types.ReportTimestamp, cache *cache.Cache) {
cache.SetDefault(timestampToKey(timestamp), time.Now().UTC())
}

func timestampToKey(timestamp types.ReportTimestamp) string {
return fmt.Sprintf("%x_%d_%d", timestamp.ConfigDigest[:], timestamp.Epoch, timestamp.Round)
}
30 changes: 11 additions & 19 deletions core/services/ocr2/plugins/promwrapper/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ func TestPlugin_MustInstantiate(t *testing.T) {
// Ensure instantiation without panic for no override backend.
var reportingPlugin = &fakeReportingPlugin{}
promPlugin := New(reportingPlugin, "test", "EVM", big.NewInt(1), types.ReportingPluginConfig{}, nil)
require.NotEqual(t, nil, promPlugin)
require.NotNil(t, promPlugin)

// Ensure instantiation without panic for override provided.
backend := mocks.NewPrometheusBackend(t)
promPlugin = New(reportingPlugin, "test-2", "EVM", big.NewInt(1), types.ReportingPluginConfig{}, backend)
require.NotEqual(t, nil, promPlugin)
require.NotNil(t, promPlugin)
}

func TestPlugin_GetLatencies(t *testing.T) {
Expand Down Expand Up @@ -194,45 +194,37 @@ func TestPlugin_GetLatencies(t *testing.T) {
types.ReportingPluginConfig{ConfigDigest: reportTimestamp.ConfigDigest},
backend,
).(*promPlugin)
require.NotEqual(t, nil, promPlugin)
require.NotNil(t, promPlugin)

ctx := testutils.Context(t)

// Run OCR methods.
_, err := promPlugin.Query(ctx, reportTimestamp)
require.NoError(t, err)
_, ok := promPlugin.queryEndTimes.Load(reportTimestamp)
require.Equal(t, true, ok)
_, ok := promPlugin.queryEndTimes.Get(timestampToKey(reportTimestamp))
require.True(t, ok)
time.Sleep(qToOLatency)

_, err = promPlugin.Observation(ctx, reportTimestamp, nil)
require.NoError(t, err)
_, ok = promPlugin.queryEndTimes.Load(reportTimestamp)
require.Equal(t, false, ok)
_, ok = promPlugin.observationEndTimes.Load(reportTimestamp)
require.Equal(t, true, ok)
_, ok = promPlugin.observationEndTimes.Get(timestampToKey(reportTimestamp))
require.True(t, ok)
time.Sleep(oToRLatency)

_, _, err = promPlugin.Report(ctx, reportTimestamp, nil, nil)
require.NoError(t, err)
_, ok = promPlugin.observationEndTimes.Load(reportTimestamp)
require.Equal(t, false, ok)
_, ok = promPlugin.reportEndTimes.Load(reportTimestamp)
require.Equal(t, true, ok)
_, ok = promPlugin.reportEndTimes.Get(timestampToKey(reportTimestamp))
require.True(t, ok)
time.Sleep(rToALatency)

_, err = promPlugin.ShouldAcceptFinalizedReport(ctx, reportTimestamp, nil)
require.NoError(t, err)
_, ok = promPlugin.reportEndTimes.Load(reportTimestamp)
require.Equal(t, false, ok)
_, ok = promPlugin.acceptFinalizedReportEndTimes.Load(reportTimestamp)
require.Equal(t, true, ok)
_, ok = promPlugin.acceptFinalizedReportEndTimes.Get(timestampToKey(reportTimestamp))
require.True(t, ok)
time.Sleep(aToTLatency)

_, err = promPlugin.ShouldTransmitAcceptedReport(ctx, reportTimestamp, nil)
require.NoError(t, err)
_, ok = promPlugin.acceptFinalizedReportEndTimes.Load(reportTimestamp)
require.Equal(t, false, ok)

// Close.
err = promPlugin.Close()
Expand Down

0 comments on commit ca4b33a

Please sign in to comment.