Skip to content

Commit

Permalink
DF-20372 (pt 2): Update Streams PluginConfig checks and FeedID parsing (
Browse files Browse the repository at this point in the history
#14504)

* Update Streams PluginConfig checks and FeedID parsing

* Update changeset comment

* Linting fixes

* Update v4/data_source_test.go

* Pull out duplicate code into helper
  • Loading branch information
austinborn authored Sep 20, 2024
1 parent 85a8d09 commit 10f7aab
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 54 deletions.
7 changes: 7 additions & 0 deletions .changeset/thirty-emus-enjoy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"chainlink": minor
---

#bugfix Fix potential nil ptr reference for LinkFeedID and NativeFeedID in Mercury specs
#bugfix Ensure Streams PluginConfig is checked for contents correctly when validated
#changed New Feed IDs with 0x01 prefix can be parsed for Mercury report schemas
4 changes: 2 additions & 2 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,10 +889,10 @@ func (d *Delegate) newServicesMercury(
}

var telemetryType synchronization.TelemetryType
if relayConfig.EnableTriggerCapability && jb.OCR2OracleSpec.PluginConfig == nil {
if relayConfig.EnableTriggerCapability && len(jb.OCR2OracleSpec.PluginConfig) == 0 {
telemetryType = synchronization.OCR3DataFeeds
// First use case for TriggerCapability transmission is Data Feeds, so telemetry should be routed accordingly.
// This is only true if TriggerCapability is the *only* transmission method (PluginConfig == nil).
// This is only true if TriggerCapability is the *only* transmission method (PluginConfig is empty).
} else {
telemetryType = synchronization.OCR3Mercury
}
Expand Down
30 changes: 23 additions & 7 deletions core/services/ocr2/plugins/mercury/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func NewServices(

var err error
var pluginConfig config.PluginConfig
if jb.OCR2OracleSpec.PluginConfig == nil {
if len(jb.OCR2OracleSpec.PluginConfig) == 0 {
if !enableTriggerCapability {
return nil, fmt.Errorf("at least one transmission option must be configured")
}
Expand Down Expand Up @@ -180,10 +180,22 @@ type factoryCfg struct {
feedID utils.FeedID
}

func getPluginFeedIDs(pluginConfig config.PluginConfig) (linkFeedID utils.FeedID, nativeFeedID utils.FeedID) {
if pluginConfig.LinkFeedID != nil {
linkFeedID = *pluginConfig.LinkFeedID
}
if pluginConfig.NativeFeedID != nil {
nativeFeedID = *pluginConfig.NativeFeedID
}
return linkFeedID, nativeFeedID
}

func newv4factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.ServiceCtx, error) {
var factory ocr3types.MercuryPluginFactory
srvs := make([]job.ServiceCtx, 0)

linkFeedID, nativeFeedID := getPluginFeedIDs(factoryCfg.reportingPluginConfig)

ds := mercuryv4.NewDataSource(
factoryCfg.orm,
factoryCfg.pipelineRunner,
Expand All @@ -194,8 +206,8 @@ func newv4factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
factoryCfg.saver,
factoryCfg.chEnhancedTelem,
factoryCfg.ocr2Provider.MercuryServerFetcher(),
*factoryCfg.reportingPluginConfig.LinkFeedID,
*factoryCfg.reportingPluginConfig.NativeFeedID,
linkFeedID,
nativeFeedID,
)

loopCmd := env.MercuryPlugin.Cmd.Get()
Expand All @@ -221,6 +233,8 @@ func newv3factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
var factory ocr3types.MercuryPluginFactory
srvs := make([]job.ServiceCtx, 0)

linkFeedID, nativeFeedID := getPluginFeedIDs(factoryCfg.reportingPluginConfig)

ds := mercuryv3.NewDataSource(
factoryCfg.orm,
factoryCfg.pipelineRunner,
Expand All @@ -231,8 +245,8 @@ func newv3factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
factoryCfg.saver,
factoryCfg.chEnhancedTelem,
factoryCfg.ocr2Provider.MercuryServerFetcher(),
*factoryCfg.reportingPluginConfig.LinkFeedID,
*factoryCfg.reportingPluginConfig.NativeFeedID,
linkFeedID,
nativeFeedID,
)

loopCmd := env.MercuryPlugin.Cmd.Get()
Expand All @@ -258,6 +272,8 @@ func newv2factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
var factory ocr3types.MercuryPluginFactory
srvs := make([]job.ServiceCtx, 0)

linkFeedID, nativeFeedID := getPluginFeedIDs(factoryCfg.reportingPluginConfig)

ds := mercuryv2.NewDataSource(
factoryCfg.orm,
factoryCfg.pipelineRunner,
Expand All @@ -268,8 +284,8 @@ func newv2factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
factoryCfg.saver,
factoryCfg.chEnhancedTelem,
factoryCfg.ocr2Provider.MercuryServerFetcher(),
*factoryCfg.reportingPluginConfig.LinkFeedID,
*factoryCfg.reportingPluginConfig.NativeFeedID,
linkFeedID,
nativeFeedID,
)

loopCmd := env.MercuryPlugin.Cmd.Get()
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocr2/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func validateOCR2MercurySpec(spec *job.OCR2OracleSpec, feedID [32]byte) error {
return pkgerrors.Wrap(err, "error while unmarshalling relay config")
}

if spec.PluginConfig == nil {
if len(spec.PluginConfig) == 0 {
if !relayConfig.EnableTriggerCapability {
return pkgerrors.Wrap(err, "at least one transmission option must be configured")
}
Expand Down
3 changes: 3 additions & 0 deletions core/services/relay/evm/mercury/utils/feeds.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ func (f *FeedID) UnmarshalText(input []byte) error {
func (f FeedID) Version() FeedVersion {
if _, exists := legacyV1FeedIDM[f]; exists {
return REPORT_V1
} else if f[0] == 0x01 { // Keystone Feed IDs
return FeedVersion(binary.BigEndian.Uint16(f[5:7]))
}

return FeedVersion(binary.BigEndian.Uint16(f[:2]))
}

Expand Down
55 changes: 38 additions & 17 deletions core/services/relay/evm/mercury/utils/feeds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,48 @@ import (
)

var (
v1FeedId = (FeedID)([32]uint8{00, 01, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114})
v2FeedId = (FeedID)([32]uint8{00, 02, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114})
v3FeedId = (FeedID)([32]uint8{00, 03, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114})
v1FeedID = (FeedID)([32]uint8{00, 01, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114})
v2FeedID = (FeedID)([32]uint8{00, 02, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114})
v3FeedID = (FeedID)([32]uint8{00, 03, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114})
keystonev2Feed = (FeedID)([32]uint8{01, 12, 34, 56, 78, 00, 02, 04, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00})
keystonev3Feed = (FeedID)([32]uint8{01, 12, 34, 56, 78, 00, 03, 04, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00})
keystonev4Feed = (FeedID)([32]uint8{01, 12, 34, 56, 78, 00, 04, 04, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00})
)

func Test_FeedID_Version(t *testing.T) {
t.Run("versioned feed ID", func(t *testing.T) {
assert.Equal(t, REPORT_V1, v1FeedId.Version())
assert.True(t, v1FeedId.IsV1())
assert.False(t, v1FeedId.IsV2())
assert.False(t, v1FeedId.IsV3())

assert.Equal(t, REPORT_V2, v2FeedId.Version())
assert.False(t, v2FeedId.IsV1())
assert.True(t, v2FeedId.IsV2())
assert.False(t, v2FeedId.IsV3())

assert.Equal(t, REPORT_V3, v3FeedId.Version())
assert.False(t, v3FeedId.IsV1())
assert.False(t, v3FeedId.IsV2())
assert.True(t, v3FeedId.IsV3())
assert.Equal(t, REPORT_V1, v1FeedID.Version())
assert.True(t, v1FeedID.IsV1())
assert.False(t, v1FeedID.IsV2())
assert.False(t, v1FeedID.IsV3())

assert.Equal(t, REPORT_V2, v2FeedID.Version())
assert.False(t, v2FeedID.IsV1())
assert.True(t, v2FeedID.IsV2())
assert.False(t, v2FeedID.IsV3())

assert.Equal(t, REPORT_V3, v3FeedID.Version())
assert.False(t, v3FeedID.IsV1())
assert.False(t, v3FeedID.IsV2())
assert.True(t, v3FeedID.IsV3())

assert.Equal(t, REPORT_V2, keystonev2Feed.Version())
assert.False(t, keystonev2Feed.IsV1())
assert.True(t, keystonev2Feed.IsV2())
assert.False(t, keystonev2Feed.IsV3())
assert.False(t, keystonev2Feed.IsV4())

assert.Equal(t, REPORT_V3, keystonev3Feed.Version())
assert.False(t, keystonev3Feed.IsV1())
assert.False(t, keystonev3Feed.IsV2())
assert.True(t, keystonev3Feed.IsV3())
assert.False(t, keystonev3Feed.IsV4())

assert.Equal(t, REPORT_V4, keystonev4Feed.Version())
assert.False(t, keystonev4Feed.IsV1())
assert.False(t, keystonev4Feed.IsV2())
assert.False(t, keystonev4Feed.IsV3())
assert.True(t, keystonev4Feed.IsV4())
})
t.Run("legacy special cases", func(t *testing.T) {
for _, feedID := range legacyV1FeedIDs {
Expand Down
12 changes: 2 additions & 10 deletions core/services/relay/evm/mercury/v2/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package v2

import (
"context"
"encoding/json"
"fmt"
"math/big"
"sync"
Expand All @@ -23,7 +22,6 @@ import (
mercurytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/types"
mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v2/reportcodec"
relayTypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

Expand Down Expand Up @@ -63,12 +61,6 @@ func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec

func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedTimestamp bool) (obs v2types.Observation, pipelineExecutionErr error) {
var wg sync.WaitGroup
var relayConfig relayTypes.RelayConfig
err := json.Unmarshal(ds.jb.OCR2OracleSpec.RelayConfig.Bytes(), &relayConfig)
if err != nil {
pipelineExecutionErr = fmt.Errorf("failed to deserialize relay config: %w", err)
return
}
ctx, cancel := context.WithCancel(ctx)

if fetchMaxFinalizedTimestamp {
Expand Down Expand Up @@ -116,7 +108,7 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam
}()

var isLink, isNative bool
if ds.jb.OCR2OracleSpec.PluginConfig == nil {
if len(ds.jb.OCR2OracleSpec.PluginConfig) == 0 {
obs.LinkPrice.Val = v2.MissingPrice
} else if ds.feedID == ds.linkFeedID {
isLink = true
Expand All @@ -136,7 +128,7 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam
}()
}

if ds.jb.OCR2OracleSpec.PluginConfig == nil {
if len(ds.jb.OCR2OracleSpec.PluginConfig) == 0 {
obs.NativePrice.Val = v2.MissingPrice
} else if ds.feedID == ds.nativeFeedID {
isNative = true
Expand Down
4 changes: 2 additions & 2 deletions core/services/relay/evm/mercury/v2/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,15 +284,15 @@ func Test_Datasource(t *testing.T) {
assert.EqualError(t, obs.NativePrice.Err, "some error fetching native price")
})

t.Run("when PluginConfig=nil skips fetching link and native prices", func(t *testing.T) {
t.Run("when PluginConfig is empty", func(t *testing.T) {
t.Cleanup(func() {
ds.jb = jb
})

fetcher.linkPriceErr = errors.New("some error fetching link price")
fetcher.nativePriceErr = errors.New("some error fetching native price")

ds.jb.OCR2OracleSpec.PluginConfig = nil
ds.jb.OCR2OracleSpec.PluginConfig = job.JSONConfig{}

obs, err := ds.Observe(ctx, repts, false)
assert.NoError(t, err)
Expand Down
12 changes: 2 additions & 10 deletions core/services/relay/evm/mercury/v3/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package v3

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
Expand All @@ -23,7 +22,6 @@ import (
mercurytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/types"
mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/reportcodec"
relayTypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

Expand Down Expand Up @@ -65,12 +63,6 @@ func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec

func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedTimestamp bool) (obs v3types.Observation, pipelineExecutionErr error) {
var wg sync.WaitGroup
var relayConfig relayTypes.RelayConfig
err := json.Unmarshal(ds.jb.OCR2OracleSpec.RelayConfig.Bytes(), &relayConfig)
if err != nil {
pipelineExecutionErr = fmt.Errorf("failed to deserialize relay config: %w", err)
return
}
ctx, cancel := context.WithCancel(ctx)

if fetchMaxFinalizedTimestamp {
Expand Down Expand Up @@ -120,7 +112,7 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam
}()

var isLink, isNative bool
if ds.jb.OCR2OracleSpec.PluginConfig == nil {
if len(ds.jb.OCR2OracleSpec.PluginConfig) == 0 {
obs.LinkPrice.Val = v3.MissingPrice
} else if ds.feedID == ds.linkFeedID {
isLink = true
Expand All @@ -140,7 +132,7 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam
}()
}

if ds.jb.OCR2OracleSpec.PluginConfig == nil {
if len(ds.jb.OCR2OracleSpec.PluginConfig) == 0 {
obs.NativePrice.Val = v3.MissingPrice
} else if ds.feedID == ds.nativeFeedID {
isNative = true
Expand Down
4 changes: 2 additions & 2 deletions core/services/relay/evm/mercury/v3/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,15 +364,15 @@ func Test_Datasource(t *testing.T) {
assert.EqualError(t, obs.NativePrice.Err, "some error fetching native price")
})

t.Run("when PluginConfig=nil skips fetching link and native prices", func(t *testing.T) {
t.Run("when PluginConfig is empty", func(t *testing.T) {
t.Cleanup(func() {
ds.jb = jb
})

fetcher.linkPriceErr = errors.New("some error fetching link price")
fetcher.nativePriceErr = errors.New("some error fetching native price")

ds.jb.OCR2OracleSpec.PluginConfig = nil
ds.jb.OCR2OracleSpec.PluginConfig = job.JSONConfig{}

obs, err := ds.Observe(ctx, repts, false)
assert.NoError(t, err)
Expand Down
8 changes: 6 additions & 2 deletions core/services/relay/evm/mercury/v4/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam
}()

var isLink, isNative bool
if ds.feedID == ds.linkFeedID {
if len(ds.jb.OCR2OracleSpec.PluginConfig) == 0 {
obs.LinkPrice.Val = v4.MissingPrice
} else if ds.feedID == ds.linkFeedID {
isLink = true
} else {
wg.Add(1)
Expand All @@ -126,7 +128,9 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam
}()
}

if ds.feedID == ds.nativeFeedID {
if len(ds.jb.OCR2OracleSpec.PluginConfig) == 0 {
obs.NativePrice.Val = v4.MissingPrice
} else if ds.feedID == ds.nativeFeedID {
isNative = true
} else {
wg.Add(1)
Expand Down
31 changes: 30 additions & 1 deletion core/services/relay/evm/mercury/v4/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
relaymercuryv4 "github.com/smartcontractkit/chainlink-data-streams/mercury/v4"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
mercurymocks "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
Expand Down Expand Up @@ -70,7 +71,16 @@ func (ms *mockSaver) Save(r *pipeline.Run) {

func Test_Datasource(t *testing.T) {
orm := &mockORM{}
ds := &datasource{orm: orm, lggr: logger.TestLogger(t)}
jb := job.Job{
Type: job.Type(pipeline.OffchainReporting2JobType),
OCR2OracleSpec: &job.OCR2OracleSpec{
CaptureEATelemetry: true,
PluginConfig: map[string]interface{}{
"serverURL": "a",
},
},
}
ds := &datasource{orm: orm, lggr: logger.TestLogger(t), jb: jb}
ctx := testutils.Context(t)
repts := ocrtypes.ReportTimestamp{}

Expand Down Expand Up @@ -286,6 +296,25 @@ func Test_Datasource(t *testing.T) {
assert.EqualError(t, obs.NativePrice.Err, "some error fetching native price")
})

t.Run("when PluginConfig is empty", func(t *testing.T) {
t.Cleanup(func() {
ds.jb = jb
})

fetcher.linkPriceErr = errors.New("some error fetching link price")
fetcher.nativePriceErr = errors.New("some error fetching native price")

ds.jb.OCR2OracleSpec.PluginConfig = job.JSONConfig{}

obs, err := ds.Observe(ctx, repts, false)
assert.NoError(t, err)
assert.Nil(t, obs.LinkPrice.Err)
assert.Equal(t, obs.LinkPrice.Val, relaymercuryv4.MissingPrice)
assert.Nil(t, obs.NativePrice.Err)
assert.Equal(t, obs.NativePrice.Val, relaymercuryv4.MissingPrice)
assert.Equal(t, big.NewInt(122), obs.BenchmarkPrice.Val)
})

t.Run("when succeeds to fetch linkPrice or nativePrice but got nil (new feed)", func(t *testing.T) {
obs, err := ds.Observe(ctx, repts, false)
assert.NoError(t, err)
Expand Down

0 comments on commit 10f7aab

Please sign in to comment.