From 6e25ca3e417c6efeb5c0c46c4d5dadb9ea0a4456 Mon Sep 17 00:00:00 2001 From: Graham Goh Date: Thu, 22 Aug 2024 15:44:10 +1000 Subject: [PATCH 1/4] fix(service): start multiple feeds managers In order to support connection to multiple feed managers, when on start of the node, we want to attempt to connect to list of registered feed managers. --- .changeset/gorgeous-lobsters-argue.md | 5 +++++ core/services/feeds/service.go | 7 ++++--- core/services/feeds/service_test.go | 16 ++++++++++++++++ 3 files changed, 25 insertions(+), 3 deletions(-) create mode 100644 .changeset/gorgeous-lobsters-argue.md diff --git a/.changeset/gorgeous-lobsters-argue.md b/.changeset/gorgeous-lobsters-argue.md new file mode 100644 index 00000000000..e1960eb61c1 --- /dev/null +++ b/.changeset/gorgeous-lobsters-argue.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#changed Connect to multiple feeds managers on app start instead of just one (default to first) diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index 5e8e743109a..7e31dcf5f7f 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -1023,7 +1023,6 @@ func (s *service) Start(ctx context.Context) error { return err } - // We only support a single feeds manager right now mgrs, err := s.ListManagers(ctx) if err != nil { return err @@ -1034,8 +1033,10 @@ func (s *service) Start(ctx context.Context) error { return nil } - mgr := mgrs[0] - s.connectFeedManager(ctx, mgr, privkey) + s.lggr.Infof("starting connection to %d feeds managers", len(mgrs)) + for _, mgr := range mgrs { + s.connectFeedManager(ctx, mgr, privkey) + } if err = s.observeJobProposalCounts(ctx); err != nil { s.lggr.Error("failed to observe job proposal count when starting service", err) diff --git a/core/services/feeds/service_test.go b/core/services/feeds/service_test.go index e98ae984fb7..ff8b04d6eb6 100644 --- a/core/services/feeds/service_test.go +++ b/core/services/feeds/service_test.go @@ -3851,6 +3851,10 @@ func Test_Service_StartStop(t *testing.T) { ID: 1, URI: "localhost:2000", } + mgr2 = feeds.FeedsManager{ + ID: 2, + URI: "localhost:2001", + } pubKeyHex = "0f17c3bf72de8beef6e2d17a14c0a972f5d7e0e66e70722373f12b88382d40f9" ) @@ -3873,6 +3877,18 @@ func Test_Service_StartStop(t *testing.T) { svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) }, }, + { + name: "success with multiple feeds managers connection", + beforeFunc: func(svc *TestService) { + svc.csaKeystore.On("GetAll").Return([]csakey.KeyV2{key}, nil) + svc.orm.On("ListManagers", mock.Anything).Return([]feeds.FeedsManager{mgr, mgr2}, nil) + svc.connMgr.On("IsConnected", mgr.ID).Return(false) + svc.connMgr.On("IsConnected", mgr2.ID).Return(false) + svc.connMgr.On("Connect", mock.IsType(feeds.ConnectOpts{})).Twice() + svc.connMgr.On("Close") + svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) + }, + }, { name: "success with no registered managers", beforeFunc: func(svc *TestService) { From 145b695012f3425d02b2a173cfa891c59c067d95 Mon Sep 17 00:00:00 2001 From: Graham Goh Date: Fri, 23 Aug 2024 15:05:05 +1000 Subject: [PATCH 2/4] fix: feature flag for multi feeds managers Introduce a new feature flag for enabling support for multi feeds managers. --- core/config/app_config.go | 2 + core/config/docs/core.toml | 2 + core/config/feature_config.go | 1 + core/config/toml/types.go | 12 +++-- core/services/chainlink/config_feature.go | 4 ++ .../services/chainlink/config_feature_test.go | 1 + core/services/chainlink/config_general.go | 4 ++ core/services/chainlink/config_test.go | 10 +++-- .../chainlink/mocks/general_config.go | 45 +++++++++++++++++++ .../testdata/config-empty-effective.toml | 1 + .../chainlink/testdata/config-full.toml | 1 + .../config-multi-chain-effective.toml | 1 + core/services/feeds/config.go | 1 + core/services/feeds/service.go | 10 +++-- core/services/feeds/service_test.go | 12 +++-- core/web/features_controller.go | 6 ++- core/web/features_controller_test.go | 6 ++- core/web/resolver/features.go | 5 +++ core/web/resolver/features_test.go | 5 ++- .../testdata/config-empty-effective.toml | 1 + core/web/resolver/testdata/config-full.toml | 1 + .../config-multi-chain-effective.toml | 1 + core/web/schema/type/features.graphql | 1 + docs/CONFIG.md | 7 +++ testdata/scripts/node/validate/default.txtar | 1 + .../disk-based-logging-disabled.txtar | 1 + .../validate/disk-based-logging-no-dir.txtar | 1 + .../node/validate/disk-based-logging.txtar | 1 + .../node/validate/invalid-ocr-p2p.txtar | 1 + testdata/scripts/node/validate/invalid.txtar | 1 + testdata/scripts/node/validate/valid.txtar | 1 + testdata/scripts/node/validate/warnings.txtar | 1 + 32 files changed, 129 insertions(+), 19 deletions(-) diff --git a/core/config/app_config.go b/core/config/app_config.go index 112e242636f..27d56bb4cb8 100644 --- a/core/config/app_config.go +++ b/core/config/app_config.go @@ -56,6 +56,8 @@ type AppConfig interface { Threshold() Threshold WebServer() WebServer Tracing() Tracing + + FeatureMultiFeedsManagers() bool } type DatabaseBackupMode string diff --git a/core/config/docs/core.toml b/core/config/docs/core.toml index d0960779c6c..3783689db38 100644 --- a/core/config/docs/core.toml +++ b/core/config/docs/core.toml @@ -15,6 +15,8 @@ LogPoller = false # Default UICSAKeys = false # Default # CCIP enables the CCIP service. CCIP = true # Default +# MultiFeedsManagers enables support for multiple feeds manager connections. +MultiFeedsManagers = false # Default [Database] # DefaultIdleInTxSessionTimeout is the maximum time allowed for a transaction to be open and idle before timing out. See Postgres `idle_in_transaction_session_timeout` for more details. diff --git a/core/config/feature_config.go b/core/config/feature_config.go index fbb3a4ea541..200a1fd8ed8 100644 --- a/core/config/feature_config.go +++ b/core/config/feature_config.go @@ -4,4 +4,5 @@ type Feature interface { FeedsManager() bool UICSAKeys() bool LogPoller() bool + MultiFeedsManagers() bool } diff --git a/core/config/toml/types.go b/core/config/toml/types.go index 0c91ddd81a9..427e3f01cb5 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -300,10 +300,11 @@ func (p *PrometheusSecrets) validateMerge(f *PrometheusSecrets) (err error) { } type Feature struct { - FeedsManager *bool - LogPoller *bool - UICSAKeys *bool - CCIP *bool + FeedsManager *bool + LogPoller *bool + UICSAKeys *bool + CCIP *bool + MultiFeedsManagers *bool } func (f *Feature) setFrom(f2 *Feature) { @@ -319,6 +320,9 @@ func (f *Feature) setFrom(f2 *Feature) { if v := f2.CCIP; v != nil { f.CCIP = v } + if v := f2.MultiFeedsManagers; v != nil { + f.MultiFeedsManagers = v + } } type Database struct { diff --git a/core/services/chainlink/config_feature.go b/core/services/chainlink/config_feature.go index 2e968df052d..f5cc8786411 100644 --- a/core/services/chainlink/config_feature.go +++ b/core/services/chainlink/config_feature.go @@ -17,3 +17,7 @@ func (f *featureConfig) LogPoller() bool { func (f *featureConfig) UICSAKeys() bool { return *f.c.UICSAKeys } + +func (f *featureConfig) MultiFeedsManagers() bool { + return *f.c.MultiFeedsManagers +} diff --git a/core/services/chainlink/config_feature_test.go b/core/services/chainlink/config_feature_test.go index bc0418c157b..8fa5884450a 100644 --- a/core/services/chainlink/config_feature_test.go +++ b/core/services/chainlink/config_feature_test.go @@ -18,4 +18,5 @@ func TestFeatureConfig(t *testing.T) { assert.True(t, f.LogPoller()) assert.True(t, f.FeedsManager()) assert.True(t, f.UICSAKeys()) + assert.True(t, f.MultiFeedsManagers()) } diff --git a/core/services/chainlink/config_general.go b/core/services/chainlink/config_general.go index 79c92f82145..d329fb0facc 100644 --- a/core/services/chainlink/config_general.go +++ b/core/services/chainlink/config_general.go @@ -282,6 +282,10 @@ func (g *generalConfig) FeatureFeedsManager() bool { return *g.c.Feature.FeedsManager } +func (g *generalConfig) FeatureMultiFeedsManagers() bool { + return *g.c.Feature.MultiFeedsManagers +} + func (g *generalConfig) OCR() config.OCR { return &ocrConfig{c: g.c.OCR} } diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index 253e4aa067f..83b47b438b4 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -258,10 +258,11 @@ func TestConfig_Marshal(t *testing.T) { } full.Feature = toml.Feature{ - FeedsManager: ptr(true), - LogPoller: ptr(true), - UICSAKeys: ptr(true), - CCIP: ptr(true), + FeedsManager: ptr(true), + LogPoller: ptr(true), + UICSAKeys: ptr(true), + CCIP: ptr(true), + MultiFeedsManagers: ptr(true), } full.Database = toml.Database{ DefaultIdleInTxSessionTimeout: commoncfg.MustNewDuration(time.Minute), @@ -774,6 +775,7 @@ FeedsManager = true LogPoller = true UICSAKeys = true CCIP = true +MultiFeedsManagers = true `}, {"Database", Config{Core: toml.Core{Database: full.Database}}, `[Database] DefaultIdleInTxSessionTimeout = '1m0s' diff --git a/core/services/chainlink/mocks/general_config.go b/core/services/chainlink/mocks/general_config.go index f4594a43225..2339cf9656d 100644 --- a/core/services/chainlink/mocks/general_config.go +++ b/core/services/chainlink/mocks/general_config.go @@ -694,6 +694,51 @@ func (_c *GeneralConfig_Feature_Call) RunAndReturn(run func() config.Feature) *G return _c } +// FeatureMultiFeedsManagers provides a mock function with given fields: +func (_m *GeneralConfig) FeatureMultiFeedsManagers() bool { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for FeatureMultiFeedsManagers") + } + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// GeneralConfig_FeatureMultiFeedsManagers_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FeatureMultiFeedsManagers' +type GeneralConfig_FeatureMultiFeedsManagers_Call struct { + *mock.Call +} + +// FeatureMultiFeedsManagers is a helper method to define mock.On call +func (_e *GeneralConfig_Expecter) FeatureMultiFeedsManagers() *GeneralConfig_FeatureMultiFeedsManagers_Call { + return &GeneralConfig_FeatureMultiFeedsManagers_Call{Call: _e.mock.On("FeatureMultiFeedsManagers")} +} + +func (_c *GeneralConfig_FeatureMultiFeedsManagers_Call) Run(run func()) *GeneralConfig_FeatureMultiFeedsManagers_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *GeneralConfig_FeatureMultiFeedsManagers_Call) Return(_a0 bool) *GeneralConfig_FeatureMultiFeedsManagers_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *GeneralConfig_FeatureMultiFeedsManagers_Call) RunAndReturn(run func() bool) *GeneralConfig_FeatureMultiFeedsManagers_Call { + _c.Call.Return(run) + return _c +} + // FluxMonitor provides a mock function with given fields: func (_m *GeneralConfig) FluxMonitor() config.FluxMonitor { ret := _m.Called() diff --git a/core/services/chainlink/testdata/config-empty-effective.toml b/core/services/chainlink/testdata/config-empty-effective.toml index f1325d824ea..d549e4024ed 100644 --- a/core/services/chainlink/testdata/config-empty-effective.toml +++ b/core/services/chainlink/testdata/config-empty-effective.toml @@ -7,6 +7,7 @@ FeedsManager = true LogPoller = false UICSAKeys = false CCIP = true +MultiFeedsManagers = false [Database] DefaultIdleInTxSessionTimeout = '1h0m0s' diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml index a7fc9dcb94c..4305c1d5a00 100644 --- a/core/services/chainlink/testdata/config-full.toml +++ b/core/services/chainlink/testdata/config-full.toml @@ -7,6 +7,7 @@ FeedsManager = true LogPoller = true UICSAKeys = true CCIP = true +MultiFeedsManagers = true [Database] DefaultIdleInTxSessionTimeout = '1m0s' diff --git a/core/services/chainlink/testdata/config-multi-chain-effective.toml b/core/services/chainlink/testdata/config-multi-chain-effective.toml index 8bfc93c7be0..640e6708bd0 100644 --- a/core/services/chainlink/testdata/config-multi-chain-effective.toml +++ b/core/services/chainlink/testdata/config-multi-chain-effective.toml @@ -7,6 +7,7 @@ FeedsManager = true LogPoller = false UICSAKeys = false CCIP = true +MultiFeedsManagers = false [Database] DefaultIdleInTxSessionTimeout = '1h0m0s' diff --git a/core/services/feeds/config.go b/core/services/feeds/config.go index e2ec889b23b..690117a5100 100644 --- a/core/services/feeds/config.go +++ b/core/services/feeds/config.go @@ -10,6 +10,7 @@ import ( type GeneralConfig interface { OCR() coreconfig.OCR Insecure() coreconfig.Insecure + FeatureMultiFeedsManagers() bool } type JobConfig interface { diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index 7e31dcf5f7f..528bbd5188d 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -1033,9 +1033,13 @@ func (s *service) Start(ctx context.Context) error { return nil } - s.lggr.Infof("starting connection to %d feeds managers", len(mgrs)) - for _, mgr := range mgrs { - s.connectFeedManager(ctx, mgr, privkey) + if s.gCfg.FeatureMultiFeedsManagers() { + s.lggr.Infof("starting connection to %d feeds managers", len(mgrs)) + for _, mgr := range mgrs { + s.connectFeedManager(ctx, mgr, privkey) + } + } else { + s.connectFeedManager(ctx, mgrs[0], privkey) } if err = s.observeJobProposalCounts(ctx); err != nil { diff --git a/core/services/feeds/service_test.go b/core/services/feeds/service_test.go index ff8b04d6eb6..bca99747e8d 100644 --- a/core/services/feeds/service_test.go +++ b/core/services/feeds/service_test.go @@ -3863,8 +3863,9 @@ func Test_Service_StartStop(t *testing.T) { require.NoError(t, err) tests := []struct { - name string - beforeFunc func(svc *TestService) + name string + enableMultiFeedsManagers bool + beforeFunc func(svc *TestService) }{ { name: "success with a feeds manager connection", @@ -3878,7 +3879,8 @@ func Test_Service_StartStop(t *testing.T) { }, }, { - name: "success with multiple feeds managers connection", + name: "success with multiple feeds managers connection", + enableMultiFeedsManagers: true, beforeFunc: func(svc *TestService) { svc.csaKeystore.On("GetAll").Return([]csakey.KeyV2{key}, nil) svc.orm.On("ListManagers", mock.Anything).Return([]feeds.FeedsManager{mgr, mgr2}, nil) @@ -3905,7 +3907,9 @@ func Test_Service_StartStop(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - svc := setupTestService(t) + svc := setupTestServiceCfg(t, func(c *chainlink.Config, s *chainlink.Secrets) { + c.Feature.MultiFeedsManagers = &tt.enableMultiFeedsManagers + }) if tt.beforeFunc != nil { tt.beforeFunc(svc) diff --git a/core/web/features_controller.go b/core/web/features_controller.go index 76f04fe262c..66cd9e60c44 100644 --- a/core/web/features_controller.go +++ b/core/web/features_controller.go @@ -13,8 +13,9 @@ type FeaturesController struct { } const ( - FeatureKeyCSA string = "csa" - FeatureKeyFeedsManager string = "feeds_manager" + FeatureKeyCSA string = "csa" + FeatureKeyFeedsManager string = "feeds_manager" + FeatureKeyMultiFeedsManagers string = "multi_feeds_managers" ) // Index retrieves the features @@ -24,6 +25,7 @@ func (fc *FeaturesController) Index(c *gin.Context) { resources := []presenters.FeatureResource{ *presenters.NewFeatureResource(FeatureKeyCSA, fc.App.GetConfig().Feature().UICSAKeys()), *presenters.NewFeatureResource(FeatureKeyFeedsManager, fc.App.GetConfig().Feature().FeedsManager()), + *presenters.NewFeatureResource(FeatureKeyMultiFeedsManagers, fc.App.GetConfig().Feature().MultiFeedsManagers()), } jsonAPIResponse(c, resources, "features") diff --git a/core/web/features_controller_test.go b/core/web/features_controller_test.go index 727d7db5476..02f75f14bc5 100644 --- a/core/web/features_controller_test.go +++ b/core/web/features_controller_test.go @@ -19,6 +19,7 @@ func Test_FeaturesController_List(t *testing.T) { app := cltest.NewApplicationWithConfig(t, configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { csa := true c.Feature.UICSAKeys = &csa + c.Feature.MultiFeedsManagers = &csa })) require.NoError(t, app.Start(testutils.Context(t))) client := app.NewHTTPClient(nil) @@ -30,11 +31,14 @@ func Test_FeaturesController_List(t *testing.T) { resources := []presenters.FeatureResource{} err := web.ParseJSONAPIResponse(cltest.ParseResponseBody(t, resp), &resources) require.NoError(t, err) - require.Len(t, resources, 2) + require.Len(t, resources, 3) assert.Equal(t, "csa", resources[0].ID) assert.True(t, resources[0].Enabled) assert.Equal(t, "feeds_manager", resources[1].ID) assert.True(t, resources[1].Enabled) + + assert.Equal(t, "multi_feeds_managers", resources[2].ID) + assert.True(t, resources[2].Enabled) } diff --git a/core/web/resolver/features.go b/core/web/resolver/features.go index 9ce39a773c6..9d6c135a93c 100644 --- a/core/web/resolver/features.go +++ b/core/web/resolver/features.go @@ -20,6 +20,11 @@ func (r *FeaturesResolver) FeedsManager() bool { return r.cfg.FeedsManager() } +// MultiFeedsManagers resolves to whether multiple feed managers support is enable. +func (r *FeaturesResolver) MultiFeedsManagers() bool { + return r.cfg.MultiFeedsManagers() +} + type FeaturesPayloadResolver struct { cfg config.Feature } diff --git a/core/web/resolver/features_test.go b/core/web/resolver/features_test.go index 76394f038b0..4e8f9e8c26c 100644 --- a/core/web/resolver/features_test.go +++ b/core/web/resolver/features_test.go @@ -15,6 +15,7 @@ func Test_ToFeatures(t *testing.T) { ... on Features { csa feedsManager + multiFeedsManagers } } }` @@ -29,6 +30,7 @@ func Test_ToFeatures(t *testing.T) { t, f := true, false c.Feature.UICSAKeys = &f c.Feature.FeedsManager = &t + c.Feature.MultiFeedsManagers = &f })) }, query: query, @@ -36,7 +38,8 @@ func Test_ToFeatures(t *testing.T) { { "features": { "csa": false, - "feedsManager": true + "feedsManager": true, + "multiFeedsManagers": false } }`, }, diff --git a/core/web/resolver/testdata/config-empty-effective.toml b/core/web/resolver/testdata/config-empty-effective.toml index f1325d824ea..d549e4024ed 100644 --- a/core/web/resolver/testdata/config-empty-effective.toml +++ b/core/web/resolver/testdata/config-empty-effective.toml @@ -7,6 +7,7 @@ FeedsManager = true LogPoller = false UICSAKeys = false CCIP = true +MultiFeedsManagers = false [Database] DefaultIdleInTxSessionTimeout = '1h0m0s' diff --git a/core/web/resolver/testdata/config-full.toml b/core/web/resolver/testdata/config-full.toml index f67d4737b57..a3743aeeac2 100644 --- a/core/web/resolver/testdata/config-full.toml +++ b/core/web/resolver/testdata/config-full.toml @@ -7,6 +7,7 @@ FeedsManager = true LogPoller = true UICSAKeys = true CCIP = true +MultiFeedsManagers = false [Database] DefaultIdleInTxSessionTimeout = '1m0s' diff --git a/core/web/resolver/testdata/config-multi-chain-effective.toml b/core/web/resolver/testdata/config-multi-chain-effective.toml index 55f998156c8..d3a9819de33 100644 --- a/core/web/resolver/testdata/config-multi-chain-effective.toml +++ b/core/web/resolver/testdata/config-multi-chain-effective.toml @@ -7,6 +7,7 @@ FeedsManager = true LogPoller = false UICSAKeys = false CCIP = true +MultiFeedsManagers = false [Database] DefaultIdleInTxSessionTimeout = '1h0m0s' diff --git a/core/web/schema/type/features.graphql b/core/web/schema/type/features.graphql index 4254bdecb6c..ff434ab4bdc 100644 --- a/core/web/schema/type/features.graphql +++ b/core/web/schema/type/features.graphql @@ -1,6 +1,7 @@ type Features { csa: Boolean! feedsManager: Boolean! + multiFeedsManagers: Boolean! } # FeaturesPayload defines the response of fetching the features availability in the UI diff --git a/docs/CONFIG.md b/docs/CONFIG.md index 7c54682367d..e879a877990 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -52,6 +52,7 @@ FeedsManager = true # Default LogPoller = false # Default UICSAKeys = false # Default CCIP = true # Default +MultiFeedsManagers = false # Default ``` @@ -79,6 +80,12 @@ CCIP = true # Default ``` CCIP enables the CCIP service. +### MultiFeedsManagers +```toml +MultiFeedsManagers = false # Default +``` +MultiFeedsManagers enables support for multiple feeds manager connections. + ## Database ```toml [Database] diff --git a/testdata/scripts/node/validate/default.txtar b/testdata/scripts/node/validate/default.txtar index ff8b4889c49..114bb9f29a8 100644 --- a/testdata/scripts/node/validate/default.txtar +++ b/testdata/scripts/node/validate/default.txtar @@ -19,6 +19,7 @@ FeedsManager = true LogPoller = false UICSAKeys = false CCIP = true +MultiFeedsManagers = false [Database] DefaultIdleInTxSessionTimeout = '1h0m0s' diff --git a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar index 016d416d5f6..6f110190467 100644 --- a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar @@ -63,6 +63,7 @@ FeedsManager = true LogPoller = false UICSAKeys = false CCIP = true +MultiFeedsManagers = false [Database] DefaultIdleInTxSessionTimeout = '1h0m0s' diff --git a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar index f8a98b2c49a..2d84d1f70b3 100644 --- a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar @@ -63,6 +63,7 @@ FeedsManager = true LogPoller = false UICSAKeys = false CCIP = true +MultiFeedsManagers = false [Database] DefaultIdleInTxSessionTimeout = '1h0m0s' diff --git a/testdata/scripts/node/validate/disk-based-logging.txtar b/testdata/scripts/node/validate/disk-based-logging.txtar index aef3b106a59..7bf4c3543f2 100644 --- a/testdata/scripts/node/validate/disk-based-logging.txtar +++ b/testdata/scripts/node/validate/disk-based-logging.txtar @@ -63,6 +63,7 @@ FeedsManager = true LogPoller = false UICSAKeys = false CCIP = true +MultiFeedsManagers = false [Database] DefaultIdleInTxSessionTimeout = '1h0m0s' diff --git a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar index 0cdf001eccd..14a8449ee13 100644 --- a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar +++ b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar @@ -48,6 +48,7 @@ FeedsManager = true LogPoller = false UICSAKeys = false CCIP = true +MultiFeedsManagers = false [Database] DefaultIdleInTxSessionTimeout = '1h0m0s' diff --git a/testdata/scripts/node/validate/invalid.txtar b/testdata/scripts/node/validate/invalid.txtar index 2912a803274..6e5e3b932a5 100644 --- a/testdata/scripts/node/validate/invalid.txtar +++ b/testdata/scripts/node/validate/invalid.txtar @@ -53,6 +53,7 @@ FeedsManager = true LogPoller = false UICSAKeys = false CCIP = true +MultiFeedsManagers = false [Database] DefaultIdleInTxSessionTimeout = '1h0m0s' diff --git a/testdata/scripts/node/validate/valid.txtar b/testdata/scripts/node/validate/valid.txtar index ce40c91f669..71a055b0215 100644 --- a/testdata/scripts/node/validate/valid.txtar +++ b/testdata/scripts/node/validate/valid.txtar @@ -60,6 +60,7 @@ FeedsManager = true LogPoller = false UICSAKeys = false CCIP = true +MultiFeedsManagers = false [Database] DefaultIdleInTxSessionTimeout = '1h0m0s' diff --git a/testdata/scripts/node/validate/warnings.txtar b/testdata/scripts/node/validate/warnings.txtar index dea40ec8da0..b6ebc1dc125 100644 --- a/testdata/scripts/node/validate/warnings.txtar +++ b/testdata/scripts/node/validate/warnings.txtar @@ -42,6 +42,7 @@ FeedsManager = true LogPoller = false UICSAKeys = false CCIP = true +MultiFeedsManagers = false [Database] DefaultIdleInTxSessionTimeout = '1h0m0s' From 5220fb97c3052fa5beb481800c5c5cc5a8ed967c Mon Sep 17 00:00:00 2001 From: Graham Goh Date: Mon, 26 Aug 2024 13:41:33 +1000 Subject: [PATCH 3/4] refactor(service): delete ListJobProposals No longer used, since it was replaced by ListJobProposalsByManagersIDs --- core/services/feeds/mocks/orm.go | 58 ---------------------------- core/services/feeds/mocks/service.go | 58 ---------------------------- core/services/feeds/orm.go | 12 ------ core/services/feeds/orm_test.go | 33 ---------------- core/services/feeds/service.go | 9 ----- core/services/feeds/service_test.go | 19 --------- 6 files changed, 189 deletions(-) diff --git a/core/services/feeds/mocks/orm.go b/core/services/feeds/mocks/orm.go index 3fce89eb60a..54897c65d40 100644 --- a/core/services/feeds/mocks/orm.go +++ b/core/services/feeds/mocks/orm.go @@ -1269,64 +1269,6 @@ func (_c *ORM_ListChainConfigsByManagerIDs_Call) RunAndReturn(run func(context.C return _c } -// ListJobProposals provides a mock function with given fields: ctx -func (_m *ORM) ListJobProposals(ctx context.Context) ([]feeds.JobProposal, error) { - ret := _m.Called(ctx) - - if len(ret) == 0 { - panic("no return value specified for ListJobProposals") - } - - var r0 []feeds.JobProposal - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) ([]feeds.JobProposal, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) []feeds.JobProposal); ok { - r0 = rf(ctx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]feeds.JobProposal) - } - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_ListJobProposals_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListJobProposals' -type ORM_ListJobProposals_Call struct { - *mock.Call -} - -// ListJobProposals is a helper method to define mock.On call -// - ctx context.Context -func (_e *ORM_Expecter) ListJobProposals(ctx interface{}) *ORM_ListJobProposals_Call { - return &ORM_ListJobProposals_Call{Call: _e.mock.On("ListJobProposals", ctx)} -} - -func (_c *ORM_ListJobProposals_Call) Run(run func(ctx context.Context)) *ORM_ListJobProposals_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *ORM_ListJobProposals_Call) Return(jps []feeds.JobProposal, err error) *ORM_ListJobProposals_Call { - _c.Call.Return(jps, err) - return _c -} - -func (_c *ORM_ListJobProposals_Call) RunAndReturn(run func(context.Context) ([]feeds.JobProposal, error)) *ORM_ListJobProposals_Call { - _c.Call.Return(run) - return _c -} - // ListJobProposalsByManagersIDs provides a mock function with given fields: ctx, ids func (_m *ORM) ListJobProposalsByManagersIDs(ctx context.Context, ids []int64) ([]feeds.JobProposal, error) { ret := _m.Called(ctx, ids) diff --git a/core/services/feeds/mocks/service.go b/core/services/feeds/mocks/service.go index d37c327850d..4f637959d45 100644 --- a/core/services/feeds/mocks/service.go +++ b/core/services/feeds/mocks/service.go @@ -799,64 +799,6 @@ func (_c *Service_ListChainConfigsByManagerIDs_Call) RunAndReturn(run func(conte return _c } -// ListJobProposals provides a mock function with given fields: ctx -func (_m *Service) ListJobProposals(ctx context.Context) ([]feeds.JobProposal, error) { - ret := _m.Called(ctx) - - if len(ret) == 0 { - panic("no return value specified for ListJobProposals") - } - - var r0 []feeds.JobProposal - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) ([]feeds.JobProposal, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) []feeds.JobProposal); ok { - r0 = rf(ctx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]feeds.JobProposal) - } - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// Service_ListJobProposals_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListJobProposals' -type Service_ListJobProposals_Call struct { - *mock.Call -} - -// ListJobProposals is a helper method to define mock.On call -// - ctx context.Context -func (_e *Service_Expecter) ListJobProposals(ctx interface{}) *Service_ListJobProposals_Call { - return &Service_ListJobProposals_Call{Call: _e.mock.On("ListJobProposals", ctx)} -} - -func (_c *Service_ListJobProposals_Call) Run(run func(ctx context.Context)) *Service_ListJobProposals_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *Service_ListJobProposals_Call) Return(_a0 []feeds.JobProposal, _a1 error) *Service_ListJobProposals_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *Service_ListJobProposals_Call) RunAndReturn(run func(context.Context) ([]feeds.JobProposal, error)) *Service_ListJobProposals_Call { - _c.Call.Return(run) - return _c -} - // ListJobProposalsByManagersIDs provides a mock function with given fields: ctx, ids func (_m *Service) ListJobProposalsByManagersIDs(ctx context.Context, ids []int64) ([]feeds.JobProposal, error) { ret := _m.Called(ctx, ids) diff --git a/core/services/feeds/orm.go b/core/services/feeds/orm.go index d130316fb2d..602154ad716 100644 --- a/core/services/feeds/orm.go +++ b/core/services/feeds/orm.go @@ -34,7 +34,6 @@ type ORM interface { DeleteProposal(ctx context.Context, id int64) error GetJobProposal(ctx context.Context, id int64) (*JobProposal, error) GetJobProposalByRemoteUUID(ctx context.Context, uuid uuid.UUID) (*JobProposal, error) - ListJobProposals(ctx context.Context) (jps []JobProposal, err error) ListJobProposalsByManagersIDs(ctx context.Context, ids []int64) ([]JobProposal, error) UpdateJobProposalStatus(ctx context.Context, id int64, status JobProposalStatus) error // NEEDED? UpsertJobProposal(ctx context.Context, jp *JobProposal) (int64, error) @@ -373,17 +372,6 @@ AND status <> $2; return jp, errors.Wrap(err, "GetJobProposalByRemoteUUID failed") } -// ListJobProposals lists all job proposals. -func (o *orm) ListJobProposals(ctx context.Context) (jps []JobProposal, err error) { - stmt := ` -SELECT * -FROM job_proposals; -` - - err = o.ds.SelectContext(ctx, &jps, stmt) - return jps, errors.Wrap(err, "ListJobProposals failed") -} - // ListJobProposalsByManagersIDs gets job proposals by feeds managers IDs. func (o *orm) ListJobProposalsByManagersIDs(ctx context.Context, ids []int64) ([]JobProposal, error) { stmt := ` diff --git a/core/services/feeds/orm_test.go b/core/services/feeds/orm_test.go index c4c9ced2ce3..298a878a8de 100644 --- a/core/services/feeds/orm_test.go +++ b/core/services/feeds/orm_test.go @@ -555,39 +555,6 @@ func Test_ORM_GetJobProposal(t *testing.T) { }) } -func Test_ORM_ListJobProposals(t *testing.T) { - t.Parallel() - ctx := testutils.Context(t) - - orm := setupORM(t) - fmID := createFeedsManager(t, orm) - uuid := uuid.New() - name := null.StringFrom("jp1") - - jp := &feeds.JobProposal{ - Name: name, - RemoteUUID: uuid, - Status: feeds.JobProposalStatusPending, - FeedsManagerID: fmID, - } - - id, err := orm.CreateJobProposal(ctx, jp) - require.NoError(t, err) - - jps, err := orm.ListJobProposals(ctx) - require.NoError(t, err) - require.Len(t, jps, 1) - - actual := jps[0] - assert.Equal(t, id, actual.ID) - assert.Equal(t, name, actual.Name) - assert.Equal(t, uuid, actual.RemoteUUID) - assert.Equal(t, jp.Status, actual.Status) - assert.False(t, actual.ExternalJobID.Valid) - assert.False(t, actual.PendingUpdate) - assert.Equal(t, jp.FeedsManagerID, actual.FeedsManagerID) -} - func Test_ORM_CountJobProposalsByStatus(t *testing.T) { t.Parallel() diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index 528bbd5188d..219e41465b7 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -98,7 +98,6 @@ type Service interface { CountJobProposalsByStatus(ctx context.Context) (*JobProposalCounts, error) GetJobProposal(ctx context.Context, id int64) (*JobProposal, error) - ListJobProposals(ctx context.Context) ([]JobProposal, error) ListJobProposalsByManagersIDs(ctx context.Context, ids []int64) ([]JobProposal, error) ApproveSpec(ctx context.Context, id int64, force bool) error @@ -421,14 +420,6 @@ func (s *service) UpdateChainConfig(ctx context.Context, cfg ChainConfig) (int64 return id, nil } -// Lists all JobProposals -// -// When we support multiple feed managers, we will need to change this to filter -// by feeds manager -func (s *service) ListJobProposals(ctx context.Context) ([]JobProposal, error) { - return s.orm.ListJobProposals(ctx) -} - // ListJobProposalsByManagersIDs gets job proposals by feeds managers IDs func (s *service) ListJobProposalsByManagersIDs(ctx context.Context, ids []int64) ([]JobProposal, error) { return s.orm.ListJobProposalsByManagersIDs(ctx, ids) diff --git a/core/services/feeds/service_test.go b/core/services/feeds/service_test.go index bca99747e8d..0c8e6a7fa55 100644 --- a/core/services/feeds/service_test.go +++ b/core/services/feeds/service_test.go @@ -1527,25 +1527,6 @@ func Test_Service_IsJobManaged(t *testing.T) { assert.True(t, isManaged) } -func Test_Service_ListJobProposals(t *testing.T) { - t.Parallel() - ctx := testutils.Context(t) - - var ( - jp = feeds.JobProposal{} - jps = []feeds.JobProposal{jp} - ) - svc := setupTestService(t) - - svc.orm.On("ListJobProposals", mock.Anything). - Return(jps, nil) - - actual, err := svc.ListJobProposals(ctx) - require.NoError(t, err) - - assert.Equal(t, actual, jps) -} - func Test_Service_ListJobProposalsByManagersIDs(t *testing.T) { t.Parallel() ctx := testutils.Context(t) From 87b77cff936ef2b85f31ae5c5dac9a2d5f7b8110 Mon Sep 17 00:00:00 2001 From: Graham Goh Date: Thu, 29 Aug 2024 10:47:18 +1000 Subject: [PATCH 4/4] [OPCORE-855]: fix: support creating more than 1 manager (#14225) --- .changeset/moody-turkeys-provide.md | 5 + core/services/feeds/mocks/orm.go | 59 +++++++++++ core/services/feeds/mocks/service.go | 56 ---------- core/services/feeds/orm.go | 21 +++- core/services/feeds/orm_test.go | 29 +++++- core/services/feeds/service.go | 41 +++++--- core/services/feeds/service_test.go | 114 +++++++++++++++++---- core/web/resolver/feeds_manager.go | 29 ++++++ core/web/resolver/feeds_manager_test.go | 23 +++++ core/web/resolver/mutation.go | 2 +- core/web/schema/type/feeds_manager.graphql | 10 +- 11 files changed, 295 insertions(+), 94 deletions(-) create mode 100644 .changeset/moody-turkeys-provide.md diff --git a/.changeset/moody-turkeys-provide.md b/.changeset/moody-turkeys-provide.md new file mode 100644 index 00000000000..5a89e4596e1 --- /dev/null +++ b/.changeset/moody-turkeys-provide.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#changed Allow registration of more than 1 feeds manager on CreateFeedsManager diff --git a/core/services/feeds/mocks/orm.go b/core/services/feeds/mocks/orm.go index 54897c65d40..d6cae81dc6c 100644 --- a/core/services/feeds/mocks/orm.go +++ b/core/services/feeds/mocks/orm.go @@ -6,6 +6,8 @@ import ( context "context" feeds "github.com/smartcontractkit/chainlink/v2/core/services/feeds" + crypto "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" + mock "github.com/stretchr/testify/mock" sqlutil "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" @@ -1504,6 +1506,63 @@ func (_c *ORM_ListSpecsByJobProposalIDs_Call) RunAndReturn(run func(context.Cont return _c } +// ManagerExists provides a mock function with given fields: ctx, publicKey +func (_m *ORM) ManagerExists(ctx context.Context, publicKey crypto.PublicKey) (bool, error) { + ret := _m.Called(ctx, publicKey) + + if len(ret) == 0 { + panic("no return value specified for ManagerExists") + } + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, crypto.PublicKey) (bool, error)); ok { + return rf(ctx, publicKey) + } + if rf, ok := ret.Get(0).(func(context.Context, crypto.PublicKey) bool); ok { + r0 = rf(ctx, publicKey) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context, crypto.PublicKey) error); ok { + r1 = rf(ctx, publicKey) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ORM_ManagerExists_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ManagerExists' +type ORM_ManagerExists_Call struct { + *mock.Call +} + +// ManagerExists is a helper method to define mock.On call +// - ctx context.Context +// - publicKey crypto.PublicKey +func (_e *ORM_Expecter) ManagerExists(ctx interface{}, publicKey interface{}) *ORM_ManagerExists_Call { + return &ORM_ManagerExists_Call{Call: _e.mock.On("ManagerExists", ctx, publicKey)} +} + +func (_c *ORM_ManagerExists_Call) Run(run func(ctx context.Context, publicKey crypto.PublicKey)) *ORM_ManagerExists_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(crypto.PublicKey)) + }) + return _c +} + +func (_c *ORM_ManagerExists_Call) Return(_a0 bool, _a1 error) *ORM_ManagerExists_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ORM_ManagerExists_Call) RunAndReturn(run func(context.Context, crypto.PublicKey) (bool, error)) *ORM_ManagerExists_Call { + _c.Call.Return(run) + return _c +} + // RejectSpec provides a mock function with given fields: ctx, id func (_m *ORM) RejectSpec(ctx context.Context, id int64) error { ret := _m.Called(ctx, id) diff --git a/core/services/feeds/mocks/service.go b/core/services/feeds/mocks/service.go index 4f637959d45..d84879bb700 100644 --- a/core/services/feeds/mocks/service.go +++ b/core/services/feeds/mocks/service.go @@ -220,62 +220,6 @@ func (_c *Service_CountJobProposalsByStatus_Call) RunAndReturn(run func(context. return _c } -// CountManagers provides a mock function with given fields: ctx -func (_m *Service) CountManagers(ctx context.Context) (int64, error) { - ret := _m.Called(ctx) - - if len(ret) == 0 { - panic("no return value specified for CountManagers") - } - - var r0 int64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) (int64, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) int64); ok { - r0 = rf(ctx) - } else { - r0 = ret.Get(0).(int64) - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// Service_CountManagers_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CountManagers' -type Service_CountManagers_Call struct { - *mock.Call -} - -// CountManagers is a helper method to define mock.On call -// - ctx context.Context -func (_e *Service_Expecter) CountManagers(ctx interface{}) *Service_CountManagers_Call { - return &Service_CountManagers_Call{Call: _e.mock.On("CountManagers", ctx)} -} - -func (_c *Service_CountManagers_Call) Run(run func(ctx context.Context)) *Service_CountManagers_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *Service_CountManagers_Call) Return(_a0 int64, _a1 error) *Service_CountManagers_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *Service_CountManagers_Call) RunAndReturn(run func(context.Context) (int64, error)) *Service_CountManagers_Call { - _c.Call.Return(run) - return _c -} - // CreateChainConfig provides a mock function with given fields: ctx, cfg func (_m *Service) CreateChainConfig(ctx context.Context, cfg feeds.ChainConfig) (int64, error) { ret := _m.Called(ctx, cfg) diff --git a/core/services/feeds/orm.go b/core/services/feeds/orm.go index 602154ad716..7f1e0194170 100644 --- a/core/services/feeds/orm.go +++ b/core/services/feeds/orm.go @@ -9,11 +9,13 @@ import ( "github.com/google/uuid" "github.com/lib/pq" "github.com/pkg/errors" + "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" ) type ORM interface { + ManagerExists(ctx context.Context, publicKey crypto.PublicKey) (bool, error) CountManagers(ctx context.Context) (int64, error) CreateManager(ctx context.Context, ms *FeedsManager) (int64, error) GetManager(ctx context.Context, id int64) (*FeedsManager, error) @@ -73,6 +75,7 @@ func (o *orm) Transact(ctx context.Context, fn func(ORM) error) error { func (o *orm) WithDataSource(ds sqlutil.DataSource) ORM { return &orm{ds} } // Count counts the number of feeds manager records. +// TODO: delete once multiple feeds managers support is released func (o *orm) CountManagers(ctx context.Context) (count int64, err error) { stmt := ` SELECT COUNT(*) @@ -83,6 +86,21 @@ FROM feeds_managers return count, errors.Wrap(err, "CountManagers failed") } +// ManagerExists checks if a feeds manager exists by public key. +func (o *orm) ManagerExists(ctx context.Context, publicKey crypto.PublicKey) (bool, error) { + stmt := ` +SELECT EXISTS ( + SELECT 1 + FROM feeds_managers + WHERE public_key = $1 +); + ` + + var exists bool + err := o.ds.GetContext(ctx, &exists, stmt, publicKey) + return exists, errors.Wrap(err, "ManagerExists failed") +} + // CreateManager creates a feeds manager. func (o *orm) CreateManager(ctx context.Context, ms *FeedsManager) (id int64, err error) { stmt := ` @@ -263,7 +281,8 @@ WHERE id = $1 func (o *orm) ListManagers(ctx context.Context) (mgrs []FeedsManager, err error) { stmt := ` SELECT id, name, uri, public_key, created_at, updated_at -FROM feeds_managers; +FROM feeds_managers +ORDER BY created_at; ` err = o.ds.SelectContext(ctx, &mgrs, stmt) diff --git a/core/services/feeds/orm_test.go b/core/services/feeds/orm_test.go index 298a878a8de..4ff1a85aea6 100644 --- a/core/services/feeds/orm_test.go +++ b/core/services/feeds/orm_test.go @@ -53,7 +53,7 @@ func setupORM(t *testing.T) *TestORM { // Managers -func Test_ORM_CreateManager(t *testing.T) { +func Test_ORM_CreateManager_CountManagers(t *testing.T) { t.Parallel() ctx := testutils.Context(t) @@ -80,6 +80,33 @@ func Test_ORM_CreateManager(t *testing.T) { assert.NotZero(t, id) } +func Test_ORM_CreateManager(t *testing.T) { + t.Parallel() + ctx := testutils.Context(t) + + var ( + orm = setupORM(t) + mgr = &feeds.FeedsManager{ + URI: uri, + Name: name, + PublicKey: publicKey, + } + ) + + exists, err := orm.ManagerExists(ctx, publicKey) + require.NoError(t, err) + require.Equal(t, false, exists) + + id, err := orm.CreateManager(ctx, mgr) + require.NoError(t, err) + + exists, err = orm.ManagerExists(ctx, publicKey) + require.NoError(t, err) + require.Equal(t, true, exists) + + assert.NotZero(t, id) +} + func Test_ORM_GetManager(t *testing.T) { t.Parallel() ctx := testutils.Context(t) diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index 219e41465b7..9671900309a 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -37,11 +37,13 @@ import ( ) var ( - ErrOCR2Disabled = errors.New("ocr2 is disabled") - ErrOCRDisabled = errors.New("ocr is disabled") - ErrSingleFeedsManager = errors.New("only a single feeds manager is supported") - ErrJobAlreadyExists = errors.New("a job for this contract address already exists - please use the 'force' option to replace it") - ErrFeedsManagerDisabled = errors.New("feeds manager is disabled") + ErrOCR2Disabled = errors.New("ocr2 is disabled") + ErrOCRDisabled = errors.New("ocr is disabled") + // TODO: delete once multiple feeds managers support is released + ErrSingleFeedsManager = errors.New("only a single feeds manager is supported") + ErrDuplicateFeedsManager = errors.New("manager was previously registered using the same public key") + ErrJobAlreadyExists = errors.New("a job for this contract address already exists - please use the 'force' option to replace it") + ErrFeedsManagerDisabled = errors.New("feeds manager is disabled") promJobProposalRequest = promauto.NewCounter(prometheus.CounterOpts{ Name: "feeds_job_proposal_requests", @@ -77,7 +79,6 @@ type Service interface { Start(ctx context.Context) error Close() error - CountManagers(ctx context.Context) (int64, error) GetManager(ctx context.Context, id int64) (*FeedsManager, error) ListManagers(ctx context.Context) ([]FeedsManager, error) ListManagersByIDs(ctx context.Context, ids []int64) ([]FeedsManager, error) @@ -185,15 +186,23 @@ type RegisterManagerParams struct { // RegisterManager registers a new ManagerService and attempts to establish a // connection. -// -// Only a single feeds manager is currently supported. func (s *service) RegisterManager(ctx context.Context, params RegisterManagerParams) (int64, error) { - count, err := s.CountManagers(ctx) - if err != nil { - return 0, err - } - if count >= 1 { - return 0, ErrSingleFeedsManager + if s.gCfg.FeatureMultiFeedsManagers() { + exists, err := s.orm.ManagerExists(ctx, params.PublicKey) + if err != nil { + return 0, err + } + if exists { + return 0, ErrDuplicateFeedsManager + } + } else { + count, err := s.CountManagers(ctx) + if err != nil { + return 0, err + } + if count >= 1 { + return 0, ErrSingleFeedsManager + } } mgr := FeedsManager{ @@ -204,7 +213,7 @@ func (s *service) RegisterManager(ctx context.Context, params RegisterManagerPar var id int64 - err = s.orm.Transact(ctx, func(tx ORM) error { + err := s.orm.Transact(ctx, func(tx ORM) error { var txerr error id, txerr = tx.CreateManager(ctx, &mgr) @@ -324,6 +333,7 @@ func (s *service) ListManagersByIDs(ctx context.Context, ids []int64) ([]FeedsMa } // CountManagers gets the total number of manager services +// TODO: delete once multiple feeds managers support is released func (s *service) CountManagers(ctx context.Context) (int64, error) { return s.orm.CountManagers(ctx) } @@ -1446,7 +1456,6 @@ func (ns NullService) Close() error { return nil } func (ns NullService) ApproveSpec(ctx context.Context, id int64, force bool) error { return ErrFeedsManagerDisabled } -func (ns NullService) CountManagers(ctx context.Context) (int64, error) { return 0, nil } func (ns NullService) CountJobProposalsByStatus(ctx context.Context) (*JobProposalCounts, error) { return nil, ErrFeedsManagerDisabled } diff --git a/core/services/feeds/service_test.go b/core/services/feeds/service_test.go index 0c8e6a7fa55..41c62ea5e06 100644 --- a/core/services/feeds/service_test.go +++ b/core/services/feeds/service_test.go @@ -265,6 +265,63 @@ func Test_Service_RegisterManager(t *testing.T) { assert.Equal(t, actual, id) } +func Test_Service_RegisterManager_MultiFeedsManager(t *testing.T) { + t.Parallel() + + key := cltest.DefaultCSAKey + + var ( + id = int64(1) + pubKeyHex = "0f17c3bf72de8beef6e2d17a14c0a972f5d7e0e66e70722373f12b88382d40f9" + ) + + var pubKey crypto.PublicKey + _, err := hex.Decode([]byte(pubKeyHex), pubKey) + require.NoError(t, err) + + var ( + mgr = feeds.FeedsManager{ + Name: "FMS", + URI: "localhost:8080", + PublicKey: pubKey, + } + params = feeds.RegisterManagerParams{ + Name: "FMS", + URI: "localhost:8080", + PublicKey: pubKey, + } + ) + + svc := setupTestServiceCfg(t, func(c *chainlink.Config, s *chainlink.Secrets) { + var multiFeedsManagers = true + c.Feature.MultiFeedsManagers = &multiFeedsManagers + }) + ctx := testutils.Context(t) + + svc.orm.On("ManagerExists", ctx, params.PublicKey).Return(false, nil) + svc.orm.On("CreateManager", mock.Anything, &mgr, mock.Anything). + Return(id, nil) + svc.orm.On("CreateBatchChainConfig", mock.Anything, params.ChainConfigs, mock.Anything). + Return([]int64{}, nil) + svc.csaKeystore.On("GetAll").Return([]csakey.KeyV2{key}, nil) + // ListManagers runs in a goroutine so it might be called. + svc.orm.On("ListManagers", ctx).Return([]feeds.FeedsManager{mgr}, nil).Maybe() + transactCall := svc.orm.On("Transact", mock.Anything, mock.Anything) + transactCall.Run(func(args mock.Arguments) { + fn := args[1].(func(orm feeds.ORM) error) + transactCall.ReturnArguments = mock.Arguments{fn(svc.orm)} + }) + svc.connMgr.On("Connect", mock.IsType(feeds.ConnectOpts{})) + + actual, err := svc.RegisterManager(ctx, params) + // We need to stop the service because the manager will attempt to make a + // connection + svc.Close() + require.NoError(t, err) + + assert.Equal(t, actual, id) +} + func Test_Service_RegisterManager_InvalidCreateManager(t *testing.T) { t.Parallel() @@ -311,6 +368,45 @@ func Test_Service_RegisterManager_InvalidCreateManager(t *testing.T) { assert.Equal(t, "orm error", err.Error()) } +func Test_Service_RegisterManager_DuplicateFeedsManager(t *testing.T) { + t.Parallel() + + var pubKeyHex = "0f17c3bf72de8beef6e2d17a14c0a972f5d7e0e66e70722373f12b88382d40f9" + var pubKey crypto.PublicKey + _, err := hex.Decode([]byte(pubKeyHex), pubKey) + + var ( + mgr = feeds.FeedsManager{ + Name: "FMS", + URI: "localhost:8080", + PublicKey: pubKey, + } + params = feeds.RegisterManagerParams{ + Name: "FMS", + URI: "localhost:8080", + PublicKey: pubKey, + } + ) + + svc := setupTestServiceCfg(t, func(c *chainlink.Config, s *chainlink.Secrets) { + var multiFeedsManagers = true + c.Feature.MultiFeedsManagers = &multiFeedsManagers + }) + ctx := testutils.Context(t) + + svc.orm.On("ManagerExists", ctx, params.PublicKey).Return(true, nil) + // ListManagers runs in a goroutine so it might be called. + svc.orm.On("ListManagers", ctx).Return([]feeds.FeedsManager{mgr}, nil).Maybe() + + _, err = svc.RegisterManager(ctx, params) + // We need to stop the service because the manager will attempt to make a + // connection + svc.Close() + require.Error(t, err) + + assert.Equal(t, "manager was previously registered using the same public key", err.Error()) +} + func Test_Service_ListManagers(t *testing.T) { t.Parallel() ctx := testutils.Context(t) @@ -388,24 +484,6 @@ func Test_Service_ListManagersByIDs(t *testing.T) { assert.Equal(t, mgrs, actual) } -func Test_Service_CountManagers(t *testing.T) { - t.Parallel() - ctx := testutils.Context(t) - - var ( - count = int64(1) - ) - svc := setupTestService(t) - - svc.orm.On("CountManagers", mock.Anything). - Return(count, nil) - - actual, err := svc.CountManagers(ctx) - require.NoError(t, err) - - assert.Equal(t, count, actual) -} - func Test_Service_CreateChainConfig(t *testing.T) { var ( mgr = feeds.FeedsManager{ID: 1} diff --git a/core/web/resolver/feeds_manager.go b/core/web/resolver/feeds_manager.go index 86705cf2071..0711cb1354b 100644 --- a/core/web/resolver/feeds_manager.go +++ b/core/web/resolver/feeds_manager.go @@ -145,6 +145,7 @@ func (r *CreateFeedsManagerPayloadResolver) ToCreateFeedsManagerSuccess() (*Crea return nil, false } +// TODO: delete once multiple feeds managers support is released func (r *CreateFeedsManagerPayloadResolver) ToSingleFeedsManagerError() (*SingleFeedsManagerErrorResolver, bool) { if r.err != nil && errors.Is(r.err, feeds.ErrSingleFeedsManager) { return NewSingleFeedsManagerError(r.err.Error()), true @@ -153,6 +154,14 @@ func (r *CreateFeedsManagerPayloadResolver) ToSingleFeedsManagerError() (*Single return nil, false } +func (r *CreateFeedsManagerPayloadResolver) ToDuplicateFeedsManagerError() (*DuplicateFeedsManagerErrorResolver, bool) { + if r.err != nil && errors.Is(r.err, feeds.ErrDuplicateFeedsManager) { + return NewDuplicateFeedsManagerError(r.err.Error()), true + } + + return nil, false +} + func (r *CreateFeedsManagerPayloadResolver) ToInputErrors() (*InputErrorsResolver, bool) { if r.inputErrs != nil { var errs []*InputErrorResolver @@ -182,6 +191,7 @@ func (r *CreateFeedsManagerSuccessResolver) FeedsManager() *FeedsManagerResolver } // SingleFeedsManagerErrorResolver - +// TODO: delete once multiple feeds managers support is released type SingleFeedsManagerErrorResolver struct { message string } @@ -200,6 +210,25 @@ func (r *SingleFeedsManagerErrorResolver) Code() ErrorCode { return ErrorCodeUnprocessable } +// DuplicateFeedsManagerErrorResolver - +type DuplicateFeedsManagerErrorResolver struct { + message string +} + +func NewDuplicateFeedsManagerError(message string) *DuplicateFeedsManagerErrorResolver { + return &DuplicateFeedsManagerErrorResolver{ + message: message, + } +} + +func (r *DuplicateFeedsManagerErrorResolver) Message() string { + return r.message +} + +func (r *DuplicateFeedsManagerErrorResolver) Code() ErrorCode { + return ErrorCodeUnprocessable +} + // -- UpdateFeedsManager Mutation -- // UpdateFeedsManagerPayloadResolver - diff --git a/core/web/resolver/feeds_manager_test.go b/core/web/resolver/feeds_manager_test.go index bafb50ab0d5..4237c6a7749 100644 --- a/core/web/resolver/feeds_manager_test.go +++ b/core/web/resolver/feeds_manager_test.go @@ -183,6 +183,10 @@ func Test_CreateFeedsManager(t *testing.T) { message code } + ... on DuplicateFeedsManagerError { + message + code + } ... on NotFoundError { message code @@ -264,6 +268,25 @@ func Test_CreateFeedsManager(t *testing.T) { } }`, }, + { + name: "register duplicate feeds manager error", + authenticated: true, + before: func(ctx context.Context, f *gqlTestFramework) { + f.App.On("GetFeedsService").Return(f.Mocks.feedsSvc) + f.Mocks.feedsSvc. + On("RegisterManager", mock.Anything, mock.IsType(feeds.RegisterManagerParams{})). + Return(int64(0), feeds.ErrDuplicateFeedsManager) + }, + query: mutation, + variables: variables, + result: ` + { + "createFeedsManager": { + "message": "manager was previously registered using the same public key", + "code": "UNPROCESSABLE" + } + }`, + }, { name: "not found", authenticated: true, diff --git a/core/web/resolver/mutation.go b/core/web/resolver/mutation.go index 4da5b1da651..a9c1f634dc3 100644 --- a/core/web/resolver/mutation.go +++ b/core/web/resolver/mutation.go @@ -424,7 +424,7 @@ func (r *Resolver) CreateFeedsManager(ctx context.Context, args struct { id, err := feedsService.RegisterManager(ctx, params) if err != nil { - if errors.Is(err, feeds.ErrSingleFeedsManager) { + if errors.Is(err, feeds.ErrSingleFeedsManager) || errors.Is(err, feeds.ErrDuplicateFeedsManager) { return NewCreateFeedsManagerPayload(nil, err, nil), nil } return nil, err diff --git a/core/web/schema/type/feeds_manager.graphql b/core/web/schema/type/feeds_manager.graphql index 12e8732c8e0..9da8f64e1c2 100644 --- a/core/web/schema/type/feeds_manager.graphql +++ b/core/web/schema/type/feeds_manager.graphql @@ -77,6 +77,13 @@ type CreateFeedsManagerSuccess { feedsManager: FeedsManager! } +type DuplicateFeedsManagerError implements Error { + message: String! + code: ErrorCode! +} + +# DEPRECATED: No longer used since we now support multiple feeds manager. +# Keeping this to avoid breaking change. type SingleFeedsManagerError implements Error { message: String! code: ErrorCode! @@ -84,7 +91,8 @@ type SingleFeedsManagerError implements Error { # CreateFeedsManagerPayload defines the response when creating a feeds manager union CreateFeedsManagerPayload = CreateFeedsManagerSuccess - | SingleFeedsManagerError + | DuplicateFeedsManagerError + | SingleFeedsManagerError # // TODO: delete once multiple feeds managers support is released | NotFoundError | InputErrors