diff --git a/app/app.go b/app/app.go index 2492ccf..5fe93d8 100644 --- a/app/app.go +++ b/app/app.go @@ -3,22 +3,21 @@ package app import ( "encoding/json" "fmt" + "gorm.io/driver/mysql" + "gorm.io/gorm" "github.com/bnb-chain/greenfield-challenger/attest" - "github.com/bnb-chain/greenfield-challenger/config" "github.com/bnb-chain/greenfield-challenger/db/dao" "github.com/bnb-chain/greenfield-challenger/db/model" "github.com/bnb-chain/greenfield-challenger/executor" - "github.com/bnb-chain/greenfield-challenger/logging" + "github.com/bnb-chain/greenfield-challenger/metrics" "github.com/bnb-chain/greenfield-challenger/monitor" "github.com/bnb-chain/greenfield-challenger/submitter" "github.com/bnb-chain/greenfield-challenger/verifier" "github.com/bnb-chain/greenfield-challenger/vote" "github.com/bnb-chain/greenfield-challenger/wiper" "github.com/spf13/viper" - "gorm.io/driver/mysql" - "gorm.io/gorm" ) type App struct { @@ -30,6 +29,7 @@ type App struct { voteCollator *vote.VoteCollator txSubmitter *submitter.TxSubmitter attestMonitor *attest.AttestMonitor + metricService *metrics.MetricService dbWiper *wiper.DBWiper } @@ -44,7 +44,7 @@ func NewApp(cfg *config.Config) *App { db, err := gorm.Open(mysql.Open(dbPath), &gorm.Config{}) - //only for debug purpose + // only for debug purpose db = db.Debug() if err != nil { @@ -58,12 +58,13 @@ func NewApp(cfg *config.Config) *App { dbConfig.SetMaxIdleConns(cfg.DBConfig.MaxIdleConns) dbConfig.SetMaxOpenConns(cfg.DBConfig.MaxOpenConns) - if cfg.DBConfig.DebugMode { - err = ResetDB(db, &model.Block{}, &model.Event{}, &model.Vote{}) - if err != nil { - logging.Logger.Errorf("reset db error, err=%+v", err.Error()) - } - } + // For clearing database during debugging + //if cfg.DBConfig.DebugMode { + // err = ResetDB(db, &model.Block{}, &model.Event{}, &model.Vote{}) + // if err != nil { + // logging.Logger.Errorf("reset db error, err=%+v", err.Error()) + // } + //} model.InitBlockTable(db) model.InitEventTable(db) @@ -76,23 +77,25 @@ func NewApp(cfg *config.Config) *App { executor := executor.NewExecutor(cfg) + metricService := metrics.NewMetricService(cfg) + monitorDataHandler := monitor.NewDataHandler(daoManager) - monitor := monitor.NewMonitor(executor, monitorDataHandler) + monitor := monitor.NewMonitor(executor, monitorDataHandler, metricService) verifierDataHandler := verifier.NewDataHandler(daoManager) - hashVerifier := verifier.NewHashVerifier(cfg, executor, cfg.GreenfieldConfig.DeduplicationInterval, verifierDataHandler) + hashVerifier := verifier.NewHashVerifier(cfg, executor, cfg.GreenfieldConfig.DeduplicationInterval, verifierDataHandler, metricService) signer := vote.NewVoteSigner(executor.BlsPrivKey) voteDataHandler := vote.NewDataHandler(daoManager, executor) - voteCollector := vote.NewVoteCollector(cfg, executor, voteDataHandler) - voteBroadcaster := vote.NewVoteBroadcaster(cfg, signer, executor, voteDataHandler) - voteCollator := vote.NewVoteCollator(cfg, signer, executor, voteDataHandler) + voteCollector := vote.NewVoteCollector(cfg, executor, voteDataHandler, metricService) + voteBroadcaster := vote.NewVoteBroadcaster(cfg, signer, executor, voteDataHandler, metricService) + voteCollator := vote.NewVoteCollator(cfg, signer, executor, voteDataHandler, metricService) txDataHandler := submitter.NewDataHandler(daoManager, executor) - txSubmitter := submitter.NewTxSubmitter(cfg, executor, txDataHandler) + txSubmitter := submitter.NewTxSubmitter(cfg, executor, txDataHandler, metricService) attestDataHandler := attest.NewDataHandler(daoManager) - attestMonitor := attest.NewAttestMonitor(executor, attestDataHandler) + attestMonitor := attest.NewAttestMonitor(executor, attestDataHandler, metricService) dbWiper := wiper.NewDBWiper(daoManager) @@ -105,6 +108,7 @@ func NewApp(cfg *config.Config) *App { voteCollator: voteCollator, attestMonitor: attestMonitor, txSubmitter: txSubmitter, + metricService: metricService, dbWiper: dbWiper, } } @@ -119,6 +123,7 @@ func (a *App) Start() { go a.voteBroadcaster.BroadcastVotesLoop() go a.voteCollator.CollateVotesLoop() go a.attestMonitor.UpdateAttestedChallengeIdLoop() + go a.metricService.Start() a.txSubmitter.SubmitTransactionLoop() } diff --git a/attest/attest_monitor.go b/attest/attest_monitor.go index 948b803..4a7be73 100644 --- a/attest/attest_monitor.go +++ b/attest/attest_monitor.go @@ -4,6 +4,8 @@ import ( "sync" "time" + "github.com/bnb-chain/greenfield-challenger/metrics" + "github.com/bnb-chain/greenfield-challenger/db/model" "github.com/bnb-chain/greenfield-challenger/executor" "github.com/bnb-chain/greenfield-challenger/logging" @@ -14,14 +16,16 @@ type AttestMonitor struct { mtx sync.RWMutex attestedChallengeIds map[uint64]bool // used to save the last attested challenge id dataProvider DataProvider + metricService *metrics.MetricService } -func NewAttestMonitor(executor *executor.Executor, dataProvider DataProvider) *AttestMonitor { +func NewAttestMonitor(executor *executor.Executor, dataProvider DataProvider, metricService *metrics.MetricService) *AttestMonitor { return &AttestMonitor{ executor: executor, mtx: sync.RWMutex{}, attestedChallengeIds: make(map[uint64]bool, 0), dataProvider: dataProvider, + metricService: metricService, } } @@ -78,4 +82,5 @@ func (a *AttestMonitor) updateEventStatus(challengeId uint64) { if err != nil { logging.Logger.Errorf("update attested event status error, err=%s", err.Error()) } + a.metricService.IncAttestedChallenges() } diff --git a/config/config.go b/config/config.go index 5b3cfed..aee0241 100644 --- a/config/config.go +++ b/config/config.go @@ -1,10 +1,11 @@ package config import ( - "cosmossdk.io/math" "encoding/json" "fmt" "os" + + "cosmossdk.io/math" ) type Config struct { @@ -12,6 +13,7 @@ type Config struct { LogConfig LogConfig `json:"log_config"` AlertConfig AlertConfig `json:"alert_config"` DBConfig DBConfig `json:"db_config"` + MetricsConfig MetricsConfig `json:"metrics_config"` } type GreenfieldConfig struct { @@ -127,10 +129,21 @@ func (cfg *DBConfig) Validate() { } } +type MetricsConfig struct { + Port uint16 `json:"port"` +} + +func (cfg *MetricsConfig) Validate() { + if cfg.Port <= 0 || cfg.Port > 65535 { + panic("port should be within (0, 65535]") + } +} + func (cfg *Config) Validate() { cfg.LogConfig.Validate() cfg.DBConfig.Validate() cfg.GreenfieldConfig.Validate() + cfg.MetricsConfig.Validate() } func ParseConfigFromJson(content string) *Config { diff --git a/executor/executor.go b/executor/executor.go index e21d2fe..659faf6 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -321,11 +321,11 @@ func (e *Executor) GetStorageProviderEndpoint(address string) (string, error) { return "", err } res, err := client.GetStorageProviderInfo(context.Background(), spAddr) - logging.Logger.Infof("response res.endpoint %s", res.Endpoint) if err != nil { logging.Logger.Errorf("executor failed to query storage provider %s, err=%+v", address, err.Error()) return "", err } + logging.Logger.Infof("response res.endpoint %s", res.Endpoint) return res.Endpoint, nil } diff --git a/go.mod b/go.mod index 8d43259..b7cd4f3 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/ory/dockertest v3.3.5+incompatible github.com/panjf2000/ants/v2 v2.7.3 github.com/pkg/errors v0.9.1 + github.com/prometheus/client_golang v1.15.0 github.com/prysmaticlabs/prysm v0.0.0-20220124113610-e26cde5e091b github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.15.0 @@ -121,7 +122,6 @@ require ( github.com/pelletier/go-toml/v2 v2.0.7 // indirect github.com/petermattis/goid v0.0.0-20230317030725-371a4b8eda08 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_golang v1.15.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect diff --git a/logging/log.go b/logging/log.go index 83a63df..049ead0 100644 --- a/logging/log.go +++ b/logging/log.go @@ -10,7 +10,7 @@ import ( var ( // Logger instance for quick declarative logging levels - Logger = logging.MustGetLogger("inscription-challenger") + Logger = logging.MustGetLogger("greenfield-challenger") // log levels that are available levels = map[string]logging.Level{ "CRITICAL": logging.CRITICAL, diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 0000000..4515021 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,339 @@ +package metrics + +import ( + "fmt" + "net/http" + "time" + + "github.com/bnb-chain/greenfield-challenger/config" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +const ( + // Monitor + MetricGnfdSavedBlock = "gnfd_saved_block" + MetricGnfdSavedBlockCount = "gnfd_saved_block_count" + MetricGnfdSavedEvent = "gnfd_saved_event" + MetricGnfdSavedEventCount = "gnfd_saved_event_count" + + // Verifier + MetricVerifiedChallenges = "verified_challenges" + MetricVerifiedChallengeFailed = "challenge_failed" + MetricVerifiedChallengeSuccess = "challenge_success" + MetricHeartbeatEvents = "heartbeat_events" + MetricHashVerifierErr = "hash_verifier_error_count" + MetricSpAPIErr = "hash_verifier_sp_api_error" + MetricHashVerifierDuration = "hash_verifier_duration" + + // Vote Broadcaster + MetricBroadcastedChallenges = "broadcasted_challenges" + MetricBroadcasterDuration = "broadcaster_duration" + MetricBroadcasterErr = "broadcaster_error_count" + + // Vote Collector + MetricsVoteCollectorErr = "vote_collector_error_count" + MetricsVotesCollected = "votes_collected" + + // Vote Collator + MetricCollatedChallenges = "collated_challenges" + MetricCollatorDuration = "collator_duration" + MetricCollatorErr = "collator_error_count" + + // Tx Submitter + MetricSubmittedChallenges = "submitted_challenges" + MetricSubmitterDuration = "submitter_duration" + MetricSubmitterErr = "submitter_error_count" + + // Attest Monitor + MetricAttestedCount = "attested_count" +) + +type MetricService struct { + MetricsMap map[string]prometheus.Metric + cfg *config.Config +} + +func NewMetricService(config *config.Config) *MetricService { + ms := make(map[string]prometheus.Metric, 0) + + // Monitor + gnfdSavedBlockMetric := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: MetricGnfdSavedBlock, + Help: "Saved block height for Greenfield in database", + }) + ms[MetricGnfdSavedBlock] = gnfdSavedBlockMetric + prometheus.MustRegister(gnfdSavedBlockMetric) + + gnfdSavedBlockCountMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricGnfdSavedBlockCount, + Help: "Saved block count for Greenfield in database", + }) + ms[MetricGnfdSavedBlockCount] = gnfdSavedBlockCountMetric + prometheus.MustRegister(gnfdSavedBlockCountMetric) + + gnfdSavedEventMetric := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: MetricGnfdSavedEvent, + Help: "Saved event challengeId in database", + }) + ms[MetricGnfdSavedEvent] = gnfdSavedEventMetric + prometheus.MustRegister(gnfdSavedEventMetric) + + gnfdSavedEventCountMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricGnfdSavedEventCount, + Help: "Saved gnfd event count in database", + }) + ms[MetricGnfdSavedEventCount] = gnfdSavedEventCountMetric + prometheus.MustRegister(gnfdSavedEventCountMetric) + + // Hash Verifier + verifiedChallengesMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricVerifiedChallenges, + Help: "Verified challenge count", + }) + ms[MetricVerifiedChallenges] = verifiedChallengesMetric + prometheus.MustRegister(verifiedChallengesMetric) + + hashVerifierDurationMetric := prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: MetricHashVerifierDuration, + Help: "Duration of the hash verifier process for each challenge ID", + }) + ms[MetricHashVerifierDuration] = hashVerifierDurationMetric + prometheus.MustRegister(hashVerifierDurationMetric) + + challengeFailedMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricVerifiedChallengeFailed, + Help: "Failed challenges in database", + }) + ms[MetricVerifiedChallengeFailed] = challengeFailedMetric + prometheus.MustRegister(challengeFailedMetric) + + challengeSuccessMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricVerifiedChallengeSuccess, + Help: "Succeeded challenges in database", + }) + ms[MetricVerifiedChallengeSuccess] = challengeSuccessMetric + prometheus.MustRegister(challengeSuccessMetric) + + heartbeatEventsMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricHeartbeatEvents, + Help: "Heartbeat challenges", + }) + ms[MetricHeartbeatEvents] = heartbeatEventsMetric + prometheus.MustRegister(heartbeatEventsMetric) + + hashVerifierErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricHashVerifierErr, + Help: "Hash verifier error count", + }) + ms[MetricHashVerifierErr] = hashVerifierErrCountMetric + prometheus.MustRegister(hashVerifierErrCountMetric) + + hashVerifierSpApiErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricSpAPIErr, + Help: "Hash verifier SP API error count", + }) + ms[MetricSpAPIErr] = hashVerifierSpApiErrCountMetric + prometheus.MustRegister(hashVerifierSpApiErrCountMetric) + + // Broadcaster + broadcasterErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricBroadcasterErr, + Help: "Broadcaster error count", + }) + ms[MetricBroadcasterErr] = broadcasterErrCountMetric + prometheus.MustRegister(broadcasterErrCountMetric) + + broadcastedChallengesMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricBroadcastedChallenges, + Help: "Broadcasted challenge count", + }) + ms[MetricBroadcastedChallenges] = broadcastedChallengesMetric + prometheus.MustRegister(broadcastedChallengesMetric) + + broadcastedDurationMetric := prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: MetricBroadcasterDuration, + Help: "Broadcaster duration for 1 challenge", + }) + ms[MetricBroadcasterDuration] = broadcastedDurationMetric + prometheus.MustRegister(broadcastedDurationMetric) + + // Vote Collector + voteCollectorErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricsVoteCollectorErr, + Help: "Vote Collector error count", + }) + ms[MetricsVoteCollectorErr] = voteCollectorErrCountMetric + prometheus.MustRegister(voteCollectorErrCountMetric) + + votesCollectedMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricsVotesCollected, + Help: "Votes collected count", + }) + ms[MetricsVotesCollected] = votesCollectedMetric + prometheus.MustRegister(votesCollectedMetric) + + // Collator + collatorErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricCollatorErr, + Help: "Collator error count", + }) + ms[MetricCollatorErr] = collatorErrCountMetric + prometheus.MustRegister(collatorErrCountMetric) + + collatedChallengesMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricCollatedChallenges, + Help: "Collated challenge count", + }) + ms[MetricCollatedChallenges] = collatedChallengesMetric + prometheus.MustRegister(collatedChallengesMetric) + + collatedDurationMetric := prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: MetricCollatorDuration, + Help: "Collator duration for 1 challenge", + }) + ms[MetricCollatorDuration] = collatedDurationMetric + prometheus.MustRegister(collatedDurationMetric) + + // Submitter + submitterErrCountMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricSubmitterErr, + Help: "Submitter error count", + }) + ms[MetricSubmitterErr] = submitterErrCountMetric + prometheus.MustRegister(submitterErrCountMetric) + + submitterChallengesMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricSubmittedChallenges, + Help: "Submitted challenge count", + }) + ms[MetricSubmittedChallenges] = submitterChallengesMetric + prometheus.MustRegister(submitterChallengesMetric) + + submitterDurationMetric := prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: MetricSubmitterDuration, + Help: "Submitter duration for 1 challengeID", + }) + ms[MetricSubmitterDuration] = submitterDurationMetric + prometheus.MustRegister(submitterDurationMetric) + + // Attest Monitor + challengeAttestedCountMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricAttestedCount, + Help: "Attested challenges count", + }) + ms[MetricAttestedCount] = challengeAttestedCountMetric + prometheus.MustRegister(challengeAttestedCountMetric) + + return &MetricService{ + MetricsMap: ms, + cfg: config, + } +} + +func (m *MetricService) Start() { + http.Handle("/metrics", promhttp.Handler()) + err := http.ListenAndServe(fmt.Sprintf(":%d", m.cfg.MetricsConfig.Port), nil) + if err != nil { + panic(err) + } +} + +// Monitor +func (m *MetricService) SetGnfdSavedBlock(height uint64) { + m.MetricsMap[MetricGnfdSavedBlock].(prometheus.Gauge).Set(float64(height)) +} + +func (m *MetricService) IncGnfdSavedBlockCount() { + m.MetricsMap[MetricGnfdSavedBlockCount].(prometheus.Counter).Inc() +} + +func (m *MetricService) SetGnfdSavedEvent(challengeId uint64) { + m.MetricsMap[MetricGnfdSavedEvent].(prometheus.Gauge).Set(float64(challengeId)) +} + +func (m *MetricService) IncGnfdSavedEventCount() { + m.MetricsMap[MetricGnfdSavedEventCount].(prometheus.Counter).Inc() +} + +// Hash Verifier +func (m *MetricService) IncVerifiedChallenges() { + m.MetricsMap[MetricVerifiedChallenges].(prometheus.Counter).Inc() +} + +func (m *MetricService) SetHashVerifierDuration(duration time.Duration) { + m.MetricsMap[MetricHashVerifierDuration].(prometheus.Histogram).Observe(duration.Seconds()) +} + +func (m *MetricService) IncChallengeFailed() { + m.MetricsMap[MetricVerifiedChallengeFailed].(prometheus.Counter).Inc() +} + +func (m *MetricService) IncChallengeSuccess() { + m.MetricsMap[MetricVerifiedChallengeSuccess].(prometheus.Counter).Inc() +} + +func (m *MetricService) IncHeartbeatEvents() { + m.MetricsMap[MetricHeartbeatEvents].(prometheus.Counter).Inc() +} + +func (m *MetricService) IncHashVerifierErr() { + m.MetricsMap[MetricHashVerifierErr].(prometheus.Counter).Inc() +} + +func (m *MetricService) IncHashVerifierSpApiErr() { + m.MetricsMap[MetricSpAPIErr].(prometheus.Counter).Inc() +} + +// Broadcaster +func (m *MetricService) IncBroadcastedChallenges() { + m.MetricsMap[MetricBroadcastedChallenges].(prometheus.Counter).Inc() +} + +func (m *MetricService) SetBroadcasterDuration(duration time.Duration) { + m.MetricsMap[MetricBroadcasterDuration].(prometheus.Histogram).Observe(duration.Seconds()) +} + +func (m *MetricService) IncBroadcasterErr() { + m.MetricsMap[MetricBroadcasterErr].(prometheus.Counter).Inc() +} + +// Vote Collector +func (m *MetricService) IncVoteCollectorErr() { + m.MetricsMap[MetricsVoteCollectorErr].(prometheus.Counter).Inc() +} + +func (m *MetricService) IncVotesCollected() { + m.MetricsMap[MetricsVotesCollected].(prometheus.Counter).Inc() +} + +// Collator +func (m *MetricService) IncCollatedChallenges() { + m.MetricsMap[MetricCollatedChallenges].(prometheus.Counter).Inc() +} + +func (m *MetricService) SetCollatorDuration(duration time.Duration) { + m.MetricsMap[MetricCollatorDuration].(prometheus.Histogram).Observe(duration.Seconds()) +} + +func (m *MetricService) IncCollatorErr() { + m.MetricsMap[MetricCollatorErr].(prometheus.Counter).Inc() +} + +// Submitter +func (m *MetricService) IncSubmittedChallenges() { + m.MetricsMap[MetricSubmittedChallenges].(prometheus.Counter).Inc() +} + +func (m *MetricService) SetSubmitterDuration(duration time.Duration) { + m.MetricsMap[MetricSubmitterDuration].(prometheus.Histogram).Observe(duration.Seconds()) +} + +func (m *MetricService) IncSubmitterErr() { + m.MetricsMap[MetricSubmitterErr].(prometheus.Counter).Inc() +} + +// Attest Monitor +func (m *MetricService) IncAttestedChallenges() { + m.MetricsMap[MetricAttestedCount].(prometheus.Counter).Inc() +} diff --git a/monitor/monitor.go b/monitor/monitor.go index 5efa1bb..002f111 100644 --- a/monitor/monitor.go +++ b/monitor/monitor.go @@ -5,6 +5,8 @@ import ( "strings" "time" + "github.com/bnb-chain/greenfield-challenger/metrics" + sdkmath "cosmossdk.io/math" "github.com/bnb-chain/greenfield-challenger/common" "github.com/bnb-chain/greenfield-challenger/db/model" @@ -18,14 +20,16 @@ import ( ) type Monitor struct { - executor *executor.Executor - dataProvider DataProvider + executor *executor.Executor + dataProvider DataProvider + metricService *metrics.MetricService } -func NewMonitor(executor *executor.Executor, dataProvider DataProvider) *Monitor { +func NewMonitor(executor *executor.Executor, dataProvider DataProvider, metricService *metrics.MetricService) *Monitor { return &Monitor{ - executor: executor, - dataProvider: dataProvider, + executor: executor, + dataProvider: dataProvider, + metricService: metricService, } } @@ -154,7 +158,11 @@ func (m *Monitor) monitorChallengeEvents(block *tmtypes.Block, blockResults *cty err = m.dataProvider.SaveBlockAndEvents(b, events) for _, event := range events { logging.Logger.Debugf("monitor event saved for challengeId: %d %s", event.ChallengeId, time.Now().Format("15:04:05.000000")) + m.metricService.SetGnfdSavedEvent(event.ChallengeId) + m.metricService.IncGnfdSavedEventCount() } + m.metricService.SetGnfdSavedBlock(b.Height) + m.metricService.IncGnfdSavedBlockCount() if err != nil { return err } diff --git a/submitter/tx_submitter.go b/submitter/tx_submitter.go index 9371f96..63aa095 100644 --- a/submitter/tx_submitter.go +++ b/submitter/tx_submitter.go @@ -1,24 +1,24 @@ package submitter import ( + "cosmossdk.io/math" "encoding/hex" "fmt" - "github.com/cosmos/cosmos-sdk/types/tx" - lru "github.com/hashicorp/golang-lru" "time" - "github.com/willf/bitset" - - "cosmossdk.io/math" "github.com/bnb-chain/greenfield-challenger/common" "github.com/bnb-chain/greenfield-challenger/config" "github.com/bnb-chain/greenfield-challenger/db/model" "github.com/bnb-chain/greenfield-challenger/executor" "github.com/bnb-chain/greenfield-challenger/logging" + "github.com/bnb-chain/greenfield-challenger/metrics" "github.com/bnb-chain/greenfield-challenger/vote" "github.com/bnb-chain/greenfield/sdk/types" challengetypes "github.com/bnb-chain/greenfield/x/challenge/types" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/tx" + lru "github.com/hashicorp/golang-lru" + "github.com/willf/bitset" ) type TxSubmitter struct { @@ -27,9 +27,10 @@ type TxSubmitter struct { cachedEventHash *lru.Cache feeAmount sdk.Coins DataProvider + metricService *metrics.MetricService } -func NewTxSubmitter(cfg *config.Config, executor *executor.Executor, submitterDataProvider DataProvider) *TxSubmitter { +func NewTxSubmitter(cfg *config.Config, executor *executor.Executor, submitterDataProvider DataProvider, metricService *metrics.MetricService) *TxSubmitter { feeAmount, ok := math.NewIntFromString(cfg.GreenfieldConfig.FeeAmount) if !ok { logging.Logger.Errorf("error converting fee_amount to math.Int, fee_amount: ", cfg.GreenfieldConfig.FeeAmount) @@ -46,6 +47,7 @@ func NewTxSubmitter(cfg *config.Config, executor *executor.Executor, submitterDa feeAmount: feeCoins, cachedEventHash: lruCache, DataProvider: submitterDataProvider, + metricService: metricService, } } @@ -59,6 +61,7 @@ func (s *TxSubmitter) SubmitTransactionLoop() { currentHeight := s.executor.GetCachedBlockHeight() events, err := s.FetchEventsForSubmit(currentHeight) if err != nil { + s.metricService.IncSubmitterErr() logging.Logger.Errorf("tx submitter failed to fetch events for submitting", err) continue } @@ -74,7 +77,7 @@ func (s *TxSubmitter) SubmitTransactionLoop() { } err = s.submitForSingleEvent(event, attestPeriodEnd) if err != nil { - logging.Logger.Errorf("tx submitter err", err) + logging.Logger.Errorf("tx submitter ran into an error while trying to attest, err=%+v", err.Error()) continue } time.Sleep(TxSubmitInterval) @@ -110,6 +113,7 @@ func (s *TxSubmitter) submitForSingleEvent(event *model.Event, attestPeriodEnd u // Calculate event hash and use it to fetch votes and validator bitset aggregatedSignature, valBitSet, err := s.getSignatureAndBitSet(event) if err != nil { + s.metricService.IncSubmitterErr() return err } return s.submitTransactionLoop(event, attestPeriodEnd, aggregatedSignature, valBitSet) @@ -148,6 +152,7 @@ func (s *TxSubmitter) getSignatureAndBitSet(event *model.Event) ([]byte, *bitset // submitTransaction creates and submits the transaction. func (s *TxSubmitter) submitTransactionLoop(event *model.Event, attestPeriodEnd uint64, aggregatedSignature []byte, valBitSet *bitset.BitSet) error { + startTime := time.Now() submittedAttempts := 0 for { if time.Now().Unix() > int64(attestPeriodEnd) { @@ -178,12 +183,24 @@ func (s *TxSubmitter) submitTransactionLoop(event *model.Event, attestPeriodEnd // Submit transaction attestRes, err := s.executor.AttestChallenge(s.executor.GetAddr(), event.ChallengerAddress, event.SpOperatorAddress, event.ChallengeId, math.NewUintFromString(event.ObjectId), voteResult, valBitSet.Bytes(), aggregatedSignature, txOpts) if err != nil || !attestRes { + s.metricService.IncSubmitterErr() + logging.Logger.Errorf("submitter failed for challengeId: %d, attempts: %d, err=%+v", event.ChallengeId, submittedAttempts, err.Error()) submittedAttempts++ time.Sleep(TxSubmitInterval) continue } // Update event status to include in Attest Monitor err = s.DataProvider.UpdateEventStatus(event.ChallengeId, model.Submitted) + if err != nil { + s.metricService.IncSubmitterErr() + logging.Logger.Errorf("submitter succeeded in attesting but failed to update database, err=%+v", err.Error()) + continue + } + + elaspedTime := time.Since(startTime) + s.metricService.SetSubmitterDuration(elaspedTime) + s.metricService.IncSubmittedChallenges() + logging.Logger.Infof("submitter metrics increased for challengeId %d, elasped time %+v", event.ChallengeId, elaspedTime) return err } } diff --git a/verifier/hash_verifier.go b/verifier/hash_verifier.go index 413431e..0e9746f 100644 --- a/verifier/hash_verifier.go +++ b/verifier/hash_verifier.go @@ -16,6 +16,7 @@ import ( "github.com/bnb-chain/greenfield-challenger/db/model" "github.com/bnb-chain/greenfield-challenger/executor" "github.com/bnb-chain/greenfield-challenger/logging" + "github.com/bnb-chain/greenfield-challenger/metrics" "github.com/bnb-chain/greenfield-common/go/hash" "github.com/bnb-chain/greenfield-go-sdk/types" "github.com/panjf2000/ants/v2" @@ -29,10 +30,11 @@ type Verifier struct { mtx sync.RWMutex dataProvider DataProvider limiterSemaphore *semaphore.Weighted + metricService *metrics.MetricService } func NewHashVerifier(cfg *config.Config, executor *executor.Executor, - deduplicationInterval uint64, dataProvider DataProvider, + deduplicationInterval uint64, dataProvider DataProvider, metricService *metrics.MetricService, ) *Verifier { limiterSemaphore := semaphore.NewWeighted(20) return &Verifier{ @@ -42,6 +44,7 @@ func NewHashVerifier(cfg *config.Config, executor *executor.Executor, mtx: sync.RWMutex{}, dataProvider: dataProvider, limiterSemaphore: limiterSemaphore, + metricService: metricService, } } @@ -70,7 +73,11 @@ func (v *Verifier) verifyHash(pool *ants.Pool) error { // Read unprocessed event from db with lowest challengeId currentHeight := v.executor.GetCachedBlockHeight() events, err := v.dataProvider.FetchEventsForVerification(currentHeight) - + if err != nil { + v.metricService.IncHashVerifierErr() + logging.Logger.Errorf("verifier failed to retrieve the earliest events from db to begin verification, err=%+v", err.Error()) + return err + } // TODO: Remove after debugging fetchedEvents := []uint64{} for _, v := range events { @@ -78,10 +85,6 @@ func (v *Verifier) verifyHash(pool *ants.Pool) error { } logging.Logger.Infof("verifier fetched these events for verification: %+v", fetchedEvents) - if err != nil { - logging.Logger.Errorf("verifier failed to retrieve the earliest events from db to begin verification, err=%+v", err.Error()) - return err - } if len(events) == 0 { time.Sleep(common.RetryInterval) return nil @@ -129,6 +132,7 @@ func (v *Verifier) verifyHash(pool *ants.Pool) error { } func (v *Verifier) verifyForSingleEvent(event *model.Event) error { + startTime := time.Now() logging.Logger.Infof("verifier started for challengeId: %d %s", event.ChallengeId, time.Now().Format("15:04:05.000000")) currentHeight := v.executor.GetCachedBlockHeight() if err := v.preCheck(event, currentHeight); err != nil { @@ -165,8 +169,10 @@ func (v *Verifier) verifyForSingleEvent(event *model.Event) error { return challengeResErr }, retry.Context(context.Background()), common.RtyAttem, common.RtyDelay, common.RtyErr) if challengeResErr != nil { + v.metricService.IncHashVerifierSpApiErr() err = v.dataProvider.UpdateEventStatusVerifyResult(event.ChallengeId, model.Verified, model.HashMismatched) if err != nil { + v.metricService.IncHashVerifierErr() logging.Logger.Errorf("error updating event status for challengeId: %d", event.ChallengeId) } return err @@ -192,12 +198,16 @@ func (v *Verifier) verifyForSingleEvent(event *model.Event) error { logging.Logger.Infof("SpRootHash after replacing: %s for challengeId: %d", hex.EncodeToString(spRootHash), event.ChallengeId) // Update database after comparing err = v.compareHashAndUpdate(event.ChallengeId, chainRootHash, spRootHash) - logging.Logger.Infof("verifier completed time for challengeId: %d %s", event.ChallengeId, time.Now().Format("15:04:05.000000")) if err != nil { logging.Logger.Errorf("failed to update event status, challenge id: %d, err: %s", event.ChallengeId, err) + v.metricService.IncHashVerifierErr() return err } + // Log duration + elaspedTime := time.Since(startTime) + v.metricService.SetHashVerifierDuration(elaspedTime) + logging.Logger.Infof("verifier completed time for challengeId: %d %s", event.ChallengeId, time.Now().Format("15:04:05.000000")) return nil } @@ -243,10 +253,23 @@ func (v *Verifier) computeRootHash(segmentIndex uint32, pieceData []byte, checks } func (v *Verifier) compareHashAndUpdate(challengeId uint64, chainRootHash []byte, spRootHash []byte) error { - // TODO: Revert this if debugging if bytes.Equal(chainRootHash, spRootHash) { - //return v.dataProvider.UpdateEventStatusVerifyResult(challengeId, model.Verified, model.HashMismatched) - return v.dataProvider.UpdateEventStatusVerifyResult(challengeId, model.Verified, model.HashMatched) + // TODO: Revert this if debugging + err := v.dataProvider.UpdateEventStatusVerifyResult(challengeId, model.Verified, model.HashMatched) + if err != nil { + return err + } + // update metrics if no err + v.metricService.IncVerifiedChallenges() + v.metricService.IncChallengeFailed() + return err + } + err := v.dataProvider.UpdateEventStatusVerifyResult(challengeId, model.Verified, model.HashMismatched) + if err != nil { + return err } - return v.dataProvider.UpdateEventStatusVerifyResult(challengeId, model.Verified, model.HashMismatched) + // update metrics if no err + v.metricService.IncVerifiedChallenges() + v.metricService.IncChallengeSuccess() + return err } diff --git a/verifier/hash_verifier_test.go b/verifier/hash_verifier_test.go index 076bf21..1f8379a 100644 --- a/verifier/hash_verifier_test.go +++ b/verifier/hash_verifier_test.go @@ -12,7 +12,7 @@ import ( ) func TestHashing(t *testing.T) { - verifier := NewHashVerifier(nil, nil, 100, nil) + verifier := NewHashVerifier(nil, nil, 100, nil, nil) hashesStr := []string{"test1", "test2", "test3", "test4", "test5", "test6", "test7"} checksums := make([][]byte, 7) diff --git a/vote/data_provider.go b/vote/data_provider.go index 09e70f7..e719342 100644 --- a/vote/data_provider.go +++ b/vote/data_provider.go @@ -8,7 +8,7 @@ import ( ) type DataProvider interface { - FetchEventsForSelfVote(currentHeight uint64) ([]*model.Event, error) + FetchEventsForSelfVote(currentHeight uint64) ([]*model.Event, uint64, error) FetchEventsForCollate(currentHeight uint64) ([]*model.Event, error) FetchVotesForCollate(eventHash string) ([]*model.Vote, error) UpdateEventStatus(challengeId uint64, status model.EventStatus) error @@ -30,27 +30,31 @@ func NewDataHandler(daoManager *dao.DaoManager, executor *executor.Executor) *Da } } -func (h *DataHandler) FetchEventsForSelfVote(currentHeight uint64) ([]*model.Event, error) { +func (h *DataHandler) FetchEventsForSelfVote(currentHeight uint64) ([]*model.Event, uint64, error) { events, err := h.daoManager.GetUnexpiredEventsByStatus(currentHeight, model.Verified) if err != nil { logging.Logger.Errorf("failed to fetch events for self vote, err=%+v", err.Error()) - return nil, err + return nil, 0, err } heartbeatInterval, err := h.executor.QueryChallengeHeartbeatInterval() logging.Logger.Infof("heartbeat interval is %d", heartbeatInterval) if err != nil { logging.Logger.Errorf("error querying heartbeat interval, err=%+v", err.Error()) - return nil, err + return nil, 0, err } result := make([]*model.Event, 0) + heartbeatEventCount := 0 for _, e := range events { - if e.VerifyResult == model.HashMismatched || e.ChallengeId%heartbeatInterval == 0 { + if e.VerifyResult == model.HashMismatched { result = append(result, e) + } else if e.ChallengeId%heartbeatInterval == 0 { + result = append(result, e) + heartbeatEventCount++ } // it means if a challenge cannot be handled correctly, it will be skipped h.lastIdForSelfVote = e.ChallengeId } - return result, nil + return result, uint64(heartbeatEventCount), nil } func (h *DataHandler) FetchEventsForCollate(currentHeight uint64) ([]*model.Event, error) { diff --git a/vote/vote_broadcaster.go b/vote/vote_broadcaster.go index d5fc01b..ad93e6a 100644 --- a/vote/vote_broadcaster.go +++ b/vote/vote_broadcaster.go @@ -2,9 +2,12 @@ package vote import ( "fmt" + lru "github.com/hashicorp/golang-lru" "strings" "time" + "github.com/bnb-chain/greenfield-challenger/metrics" + "github.com/bnb-chain/greenfield-challenger/common" "github.com/bnb-chain/greenfield-challenger/config" "github.com/bnb-chain/greenfield-challenger/db/model" @@ -18,32 +21,34 @@ type VoteBroadcaster struct { signer *VoteSigner executor *executor.Executor blsPublicKey []byte - cachedLocalVote map[uint64]*votepool.Vote + cachedLocalVote *lru.Cache dataProvider DataProvider + metricService *metrics.MetricService } func NewVoteBroadcaster(cfg *config.Config, signer *VoteSigner, - executor *executor.Executor, broadcasterDataProvider DataProvider, + executor *executor.Executor, broadcasterDataProvider DataProvider, metricService *metrics.MetricService, ) *VoteBroadcaster { + cacheSize := 1000 + lruCache, _ := lru.New(cacheSize) + return &VoteBroadcaster{ config: cfg, signer: signer, executor: executor, dataProvider: broadcasterDataProvider, - cachedLocalVote: nil, + cachedLocalVote: lruCache, blsPublicKey: executor.BlsPubKey, + metricService: metricService, } } func (p *VoteBroadcaster) BroadcastVotesLoop() { - // Event lasts for 300 blocks, 2x for redundancy - p.cachedLocalVote = make(map[uint64]*votepool.Vote, common.CacheSize) - broadcastLoopCount := 0 for { currentHeight := p.executor.GetCachedBlockHeight() - // Ask about this function - events, err := p.dataProvider.FetchEventsForSelfVote(currentHeight) + events, heartbeatEventCount, err := p.dataProvider.FetchEventsForSelfVote(currentHeight) if err != nil { + p.metricService.IncBroadcasterErr() logging.Logger.Errorf("vote processor failed to fetch unexpired events to collate votes, err=%+v", err.Error()) continue } @@ -51,46 +56,51 @@ func (p *VoteBroadcaster) BroadcastVotesLoop() { time.Sleep(RetryInterval) continue } + if heartbeatEventCount != 0 { + for i := uint64(0); i < heartbeatEventCount; i++ { + p.metricService.IncHeartbeatEvents() + } + } for _, event := range events { - localVote := p.cachedLocalVote[event.ChallengeId] + localVote, found := p.cachedLocalVote.Get(event.ChallengeId) - if localVote == nil { + if !found { localVote, err = p.constructVoteAndSign(event) if err != nil { if strings.Contains(err.Error(), "Duplicate") { logging.Logger.Errorf("[non-blocking error] broadcaster was trying to save a duplicated vote after clearing cache for challengeId: %d, err=%+v", event.ChallengeId, err.Error()) } else { + p.metricService.IncBroadcasterErr() logging.Logger.Errorf("broadcaster ran into error trying to construct vote for challengeId: %d, err=%+v", event.ChallengeId, err.Error()) continue } } - p.cachedLocalVote[event.ChallengeId] = localVote + p.cachedLocalVote.Add(event.ChallengeId, localVote) + // Incrementing this before broadcasting to prevent the same challengeID from being incremented multiple times + // does not mean that it has been successfully broadcasted, check error metrics for broadcast errors. + p.metricService.IncBroadcastedChallenges() + logging.Logger.Infof("broadcaster metrics increased for challengeId %d", event.ChallengeId) } - err = p.broadcastForSingleEvent(localVote, event) + err = p.broadcastForSingleEvent(localVote.(*votepool.Vote), event) if err != nil { + p.metricService.IncBroadcasterErr() continue } time.Sleep(50 * time.Millisecond) } - broadcastLoopCount++ - if broadcastLoopCount == common.CacheClearIterations { - // Clear cachedLocalVote every N loops, preCheck cannot catch events expired in between iterations - p.cachedLocalVote = make(map[uint64]*votepool.Vote, common.CacheSize) - broadcastLoopCount = 0 - } - time.Sleep(RetryInterval) } } func (p *VoteBroadcaster) broadcastForSingleEvent(localVote *votepool.Vote, event *model.Event) error { + startTime := time.Now() err := p.preCheck(event) if err != nil { if err.Error() == common.ErrEventExpired.Error() { - delete(p.cachedLocalVote, event.ChallengeId) + p.cachedLocalVote.Remove(event.ChallengeId) return err } return err @@ -102,6 +112,10 @@ func (p *VoteBroadcaster) broadcastForSingleEvent(localVote *votepool.Vote, even return fmt.Errorf("failed to broadcast vote for challengeId: %d", event.ChallengeId) } logging.Logger.Infof("vote broadcasted for challengeId: %d, height: %d", event.ChallengeId, event.Height) + + // Metrics + elaspedTime := time.Since(startTime) + p.metricService.SetBroadcasterDuration(elaspedTime) return nil } diff --git a/vote/vote_collator.go b/vote/vote_collator.go index b6bd0b0..09e4911 100644 --- a/vote/vote_collator.go +++ b/vote/vote_collator.go @@ -10,26 +10,29 @@ import ( "github.com/bnb-chain/greenfield-challenger/db/model" "github.com/bnb-chain/greenfield-challenger/executor" "github.com/bnb-chain/greenfield-challenger/logging" + "github.com/bnb-chain/greenfield-challenger/metrics" tmtypes "github.com/cometbft/cometbft/types" ) type VoteCollator struct { - config *config.Config - signer *VoteSigner - executor *executor.Executor - blsPublicKey []byte - dataProvider DataProvider + config *config.Config + signer *VoteSigner + executor *executor.Executor + blsPublicKey []byte + dataProvider DataProvider + metricService *metrics.MetricService } func NewVoteCollator(cfg *config.Config, signer *VoteSigner, - executor *executor.Executor, collatorDataProvider DataProvider, + executor *executor.Executor, collatorDataProvider DataProvider, metricService *metrics.MetricService, ) *VoteCollator { return &VoteCollator{ - config: cfg, - signer: signer, - executor: executor, - dataProvider: collatorDataProvider, - blsPublicKey: executor.BlsPubKey, + config: cfg, + signer: signer, + executor: executor, + dataProvider: collatorDataProvider, + blsPublicKey: executor.BlsPubKey, + metricService: metricService, } } @@ -39,6 +42,7 @@ func (p *VoteCollator) CollateVotesLoop() { events, err := p.dataProvider.FetchEventsForCollate(currentHeight) logging.Logger.Infof("vote processor fetched %d events for collate", len(events)) if err != nil { + p.metricService.IncCollatorErr() logging.Logger.Errorf("vote processor failed to fetch unexpired events to collate votes, err=%+v", err.Error()) time.Sleep(RetryInterval) continue @@ -65,14 +69,22 @@ func (p *VoteCollator) collateForSingleEvent(event *model.Event) error { if err != nil { return err } + startTime := time.Now() + err = p.prepareEnoughValidVotesForEvent(event) if err != nil { return err } err = p.dataProvider.UpdateEventStatus(event.ChallengeId, model.EnoughVotesCollected) if err != nil { + p.metricService.IncCollatorErr() return err } + + elaspedTime := time.Since(startTime) + p.metricService.SetCollatorDuration(elaspedTime) + p.metricService.IncCollatedChallenges() + logging.Logger.Infof("collator metrics increased for challengeId %d, elasped time %+v", event.ChallengeId, elaspedTime) logging.Logger.Infof("collator completed time for challengeId: %d %s", event.ChallengeId, time.Now().Format("15:04:05.000000")) return nil } @@ -81,6 +93,7 @@ func (p *VoteCollator) collateForSingleEvent(event *model.Event) error { func (p *VoteCollator) prepareEnoughValidVotesForEvent(event *model.Event) error { validators, err := p.executor.QueryCachedLatestValidators() if err != nil { + p.metricService.IncCollatorErr() return err } if len(validators) == 1 { @@ -112,6 +125,7 @@ func (p *VoteCollator) queryMoreThanTwoThirdVotesForEvent(event *model.Event, va eventHash := CalculateEventHash(event, p.config.GreenfieldConfig.ChainIdString) queriedVotes, err := p.dataProvider.FetchVotesForCollate(hex.EncodeToString(eventHash)) if err != nil { + p.metricService.IncCollatorErr() logging.Logger.Errorf("failed to query votes for event %d, err=%+v", event.ChallengeId, err.Error()) return err } diff --git a/vote/vote_collector.go b/vote/vote_collector.go index c243e92..4445454 100644 --- a/vote/vote_collector.go +++ b/vote/vote_collector.go @@ -9,23 +9,26 @@ import ( "github.com/bnb-chain/greenfield-challenger/config" "github.com/bnb-chain/greenfield-challenger/executor" "github.com/bnb-chain/greenfield-challenger/logging" + "github.com/bnb-chain/greenfield-challenger/metrics" tmtypes "github.com/cometbft/cometbft/types" "github.com/cometbft/cometbft/votepool" ) type VoteCollector struct { - config *config.Config - executor *executor.Executor - mtx sync.RWMutex - dataProvider DataProvider + config *config.Config + executor *executor.Executor + mtx sync.RWMutex + dataProvider DataProvider + metricService *metrics.MetricService } -func NewVoteCollector(cfg *config.Config, executor *executor.Executor, collectorDataProvider DataProvider) *VoteCollector { +func NewVoteCollector(cfg *config.Config, executor *executor.Executor, collectorDataProvider DataProvider, metricService *metrics.MetricService) *VoteCollector { return &VoteCollector{ - config: cfg, - executor: executor, - mtx: sync.RWMutex{}, - dataProvider: collectorDataProvider, + config: cfg, + executor: executor, + mtx: sync.RWMutex{}, + dataProvider: collectorDataProvider, + metricService: metricService, } } @@ -43,10 +46,14 @@ func (p *VoteCollector) collectVotes() error { eventType := votepool.DataAvailabilityChallengeEvent queriedVotes, err := p.executor.QueryVotes(eventType) if err != nil { + p.metricService.IncVoteCollectorErr() logging.Logger.Errorf("vote collector failed to query votes, err=%+v", err.Error()) return err } logging.Logger.Infof("number of votes collected: %d", len(queriedVotes)) + for range queriedVotes { + p.metricService.IncVotesCollected() + } if len(queriedVotes) == 0 { time.Sleep(RetryInterval) @@ -55,6 +62,7 @@ func (p *VoteCollector) collectVotes() error { validators, err := p.executor.QueryCachedLatestValidators() if err != nil { + p.metricService.IncVoteCollectorErr() logging.Logger.Errorf("vote collector ran into error querying validators, err=%+v", err.Error()) return err } @@ -62,6 +70,7 @@ func (p *VoteCollector) collectVotes() error { for _, v := range queriedVotes { exists, err := p.dataProvider.IsVoteExists(hex.EncodeToString(v.EventHash), hex.EncodeToString(v.PubKey)) if err != nil { + p.metricService.IncVoteCollectorErr() logging.Logger.Errorf("vote collector ran into an error while checking if vote exists, err=%+v", err.Error()) continue } @@ -81,6 +90,7 @@ func (p *VoteCollector) collectVotes() error { err = p.dataProvider.SaveVote(EntityToDto(v, uint64(0))) if err != nil { + p.metricService.IncVoteCollectorErr() return err } logging.Logger.Infof("vote saved: %s", hex.EncodeToString(v.Signature))