Skip to content

Commit

Permalink
fix: sidecar no longer checks LastUpdated (#800)
Browse files Browse the repository at this point in the history
  • Loading branch information
aljo242 authored Oct 22, 2024
1 parent 422229b commit f4be9c5
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 29 deletions.
1 change: 1 addition & 0 deletions cmd/connect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net/http"

//nolint: gosec
_ "net/http/pprof"
"os"
Expand Down
78 changes: 53 additions & 25 deletions oracle/market_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,61 @@ package oracle
import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
"time"

"go.uber.org/zap"

mmtypes "github.com/skip-mev/connect/v2/x/marketmap/types"
)

// IsMarketMapValidUpdated checks if the given MarketMapResponse is an update to the existing MarketMap.
// - returns an error if the market map is fully invalid or the response is invalid
// - returns false if the market map is not updated
// - returns true and the new market map if the new market map is updated and valid.
func (o *OracleImpl) IsMarketMapValidUpdated(resp *mmtypes.MarketMapResponse) (mmtypes.MarketMap, bool, error) {
if resp == nil {
return mmtypes.MarketMap{}, false, fmt.Errorf("nil response")
}

// TODO: restore LastUpdated check when on-chain logic is fixed

// check equality of the response and our current market map
if o.marketMap.Equal(resp.MarketMap) {
o.logger.Info("market map has not changed")
return mmtypes.MarketMap{}, false, nil
}

// if the value has changed, check for a Valid subset
validSubset, err := resp.MarketMap.GetValidSubset()
if err != nil {
o.logger.Error("failed to validate market map", zap.Error(err))
return mmtypes.MarketMap{}, false, fmt.Errorf("failed to validate market map: %w", err)
}

// Detect removed markets and surface info about the removals
var removedMarkets []string
for t := range resp.MarketMap.Markets {
if _, in := validSubset.Markets[t]; !in {
removedMarkets = append(removedMarkets, t)
}
}
if len(validSubset.Markets) == 0 || len(validSubset.Markets) != len(resp.MarketMap.Markets) {
o.logger.Warn("invalid market map update has caused some markets to be removed")
o.logger.Info("markets removed from invalid market map", zap.String("markets", strings.Join(removedMarkets, " ")))
}

// Update the oracle with the latest market map iff the market map has changed.
if o.marketMap.Equal(validSubset) {
o.logger.Debug("market map has not changed")
return mmtypes.MarketMap{}, false, nil
}

return validSubset, true, nil
}

// listenForMarketMapUpdates is a goroutine that listens for market map updates and
// updates the orchestrated providers with the new market map. This method assumes a market map provider is present,
// so callers of this method must nil check the provider first.
Expand Down Expand Up @@ -43,38 +91,18 @@ func (o *OracleImpl) listenForMarketMapUpdates(ctx context.Context) {
continue
}

if o.lastUpdated != 0 && o.lastUpdated == result.Value.LastUpdated {
o.logger.Debug("skipping market map update on no lastUpdated change", zap.Uint64("lastUpdated", o.lastUpdated))
continue
}

validSubset, err := result.Value.MarketMap.GetValidSubset()
newMarketMap, isUpdated, err := o.IsMarketMapValidUpdated(result.Value)
if err != nil {
o.logger.Error("failed to validate market map", zap.Error(err))
o.logger.Error("failed to check new market map", zap.Error(err))
continue
}

// Detect removed markets and surface info about the removals
var removedMarkets []string
for t := range result.Value.MarketMap.Markets {
if _, in := validSubset.Markets[t]; !in {
removedMarkets = append(removedMarkets, t)
}
}
if len(validSubset.Markets) == 0 || len(validSubset.Markets) != len(result.Value.MarketMap.Markets) {
o.logger.Warn("invalid market map update has caused some markets to be removed")
o.logger.Info("markets removed from invalid market map", zap.String("markets", strings.Join(removedMarkets, " ")))
}

// Update the oracle with the latest market map iff the market map has changed.
updated := validSubset
if o.marketMap.Equal(updated) {
o.logger.Debug("market map has not changed")
if !isUpdated {
continue
}

o.logger.Info("updating oracle with new market map")
if err := o.UpdateMarketMap(updated); err != nil {
if err := o.UpdateMarketMap(newMarketMap); err != nil {
o.logger.Error("failed to update oracle with new market map", zap.Error(err))
continue
}
Expand All @@ -87,7 +115,7 @@ func (o *OracleImpl) listenForMarketMapUpdates(ctx context.Context) {
}

o.logger.Info("updated oracle with new market map")
o.logger.Debug("updated oracle with new market map", zap.Any("market_map", updated))
o.logger.Debug("updated oracle with new market map", zap.Any("market_map", newMarketMap))
}
}
}
Expand Down
199 changes: 199 additions & 0 deletions oracle/market_mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,106 @@ import (
"github.com/stretchr/testify/require"

"github.com/skip-mev/connect/v2/oracle"
connecttypes "github.com/skip-mev/connect/v2/pkg/types"
oraclefactory "github.com/skip-mev/connect/v2/providers/factories/oracle"
"github.com/skip-mev/connect/v2/providers/providertest"
mmclienttypes "github.com/skip-mev/connect/v2/service/clients/marketmap/types"
mmtypes "github.com/skip-mev/connect/v2/x/marketmap/types"
)

var (
btcusdt = mmtypes.Market{
Ticker: mmtypes.Ticker{
CurrencyPair: connecttypes.CurrencyPair{
Base: "BITCOIN",
Quote: "USDT",
},
Decimals: 8,
MinProviderCount: 1,
},
ProviderConfigs: []mmtypes.ProviderConfig{
{
Name: "kucoin",
OffChainTicker: "btc-usdt",
},
},
}

usdtusd = mmtypes.Market{
Ticker: mmtypes.Ticker{
CurrencyPair: connecttypes.CurrencyPair{
Base: "USDT",
Quote: "USD",
},
Decimals: 8,
MinProviderCount: 1,
},
ProviderConfigs: []mmtypes.ProviderConfig{
{
Name: "kucoin",
OffChainTicker: "usdt-usd",
},
},
}

usdcusd = mmtypes.Market{
Ticker: mmtypes.Ticker{
CurrencyPair: connecttypes.CurrencyPair{
Base: "USDC",
Quote: "USD",
},
Decimals: 8,
MinProviderCount: 1,
},
ProviderConfigs: []mmtypes.ProviderConfig{
{
Name: "kucoin",
OffChainTicker: "usdc-usd",
},
},
}

ethusdt = mmtypes.Market{
Ticker: mmtypes.Ticker{
CurrencyPair: connecttypes.CurrencyPair{
Base: "ETHEREUM",
Quote: "USDT",
},
Decimals: 8,
MinProviderCount: 1,
},
ProviderConfigs: []mmtypes.ProviderConfig{
{
Name: "kucoin",
OffChainTicker: "eth-usdt",
// include a normalize pair
NormalizeByPair: &usdcusd.Ticker.CurrencyPair,
},
},
}

marketsMap = map[string]mmtypes.Market{
btcusdt.Ticker.String(): btcusdt,
usdcusd.Ticker.String(): usdcusd,
usdtusd.Ticker.String(): usdtusd,
ethusdt.Ticker.String(): ethusdt,
}

// invalid because we are excluding the usdcusd pair which
// is used as a normalization in ethusdt.
marketsMapInvalid = map[string]mmtypes.Market{
btcusdt.Ticker.String(): btcusdt,
usdtusd.Ticker.String(): usdtusd,
ethusdt.Ticker.String(): ethusdt,
}

// remove the ethusdt which was requiring a normalization pair that wasn't in the map.
marketsMapValidSubset = map[string]mmtypes.Market{
btcusdt.Ticker.String(): btcusdt,
usdtusd.Ticker.String(): usdtusd,
}
)

func TestListenForMarketMapUpdates(t *testing.T) {
t.Run("mapper has no chain IDs to fetch should not update the oracle", func(t *testing.T) {
handler, factory := marketMapperFactory(t, nil)
Expand Down Expand Up @@ -339,3 +434,107 @@ func TestListenForMarketMapUpdates(t *testing.T) {
o.Stop()
})
}

func TestOracleImpl_IsMarketMapValidUpdated(t *testing.T) {
tests := []struct {
name string
resp *mmtypes.MarketMapResponse
initialMarketMap mmtypes.MarketMap
lastUpdated uint64
wantMM mmtypes.MarketMap
wantUpdated bool
wantErr bool
}{
{
name: "error on nil response",
wantErr: true,
},
{
name: "do nothing on empty response - no initial state",
resp: &mmtypes.MarketMapResponse{},
wantErr: false,
wantUpdated: false,
},
{
name: "response is empty - initial state - update to empty",
initialMarketMap: mmtypes.MarketMap{
Markets: marketsMap,
},
resp: &mmtypes.MarketMapResponse{},
wantErr: false,
wantUpdated: true,
wantMM: mmtypes.MarketMap{
Markets: map[string]mmtypes.Market{},
},
},
{
name: "response is equal - initial state - do nothing",
initialMarketMap: mmtypes.MarketMap{
Markets: marketsMap,
},
resp: &mmtypes.MarketMapResponse{
MarketMap: mmtypes.MarketMap{
Markets: marketsMap,
},
},
wantErr: false,
wantUpdated: false,
},
{
name: "response is invalid - initial state - update to valid",
initialMarketMap: mmtypes.MarketMap{
Markets: marketsMap,
},
resp: &mmtypes.MarketMapResponse{
MarketMap: mmtypes.MarketMap{
Markets: marketsMapInvalid,
},
},
wantErr: false,
wantUpdated: true,
wantMM: mmtypes.MarketMap{
Markets: marketsMapValidSubset,
},
},
{
name: "response is invalid - no initial state - update to valid",
resp: &mmtypes.MarketMapResponse{
MarketMap: mmtypes.MarketMap{
Markets: marketsMapInvalid,
},
},
wantErr: false,
wantUpdated: true,
wantMM: mmtypes.MarketMap{
Markets: marketsMapValidSubset,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
o, err := providertest.NewTestingOracle(ctx, []string{},
oracle.WithLastUpdated(tt.lastUpdated),
oracle.WithMarketMap(tt.initialMarketMap),
)
require.NoError(t, err)
gotMM, isUpdated, err := o.Oracle.IsMarketMapValidUpdated(tt.resp)

if tt.wantErr {
require.Error(t, err)
require.False(t, isUpdated)
return
}

require.NoError(t, err)

if tt.wantUpdated {
require.Equal(t, tt.wantMM, gotMM)
require.Equal(t, tt.wantUpdated, isUpdated)
return
}

require.Equal(t, tt.wantUpdated, isUpdated)
})
}
}
7 changes: 7 additions & 0 deletions oracle/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ func WithMarketMap(marketMap mmtypes.MarketMap) Option {
}
}

// WithLastUpdated sets the last updated for the oracle.
func WithLastUpdated(lastUpdated uint64) Option {
return func(m *OracleImpl) {
m.lastUpdated = lastUpdated
}
}

// WithPriceAPIQueryHandlerFactory sets the Price API query handler factory for the oracle.
// Specifically, this is what is utilized to construct price providers that are API based.
func WithPriceAPIQueryHandlerFactory(factory types.PriceAPIQueryHandlerFactory) Option {
Expand Down
13 changes: 9 additions & 4 deletions providers/providertest/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (o *TestingOracle) UpdateMarketMap(mm mmtypes.MarketMap) error {
return o.Oracle.UpdateMarketMap(mm)
}

func NewTestingOracle(ctx context.Context, providerNames ...string) (TestingOracle, error) {
func NewTestingOracle(ctx context.Context, providerNames []string, extraOpts ...oracle.Option) (TestingOracle, error) {
logCfg := log.NewDefaultConfig()
logCfg.StdOutLogLevel = "debug"
logCfg.FileOutLogLevel = "debug"
Expand All @@ -54,13 +54,18 @@ func NewTestingOracle(ctx context.Context, providerNames ...string) (TestingOrac
return TestingOracle{}, fmt.Errorf("failed to create oracle config: %w", err)
}

orc, err := oracle.New(
cfg,
agg,
opts := []oracle.Option{
oracle.WithLogger(logger),
oracle.WithPriceAPIQueryHandlerFactory(oraclefactory.APIQueryHandlerFactory),
oracle.WithPriceWebSocketQueryHandlerFactory(oraclefactory.WebSocketQueryHandlerFactory),
oracle.WithMarketMapperFactory(oraclefactory.MarketMapProviderFactory),
}
opts = append(opts, extraOpts...)

orc, err := oracle.New(
cfg,
agg,
opts...,
)
if err != nil {
return TestingOracle{}, fmt.Errorf("failed to create oracle: %w", err)
Expand Down

0 comments on commit f4be9c5

Please sign in to comment.