Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

Commit

Permalink
Diego/ora 1465 devnet no valid bundles (#122)
Browse files Browse the repository at this point in the history
* Do not send empty inferences or forecasts if not received, and
nonempty.
* Update chain changes in protos
* Refactors
  • Loading branch information
xmariachi authored May 23, 2024
1 parent 783aaef commit b9a71a0
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 60 deletions.
30 changes: 15 additions & 15 deletions cmd/node/appchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ func NewAppChain(config AppChainConfig, log zerolog.Logger) (*AppChain, error) {
}

appchain := &AppChain{
ReputerAddress: address,
ReputerAccount: account,
Logger: log,
Client: client,
QueryClient: queryClient,
Config: config,
Address: address,
Account: account,
Logger: log,
Client: client,
QueryClient: queryClient,
Config: config,
}

if config.NodeRole == blockless.WorkerNode {
Expand Down Expand Up @@ -162,7 +162,7 @@ func isReputerRegistered(appchain *AppChain, topicId uint64) (bool, error) {

res, err := appchain.QueryClient.IsReputerRegisteredInTopicId(ctx, &types.QueryIsReputerRegisteredInTopicIdRequest{
TopicId: topicId,
Address: appchain.ReputerAddress,
Address: appchain.Address,
})

if err != nil {
Expand All @@ -177,7 +177,7 @@ func isWorkerRegistered(appchain *AppChain, topicId uint64) (bool, error) {

res, err := appchain.QueryClient.IsWorkerRegisteredInTopicId(ctx, &types.QueryIsWorkerRegisteredInTopicIdRequest{
TopicId: topicId,
Address: appchain.ReputerAddress,
Address: appchain.Address,
})

if err != nil {
Expand Down Expand Up @@ -223,11 +223,11 @@ func registerWithBlockchain(appchain *AppChain) {
if !is_registered {
// register the wroker in the topic
msg := &types.MsgRegister{
Sender: appchain.ReputerAddress,
Sender: appchain.Address,
LibP2PKey: appchain.Config.LibP2PKey,
MultiAddress: appchain.Config.MultiAddress,
TopicId: topicId,
Owner: appchain.ReputerAddress,
Owner: appchain.Address,
IsReputer: isReputer,
}
res, err := appchain.SendDataWithRetry(ctx, msg, NUM_REGISTRATION_RETRIES,
Expand All @@ -240,8 +240,8 @@ func registerWithBlockchain(appchain *AppChain) {
var initstake = appchain.Config.InitialStake
if initstake > 0 {
msg := &types.MsgAddStake{
Sender: appchain.ReputerAddress,
Amount: cosmossdk_io_math.NewUint(initstake),
Sender: appchain.Address,
Amount: cosmossdk_io_math.NewInt(initstake),
TopicId: topicId,
}
res, err := appchain.SendDataWithRetry(ctx, msg, NUM_STAKING_RETRIES,
Expand All @@ -266,7 +266,7 @@ func (ap *AppChain) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, Max
var txResp *cosmosclient.Response
var err error
for retryCount := 0; retryCount <= MaxRetries; retryCount++ {
txResponse, err := ap.Client.BroadcastTx(ctx, ap.ReputerAccount, req)
txResponse, err := ap.Client.BroadcastTx(ctx, ap.Account, req)
txResp = &txResponse
if err == nil {
ap.Logger.Info().Str("Tx Hash:", txResp.TxHash).Msg("Success: " + SuccessMsg)
Expand Down Expand Up @@ -340,7 +340,7 @@ func (ap *AppChain) SendWorkerModeData(ctx context.Context, topicId uint64, resu

// Make 1 request per worker
req := &types.MsgInsertBulkWorkerPayload{
Sender: ap.ReputerAddress,
Sender: ap.Address,
Nonce: nonce,
TopicId: topicId,
WorkerDataBundles: WorkerDataBundles,
Expand Down Expand Up @@ -425,7 +425,7 @@ func (ap *AppChain) SendReputerModeData(ctx context.Context, topicId uint64, res

// Make 1 request per worker
req := &types.MsgInsertBulkReputerPayload{
Sender: ap.ReputerAddress,
Sender: ap.Address,
ReputerRequestNonce: &types.ReputerRequestNonce{
ReputerNonce: nonceCurrent,
WorkerNonce: nonceEval,
Expand Down
2 changes: 1 addition & 1 deletion cmd/node/appchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (ap *AppChainTestSuit) TestSendDataWithRetry() {
}
src := make([]byte, 0)
src, _ = req.WorkerDataBundles[0].InferenceForecastsBundle.XXX_Marshal(src, true)
sig, pk, err := ap.app.Client.Context().Keyring.Sign(ap.app.ReputerAccount.Name, src, signing.SignMode_SIGN_MODE_DIRECT)
sig, pk, err := ap.app.Client.Context().Keyring.Sign(ap.app.Account.Name, src, signing.SignMode_SIGN_MODE_DIRECT)
pkStr := hex.EncodeToString(pk.Bytes())
if err != nil {
fmt.Println("Error signing the nonce: ", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/node/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func parseFlags() *alloraCfg {
pflag.StringVarP(&cfg.AppChainConfig.NodeRPCAddress, "allora-node-rpc-address", "", "http://localhost:26657", "The address for the client to connect to a node.")
pflag.StringSliceVar(&cfg.AppChainConfig.TopicIds, "allora-chain-topic-id", nil, "The topic id for the topic that the node will subscribe to.")
pflag.Uint64Var(&cfg.AppChainConfig.ReconnectSeconds, "allora-chain-reconnect-seconds", 60, "If connection to Allora Appchain breaks, it will attempt to reconnect with this interval. O means no reconnection.")
pflag.Uint64Var(&cfg.AppChainConfig.InitialStake, "allora-chain-initial-stake", 0, "Upon registering on a new topic, amount of stake to use.")
pflag.Int64Var(&cfg.AppChainConfig.InitialStake, "allora-chain-initial-stake", 0, "Upon registering on a new topic, amount of stake to use.")
pflag.StringVarP(&cfg.AppChainConfig.WorkerMode, "allora-chain-worker-mode", "", WorkerModeWorker, "Worker mode of an Allora Network node.")
pflag.StringVar(&cfg.AppChainConfig.Gas, "allora-chain-gas", "auto", "Max gas on Allora client.")
pflag.Float64Var(&cfg.AppChainConfig.GasAdjustment, "allora-chain-gas-adjustment", 0.1, "Gas adjustment on Allora client.")
Expand Down
73 changes: 39 additions & 34 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,49 +136,54 @@ func (e *AlloraExecutor) ExecuteFunction(requestID string, req execute.Request)
// If appchain is null or SubmitTx is false, do not sign the nonce
if e.appChain != nil && e.appChain.Client != nil {
// Get the account from the appchain
accountName := e.appChain.ReputerAccount.Name
accountName := e.appChain.Account.Name
var responseValue InferenceForecastResponse
err = json.Unmarshal([]byte(result.Result.Stdout), &responseValue)
if err != nil {
fmt.Println("Error serializing InferenceForecastResponse proto message: ", err)
} else {
// Build inference
infererValue := alloraMath.MustNewDecFromString(responseValue.InfererValue)
inference := &types.Inference{
TopicId: topicId,
Inferer: e.appChain.ReputerAddress,
Value: infererValue,
BlockHeight: alloraBlockHeightCurrent,
// Define an empty bundle
inferenceForecastsBundle := &types.InferenceForecastBundle{}
// Build inference if existent
if responseValue.InfererValue != "" {
infererValue := alloraMath.MustNewDecFromString(responseValue.InfererValue)
inference := &types.Inference{
TopicId: topicId,
Inferer: e.appChain.Address,
Value: infererValue,
BlockHeight: alloraBlockHeightCurrent,
}
inferenceForecastsBundle.Inference = inference
}
// Build Forecast
var forecasterElements []*types.ForecastElement
for _, val := range responseValue.ForecasterValues {
decVal := alloraMath.MustNewDecFromString(val.Value)
if !topicAllowsNegative {
decVal, err = alloraMath.Log10(decVal)
if err != nil {
fmt.Println("Error Log10 forecasterElements: ", err)
return result, err
if len(responseValue.ForecasterValues) > 0 {
var forecasterElements []*types.ForecastElement
for _, val := range responseValue.ForecasterValues {
decVal := alloraMath.MustNewDecFromString(val.Value)
if !topicAllowsNegative {
decVal, err = alloraMath.Log10(decVal)
if err != nil {
fmt.Println("Error Log10 forecasterElements: ", err)
return result, err
}
}
forecasterElements = append(forecasterElements, &types.ForecastElement{
Inferer: val.Worker,
Value: decVal,
})
}

forecasterElements = append(forecasterElements, &types.ForecastElement{
Inferer: val.Worker,
Value: decVal,
})
}

forecasterValues := &types.Forecast{
TopicId: topicId,
BlockHeight: alloraBlockHeightCurrent,
Forecaster: e.appChain.ReputerAddress,
ForecastElements: forecasterElements,
if len(forecasterElements) > 0 {
forecasterValues := &types.Forecast{
TopicId: topicId,
BlockHeight: alloraBlockHeightCurrent,
Forecaster: e.appChain.Address,
ForecastElements: forecasterElements,
}
inferenceForecastsBundle.Forecast = forecasterValues
}
}

inferenceForecastsBundle := &types.InferenceForecastBundle{
Inference: inference,
Forecast: forecasterValues,
}
// Marshall and sign the bundle
protoBytesIn := make([]byte, 0) // Create a byte slice with initial length 0 and capacity greater than 0
protoBytesIn, err := inferenceForecastsBundle.XXX_Marshal(protoBytesIn, true)
Expand All @@ -194,7 +199,7 @@ func (e *AlloraExecutor) ExecuteFunction(requestID string, req execute.Request)
}
// Create workerDataBundle with signature
workerDataBundle := &types.WorkerDataBundle{
Worker: e.appChain.ReputerAddress,
Worker: e.appChain.Address,
InferenceForecastsBundle: inferenceForecastsBundle,
InferencesForecastsBundleSignature: sig,
Pubkey: pkStr,
Expand Down Expand Up @@ -354,7 +359,7 @@ func (e *AlloraExecutor) ExecuteFunction(requestID string, req execute.Request)
newValueBundle := &types.ValueBundle{
TopicId: topicId,
ReputerRequestNonce: reputerRequestNonce,
Reputer: e.appChain.ReputerAddress,
Reputer: e.appChain.Address,
CombinedValue: combinedValue,
NaiveValue: naiveValue,
InfererValues: inferVal,
Expand All @@ -366,7 +371,7 @@ func (e *AlloraExecutor) ExecuteFunction(requestID string, req execute.Request)

// Marshall and sign the bundle
// Get the account from the appchain
accountName := e.appChain.ReputerAccount.Name
accountName := e.appChain.Account.Name
protoBytesIn := make([]byte, 0)
protoBytesIn, err := newValueBundle.XXX_Marshal(protoBytesIn, true)
if err != nil {
Expand Down
15 changes: 7 additions & 8 deletions cmd/node/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@ type alloraCfg struct {
}

type AppChain struct {
ReputerAddress string
ReputerAccount cosmosaccount.Account
Client *cosmosclient.Client
QueryClient types.QueryClient
WorkersAddress map[string]string
Config AppChainConfig
Logger zerolog.Logger
Address string
Account cosmosaccount.Account
Client *cosmosclient.Client
QueryClient types.QueryClient
Config AppChainConfig
Logger zerolog.Logger
}

type AppChainConfig struct {
Expand All @@ -38,7 +37,7 @@ type AppChainConfig struct {
TopicIds []string
NodeRole blockless.NodeRole
ReconnectSeconds uint64 // seconds to wait for reconnection
InitialStake uint64 // uallo to initially stake upon registration on a new topi
InitialStake int64 // uallo to initially stake upon registration on a new topi
WorkerMode string // Allora Network worker mode to use
Gas string // gas to use for the allora client
GasAdjustment float64 // gas adjustment to use for the allora client
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22.2

require (
cosmossdk.io/math v1.3.0
github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240520155716-eb79d5859957
github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240523180314-287ef36181fe
github.com/allora-network/b7s v0.0.2-0.20240418175046-eca9bfd68831
github.com/cockroachdb/pebble v1.1.0
github.com/cosmos/cosmos-sdk v0.50.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240520155716-eb79d5859957 h1:9xJ3KMib5e4p+SXtl8Z0zLMNcJcijBMqVQKXIkLjUjk=
github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240520155716-eb79d5859957/go.mod h1:7UrL7qr/wLTnBBfTGZHHui9tjDfx89FvDj22YD2TVow=
github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240523180314-287ef36181fe h1:fbf7tNwCdhoYjUSMFSbRp3TCRJPcG6NLyWA0Q2Py/vk=
github.com/allora-network/allora-chain v0.1.0-dev.a53b6d4.0.20240523180314-287ef36181fe/go.mod h1:7UrL7qr/wLTnBBfTGZHHui9tjDfx89FvDj22YD2TVow=
github.com/allora-network/b7s v0.0.2-0.20240418175046-eca9bfd68831 h1:4s9e1sjeHlqG4SWoV29vcf/WGX9KeATx1V38X4k2f+I=
github.com/allora-network/b7s v0.0.2-0.20240418175046-eca9bfd68831/go.mod h1:rJJrdC5Y83LEDFxo/iJp3JJpi8I6TJncOTigMWk8ieE=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
Expand Down

0 comments on commit b9a71a0

Please sign in to comment.