diff --git a/README.md b/README.md index e2d77d5..a6ea272 100644 --- a/README.md +++ b/README.md @@ -114,7 +114,7 @@ To build the image for the head: docker build --pull -f docker/Dockerfile_head -t allora-inference-base:dev-head --build-arg "GH_TOKEN=${YOUR_GH_TOKEN}" --build-arg "BLS_EXTENSION_VER=${BLS_EXTENSION_VERSION}" . ``` -Then to build the image for the worker: +To build the image for the worker: ``` docker build --pull -f docker/Dockerfile_worker -t allora-inference-base:dev-worker --build-arg "GH_TOKEN=${YOUR_GH_TOKEN}" --build-arg "BLS_EXTENSION_VER=${BLS_EXTENSION_VERSION}" . diff --git a/cmd/node/appchain.go b/cmd/node/appchain.go index 4d9d05c..242cc8e 100644 --- a/cmd/node/appchain.go +++ b/cmd/node/appchain.go @@ -358,12 +358,142 @@ func (ap *AppChain) SendWorkerModeData(ctx context.Context, topicId uint64, resu }() } +// Can only look up the topic stakes of this many reputers at a time +const DEFAULT_MAX_REPUTERS_FOR_STAKE_QUERY = uint64(100) + +// Only this number times MaxLimit (whose default is given above) of reputer stakes can be gathered at once +const MAX_NUMBER_STAKE_QUERIES_PER_REQUEST = uint64(3) + +// Get the stake of each reputer in the given topic +func (ap *AppChain) getStakePerReputer(ctx context.Context, topicId uint64, reputerAddrs []*string) (map[string]cosmossdk_io_math.Int, error) { + maxReputers := DEFAULT_MAX_REPUTERS_FOR_STAKE_QUERY + params, err := ap.QueryClient.Params(ctx, &types.QueryParamsRequest{}) + if err != nil { + ap.Logger.Error().Err(err).Uint64("topic", topicId).Msg("could not get chain params") + } + if err == nil { + maxReputers = params.Params.MaxLimit + } + + numberRequestsForStake := MAX_NUMBER_STAKE_QUERIES_PER_REQUEST + var stakesPerReputer = make(map[string]cosmossdk_io_math.Int) // This will be populated with each request/loop below + for i := uint64(0); i < numberRequestsForStake; i++ { + // Dereference only the needed reputer addresses to get the actual strings + addresses := make([]string, 0) + start := i * maxReputers + end := (i + 1) * maxReputers + if end > uint64(len(reputerAddrs)) { + end = uint64(len(reputerAddrs)) + } + if start >= end { + break + } + for _, addr := range reputerAddrs[start:end] { + if addr == nil { + return nil, fmt.Errorf("nil address in reputerAddrs") + } + addresses = append(addresses, *addr) + } + res, err := ap.QueryClient.GetMultiReputerStakeInTopic(ctx, &types.QueryMultiReputerStakeInTopicRequest{ + TopicId: topicId, + Addresses: addresses, + }) + if err != nil { + ap.Logger.Error().Err(err).Uint64("topic", topicId).Msg("could not get reputer stakes from the chain") + return nil, err + } + + // Create a map of reputer addresses to their stakes + for _, stake := range res.Amounts { + stakesPerReputer[stake.Reputer] = stake.Amount + } + } + + return stakesPerReputer, err +} + +func (ap *AppChain) argmaxBlockByStake( + blockToReputer *map[int64][]string, + stakesPerReputer map[string]cosmossdk_io_math.Int, +) int64 { + // Find the current block height with the highest voting power + firstIter := true + highestVotingPower := cosmossdk_io_math.ZeroInt() + blockOfMaxPower := int64(-1) + for block, reputersWhoVotedForBlock := range *blockToReputer { + // Calc voting power of this candidate block by total voting reputer stake + blockVotingPower := cosmossdk_io_math.ZeroInt() + for _, reputerAddr := range reputersWhoVotedForBlock { + blockVotingPower = blockVotingPower.Add(stakesPerReputer[reputerAddr]) + } + + // Decide if voting power exceeds that of current front-runner + if firstIter || blockVotingPower.GT(highestVotingPower) { + blockOfMaxPower = block + } + + firstIter = false + } + + return blockOfMaxPower +} + +func (ap *AppChain) argmaxBlockByCount( + blockToReputer *map[int64][]string, +) int64 { + // Find the current block height with the highest voting power + firstIter := true + highestVotingPower := cosmossdk_io_math.ZeroInt() + blockOfMaxPower := int64(-1) + for block, reputersWhoVotedForBlock := range *blockToReputer { + // Calc voting power of this candidate block by total reputer count + blockVotingPower := cosmossdk_io_math.NewInt(int64(len(reputersWhoVotedForBlock))) + + // Decide if voting power exceeds that of current front-runner + if firstIter || blockVotingPower.GT(highestVotingPower) { + blockOfMaxPower = block + } + + firstIter = false + } + + return blockOfMaxPower +} + +// Take stake-weighted vote of what the reputer leader thinks the current and eval block heights should be +func (ap *AppChain) getStakeWeightedBlockHeights( + ctx context.Context, + topicId uint64, + blockCurrentToReputer, blockEvalToReputer *map[int64][]string, + reputerAddrs []*string, +) (int64, int64, error) { + useWeightedVote := true + stakesPerReputer, err := ap.getStakePerReputer(ctx, topicId, reputerAddrs) + if err != nil { + ap.Logger.Error().Err(err).Uint64("topic", topicId).Msg("error getting reputer stakes from the chain => using unweighted vote") + // This removes a strict requirement for the reputer leader to have the correct stake + // at the cost of potentially allowing sybil attacks, though Blockless itself somewhat mitigates this + useWeightedVote = false + } + + // Find the current and ev block height with the highest voting power + if useWeightedVote { + return ap.argmaxBlockByStake(blockCurrentToReputer, stakesPerReputer), ap.argmaxBlockByStake(blockEvalToReputer, stakesPerReputer), nil + } else { + return ap.argmaxBlockByCount(blockCurrentToReputer), ap.argmaxBlockByCount(blockEvalToReputer), nil + } +} + // Sending Losses to the AppChain func (ap *AppChain) SendReputerModeData(ctx context.Context, topicId uint64, results aggregate.Results) { // Aggregate the forecast from reputer leader var valueBundles []*types.ReputerValueBundle + var reputerAddrs []*string + var reputerAddrSet = make(map[string]bool) // Prevents duplicate reputer addresses from being counted in vote tally var nonceCurrent *types.Nonce var nonceEval *types.Nonce + var blockCurrentToReputer = make(map[int64][]string) // map blockHeight to addresses of reputers who sent data for current block height + var blockEvalToReputer = make(map[int64][]string) // map blockHeight to addresses of reputers who sent data for eval block height for _, result := range results { if len(result.Peers) > 0 { @@ -382,46 +512,82 @@ func (ap *AppChain) SendReputerModeData(ctx context.Context, topicId uint64, res ap.Logger.Info().Str("Reputer Address", res.Address).Msg("Reputer Address") } - // Parse the result from the reputer to get the inference and forecasts - // Parse the result from the worker to get the inference and forecasts - var value ReputerDataResponse - err = json.Unmarshal([]byte(result.Result.Stdout), &value) - if err != nil { - ap.Logger.Warn().Err(err).Str("peer", peer.String()).Msg("error extracting WorkerDataBundle from stdout, ignoring bundle.") - continue - } - if nonceCurrent == nil { - nonceCurrent = &types.Nonce{BlockHeight: value.BlockHeight} - } - if nonceEval == nil { - nonceEval = &types.Nonce{BlockHeight: value.BlockHeightEval} - } + if _, ok := reputerAddrSet[res.Address]; !ok { + reputerAddrSet[res.Address] = true - // Here reputer leader can choose to validate data further to ensure set is correct and act accordingly - if value.ReputerValueBundle == nil { - ap.Logger.Warn().Str("peer", peer.String()).Msg("ReputerValueBundle is nil from stdout, ignoring bundle.") - continue - } - if value.ReputerValueBundle.ValueBundle == nil { - ap.Logger.Warn().Str("peer", peer.String()).Msg("ValueBundle is nil from stdout, ignoring bundle.") - continue - } - if value.ReputerValueBundle.ValueBundle.TopicId != topicId { - ap.Logger.Warn().Str("peer", peer.String()).Msg("ReputerValueBundle topicId does not match with request topicId, ignoring bundle.") - continue - } - // Append the WorkerDataBundle (only) to the WorkerDataBundles slice - valueBundles = append(valueBundles, value.ReputerValueBundle) + // Parse the result from the reputer to get the losses + // Parse the result from the worker to get the inferences and forecasts + var value ReputerDataResponse + err = json.Unmarshal([]byte(result.Result.Stdout), &value) + if err != nil { + ap.Logger.Warn().Err(err).Str("peer", peer.String()).Str("Value", result.Result.Stdout).Msg("error extracting ReputerDataResponse from stdout, ignoring bundle.") + continue + } + // Here reputer leader can choose to validate data further to ensure set is correct and act accordingly + if value.ReputerValueBundle == nil { + ap.Logger.Warn().Str("peer", peer.String()).Msg("ReputerValueBundle is nil from stdout, ignoring bundle.") + continue + } + if value.ReputerValueBundle.ValueBundle == nil { + ap.Logger.Warn().Str("peer", peer.String()).Msg("ValueBundle is nil from stdout, ignoring bundle.") + continue + } + if value.ReputerValueBundle.ValueBundle.TopicId != topicId { + ap.Logger.Warn().Str("peer", peer.String()).Msg("ReputerValueBundle topicId does not match with request topicId, ignoring bundle.") + continue + } + // Append the WorkerDataBundle (only) to the WorkerDataBundles slice + valueBundles = append(valueBundles, value.ReputerValueBundle) + reputerAddrs = append(reputerAddrs, &res.Address) + blockCurrentToReputer[value.BlockHeight] = append(blockCurrentToReputer[value.BlockHeight], res.Address) + blockEvalToReputer[value.BlockHeightEval] = append(blockEvalToReputer[value.BlockHeightEval], res.Address) + } } else { ap.Logger.Warn().Msg("No peers in the result, ignoring") } } - if nonceCurrent == nil || nonceEval == nil { - ap.Logger.Error().Uint64("topic", topicId).Msg("No valid ReputerDataBundles with nonces found, not sending data to the chain") + if len(reputerAddrs) == 0 { + ap.Logger.Warn().Msg("No reputer addresses found, not sending data to the chain") + return + } + + blockCurrentHeight, blockEvalHeight, err := ap.getStakeWeightedBlockHeights(ctx, topicId, &blockCurrentToReputer, &blockEvalToReputer, reputerAddrs) + if err != nil { + ap.Logger.Error().Err(err).Msg("could not get stake-weighted block heights, not sending data to the chain") return } + if blockCurrentHeight == -1 || blockEvalHeight == -1 { + ap.Logger.Error().Msg("could not get stake-weighted block heights, not sending data to the chain") + return + } + if blockCurrentHeight < blockEvalHeight { + ap.Logger.Error().Int64("blockCurrentHeight", blockCurrentHeight).Int64("blockEvalHeight", blockEvalHeight).Msg("blockCurrentHeight < blockEvalHeight, not sending data to the chain") + return + } + nonceCurrent = &types.Nonce{BlockHeight: blockCurrentHeight} + nonceEval = &types.Nonce{BlockHeight: blockEvalHeight} + + // Remove those bundles that do not come from the current block height + var valueBundlesFiltered []*types.ReputerValueBundle + + for _, valueBundle := range valueBundles { + if valueBundle.ValueBundle.ReputerRequestNonce.ReputerNonce.BlockHeight == blockCurrentHeight && valueBundle.ValueBundle.ReputerRequestNonce.WorkerNonce.BlockHeight == blockEvalHeight { + ap.Logger.Debug(). + Str("reputer", valueBundle.ValueBundle.Reputer). + Str("nonce reputer", strconv.FormatInt(valueBundle.ValueBundle.ReputerRequestNonce.ReputerNonce.BlockHeight, 10)). + Str("nonce worker", strconv.FormatInt(valueBundle.ValueBundle.ReputerRequestNonce.WorkerNonce.BlockHeight, 10)). + Msg("Valid nonce, adding to valueBundlesFiltered") + valueBundlesFiltered = append(valueBundlesFiltered, valueBundle) + } else { + ap.Logger.Warn(). + Str("reputer", valueBundle.ValueBundle.Reputer). + Str("nonce reputer", strconv.FormatInt(valueBundle.ValueBundle.ReputerRequestNonce.ReputerNonce.BlockHeight, 10)). + Str("nonce worker", strconv.FormatInt(valueBundle.ValueBundle.ReputerRequestNonce.WorkerNonce.BlockHeight, 10)). + Msg("Rejected Bundle, non-matching nonces.") + } + } // Make 1 request per worker req := &types.MsgInsertBulkReputerPayload{ diff --git a/docker/Dockerfile_worker b/docker/Dockerfile_worker index 44962f5..080696d 100644 --- a/docker/Dockerfile_worker +++ b/docker/Dockerfile_worker @@ -71,9 +71,9 @@ RUN if [ -n $BLS_EXTENSION_VER]; then \ COPY --from=builder /src/dist/allora-node /usr/local/bin/allora-node COPY --from=builder /src/dist/allora-keys /usr/local/bin/allora-keys -# Smoke test -RUN /app/runtime/bls-runtime --help && \ - /app/runtime/extensions/allora-inference-extension --help +# # Smoke test +# RUN /app/runtime/bls-runtime --help && \ +# /app/runtime/extensions/allora-inference-extension --help RUN groupadd -g 1001 ${USERNAME} \ && useradd -m -d ${APP_PATH} -u 1001 -g 1001 ${USERNAME} \ diff --git a/go.mod b/go.mod index 88816bd..fb6f59e 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.22.2 require ( cosmossdk.io/math v1.3.0 - github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240523180314-287ef36181fe + github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240526151956-574b2f09d09e github.com/allora-network/b7s v0.0.2-0.20240418175046-eca9bfd68831 github.com/cockroachdb/pebble v1.1.0 github.com/cosmos/cosmos-sdk v0.50.5 diff --git a/go.sum b/go.sum index 39dace3..422efa6 100644 --- a/go.sum +++ b/go.sum @@ -57,10 +57,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240520155716-eb79d5859957 h1:9xJ3KMib5e4p+SXtl8Z0zLMNcJcijBMqVQKXIkLjUjk= -github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240520155716-eb79d5859957/go.mod h1:7UrL7qr/wLTnBBfTGZHHui9tjDfx89FvDj22YD2TVow= -github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240523180314-287ef36181fe h1:fbf7tNwCdhoYjUSMFSbRp3TCRJPcG6NLyWA0Q2Py/vk= -github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240523180314-287ef36181fe/go.mod h1:7UrL7qr/wLTnBBfTGZHHui9tjDfx89FvDj22YD2TVow= +github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240526151956-574b2f09d09e h1:8SQlhMZDDPm51KqDiGu5GEU/ZhYA+ZAgWOegfrxmiOI= +github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240526151956-574b2f09d09e/go.mod h1:7UrL7qr/wLTnBBfTGZHHui9tjDfx89FvDj22YD2TVow= github.com/allora-network/b7s v0.0.2-0.20240418175046-eca9bfd68831 h1:4s9e1sjeHlqG4SWoV29vcf/WGX9KeATx1V38X4k2f+I= github.com/allora-network/b7s v0.0.2-0.20240418175046-eca9bfd68831/go.mod h1:rJJrdC5Y83LEDFxo/iJp3JJpi8I6TJncOTigMWk8ieE= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=