Skip to content

Commit

Permalink
feat: send webhook on planned upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
MattKetmo committed Nov 20, 2023
1 parent 15d345d commit 121f316
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 8 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ GLOBAL OPTIONS:
--denom value denom used in metrics label (eg. atom or uatom)
--denom-exponent value denom exponent (eg. 6 for atom, 1 for uatom) (default: 0)
--validator value [ --validator value ] validator address(es) to track (use :my-label to add a custom label in metrics & ouput)
--webhook-endpoint value endpoint where to send upgrade webhooks
--x-gov value version of the gov module to use (v1|v1beta1) (default: "v1beta1")
--help, -h show help
--version, -v print the version
Expand Down
4 changes: 4 additions & 0 deletions pkg/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ var Flags = []cli.Flag{
Name: "validator",
Usage: "validator address(es) to track (use :my-label to add a custom label in metrics & ouput)",
},
&cli.StringFlag{
Name: "webhook-url",
Usage: "endpoint where to send upgrade webhooks",
},
&cli.StringFlag{
Name: "x-gov",
Usage: "version of the gov module to use (v1|v1beta1)",
Expand Down
29 changes: 22 additions & 7 deletions pkg/app/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app
import (
"context"
"fmt"
"net/url"
"os"
"os/signal"
"syscall"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/kilnfi/cosmos-validator-watcher/pkg/metrics"
"github.com/kilnfi/cosmos-validator-watcher/pkg/rpc"
"github.com/kilnfi/cosmos-validator-watcher/pkg/watcher"
"github.com/kilnfi/cosmos-validator-watcher/pkg/webhook"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/samber/lo"
Expand All @@ -40,6 +42,7 @@ func RunFunc(cCtx *cli.Context) error {
denom = cCtx.String("denom")
denomExpon = cCtx.Uint("denom-exponent")
validators = cCtx.StringSlice("validator")
webhookURL = cCtx.String("webhook-url")
xGov = cCtx.String("x-gov")
)

Expand Down Expand Up @@ -86,12 +89,6 @@ func RunFunc(cCtx *cli.Context) error {
errg.Go(func() error {
return statusWatcher.Start(ctx)
})
// Register watchers on nodes events
for _, node := range pool.Nodes {
node.OnStart(blockWatcher.OnNodeStart)
node.OnStatus(statusWatcher.OnNodeStatus)
node.OnEvent(rpc.EventNewBlock, blockWatcher.OnNewBlock)
}

//
// Pool watchers
Expand Down Expand Up @@ -121,13 +118,31 @@ func RunFunc(cCtx *cli.Context) error {
log.Warn().Msgf("unknown gov module version: %s", xGov)
}
}
upgradeWatcher := watcher.NewUpgradeWatcher(metrics, pool, watcher.UpgradeWatcherOptions{
var wh *webhook.Webhook
if webhookURL != "" {
whURL, err := url.Parse(webhookURL)
if err != nil {
return fmt.Errorf("failed to parse webhook endpoint: %w", err)
}
wh = webhook.New(*whURL)
}
upgradeWatcher := watcher.NewUpgradeWatcher(metrics, pool, wh, watcher.UpgradeWatcherOptions{
CheckPendingProposals: !noGov,
})
errg.Go(func() error {
return upgradeWatcher.Start(ctx)
})

//
// Register watchers on nodes events
//
for _, node := range pool.Nodes {
node.OnStart(blockWatcher.OnNodeStart)
node.OnStatus(statusWatcher.OnNodeStatus)
node.OnEvent(rpc.EventNewBlock, blockWatcher.OnNewBlock)
node.OnEvent(rpc.EventNewBlock, upgradeWatcher.OnNewBlock)
}

//
// Start Pool
//
Expand Down
63 changes: 62 additions & 1 deletion pkg/watcher/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,37 @@ import (
"fmt"
"time"

ctypes "github.com/cometbft/cometbft/rpc/core/types"
comettypes "github.com/cometbft/cometbft/types"
"github.com/cosmos/cosmos-sdk/client"
gov "github.com/cosmos/cosmos-sdk/x/gov/types/v1beta1"
"github.com/cosmos/cosmos-sdk/x/upgrade/types"
upgrade "github.com/cosmos/cosmos-sdk/x/upgrade/types"
"github.com/gogo/protobuf/codec"
"github.com/kilnfi/cosmos-validator-watcher/pkg/metrics"
"github.com/kilnfi/cosmos-validator-watcher/pkg/rpc"
"github.com/kilnfi/cosmos-validator-watcher/pkg/webhook"
"github.com/rs/zerolog/log"
)

type UpgradeWatcher struct {
metrics *metrics.Metrics
pool *rpc.Pool
webhook *webhook.Webhook
options UpgradeWatcherOptions

nextUpgradePlan *upgrade.Plan
}

type UpgradeWatcherOptions struct {
CheckPendingProposals bool
}

func NewUpgradeWatcher(metrics *metrics.Metrics, pool *rpc.Pool, options UpgradeWatcherOptions) *UpgradeWatcher {
func NewUpgradeWatcher(metrics *metrics.Metrics, pool *rpc.Pool, webhook *webhook.Webhook, options UpgradeWatcherOptions) *UpgradeWatcher {
return &UpgradeWatcher{
metrics: metrics,
pool: pool,
webhook: webhook,
options: options,
}
}
Expand All @@ -52,6 +59,55 @@ func (w *UpgradeWatcher) Start(ctx context.Context) error {
}
}

func (w *UpgradeWatcher) OnNewBlock(ctx context.Context, node *rpc.Node, evt *ctypes.ResultEvent) error {
// Ignore is webhook is not configured
if w.webhook == nil {
return nil
}

// Ignore if no upgrade plan
if w.nextUpgradePlan == nil {
return nil
}

// Ignore blocks if node is catching up
if !node.IsSynced() {
return nil
}

blockEvent := evt.Data.(comettypes.EventDataNewBlock)
block := blockEvent.Block

// Ignore if upgrade plan is for a future block
if block.Header.Height < w.nextUpgradePlan.Height {
return nil
}

// Upgrade plan is for this block
go w.triggerWebhook(ctx, node.ChainID(), *w.nextUpgradePlan)
w.nextUpgradePlan = nil

return nil
}

func (w *UpgradeWatcher) triggerWebhook(ctx context.Context, chainID string, plan upgrade.Plan) {
msg := struct {
Type string
Block int64
ChainID string
Version string
}{
Type: "upgrade",
Block: plan.Height,
ChainID: chainID,
Version: plan.Name,
}

if err := w.webhook.Send(ctx, msg); err != nil {
log.Error().Err(err).Msg("failed to send upgrade webhook")
}
}

func (w *UpgradeWatcher) fetchUpgrade(ctx context.Context, node *rpc.Node) error {
clientCtx := (client.Context{}).WithClient(node.Client)
queryClient := upgrade.NewQueryClient(clientCtx)
Expand Down Expand Up @@ -117,10 +173,15 @@ func (w *UpgradeWatcher) checkUpgradeProposals(ctx context.Context, node *rpc.No
}

func (w *UpgradeWatcher) handleUpgradePlan(chainID string, plan *upgrade.Plan) {
w.nextUpgradePlan = &upgrade.Plan{
Name: "v43",
Height: 17935608,
}
if plan == nil {
w.metrics.UpgradePlan.Reset()
return
}

w.nextUpgradePlan = plan
w.metrics.UpgradePlan.WithLabelValues(chainID, plan.Name).Set(float64(plan.Height))
}
1 change: 1 addition & 0 deletions pkg/watcher/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestUpgradeWatcher(t *testing.T) {
watcher := NewUpgradeWatcher(
metrics.New("cosmos_validator_watcher"),
nil,
nil,
UpgradeWatcherOptions{},
)

Expand Down
70 changes: 70 additions & 0 deletions pkg/webhook/webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package webhook

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"

"github.com/avast/retry-go/v4"
"github.com/rs/zerolog/log"
)

type Webhook struct {
endpoint url.URL
client *http.Client
}

func New(endpoint url.URL) *Webhook {
return &Webhook{
endpoint: endpoint,
client: &http.Client{},
}
}

func (w *Webhook) Send(ctx context.Context, message interface{}) error {
body, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}

log.Info().Msgf("sending webhook: %s", body)

req, err := http.NewRequestWithContext(ctx, "POST", w.endpoint.String(), bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

req.Header.Set("Content-Type", "application/json")

retryOpts := []retry.Option{
retry.Context(ctx),
retry.Delay(1 * time.Second),
retry.Attempts(3),
retry.OnRetry(func(_ uint, err error) {
log.Warn().Err(err).Msgf("retrying webhook on %s", w.endpoint.String())
}),
}

return retry.Do(func() error {
return w.postRequest(ctx, req)
}, retryOpts...)
}

func (w *Webhook) postRequest(ctx context.Context, req *http.Request) error {
resp, err := w.client.Do(req)
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()

// Check if response is not 4xx or 5xx
if resp.StatusCode >= 400 {
return fmt.Errorf("unexpected response status: %s", resp.Status)
}

return nil
}

0 comments on commit 121f316

Please sign in to comment.