Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

Commit

Permalink
[ORA-1625] Check Actor Balance Before Registration (#127)
Browse files Browse the repository at this point in the history
Check the user has a sufficient balance to register before wasting their
transaction fees on a failed registration.
  • Loading branch information
relyt29 authored Jun 15, 2024
1 parent 261effa commit 1da9db5
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 42 deletions.
93 changes: 65 additions & 28 deletions cmd/node/appchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
"time"

cosmossdk_io_math "cosmossdk.io/math"
"github.com/allora-network/allora-chain/x/emissions/types"
chainParams "github.com/allora-network/allora-chain/app/params"
emissionstypes "github.com/allora-network/allora-chain/x/emissions/types"
"github.com/allora-network/b7s/models/blockless"
"github.com/allora-network/b7s/node/aggregate"
sdktypes "github.com/cosmos/cosmos-sdk/types"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
"github.com/ignite/cli/v28/ignite/pkg/cosmosaccount"
"github.com/ignite/cli/v28/ignite/pkg/cosmosclient"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -118,20 +120,20 @@ func NewAppChain(config AppChainConfig, log zerolog.Logger) (*AppChain, error) {
}

// Create query client
queryClient := types.NewQueryClient(client.Context())
queryClient := emissionstypes.NewQueryClient(client.Context())

// this is terrible, no isConnected as part of this code path
if client.Context().ChainID == "" {
return nil, nil
}

appchain := &AppChain{
Address: address,
Account: account,
Logger: log,
Client: client,
QueryClient: queryClient,
Config: config,
Address: address,
Account: account,
Logger: log,
Client: client,
EmissionsQueryClient: queryClient,
Config: config,
}

if config.NodeRole == blockless.WorkerNode {
Expand Down Expand Up @@ -160,7 +162,7 @@ func parseTopicIds(appchain *AppChain, topicIds []string) []uint64 {
func isReputerRegistered(appchain *AppChain, topicId uint64) (bool, error) {
ctx := context.Background()

res, err := appchain.QueryClient.IsReputerRegisteredInTopicId(ctx, &types.QueryIsReputerRegisteredInTopicIdRequest{
res, err := appchain.EmissionsQueryClient.IsReputerRegisteredInTopicId(ctx, &emissionstypes.QueryIsReputerRegisteredInTopicIdRequest{
TopicId: topicId,
Address: appchain.Address,
})
Expand All @@ -175,7 +177,7 @@ func isReputerRegistered(appchain *AppChain, topicId uint64) (bool, error) {
func isWorkerRegistered(appchain *AppChain, topicId uint64) (bool, error) {
ctx := context.Background()

res, err := appchain.QueryClient.IsWorkerRegisteredInTopicId(ctx, &types.QueryIsWorkerRegisteredInTopicIdRequest{
res, err := appchain.EmissionsQueryClient.IsWorkerRegisteredInTopicId(ctx, &emissionstypes.QueryIsWorkerRegisteredInTopicIdRequest{
TopicId: topicId,
Address: appchain.Address,
})
Expand All @@ -187,6 +189,21 @@ func isWorkerRegistered(appchain *AppChain, topicId uint64) (bool, error) {
return res.IsRegistered, nil
}

func hasBalanceForRegistration(
ctx context.Context,
appchain *AppChain,
registrationFee cosmossdk_io_math.Int,
) (bool, error) {
resp, err := appchain.BankQueryClient.Balance(ctx, &banktypes.QueryBalanceRequest{
Address: appchain.Address,
Denom: chainParams.DefaultBondDenom,
})
if err != nil {
return false, err
}
return registrationFee.LTE(resp.Balance.Amount), nil
}

// / Registration
func registerWithBlockchain(appchain *AppChain) {
ctx := context.Background()
Expand All @@ -206,6 +223,11 @@ func registerWithBlockchain(appchain *AppChain) {
// Print the array entries as a comma-separated value list
topicsList := strings.Join(strings.Fields(fmt.Sprint(b7sTopicIds)), ", ")
appchain.Logger.Info().Str("topicsList", topicsList).Msg("Topics list")
moduleParams, err := appchain.EmissionsQueryClient.Params(ctx, &emissionstypes.QueryParamsRequest{})
if err != nil {
appchain.Logger.Error().Err(err).Msg("could not get chain params")
return
}

// Iterate each topic
for _, topicId := range b7sTopicIds {
Expand All @@ -221,8 +243,23 @@ func registerWithBlockchain(appchain *AppChain) {
continue
}
if !is_registered {
hasBalance, err := hasBalanceForRegistration(ctx, appchain, moduleParams.Params.RegistrationFee)
if err != nil {
appchain.Logger.Error().Err(err).
Uint64("topic", topicId).
Str("addr", appchain.Address).
Msg("could not check if the node has enough balance to register, skipping.")
continue
}
if !hasBalance {
appchain.Logger.Error().
Uint64("topic", topicId).
Str("addr", appchain.Address).
Msg("node does not have enough balance to register, skipping.")
continue
}
// register the wroker in the topic
msg := &types.MsgRegister{
msg := &emissionstypes.MsgRegister{
Sender: appchain.Address,
LibP2PKey: appchain.Config.LibP2PKey,
MultiAddress: appchain.Config.MultiAddress,
Expand All @@ -239,7 +276,7 @@ func registerWithBlockchain(appchain *AppChain) {
if isReputer {
var initstake = appchain.Config.InitialStake
if initstake > 0 {
msg := &types.MsgAddStake{
msg := &emissionstypes.MsgAddStake{
Sender: appchain.Address,
Amount: cosmossdk_io_math.NewInt(initstake),
TopicId: topicId,
Expand Down Expand Up @@ -287,14 +324,14 @@ func (ap *AppChain) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, Max
// Sending Inferences/Forecasts to the AppChain
func (ap *AppChain) SendWorkerModeData(ctx context.Context, topicId uint64, results aggregate.Results) {
// Aggregate the inferences from all peers/workers
WorkerDataBundles := make([]*types.WorkerDataBundle, 0)
var nonce *types.Nonce
WorkerDataBundles := make([]*emissionstypes.WorkerDataBundle, 0)
var nonce *emissionstypes.Nonce
for _, result := range results {
for _, peer := range result.Peers {
ap.Logger.Debug().Str("worker peer", peer.String())

// Get Peer's $allo address
res, err := ap.QueryClient.GetWorkerAddressByP2PKey(ctx, &types.QueryWorkerAddressByP2PKeyRequest{
res, err := ap.EmissionsQueryClient.GetWorkerAddressByP2PKey(ctx, &emissionstypes.QueryWorkerAddressByP2PKeyRequest{
Libp2PKey: peer.String(),
})
if err != nil {
Expand All @@ -311,7 +348,7 @@ func (ap *AppChain) SendWorkerModeData(ctx context.Context, topicId uint64, resu
continue
}
if nonce == nil {
nonce = &types.Nonce{BlockHeight: value.BlockHeight}
nonce = &emissionstypes.Nonce{BlockHeight: value.BlockHeight}
}
// Here reputer leader can choose to validate data further to ensure set is correct and act accordingly
if value.WorkerDataBundle == nil {
Expand Down Expand Up @@ -339,7 +376,7 @@ func (ap *AppChain) SendWorkerModeData(ctx context.Context, topicId uint64, resu
}

// Make 1 request per worker
req := &types.MsgInsertBulkWorkerPayload{
req := &emissionstypes.MsgInsertBulkWorkerPayload{
Sender: ap.Address,
Nonce: nonce,
TopicId: topicId,
Expand Down Expand Up @@ -367,7 +404,7 @@ const MAX_NUMBER_STAKE_QUERIES_PER_REQUEST = uint64(3)
// Get the stake of each reputer in the given topic
func (ap *AppChain) getStakePerReputer(ctx context.Context, topicId uint64, reputerAddrs []*string) (map[string]cosmossdk_io_math.Int, error) {
maxReputers := DEFAULT_MAX_REPUTERS_FOR_STAKE_QUERY
params, err := ap.QueryClient.Params(ctx, &types.QueryParamsRequest{})
params, err := ap.EmissionsQueryClient.Params(ctx, &emissionstypes.QueryParamsRequest{})
if err != nil {
ap.Logger.Error().Err(err).Uint64("topic", topicId).Msg("could not get chain params")
}
Expand All @@ -394,7 +431,7 @@ func (ap *AppChain) getStakePerReputer(ctx context.Context, topicId uint64, repu
}
addresses = append(addresses, *addr)
}
res, err := ap.QueryClient.GetMultiReputerStakeInTopic(ctx, &types.QueryMultiReputerStakeInTopicRequest{
res, err := ap.EmissionsQueryClient.GetMultiReputerStakeInTopic(ctx, &emissionstypes.QueryMultiReputerStakeInTopicRequest{
TopicId: topicId,
Addresses: addresses,
})
Expand Down Expand Up @@ -487,11 +524,11 @@ func (ap *AppChain) getStakeWeightedBlockHeights(
// Sending Losses to the AppChain
func (ap *AppChain) SendReputerModeData(ctx context.Context, topicId uint64, results aggregate.Results) {
// Aggregate the forecast from reputer leader
var valueBundles []*types.ReputerValueBundle
var valueBundles []*emissionstypes.ReputerValueBundle
var reputerAddrs []*string
var reputerAddrSet = make(map[string]bool) // Prevents duplicate reputer addresses from being counted in vote tally
var nonceCurrent *types.Nonce
var nonceEval *types.Nonce
var nonceCurrent *emissionstypes.Nonce
var nonceEval *emissionstypes.Nonce
var blockCurrentToReputer = make(map[int64][]string) // map blockHeight to addresses of reputers who sent data for current block height
var blockEvalToReputer = make(map[int64][]string) // map blockHeight to addresses of reputers who sent data for eval block height

Expand All @@ -501,7 +538,7 @@ func (ap *AppChain) SendReputerModeData(ctx context.Context, topicId uint64, res
ap.Logger.Debug().Str("worker peer", peer.String())

// Get Peer $allo address
res, err := ap.QueryClient.GetReputerAddressByP2PKey(ctx, &types.QueryReputerAddressByP2PKeyRequest{
res, err := ap.EmissionsQueryClient.GetReputerAddressByP2PKey(ctx, &emissionstypes.QueryReputerAddressByP2PKeyRequest{
Libp2PKey: peer.String(),
})
if err != nil {
Expand Down Expand Up @@ -566,11 +603,11 @@ func (ap *AppChain) SendReputerModeData(ctx context.Context, topicId uint64, res
ap.Logger.Error().Int64("blockCurrentHeight", blockCurrentHeight).Int64("blockEvalHeight", blockEvalHeight).Msg("blockCurrentHeight < blockEvalHeight, not sending data to the chain")
return
}
nonceCurrent = &types.Nonce{BlockHeight: blockCurrentHeight}
nonceEval = &types.Nonce{BlockHeight: blockEvalHeight}
nonceCurrent = &emissionstypes.Nonce{BlockHeight: blockCurrentHeight}
nonceEval = &emissionstypes.Nonce{BlockHeight: blockEvalHeight}

// Remove those bundles that do not come from the current block height
var valueBundlesFiltered []*types.ReputerValueBundle
var valueBundlesFiltered []*emissionstypes.ReputerValueBundle

for _, valueBundle := range valueBundles {
if valueBundle.ValueBundle.ReputerRequestNonce.ReputerNonce.BlockHeight == blockCurrentHeight && valueBundle.ValueBundle.ReputerRequestNonce.WorkerNonce.BlockHeight == blockEvalHeight {
Expand All @@ -590,9 +627,9 @@ func (ap *AppChain) SendReputerModeData(ctx context.Context, topicId uint64, res
}

// Make 1 request per worker
req := &types.MsgInsertBulkReputerPayload{
req := &emissionstypes.MsgInsertBulkReputerPayload{
Sender: ap.Address,
ReputerRequestNonce: &types.ReputerRequestNonce{
ReputerRequestNonce: &emissionstypes.ReputerRequestNonce{
ReputerNonce: nonceCurrent,
WorkerNonce: nonceEval,
},
Expand Down
1 change: 0 additions & 1 deletion cmd/node/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func sendResultsToChain(log zerolog.Logger, appChainClient *AppChain, res node.C

log.Debug().Str("Topic", res.Topic).Str("worker mode", appChainClient.Config.WorkerMode).Msg("Found topic ID")

// TODO: We can move this context to the AppChain struct (previous context was breaking the tx broadcast response)
reqCtx := context.Background()
if appChainClient.Config.WorkerMode == WorkerModeWorker { // for inference or forecast
topicId, err := strconv.ParseUint(res.Topic, 10, 64)
Expand Down
20 changes: 11 additions & 9 deletions cmd/node/types.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package main

import (
"github.com/allora-network/allora-chain/x/emissions/types"
emissionstypes "github.com/allora-network/allora-chain/x/emissions/types"
"github.com/allora-network/b7s/config"
"github.com/allora-network/b7s/models/blockless"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
"github.com/ignite/cli/v28/ignite/pkg/cosmosaccount"
"github.com/ignite/cli/v28/ignite/pkg/cosmosclient"
"github.com/rs/zerolog"
Expand All @@ -15,12 +16,13 @@ type alloraCfg struct {
}

type AppChain struct {
Address string
Account cosmosaccount.Account
Client *cosmosclient.Client
QueryClient types.QueryClient
Config AppChainConfig
Logger zerolog.Logger
Address string
Account cosmosaccount.Account
Client *cosmosclient.Client
EmissionsQueryClient emissionstypes.QueryClient
BankQueryClient banktypes.QueryClient
Config AppChainConfig
Logger zerolog.Logger
}

type AppChainConfig struct {
Expand Down Expand Up @@ -55,7 +57,7 @@ type InferenceForecastResponse struct {
}

type WorkerDataResponse struct {
*types.WorkerDataBundle
*emissionstypes.WorkerDataBundle
BlockHeight int64 `json:"blockHeight,omitempty"`
TopicId int64 `json:"topicId,omitempty"`
}
Expand All @@ -74,7 +76,7 @@ type ValueBundle struct {

// Wrapper around the ReputerValueBundle to include the block height and topic id for the leader
type ReputerDataResponse struct {
*types.ReputerValueBundle
*emissionstypes.ReputerValueBundle
BlockHeight int64 `json:"blockHeight,omitempty"`
BlockHeightEval int64 `json:"blockHeightEval,omitempty"`
TopicId int64 `json:"topicId,omitempty"`
Expand Down
3 changes: 1 addition & 2 deletions docker/Dockerfile_worker
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ ENV DEBIAN_FRONTEND=noninteractive \
APP_PATH=/app

## curl, unzip other utilities
#! libssl-dev - BLS_RUNTIME dependency # - temporary use libssl 1.1 TODO: Should use fresher libssl
#! gh - to downaload release from priv repo
#! gh - to download release from priv repo
RUN apt update && \
apt -y dist-upgrade && \
apt install -y --no-install-recommends \
Expand Down
3 changes: 1 addition & 2 deletions docker/Dockerfile_worker_py3.9
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ ENV DEBIAN_FRONTEND=noninteractive \
APP_PATH=/app

## curl, unzip other utilities
#! libssl-dev - BLS_RUNTIME dependency # - temporary use libssl 1.1 TODO: Should use fresher libssl
#! gh - to downaload release from priv repo
#! gh - to download release from priv repo
RUN apt update && \
apt -y dist-upgrade && \
apt install -y --no-install-recommends \
Expand Down

0 comments on commit 1da9db5

Please sign in to comment.