diff --git a/cmd/connect/main.go b/cmd/connect/main.go index 8cb1cf8f2..50e4c68eb 100644 --- a/cmd/connect/main.go +++ b/cmd/connect/main.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/http" + //nolint: gosec _ "net/http/pprof" "os" diff --git a/oracle/market_mapper.go b/oracle/market_mapper.go index 8e1083a7a..525595a02 100644 --- a/oracle/market_mapper.go +++ b/oracle/market_mapper.go @@ -3,13 +3,61 @@ package oracle import ( "context" "encoding/json" + "fmt" "os" "strings" "time" "go.uber.org/zap" + + mmtypes "github.com/skip-mev/connect/v2/x/marketmap/types" ) +// IsMarketMapValidUpdated checks if the given MarketMapResponse is an update to the existing MarketMap. +// - returns an error if the market map is fully invalid or the response is invalid +// - returns false if the market map is not updated +// - returns true and the new market map if the new market map is updated and valid. +func (o *OracleImpl) IsMarketMapValidUpdated(resp *mmtypes.MarketMapResponse) (mmtypes.MarketMap, bool, error) { + if resp == nil { + return mmtypes.MarketMap{}, false, fmt.Errorf("nil response") + } + + // TODO: restore LastUpdated check when on-chain logic is fixed + + // check equality of the response and our current market map + if o.marketMap.Equal(resp.MarketMap) { + o.logger.Info("market map has not changed") + return mmtypes.MarketMap{}, false, nil + } + + // if the value has changed, check for a Valid subset + validSubset, err := resp.MarketMap.GetValidSubset() + if err != nil { + o.logger.Error("failed to validate market map", zap.Error(err)) + return mmtypes.MarketMap{}, false, fmt.Errorf("failed to validate market map: %w", err) + } + + // Detect removed markets and surface info about the removals + var removedMarkets []string + for t := range resp.MarketMap.Markets { + if _, in := validSubset.Markets[t]; !in { + removedMarkets = append(removedMarkets, t) + } + } + if len(validSubset.Markets) == 0 || len(validSubset.Markets) != len(resp.MarketMap.Markets) { + o.logger.Warn("invalid market map update has caused some markets to be removed") + o.logger.Info("markets removed from invalid market map", zap.String("markets", strings.Join(removedMarkets, " "))) + } + + // Update the oracle with the latest market map iff the market map has changed. + if o.marketMap.Equal(validSubset) { + o.logger.Debug("market map has not changed") + return mmtypes.MarketMap{}, false, nil + } + + return validSubset, true, nil +} + // listenForMarketMapUpdates is a goroutine that listens for market map updates and // updates the orchestrated providers with the new market map. This method assumes a market map provider is present, // so callers of this method must nil check the provider first. @@ -43,38 +91,18 @@ func (o *OracleImpl) listenForMarketMapUpdates(ctx context.Context) { continue } - if o.lastUpdated != 0 && o.lastUpdated == result.Value.LastUpdated { - o.logger.Debug("skipping market map update on no lastUpdated change", zap.Uint64("lastUpdated", o.lastUpdated)) - continue - } - - validSubset, err := result.Value.MarketMap.GetValidSubset() + newMarketMap, isUpdated, err := o.IsMarketMapValidUpdated(result.Value) if err != nil { - o.logger.Error("failed to validate market map", zap.Error(err)) + o.logger.Error("failed to check new market map", zap.Error(err)) continue } - // Detect removed markets and surface info about the removals - var removedMarkets []string - for t := range result.Value.MarketMap.Markets { - if _, in := validSubset.Markets[t]; !in { - removedMarkets = append(removedMarkets, t) - } - } - if len(validSubset.Markets) == 0 || len(validSubset.Markets) != len(result.Value.MarketMap.Markets) { - o.logger.Warn("invalid market map update has caused some markets to be removed") - o.logger.Info("markets removed from invalid market map", zap.String("markets", strings.Join(removedMarkets, " "))) - } - - // Update the oracle with the latest market map iff the market map has changed. - updated := validSubset - if o.marketMap.Equal(updated) { - o.logger.Debug("market map has not changed") + if !isUpdated { continue } o.logger.Info("updating oracle with new market map") - if err := o.UpdateMarketMap(updated); err != nil { + if err := o.UpdateMarketMap(newMarketMap); err != nil { o.logger.Error("failed to update oracle with new market map", zap.Error(err)) continue } @@ -87,7 +115,7 @@ func (o *OracleImpl) listenForMarketMapUpdates(ctx context.Context) { } o.logger.Info("updated oracle with new market map") - o.logger.Debug("updated oracle with new market map", zap.Any("market_map", updated)) + o.logger.Debug("updated oracle with new market map", zap.Any("market_map", newMarketMap)) } } } diff --git a/oracle/market_mapper_test.go b/oracle/market_mapper_test.go index 507e138f7..b74af755c 100644 --- a/oracle/market_mapper_test.go +++ b/oracle/market_mapper_test.go @@ -12,11 +12,106 @@ import ( "github.com/stretchr/testify/require" "github.com/skip-mev/connect/v2/oracle" + connecttypes "github.com/skip-mev/connect/v2/pkg/types" oraclefactory "github.com/skip-mev/connect/v2/providers/factories/oracle" + "github.com/skip-mev/connect/v2/providers/providertest" mmclienttypes "github.com/skip-mev/connect/v2/service/clients/marketmap/types" mmtypes "github.com/skip-mev/connect/v2/x/marketmap/types" ) +var ( + btcusdt = mmtypes.Market{ + Ticker: mmtypes.Ticker{ + CurrencyPair: connecttypes.CurrencyPair{ + Base: "BITCOIN", + Quote: "USDT", + }, + Decimals: 8, + MinProviderCount: 1, + }, + ProviderConfigs: []mmtypes.ProviderConfig{ + { + Name: "kucoin", + OffChainTicker: "btc-usdt", + }, + }, + } + + usdtusd = mmtypes.Market{ + Ticker: mmtypes.Ticker{ + CurrencyPair: connecttypes.CurrencyPair{ + Base: "USDT", + Quote: "USD", + }, + Decimals: 8, + MinProviderCount: 1, + }, + ProviderConfigs: []mmtypes.ProviderConfig{ + { + Name: "kucoin", + OffChainTicker: "usdt-usd", + }, + }, + } + + usdcusd = mmtypes.Market{ + Ticker: mmtypes.Ticker{ + CurrencyPair: connecttypes.CurrencyPair{ + Base: "USDC", + Quote: "USD", + }, + Decimals: 8, + MinProviderCount: 1, + }, + ProviderConfigs: []mmtypes.ProviderConfig{ + { + Name: "kucoin", + OffChainTicker: "usdc-usd", + }, + }, + } + + ethusdt = mmtypes.Market{ + Ticker: mmtypes.Ticker{ + CurrencyPair: connecttypes.CurrencyPair{ + Base: "ETHEREUM", + Quote: "USDT", + }, + Decimals: 8, + MinProviderCount: 1, + }, + ProviderConfigs: []mmtypes.ProviderConfig{ + { + Name: "kucoin", + OffChainTicker: "eth-usdt", + // include a normalize pair + NormalizeByPair: &usdcusd.Ticker.CurrencyPair, + }, + }, + } + + marketsMap = map[string]mmtypes.Market{ + btcusdt.Ticker.String(): btcusdt, + usdcusd.Ticker.String(): usdcusd, + usdtusd.Ticker.String(): usdtusd, + ethusdt.Ticker.String(): ethusdt, + } + + // invalid because we are excluding the usdcusd pair which + // is used as a normalization in ethusdt. + marketsMapInvalid = map[string]mmtypes.Market{ + btcusdt.Ticker.String(): btcusdt, + usdtusd.Ticker.String(): usdtusd, + ethusdt.Ticker.String(): ethusdt, + } + + // remove the ethusdt which was requiring a normalization pair that wasn't in the map. + marketsMapValidSubset = map[string]mmtypes.Market{ + btcusdt.Ticker.String(): btcusdt, + usdtusd.Ticker.String(): usdtusd, + } +) + func TestListenForMarketMapUpdates(t *testing.T) { t.Run("mapper has no chain IDs to fetch should not update the oracle", func(t *testing.T) { handler, factory := marketMapperFactory(t, nil) @@ -339,3 +434,107 @@ func TestListenForMarketMapUpdates(t *testing.T) { o.Stop() }) } + +func TestOracleImpl_IsMarketMapValidUpdated(t *testing.T) { + tests := []struct { + name string + resp *mmtypes.MarketMapResponse + initialMarketMap mmtypes.MarketMap + lastUpdated uint64 + wantMM mmtypes.MarketMap + wantUpdated bool + wantErr bool + }{ + { + name: "error on nil response", + wantErr: true, + }, + { + name: "do nothing on empty response - no initial state", + resp: &mmtypes.MarketMapResponse{}, + wantErr: false, + wantUpdated: false, + }, + { + name: "response is empty - initial state - update to empty", + initialMarketMap: mmtypes.MarketMap{ + Markets: marketsMap, + }, + resp: &mmtypes.MarketMapResponse{}, + wantErr: false, + wantUpdated: true, + wantMM: mmtypes.MarketMap{ + Markets: map[string]mmtypes.Market{}, + }, + }, + { + name: "response is equal - initial state - do nothing", + initialMarketMap: mmtypes.MarketMap{ + Markets: marketsMap, + }, + resp: &mmtypes.MarketMapResponse{ + MarketMap: mmtypes.MarketMap{ + Markets: marketsMap, + }, + }, + wantErr: false, + wantUpdated: false, + }, + { + name: "response is invalid - initial state - update to valid", + initialMarketMap: mmtypes.MarketMap{ + Markets: marketsMap, + }, + resp: &mmtypes.MarketMapResponse{ + MarketMap: mmtypes.MarketMap{ + Markets: marketsMapInvalid, + }, + }, + wantErr: false, + wantUpdated: true, + wantMM: mmtypes.MarketMap{ + Markets: marketsMapValidSubset, + }, + }, + { + name: "response is invalid - no initial state - update to valid", + resp: &mmtypes.MarketMapResponse{ + MarketMap: mmtypes.MarketMap{ + Markets: marketsMapInvalid, + }, + }, + wantErr: false, + wantUpdated: true, + wantMM: mmtypes.MarketMap{ + Markets: marketsMapValidSubset, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + o, err := providertest.NewTestingOracle(ctx, []string{}, + oracle.WithLastUpdated(tt.lastUpdated), + oracle.WithMarketMap(tt.initialMarketMap), + ) + require.NoError(t, err) + gotMM, isUpdated, err := o.Oracle.IsMarketMapValidUpdated(tt.resp) + + if tt.wantErr { + require.Error(t, err) + require.False(t, isUpdated) + return + } + + require.NoError(t, err) + + if tt.wantUpdated { + require.Equal(t, tt.wantMM, gotMM) + require.Equal(t, tt.wantUpdated, isUpdated) + return + } + + require.Equal(t, tt.wantUpdated, isUpdated) + }) + } +} diff --git a/oracle/options.go b/oracle/options.go index 2d9cf81a7..32b9b1bfc 100644 --- a/oracle/options.go +++ b/oracle/options.go @@ -34,6 +34,13 @@ func WithMarketMap(marketMap mmtypes.MarketMap) Option { } } +// WithLastUpdated sets the last updated for the oracle. +func WithLastUpdated(lastUpdated uint64) Option { + return func(m *OracleImpl) { + m.lastUpdated = lastUpdated + } +} + // WithPriceAPIQueryHandlerFactory sets the Price API query handler factory for the oracle. // Specifically, this is what is utilized to construct price providers that are API based. func WithPriceAPIQueryHandlerFactory(factory types.PriceAPIQueryHandlerFactory) Option { diff --git a/providers/providertest/provider.go b/providers/providertest/provider.go index 9be1baa49..01dc35166 100644 --- a/providers/providertest/provider.go +++ b/providers/providertest/provider.go @@ -37,7 +37,7 @@ func (o *TestingOracle) UpdateMarketMap(mm mmtypes.MarketMap) error { return o.Oracle.UpdateMarketMap(mm) } -func NewTestingOracle(ctx context.Context, providerNames ...string) (TestingOracle, error) { +func NewTestingOracle(ctx context.Context, providerNames []string, extraOpts ...oracle.Option) (TestingOracle, error) { logCfg := log.NewDefaultConfig() logCfg.StdOutLogLevel = "debug" logCfg.FileOutLogLevel = "debug" @@ -54,13 +54,18 @@ func NewTestingOracle(ctx context.Context, providerNames ...string) (TestingOrac return TestingOracle{}, fmt.Errorf("failed to create oracle config: %w", err) } - orc, err := oracle.New( - cfg, - agg, + opts := []oracle.Option{ oracle.WithLogger(logger), oracle.WithPriceAPIQueryHandlerFactory(oraclefactory.APIQueryHandlerFactory), oracle.WithPriceWebSocketQueryHandlerFactory(oraclefactory.WebSocketQueryHandlerFactory), oracle.WithMarketMapperFactory(oraclefactory.MarketMapProviderFactory), + } + opts = append(opts, extraOpts...) + + orc, err := oracle.New( + cfg, + agg, + opts..., ) if err != nil { return TestingOracle{}, fmt.Errorf("failed to create oracle: %w", err)