diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 47240ef16..2eb3a7ac7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -14,7 +14,7 @@ jobs: build: strategy: matrix: - image: [{file: "connect.e2e.Dockerfile", name: "connect-simapp"}, {file: "slinky.sidecar.prod.Dockerfile", name: "slinky-sidecar"}, {file: "connect.sidecar.prod.Dockerfile", name: "connect-sidecar"}, {file: "connect.local.Dockerfile", name: "connect-testapp"}, {file: "connect.sidecar.e2e.Dockerfile", name: "connect-e2e-sidecar"}] + image: [{file: "connect.e2e.Dockerfile", name: "connect-simapp"}, {file: "slinky.sidecar.prod.Dockerfile", name: "slinky-sidecar"}, {file: "connect.sidecar.prod.Dockerfile", name: "connect-sidecar"}, {file: "connect.local.Dockerfile", name: "connect-testapp"}] runs-on: ubuntu-latest-m permissions: contents: read @@ -102,13 +102,13 @@ jobs: - name: Update the image tags uses: skip-mev/gitops-actions/update-values@main with: - service: "slinky" + service: "connect" app_id: ${{ vars.DEPLOYER_APP_ID }} app_private_key: ${{ secrets.DEPLOYER_PRIVATE_KEY }} - manifests_repo: "slinky-manifests" + manifests_repo: "slinky-sensitive-manifests" values_file_name: "values-dev.yaml" modified_values: | { ".chain.image.tag": "${{ fromJSON(steps.matrix_output.outputs.result).image['connect-testapp'] }}", - ".sidecar.image.tag": "${{ fromJSON(steps.matrix_output.outputs.result).image['connect-e2e-sidecar'] }}", + ".sidecar.image.tag": "${{ fromJSON(steps.matrix_output.outputs.result).image['connect-sidecar'] }}", } diff --git a/Makefile b/Makefile index 36695cc7c..a101d9a6d 100644 --- a/Makefile +++ b/Makefile @@ -89,7 +89,7 @@ install: tidy docker-build: @echo "Building E2E Docker image..." @DOCKER_BUILDKIT=1 $(DOCKER) build -t skip-mev/connect-e2e -f contrib/images/connect.e2e.Dockerfile . - @DOCKER_BUILDKIT=1 $(DOCKER) build -t skip-mev/connect-e2e-oracle -f contrib/images/connect.sidecar.dev.Dockerfile . + @DOCKER_BUILDKIT=1 $(DOCKER) build -t skip-mev/connect-e2e-oracle -f contrib/images/connect.sidecar.prod.Dockerfile . .PHONY: docker-build diff --git a/contrib/images/connect.sidecar.e2e.Dockerfile b/contrib/images/connect.sidecar.e2e.Dockerfile deleted file mode 100644 index 9a8d1bc12..000000000 --- a/contrib/images/connect.sidecar.e2e.Dockerfile +++ /dev/null @@ -1,20 +0,0 @@ -FROM ghcr.io/skip-mev/connect-dev-base AS builder - -WORKDIR /src/connect - -COPY go.mod . - -RUN go mod download - -COPY . . - -RUN make build - -FROM ubuntu:rolling -EXPOSE 8080 8002 - -COPY --from=builder /src/connect/build/* /usr/local/bin/ -RUN apt-get update && apt-get install -y ca-certificates - -WORKDIR /usr/local/bin/ -ENTRYPOINT [ "connect" ] diff --git a/oracle/config/api.go b/oracle/config/api.go index c24ddc687..8b8e5bf83 100644 --- a/oracle/config/api.go +++ b/oracle/config/api.go @@ -40,6 +40,11 @@ type APIConfig struct { // Name is the name of the provider that corresponds to this config. Name string `json:"name"` + + // MaxBlockHeightAge is the oldest an update from an on-chain data source can be without having its + // block height incremented. In the case where a data source has exceeded this limit and the block + // height is not increasing, price reporting will be skipped until the block height increases. + MaxBlockHeightAge time.Duration `json:"maxBlockHeightAge"` } // Endpoint holds all data necessary for an API provider to connect to a given endpoint @@ -123,5 +128,9 @@ func (c *APIConfig) ValidateBasic() error { } } + if c.MaxBlockHeightAge < 0 { + return fmt.Errorf("max_block_height_age cannot be negative") + } + return nil } diff --git a/oracle/config/api_test.go b/oracle/config/api_test.go index 619ce66ce..28c6d82da 100644 --- a/oracle/config/api_test.go +++ b/oracle/config/api_test.go @@ -135,6 +135,36 @@ func TestAPIConfig(t *testing.T) { }, expectedErr: false, }, + { + name: "good config with max_block_height_age", + config: config.APIConfig{ + Enabled: true, + Timeout: time.Second, + Interval: time.Second, + ReconnectTimeout: time.Second, + MaxQueries: 1, + Name: "test", + Endpoints: []config.Endpoint{{URL: "http://test.com"}}, + BatchSize: 1, + MaxBlockHeightAge: 10 * time.Second, + }, + expectedErr: false, + }, + { + name: "bad config with negative max_block_height_age", + config: config.APIConfig{ + Enabled: true, + Timeout: time.Second, + Interval: time.Second, + ReconnectTimeout: time.Second, + MaxQueries: 1, + Name: "test", + Endpoints: []config.Endpoint{{URL: "http://test.com"}}, + BatchSize: 1, + MaxBlockHeightAge: -10 * time.Second, + }, + expectedErr: true, + }, { name: "bad config with invalid endpoint (no url)", config: config.APIConfig{ diff --git a/providers/apis/defi/ethmulticlient/multi_client.go b/providers/apis/defi/ethmulticlient/multi_client.go index 95a5e47ce..752a84ff7 100644 --- a/providers/apis/defi/ethmulticlient/multi_client.go +++ b/providers/apis/defi/ethmulticlient/multi_client.go @@ -6,6 +6,8 @@ import ( "fmt" "sync" + "github.com/skip-mev/connect/v2/providers/apis/defi/types" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/rpc" "go.uber.org/zap" @@ -23,6 +25,8 @@ type MultiRPCClient struct { // underlying clients clients []EVMClient + + blockAgeChecker types.BlockAgeChecker } // NewMultiRPCClient returns a new MultiRPCClient. @@ -32,9 +36,10 @@ func NewMultiRPCClient( clients []EVMClient, ) EVMClient { return &MultiRPCClient{ - logger: logger, - clients: clients, - api: api, + logger: logger, + clients: clients, + api: api, + blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge), } } @@ -81,12 +86,20 @@ func NewMultiRPCClientFromEndpoints( } return &MultiRPCClient{ - logger: logger.With(zap.String("multi_client", api.Name)), - api: api, - clients: clients, + logger: logger.With(zap.String("multi_client", api.Name)), + api: api, + clients: clients, + blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge), }, nil } +// define a result struct that go routines will populate and append to a slice when they complete their request. +type result struct { + height uint64 + results []rpc.BatchElem + err error +} + // BatchCallContext injects a call to eth_blockNumber, and makes batch calls to the underlying EVMClients. // It returns the response that has the greatest height from the eth_blockNumber call. An error is returned // only when no client was able to successfully provide a height or errored when sending the BatchCall. @@ -95,15 +108,9 @@ func (m *MultiRPCClient) BatchCallContext(ctx context.Context, batchElems []rpc. m.logger.Debug("BatchCallContext called with 0 elems") return nil } - // define a result struct that go routines will populate and append to a slice when they complete their request. - type result struct { - height uint64 - results []rpc.BatchElem - } + results := make([]result, len(m.clients)) - // error slice to capture errors go routines encounter. - errs := make([]error, len(m.clients)) wg := new(sync.WaitGroup) // this is the index of where we will have an eth_blockNumber call. blockNumReqIndex := len(batchElems) @@ -124,7 +131,8 @@ func (m *MultiRPCClient) BatchCallContext(ctx context.Context, batchElems []rpc. // if there was an error, or if the block_num request didn't have result / errored // we log the error and append to error slice. if err != nil || req[blockNumReqIndex].Result == "" || req[blockNumReqIndex].Error != nil { - errs[i] = fmt.Errorf("endpoint request failed: %w, %w", err, req[blockNumReqIndex].Error) + resultErr := fmt.Errorf("endpoint request failed: %w, %w", err, req[blockNumReqIndex].Error) + results[i] = result{0, nil, resultErr} m.logger.Debug( "endpoint request failed", zap.Error(err), @@ -138,7 +146,8 @@ func (m *MultiRPCClient) BatchCallContext(ctx context.Context, batchElems []rpc. // try to get the block number. r, ok := req[blockNumReqIndex].Result.(*string) if !ok { - errs[i] = fmt.Errorf("result from eth_blockNumber was not a string") + resultErr := fmt.Errorf("result from eth_blockNumber was not a string") + results[i] = result{0, nil, resultErr} m.logger.Debug( "result from eth_blockNumber was not a string", zap.String("url", url), @@ -149,7 +158,8 @@ func (m *MultiRPCClient) BatchCallContext(ctx context.Context, batchElems []rpc. // decode the new height height, err := hexutil.DecodeUint64(*r) if err != nil { // if we can't decode the height, log an error. - errs[i] = fmt.Errorf("could not decode hex eth height: %w", err) + resultErr := fmt.Errorf("could not decode hex eth height: %w", err) + results[i] = result{0, nil, resultErr} m.logger.Debug( "could not decode hex eth height", zap.String("url", url), @@ -163,17 +173,31 @@ func (m *MultiRPCClient) BatchCallContext(ctx context.Context, batchElems []rpc. zap.String("url", url), ) // append the results, minus the appended eth_blockNumber request. - results[i] = result{height, req[:blockNumReqIndex]} + results[i] = result{height, req[:blockNumReqIndex], nil} }(clientIdx) } wg.Wait() + filtered, err := m.filterResponses(results) + if err != nil { + return fmt.Errorf("error filtering responses: %w", err) + } + + // copy the results from the results that had the largest height. + copy(batchElems, filtered) + return nil +} + +// filterAccountsResponses chooses the rpc response with the highest block number. +func (m *MultiRPCClient) filterResponses(responses []result) ([]rpc.BatchElem, error) { // see which of the results had the largest height, and store the index of that result. var ( maxHeight uint64 maxHeightIndex int + errs = make([]error, len(responses)) ) - for i, res := range results { + for i, res := range responses { + errs[i] = res.err if res.height > maxHeight { maxHeight = res.height maxHeightIndex = i @@ -183,12 +207,17 @@ func (m *MultiRPCClient) BatchCallContext(ctx context.Context, batchElems []rpc. if maxHeight == 0 { err := errors.Join(errs...) if err != nil { - return err + return nil, err } // this should never happen... but who knows. maybe something terrible happened. - return errors.New("no errors were encountered, however no go routine was able to report a height") + return nil, errors.New("no errors were encountered, however no go routine was able to report a height") + } - // copy the results from the results that had the largest height. - copy(batchElems, results[maxHeightIndex].results) - return nil + + // check the block height + if valid := m.blockAgeChecker.IsHeightValid(maxHeight); !valid { + return nil, fmt.Errorf("height %d is stale and older than %d", maxHeight, m.api.MaxBlockHeightAge) + } + + return responses[maxHeightIndex].results, nil } diff --git a/providers/apis/defi/osmosis/client.go b/providers/apis/defi/osmosis/client.go index c1b0e7b6d..a63312e02 100644 --- a/providers/apis/defi/osmosis/client.go +++ b/providers/apis/defi/osmosis/client.go @@ -12,6 +12,7 @@ import ( "github.com/skip-mev/connect/v2/oracle/config" "github.com/skip-mev/connect/v2/pkg/http" + "github.com/skip-mev/connect/v2/providers/apis/defi/types" "github.com/skip-mev/connect/v2/providers/base/api/metrics" ) @@ -124,6 +125,8 @@ type MultiClientImpl struct { apiMetrics metrics.APIMetrics clients []Client + + blockAgeChecker types.BlockAgeChecker } // NewMultiClient creates a new Client. @@ -150,10 +153,11 @@ func NewMultiClient( } return &MultiClientImpl{ - logger: logger, - api: api, - apiMetrics: apiMetrics, - clients: clients, + logger: logger, + api: api, + apiMetrics: apiMetrics, + clients: clients, + blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge), }, nil } @@ -190,10 +194,11 @@ func NewMultiClientFromEndpoints( } return &MultiClientImpl{ - logger: logger, - api: api, - apiMetrics: apiMetrics, - clients: clients, + logger: logger, + api: api, + apiMetrics: apiMetrics, + clients: clients, + blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge), }, nil } @@ -225,11 +230,11 @@ func (mc *MultiClientImpl) SpotPrice(ctx context.Context, poolID uint64, baseAss wg.Wait() - return filterSpotPriceResponses(resps) + return mc.filterSpotPriceResponses(resps) } // filterSpotPriceResponses chooses the response with the highest block height. -func filterSpotPriceResponses(responses []WrappedSpotPriceResponse) (WrappedSpotPriceResponse, error) { +func (mc *MultiClientImpl) filterSpotPriceResponses(responses []WrappedSpotPriceResponse) (WrappedSpotPriceResponse, error) { if len(responses) == 0 { return WrappedSpotPriceResponse{}, fmt.Errorf("no responses found") } @@ -244,5 +249,10 @@ func filterSpotPriceResponses(responses []WrappedSpotPriceResponse) (WrappedSpot } } + // check the block height + if valid := mc.blockAgeChecker.IsHeightValid(highestHeight); !valid { + return WrappedSpotPriceResponse{}, fmt.Errorf("height %d is stale and older than %d", highestHeight, mc.api.MaxBlockHeightAge) + } + return responses[highestHeightIndex], nil } diff --git a/providers/apis/defi/osmosis/types.go b/providers/apis/defi/osmosis/types.go index a0f5824dd..dbcd85b6d 100644 --- a/providers/apis/defi/osmosis/types.go +++ b/providers/apis/defi/osmosis/types.go @@ -127,6 +127,7 @@ var DefaultAPIConfig = config.APIConfig{ URL: "https://osmosis-api.polkachu.com", }, }, + MaxBlockHeightAge: 30 * time.Second, } type SpotPriceResponse struct { diff --git a/providers/apis/defi/raydium/multi_client.go b/providers/apis/defi/raydium/multi_client.go index 781eae8d9..d43e0530e 100644 --- a/providers/apis/defi/raydium/multi_client.go +++ b/providers/apis/defi/raydium/multi_client.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" "github.com/skip-mev/connect/v2/oracle/config" + "github.com/skip-mev/connect/v2/providers/apis/defi/types" "github.com/skip-mev/connect/v2/providers/base/api/metrics" ) @@ -23,6 +24,8 @@ type MultiJSONRPCClient struct { // underlying clients clients []SolanaJSONRPCClient + + blockAgeChecker types.BlockAgeChecker } // NewMultiJSONRPCClient returns a new MultiJSONRPCClient. @@ -33,10 +36,11 @@ func NewMultiJSONRPCClient( clients []SolanaJSONRPCClient, ) SolanaJSONRPCClient { return &MultiJSONRPCClient{ - logger: logger, - api: api, - apiMetrics: apiMetrics, - clients: clients, + logger: logger, + api: api, + apiMetrics: apiMetrics, + clients: clients, + blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge), } } @@ -80,10 +84,11 @@ func NewMultiJSONRPCClientFromEndpoints( } return &MultiJSONRPCClient{ - logger: logger.With(zap.String("multi_client", Name)), - api: api, - apiMetrics: apiMetrics, - clients: clients, + logger: logger.With(zap.String("multi_client", Name)), + api: api, + apiMetrics: apiMetrics, + clients: clients, + blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge), }, nil } @@ -138,11 +143,11 @@ func (c *MultiJSONRPCClient) GetMultipleAccountsWithOpts( } // filter the responses - return filterAccountsResponses(responses) + return c.filterAccountsResponses(responses) } // filterAccountsResponses chooses the rpc response with the highest slot number. -func filterAccountsResponses(responses []*rpc.GetMultipleAccountsResult) (*rpc.GetMultipleAccountsResult, error) { +func (c *MultiJSONRPCClient) filterAccountsResponses(responses []*rpc.GetMultipleAccountsResult) (*rpc.GetMultipleAccountsResult, error) { var ( maxSlot uint64 maxResp *rpc.GetMultipleAccountsResult @@ -159,5 +164,10 @@ func filterAccountsResponses(responses []*rpc.GetMultipleAccountsResult) (*rpc.G } } + // check the block height (slot) + if valid := c.blockAgeChecker.IsHeightValid(maxSlot); !valid { + return nil, fmt.Errorf("height %d is stale and older than %d", maxSlot, c.api.MaxBlockHeightAge) + } + return maxResp, nil } diff --git a/providers/apis/defi/raydium/types.go b/providers/apis/defi/raydium/types.go index 812718c12..7c7da451d 100644 --- a/providers/apis/defi/raydium/types.go +++ b/providers/apis/defi/raydium/types.go @@ -150,4 +150,5 @@ var DefaultAPIConfig = config.APIConfig{ URL: "https://api.mainnet-beta.solana.com", }, }, + MaxBlockHeightAge: 30 * time.Second, } diff --git a/providers/apis/defi/types/block_age.go b/providers/apis/defi/types/block_age.go new file mode 100644 index 000000000..f998fdc94 --- /dev/null +++ b/providers/apis/defi/types/block_age.go @@ -0,0 +1,42 @@ +package types + +import "time" + +// BlockAgeChecker is a utility type to check if incoming block heights are validly updating. +// If the block heights are not increasing and the time since the last update has exceeded +// a configurable duration, this type will report that the updates are invalid. +type BlockAgeChecker struct { + lastHeight uint64 + lastTimeStamp time.Time + maxAge time.Duration +} + +// NewBlockAgeChecker returns a zeroed BlockAgeChecker using the provided maxAge. +func NewBlockAgeChecker(maxAge time.Duration) BlockAgeChecker { + return BlockAgeChecker{ + lastHeight: 0, + lastTimeStamp: time.Now(), + maxAge: maxAge, + } +} + +// IsHeightValid returns true if: +// - the new height is greater than the last height OR +// - the time past the last block height update is less than the configured max age +// returns false if: +// - the time is past the configured max age. +func (bc *BlockAgeChecker) IsHeightValid(newHeight uint64) bool { + now := time.Now() + + if newHeight > bc.lastHeight { + bc.lastHeight = newHeight + bc.lastTimeStamp = now + return true + } + + if now.Sub(bc.lastTimeStamp) > bc.maxAge { + return false + } + + return true +} diff --git a/providers/apis/defi/types/block_age_test.go b/providers/apis/defi/types/block_age_test.go new file mode 100644 index 000000000..2dd03935a --- /dev/null +++ b/providers/apis/defi/types/block_age_test.go @@ -0,0 +1,66 @@ +package types_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/skip-mev/connect/v2/providers/apis/defi/types" +) + +func TestBlockAgeChecker_IsHeightValid(t *testing.T) { + tests := []struct { + name string + lastHeight uint64 + waitTime time.Duration + maxAge time.Duration + newHeight uint64 + isValid bool + }{ + { + name: "valid 0s no timeout", + lastHeight: 0, + waitTime: 0, + maxAge: 10 * time.Minute, + newHeight: 0, + isValid: true, + }, + { + name: "valid new height no timeout", + lastHeight: 0, + waitTime: 0, + maxAge: 10 * time.Minute, + newHeight: 0, + isValid: true, + }, + { + name: "invalid 0s due to timeout", + lastHeight: 0, + waitTime: 10 * time.Millisecond, + maxAge: 1 * time.Millisecond, + newHeight: 0, + isValid: false, + }, + { + name: "valid timeout but block height increase", + lastHeight: 0, + waitTime: 10 * time.Millisecond, + maxAge: 1 * time.Millisecond, + newHeight: 1, + isValid: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + bc := types.NewBlockAgeChecker(tt.maxAge) + + got := bc.IsHeightValid(tt.lastHeight) + require.True(t, got) + time.Sleep(tt.waitTime) + + got = bc.IsHeightValid(tt.newHeight) + require.Equal(t, tt.isValid, got) + }) + } +} diff --git a/providers/apis/defi/uniswapv3/fetcher.go b/providers/apis/defi/uniswapv3/fetcher.go index 3fe270277..8124fe56c 100644 --- a/providers/apis/defi/uniswapv3/fetcher.go +++ b/providers/apis/defi/uniswapv3/fetcher.go @@ -24,7 +24,7 @@ import ( var _ types.PriceAPIFetcher = (*PriceFetcher)(nil) -// UniswapV3PriceFetcher is the Uniswap V3 price fetcher. This fetcher is responsible for +// PriceFetcher is the Uniswap V3 price fetcher. This fetcher is responsible for // querying Uniswap V3 pool contracts and returning the price of a given ticker. The price is // derived from the slot 0 data of the pool contract. // diff --git a/providers/apis/defi/uniswapv3/utils.go b/providers/apis/defi/uniswapv3/utils.go index a359839a5..720234973 100644 --- a/providers/apis/defi/uniswapv3/utils.go +++ b/providers/apis/defi/uniswapv3/utils.go @@ -92,26 +92,28 @@ var ( // DefaultETHAPIConfig is the default configuration for the Uniswap API. Specifically this is for // Ethereum mainnet. DefaultETHAPIConfig = config.APIConfig{ - Name: fmt.Sprintf("%s%s%s", BaseName, NameSeparator, constants.ETHEREUM), - Atomic: true, - Enabled: true, - Timeout: 1000 * time.Millisecond, - Interval: 2000 * time.Millisecond, - ReconnectTimeout: 2000 * time.Millisecond, - MaxQueries: 1, - Endpoints: []config.Endpoint{{URL: ETH_URL}}, + Name: fmt.Sprintf("%s%s%s", BaseName, NameSeparator, constants.ETHEREUM), + Atomic: true, + Enabled: true, + Timeout: 1000 * time.Millisecond, + Interval: 2000 * time.Millisecond, + ReconnectTimeout: 2000 * time.Millisecond, + MaxQueries: 1, + Endpoints: []config.Endpoint{{URL: ETH_URL}}, + MaxBlockHeightAge: 30 * time.Second, } // DefaultBaseAPIConfig is the default configuration for the Uniswap API. Specifically this is for // Base mainnet. DefaultBaseAPIConfig = config.APIConfig{ - Name: fmt.Sprintf("%s%s%s", BaseName, NameSeparator, constants.BASE), - Atomic: true, - Enabled: true, - Timeout: 1000 * time.Millisecond, - Interval: 2000 * time.Millisecond, - ReconnectTimeout: 2000 * time.Millisecond, - MaxQueries: 1, - Endpoints: []config.Endpoint{{URL: BASE_URL}}, + Name: fmt.Sprintf("%s%s%s", BaseName, NameSeparator, constants.BASE), + Atomic: true, + Enabled: true, + Timeout: 1000 * time.Millisecond, + Interval: 2000 * time.Millisecond, + ReconnectTimeout: 2000 * time.Millisecond, + MaxQueries: 1, + Endpoints: []config.Endpoint{{URL: BASE_URL}}, + MaxBlockHeightAge: 30 * time.Second, } )