diff --git a/.gitignore b/.gitignore index caa271342..a6049937a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ - # Redis redis-cache @@ -26,3 +25,7 @@ sqs.log # Vim swap files .*.swp .*.swo + +# Delve debug +__debug_bin* +debug.test* diff --git a/Makefile b/Makefile index 1c5f28064..20341d2c5 100644 --- a/Makefile +++ b/Makefile @@ -139,6 +139,9 @@ build-reproducible-arm64: go.sum load-test-ui: $(DOCKER) compose -f locust/docker-compose.yml up --scale worker=4 +debug: + dlv --build-flags="-ldflags='-X github.com/osmosis-labs/sqs/version=${VERSION}'" debug app/*.go -- --config ./config.json + profile: go tool pprof -http=:8080 http://localhost:9092/debug/pprof/profile?seconds=60 diff --git a/docs/architecture/passthrough.md b/docs/architecture/passthrough.md index e0619a8bb..1b0574c3c 100644 --- a/docs/architecture/passthrough.md +++ b/docs/architecture/passthrough.md @@ -43,4 +43,4 @@ Unstaking value from delegator undelegating chain query. #### Unclaimed Rewards -Unclaimed rewards from concentrated liquidity positions. +Unclaimed rewards from concentrated liquidity positions and unclaimed rewards from staking rewards. diff --git a/domain/mocks/passthrough_grpc_client_mock.go b/domain/mocks/passthrough_grpc_client_mock.go index a2fe0f0dd..535e64af6 100644 --- a/domain/mocks/passthrough_grpc_client_mock.go +++ b/domain/mocks/passthrough_grpc_client_mock.go @@ -15,6 +15,7 @@ type PassthroughGRPCClientMock struct { MockDelegatorDelegationsCb func(ctx context.Context, address string) (sdk.Coins, error) MockDelegatorUnbondingDelegationsCb func(ctx context.Context, address string) (sdk.Coins, error) MockUserPositionsBalancesCb func(ctx context.Context, address string) (sdk.Coins, sdk.Coins, error) + MockDelegationRewardsCb func(ctx context.Context, address string) (sdk.Coins, error) } // AccountLockedCoins implements passthroughdomain.PassthroughGRPCClient. @@ -71,4 +72,13 @@ func (p *PassthroughGRPCClientMock) AccountUnlockingCoins(ctx context.Context, a return nil, errors.New("MockAccountLockedCoinsCb is not implemented") } +// DelegationRewards implements passthroughdomain.PassthroughGRPCClient. +func (p *PassthroughGRPCClientMock) DelegationRewards(ctx context.Context, address string) (sdk.Coins, error) { + if p.MockDelegationRewardsCb != nil { + return p.MockDelegationRewardsCb(ctx, address) + } + + return nil, errors.New("MockDelegationRewardsCb is not implemented") +} + var _ passthroughdomain.PassthroughGRPCClient = &PassthroughGRPCClientMock{} diff --git a/domain/passthrough/passthrough_grpc_client.go b/domain/passthrough/passthrough_grpc_client.go index 391caa659..831eaf5b6 100644 --- a/domain/passthrough/passthrough_grpc_client.go +++ b/domain/passthrough/passthrough_grpc_client.go @@ -6,6 +6,7 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" query "github.com/cosmos/cosmos-sdk/types/query" banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" + distribution "github.com/cosmos/cosmos-sdk/x/distribution/types" staking "github.com/cosmos/cosmos-sdk/x/staking/types" concentratedLiquidity "github.com/osmosis-labs/osmosis/v25/x/concentrated-liquidity/client/queryproto" lockup "github.com/osmosis-labs/osmosis/v25/x/lockup/types" @@ -34,6 +35,9 @@ type PassthroughGRPCClient interface { // UserPositionsBalances returns the user concentrated positions balances of the user with the given address. // The first return is the pooled balance. The second return is the reward balance. UserPositionsBalances(ctx context.Context, address string) (sdk.Coins, sdk.Coins, error) + + // DelegationTotalRewards returns the total unclaimed staking rewards accrued of the user with the given address. + DelegationRewards(ctx context.Context, address string) (sdk.Coins, error) } type PassthroughFetchFn func(context.Context, string) (sdk.Coins, error) @@ -48,6 +52,7 @@ type passthroughGRPCClient struct { stakingQueryClient staking.QueryClient lockupQueryClient lockup.QueryClient concentratedLiquidityQueryClient concentratedLiquidity.QueryClient + distributionClient distribution.QueryClient } const ( @@ -72,6 +77,7 @@ func NewPassthroughGRPCClient(grpcURI string) (PassthroughGRPCClient, error) { stakingQueryClient: staking.NewQueryClient(grpcClient), lockupQueryClient: lockup.NewQueryClient(grpcClient), concentratedLiquidityQueryClient: concentratedLiquidity.NewQueryClient(grpcClient), + distributionClient: distribution.NewQueryClient(grpcClient), }, nil } @@ -168,6 +174,23 @@ func (p *passthroughGRPCClient) UserPositionsBalances(ctx context.Context, addre return pooledCoins, rewardCoins, nil } +func (p *passthroughGRPCClient) DelegationRewards(ctx context.Context, address string) (sdk.Coins, error) { + response, err := p.distributionClient.DelegationTotalRewards( + ctx, + &distribution.QueryDelegationTotalRewardsRequest{DelegatorAddress: address}, + ) + if err != nil { + return nil, err + } + + var rewardCoins = sdk.Coins{} + for _, v := range response.GetTotal() { + rewardCoins = append(rewardCoins, sdk.Coin{Denom: v.Denom, Amount: v.Amount.TruncateInt()}) + } + + return rewardCoins, nil +} + func paginateRequest(ctx context.Context, fetchCoinsFn func(ctx context.Context, pageRequest *query.PageRequest) (*query.PageResponse, sdk.Coins, error)) (sdk.Coins, error) { var ( isFirstRequest = true diff --git a/passthrough/usecase/passthrough_usecase.go b/passthrough/usecase/passthrough_usecase.go index 780e43a27..8f1a5cb5f 100644 --- a/passthrough/usecase/passthrough_usecase.go +++ b/passthrough/usecase/passthrough_usecase.go @@ -100,6 +100,11 @@ const ( // 2. Concentrated positions pooledBalancedNumJobs = 2 + // Number of unclaimed rewards jobs to fetch concurrently. + // 1. Unclaimed rewards from concentrated positions + // 2. Unclaimed rewards from staking rewards + unclaimedRewardsNumJobs = 2 + // locked + unlocking numInLocksQueries = 2 ) @@ -131,7 +136,7 @@ func (p *passthroughUseCase) GetPortfolioAssets(ctx context.Context, address str defer close(pooledBalancesChan) // Channel to fetch unclaimed rewards concurrently. - unclaimedRewardsChan := make(chan coinsResult) + unclaimedRewardsChan := make(chan coinsResult, unclaimedRewardsNumJobs) defer close(unclaimedRewardsChan) go func() { @@ -168,6 +173,17 @@ func (p *passthroughUseCase) GetPortfolioAssets(ctx context.Context, address str } }() + go func() { + // Fetch unclaimed staking rewards concurrently + unclaimedStakingRewards, err := p.passthroughGRPCClient.DelegationRewards(ctx, address) + + // Send unclaimed rewards to the unclaimed rewards channel + unclaimedRewardsChan <- coinsResult{ + coins: unclaimedStakingRewards, + err: err, + } + }() + // Aggregate poold coins callback getPooledCoins := func(ctx context.Context, address string) (sdk.Coins, error) { pooledCoins := sdk.Coins{} @@ -202,8 +218,29 @@ func (p *passthroughUseCase) GetPortfolioAssets(ctx context.Context, address str // Callback to fetch unclaimed rewards concurrently. getUnclaimedRewards := func(ctx context.Context, address string) (sdk.Coins, error) { - unclaimedRewardsResult := <-unclaimedRewardsChan - return unclaimedRewardsResult.coins, unclaimedRewardsResult.err + unclaimedCoins := sdk.Coins{} + + var finalErr error + for i := 0; i < unclaimedRewardsNumJobs; i++ { + unclaimedRewardsResult := <-unclaimedRewardsChan + + if unclaimedRewardsResult.err != nil { + // Rather than returning the error, log it and continue + finalErr = unclaimedRewardsResult.err + + // Ensure that coins are valid to be added and avoid panic. + if len(unclaimedRewardsResult.coins) > 0 && !unclaimedRewardsResult.coins.IsAnyNil() { + unclaimedCoins = unclaimedCoins.Add(unclaimedRewardsResult.coins...) + } + + continue + } + + unclaimedCoins = unclaimedCoins.Add(unclaimedRewardsResult.coins...) + } + + // Return error and best-effort result + return unclaimedCoins, finalErr } // Fetch jobs to fetch the portfolio assets concurrently in separate gorooutines. diff --git a/passthrough/usecase/passthrough_usecase_test.go b/passthrough/usecase/passthrough_usecase_test.go index 06c9185f4..78fb3c148 100644 --- a/passthrough/usecase/passthrough_usecase_test.go +++ b/passthrough/usecase/passthrough_usecase_test.go @@ -166,6 +166,10 @@ func (s *PassthroughUseCaseTestSuite) TestGetPotrfolioAssets_HappyPath() { // Return error to test the silent error handling. return sdk.NewCoins(wbtcCoin), sdk.NewCoins(invalidCoin), miscError }, + MockDelegationRewardsCb: func(ctx context.Context, address string) (sdk.Coins, error) { + // Return error to test the silent error handling. + return sdk.NewCoins(osmoCoin), miscError + }, } // Initialize pools use case mock @@ -213,11 +217,11 @@ func (s *PassthroughUseCaseTestSuite) TestGetPotrfolioAssets_HappyPath() { IsBestEffort: true, }, usecase.UnclaimedRewardsAssetsCategoryName: { - Capitalization: zero, + Capitalization: osmoCapitalization, IsBestEffort: true, }, usecase.TotalAssetsCategoryName: { - Capitalization: osmoCapitalization.Add(osmoCapitalization).Add(osmoCapitalization).Add(atomCapitalization).Add(wbtcCapitalization), + Capitalization: osmoCapitalization.Add(osmoCapitalization).Add(osmoCapitalization).Add(atomCapitalization).Add(wbtcCapitalization).Add(osmoCapitalization), AccountCoinsResult: []passthroughdomain.AccountCoinsResult{ { Coin: atomCoin, @@ -232,8 +236,8 @@ func (s *PassthroughUseCaseTestSuite) TestGetPotrfolioAssets_HappyPath() { CapitalizationValue: zero, }, { - Coin: osmoCoin.Add(osmoCoin).Add(osmoCoin), - CapitalizationValue: osmoCapitalization.Add(osmoCapitalization).Add(osmoCapitalization), + Coin: osmoCoin.Add(osmoCoin).Add(osmoCoin).Add(osmoCoin), + CapitalizationValue: osmoCapitalization.Add(osmoCapitalization).Add(osmoCapitalization).Add(osmoCapitalization), }, }, IsBestEffort: true,