Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

Commit

Permalink
WIP Diego/ora 1175 adapt wasm and reputers to new data structures and…
Browse files Browse the repository at this point in the history
… align (#105)

Signed-off-by: Diego C <[email protected]>
Co-authored-by: Michael <[email protected]>
  • Loading branch information
xmariachi and RedBird96 authored Apr 12, 2024
1 parent 58b139d commit 7d7c485
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 158 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ cmd/node/node
data
data-head
data-worker
data-worker-2
main.py
.DS_Store
189 changes: 134 additions & 55 deletions cmd/node/appchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"os"
"path/filepath"
"slices"
"strconv"
"strings"
"time"

cosmossdk_io_math "cosmossdk.io/math"
alloraMath "github.com/allora-network/allora-chain/math"
"github.com/allora-network/allora-chain/x/emissions/types"
"github.com/allora-network/b7s/models/blockless"
"github.com/allora-network/b7s/node/aggregate"
Expand Down Expand Up @@ -152,6 +155,9 @@ func registerWithBlockchain(appchain *AppChain) {

// Parse topics into b7sTopicIds as numerical ids. Reputers and worker use different schema.
b7sTopicIds := parseTopicIds(appchain, appchain.Config.TopicIds)
// Print the array entries as a comma-separated value list
topicsList := strings.Join(strings.Fields(fmt.Sprint(b7sTopicIds)), ", ")
appchain.Logger.Info().Str("topicsList", topicsList).Msg("Topics list")

appchain.Logger.Info().Str("Address", appchain.ReputerAddress).Msg("Node address")
// Check if address is already registered in a topic, getting all topics already reg'd
Expand All @@ -167,6 +173,7 @@ func registerWithBlockchain(appchain *AppChain) {
appchain.Logger.Info().Str("Worker", appchain.ReputerAddress).Msg("Current Address")
if len(res.TopicIds) > 0 {
appchain.Logger.Debug().Msg("Worker already registered for some topics, checking...")
// Check if libp2p key is already registered - if not, register it
var topicsToRegister []uint64
var topicsToDeRegister []uint64
// Calculate topics to deregister
Expand Down Expand Up @@ -318,144 +325,216 @@ func (ap *AppChain) SendDataWithRetry(ctx context.Context, req sdktypes.Msg, Max
func (ap *AppChain) SendWorkerModeData(ctx context.Context, topicId uint64, results aggregate.Results) {
// Aggregate the inferences from all peers/workers
var inferences []*types.Inference
var forecasts []*types.Forecast

var forecasterValues []*types.Forecast
var nonce *types.Nonce
for _, result := range results {
for _, peer := range result.Peers {
ap.Logger.Debug().Any("peer", peer.String())

// Get Peer $allo address
ap.Logger.Debug().Str("worker peer", peer.String())

// Get Peer's $allo address
res, err := ap.QueryClient.GetWorkerAddressByP2PKey(ctx, &types.QueryWorkerAddressByP2PKeyRequest{
Libp2PKey: peer.String(),
})
if err != nil {
ap.Logger.Warn().Err(err).Str("peer", peer.String()).Msg("error getting peer address from chain, worker not registered? Ignoring peer.")
ap.Logger.Warn().Err(err).Str("peer", peer.String()).Msg("error getting worker peer address from chain, worker not registered? Ignoring peer.")
continue
}

var value InferenceForeacstResponse
ap.Logger.Debug().Str("worker address", res.Address).Msgf("%+v", result.Result)
// Parse the result from the worker to get the inference and forecasts
var value InferenceForecastResponse
err = json.Unmarshal([]byte(result.Result.Stdout), &value)
if err != nil {
ap.Logger.Warn().Err(err).Str("peer", peer.String()).Msg("error extracting value as number from stdout, ignoring inference.")
continue
}

// Get first Nonce
if nonce == nil {
// Parse the value.nonce as str from the result as int64
nonceInt64, err := strconv.ParseInt(value.Nonce, 10, 64)
if err != nil {
ap.Logger.Warn().Err(err).Str("peer", peer.String()).Msg("error extracting nonce as number from stdout, ignoring inference.")
continue
}
nonce = &types.Nonce{Nonce: nonceInt64}
}

infererValue := alloraMath.MustNewDecFromString(value.InfererValue)
inference := &types.Inference{
TopicId: topicId,
Worker: res.Address,
Value: value.InfererValue,
Value: infererValue,
Proof: value.Signature,
}
// Create array with one inference only to be infererValue
inferences = append(inferences, inference)

// Aggregate forecasts
var forecasterValues []*types.Forecast
var forecasterVal []*types.ForecastElement
for _, val := range value.ForecasterValues {
forecasterVal = append(forecasterVal, &types.ForecastElement{
Inferer: val.Node,
Value: val.Value,
Inferer: val.Worker,
Value: alloraMath.MustNewDecFromString(val.Value),
Proof: value.Signature,
})
}
forecasts = append(forecasts, &types.Forecast{
forecasterValues = append(forecasterValues, &types.Forecast{
TopicId: topicId,
Forecaster: res.Address,
ForecastElements: forecasterVal,
})
// Make 1 request per worker
req := &types.MsgInsertBulkWorkerPayload{
Sender: ap.ReputerAddress,
Nonce: nonce,
TopicId: topicId,
Inferences: inferences,
Forecasts: forecasterValues,
}
go func() {
_, _ = ap.SendDataWithRetry(ctx, req, 5, 0, 2)
}()
}
}

reqInf := &types.MsgProcessInferences{
// Make 1 request per worker
req := &types.MsgInsertBulkWorkerPayload{
Sender: ap.ReputerAddress,
Nonce: nonce,
TopicId: topicId,
Inferences: inferences,
Forecasts: forecasterValues,
}

reqFor := &types.MsgProcessForecasts{
Sender: ap.ReputerAddress,
Forecasts: forecasts,
}
go func() {
_, _ = ap.SendDataWithRetry(ctx, reqInf, 5, 0, 2)
}()
go func() {
_, _ = ap.SendDataWithRetry(ctx, reqFor, 5, 0, 2)
_, _ = ap.SendDataWithRetry(ctx, req, 5, 0, 2)
}()
}

// Sending Losses to the AppChain
func (ap *AppChain) SendReputerModeData(ctx context.Context, topicId uint64, results aggregate.Results) {
// Aggregate the forecast from reputer leader
var valueBundles []*types.ReputerValueBundle
var nonce *types.Nonce

for _, result := range results {
for _, peer := range result.Peers {
ap.Logger.Debug().Any("peer", peer)
if len(result.Peers) > 0 {
peer := result.Peers[0]

// Get Peer $allo address
res, err := ap.QueryClient.GetWorkerAddressByP2PKey(ctx, &types.QueryWorkerAddressByP2PKeyRequest{
res, err := ap.QueryClient.GetReputerAddressByP2PKey(ctx, &types.QueryReputerAddressByP2PKeyRequest{
Libp2PKey: peer.String(),
})
if err != nil {
ap.Logger.Warn().Err(err).Str("peer", peer.String()).Msg("error getting peer address from chain, worker not registered? Ignoring peer.")
ap.Logger.Warn().Err(err).Str("peer", peer.String()).Msg("error getting reputer peer address from chain, worker not registered? Ignoring peer.")
continue
} else {
// Print the address of the reputer
ap.Logger.Info().Str("Reputer Address", res.Address).Msg("Reputer Address")
}

var value LossResponse
err = json.Unmarshal([]byte(result.Result.Stdout), &value)
var responseValue LossResponse
err = json.Unmarshal([]byte(result.Result.Stdout), &responseValue)
if err != nil {
ap.Logger.Warn().Err(err).Str("peer", peer.String()).Msg("error extracting value as number from stdout, ignoring loss.")
continue
ap.Logger.Error().Err(err).Msg("error extracting loss object from stdout, ignoring loss.")
} else {
ap.Logger.Info().Msg("Response parsed successfully.")
}
// Now get the string of the value, unescape it and unmarshall into ValueBundle
// Unmarshal the "value" field from the LossResponse struct
var nestedValueBundle ValueBundle
err = json.Unmarshal([]byte(responseValue.Value), &nestedValueBundle)
if err != nil {
ap.Logger.Error().Err(err).Msg("Error unmarshalling nested JSON:")
return
}

// Get first Nonce only - they're all the same
if nonce == nil {
// Parse the value.nonce as str from the result as int64
nonceInt64, err := strconv.ParseInt(responseValue.Nonce, 10, 64)
if err != nil {
ap.Logger.Warn().Err(err).Str("peer", peer.String()).Msg("error extracting nonce as number from stdout, ignoring inference.")
continue
}
nonce = &types.Nonce{Nonce: nonceInt64}
}

var (
inferVal []*types.WorkerAttributedValue
forcastsVal []*types.WorkerAttributedValue
outInferVal []*types.WorkerAttributedValue
inInferVal []*types.WorkerAttributedValue
inferVal []*types.WorkerAttributedValue
forecastsVal []*types.WorkerAttributedValue
outInferVal []*types.WithheldWorkerAttributedValue
outForecastVal []*types.WithheldWorkerAttributedValue
inInferVal []*types.WorkerAttributedValue
)

for _, inf := range value.InferrerInferences {
for _, inf := range nestedValueBundle.InferrerValues {
inferVal = append(inferVal, &types.WorkerAttributedValue{
Worker: inf.Node,
Value: inf.Value,
Worker: inf.Worker,
Value: alloraMath.MustNewDecFromString(inf.Value),
})
}
for _, inf := range nestedValueBundle.ForecasterValues {
forecastsVal = append(forecastsVal, &types.WorkerAttributedValue{
Worker: inf.Worker,
Value: alloraMath.MustNewDecFromString(inf.Value),
})
}
for _, inf := range value.ForecasterInferences {
forcastsVal = append(forcastsVal, &types.WorkerAttributedValue{
Worker: inf.Node,
Value: inf.Value,
for _, inf := range nestedValueBundle.OneOutInfererValues {
outInferVal = append(outInferVal, &types.WithheldWorkerAttributedValue{
Worker: inf.Worker,
Value: alloraMath.MustNewDecFromString(inf.Value),
})
}
for _, inf := range value.OneOutNetworkInferences {
outInferVal = append(outInferVal, &types.WorkerAttributedValue{
Worker: inf.Node,
Value: inf.Value,
for _, inf := range nestedValueBundle.OneOutForecasterValues {
outForecastVal = append(outForecastVal, &types.WithheldWorkerAttributedValue{
Worker: inf.Worker,
Value: alloraMath.MustNewDecFromString(inf.Value),
})
}
for _, inf := range value.OneInNetworkInferences {
for _, inf := range nestedValueBundle.OneInForecasterValues {
inInferVal = append(inInferVal, &types.WorkerAttributedValue{
Worker: inf.Node,
Value: inf.Value,
Worker: inf.Worker,
Value: alloraMath.MustNewDecFromString(inf.Value),
})
}

valueBundle := &types.ReputerValueBundle{
Reputer: res.Address,
ValueBundle: &types.ValueBundle{
TopicId: topicId,
NaiveValue: value.NaiveNetworkInference,
CombinedValue: value.NetworkInference,
InfererValues: inferVal,
ForecasterValues: forcastsVal,
OneOutValues: outInferVal,
OneInNaiveValues: inInferVal,
TopicId: topicId,
CombinedValue: alloraMath.MustNewDecFromString(nestedValueBundle.CombinedValue),
NaiveValue: alloraMath.MustNewDecFromString(nestedValueBundle.NaiveValue),
InfererValues: inferVal,
ForecasterValues: forecastsVal,
OneOutInfererValues: outInferVal,
OneOutForecasterValues: outForecastVal,
OneInForecasterValues: inInferVal,
},
}
// Print the valueBundle to be added
ap.Logger.Info().Interface("valueBundle", valueBundle).Msg("valueBundle to append")
valueBundles = append(valueBundles, valueBundle)
} else {
ap.Logger.Warn().Msg("No peers in the result, ignoring")
}
}

req := &types.MsgSetLosses{
// Make 1 request per worker
req := &types.MsgInsertBulkReputerPayload{
Sender: ap.ReputerAddress,
Nonce: nonce,
TopicId: topicId,
ReputerValueBundles: valueBundles,
}
// Print req as JSON to the log
reqJSON, err := json.Marshal(req)
if err != nil {
ap.Logger.Error().Err(err).Msg("Error marshaling MsgInsertBulkReputerPayload to print Msg as JSON")
} else {
ap.Logger.Info().Str("req_json", string(reqJSON)).Msg("Sending Reputer Mode Data")
}

_, _ = ap.SendDataWithRetry(ctx, req, 5, 0, 2)
}
2 changes: 1 addition & 1 deletion cmd/node/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func sendResultsToChain(log zerolog.Logger, appChainClient *AppChain, res node.C
return
}
stdout := aggregate.Aggregate(res.Data)[0].Result.Stdout
log.Info().Str("", stdout).Msg("WASM function stdout result")
log.Info().Str("stdout", stdout).Msg("WASM function stdout result")

log.Debug().Str("Topic", res.Topic).Str("worker mode", appChainClient.Config.WorkerMode).Msg("Found topic ID")

Expand Down
2 changes: 1 addition & 1 deletion cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (e *AlloraExecutor) ExecuteFunction(requestID string, req execute.Request)
} else {
// Add the signature to the stdout object
stdout["signature"] = sig
stdout["nonce"] = nonceBytes
stdout["nonce"] = nonce
// Marshal the stdout map back into a JSON string
stdoutBytes, err := json.Marshal(stdout)
if err != nil {
Expand Down
31 changes: 20 additions & 11 deletions cmd/node/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,31 @@ type AppChainConfig struct {
}

type NodeValue struct {
Node string `json:"node,omitempty"`
Value float64 `json:"value,omitempty"`
Worker string `json:"worker,omitempty"`
Value string `json:"value,omitempty"`
}

type InferenceForeacstResponse struct {
InfererValue float64 `json:"node,omitempty"`
ForecasterValues []NodeValue `json:"node,omitempty"`
type InferenceForecastResponse struct {
InfererValue string `json:"infererValue,omitempty"`
ForecasterValues []NodeValue `json:"forecasterValue,omitempty"`
Nonce string `json:"nonce,omitempty"`
Signature string `json:"signature,omitempty"`
}

type ValueBundle struct {
CombinedValue string `json:"combinedValue,omitempty"`
NaiveValue string `json:"naiveValue,omitempty"`
InferrerValues []NodeValue `json:"infererValues,omitempty"`
ForecasterValues []NodeValue `json:"forecasterValues,omitempty"`
OneOutInfererValues []NodeValue `json:"oneOutInfererValues,omitempty"`
OneOutForecasterValues []NodeValue `json:"oneOutForecasterValues,omitempty"`
OneInForecasterValues []NodeValue `json:"oneInForecasterValues,omitempty"`
}

type LossResponse struct {
NetworkInference float64 `json:"networkInference,omitempty"`
NaiveNetworkInference float64 `json:"naiveNetworkInference,omitempty"`
InferrerInferences []NodeValue `json:"inferrerInferences,omitempty"`
ForecasterInferences []NodeValue `json:"forecasterInferences,omitempty"`
OneOutNetworkInferences []NodeValue `json:"oneOutNetworkInferences,omitempty"`
OneInNetworkInferences []NodeValue `json:"oneInNetworkInferences,omitempty"`
Value string `json:"value,omitempty"`
Nonce string `json:"nonce,omitempty"`
Signature string `json:"signature,omitempty"`
}

const (
Expand Down
Loading

0 comments on commit 7d7c485

Please sign in to comment.