diff --git a/.vscode/launch.json b/.vscode/launch.json index 2ea008fe3..3fc456d93 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -11,12 +11,6 @@ "request": "launch", "mode": "debug", "program": "${workspaceFolder}/cmd/api/main.go", - "args": [ - "-f", - "config.yml", - "-f", - "config.dev.yml" - ], "envFile": "${workspaceFolder}/.env" }, { @@ -25,12 +19,6 @@ "request": "launch", "mode": "debug", "program": "${workspaceFolder}/cmd/indexer/main.go", - "args": [ - "-f", - "config.yml", - "-f", - "config.dev.yml" - ], "envFile": "${workspaceFolder}/.env" }, { @@ -39,12 +27,6 @@ "request": "launch", "mode": "debug", "program": "${workspaceFolder}/scripts/migration/main.go", - "args": [ - "-f", - "config.yml", - "-f", - "config.dev.yml" - ], "envFile": "${workspaceFolder}/.env" } ] diff --git a/cmd/indexer/indexer/create.go b/cmd/indexer/indexer/create.go index 5c19c36e1..8b7c96c79 100644 --- a/cmd/indexer/indexer/create.go +++ b/cmd/indexer/indexer/create.go @@ -6,11 +6,12 @@ import ( "github.com/baking-bad/bcdhub/internal/bcd/tezerrors" "github.com/baking-bad/bcdhub/internal/config" + "github.com/dipdup-io/workerpool" "github.com/rs/zerolog/log" ) // CreateIndexers - -func CreateIndexers(ctx context.Context, cfg config.Config) ([]Indexer, error) { +func CreateIndexers(ctx context.Context, cfg config.Config, g workerpool.Group) ([]Indexer, error) { if err := tezerrors.LoadErrorDescriptions(); err != nil { return nil, err } @@ -27,7 +28,7 @@ func CreateIndexers(ctx context.Context, cfg config.Config) ([]Indexer, error) { defer wg.Done() if indexerCfg.Periodic != nil { - periodicIndexer, err := NewPeriodicIndexer(ctx, network, cfg, indexerCfg) + periodicIndexer, err := NewPeriodicIndexer(ctx, network, cfg, indexerCfg, g) if err != nil { log.Err(err).Msg("NewPeriodicIndexer") return diff --git a/cmd/indexer/indexer/indexer.go b/cmd/indexer/indexer/indexer.go index c2ca7d62a..6eba127f6 100644 --- a/cmd/indexer/indexer/indexer.go +++ b/cmd/indexer/indexer/indexer.go @@ -22,6 +22,7 @@ import ( "github.com/baking-bad/bcdhub/internal/postgres" "github.com/baking-bad/bcdhub/internal/postgres/core" "github.com/baking-bad/bcdhub/internal/rollback" + "github.com/dipdup-io/workerpool" "github.com/go-pg/pg/v10" "github.com/pkg/errors" ) @@ -45,6 +46,8 @@ type BlockchainIndexer struct { isPeriodic bool indicesInit sync.Once + + g workerpool.Group } // NewBlockchainIndexer - @@ -69,6 +72,7 @@ func NewBlockchainIndexer(ctx context.Context, cfg config.Config, network string Network: networkType, isPeriodic: indexerConfig.Periodic != nil, refreshTimer: make(chan struct{}, 10), + g: workerpool.NewGroup(), } if err := bi.init(ctx, bi.Context.StorageDB); err != nil { @@ -80,6 +84,8 @@ func NewBlockchainIndexer(ctx context.Context, cfg config.Config, network string // Close - func (bi *BlockchainIndexer) Close() error { + bi.g.Wait() + close(bi.refreshTimer) if err := bi.receiver.Close(); err != nil { return nil @@ -127,14 +133,11 @@ func (bi *BlockchainIndexer) init(ctx context.Context, db *core.Postgres) error } // Start - -func (bi *BlockchainIndexer) Start(ctx context.Context, wg *sync.WaitGroup) { - defer wg.Done() - +func (bi *BlockchainIndexer) Start(ctx context.Context) { localSentry := helpers.GetLocalSentry() helpers.SetLocalTagSentry(localSentry, "network", bi.Network.String()) - wg.Add(1) - go bi.indexBlock(ctx, wg) + bi.g.GoCtx(ctx, bi.indexBlock) bi.receiver.Start(ctx) @@ -194,9 +197,7 @@ func (bi *BlockchainIndexer) setUpdateTicker(seconds int) { bi.refreshTimer <- struct{}{} } -func (bi *BlockchainIndexer) indexBlock(ctx context.Context, wg *sync.WaitGroup) { - defer wg.Done() - +func (bi *BlockchainIndexer) indexBlock(ctx context.Context) { ticker := time.NewTicker(time.Millisecond) defer ticker.Stop() @@ -578,5 +579,6 @@ func (bi *BlockchainIndexer) reinit(ctx context.Context, cfg config.Config, inde logger.Info().Str("network", bi.Context.Network.String()).Msg("Creating indexer object...") bi.receiver = NewReceiver(bi.Context.RPC, 20, indexerConfig.ReceiverThreads) + bi.refreshTimer = make(chan struct{}, 10) return bi.init(ctx, bi.Context.StorageDB) } diff --git a/cmd/indexer/indexer/interface.go b/cmd/indexer/indexer/interface.go index 196f5a5b0..563816058 100644 --- a/cmd/indexer/indexer/interface.go +++ b/cmd/indexer/indexer/interface.go @@ -2,14 +2,13 @@ package indexer import ( "context" - "sync" "github.com/baking-bad/bcdhub/internal/noderpc" ) // Indexer - type Indexer interface { - Start(ctx context.Context, wg *sync.WaitGroup) + Start(ctx context.Context) Index(ctx context.Context, head noderpc.Header) error Rollback(ctx context.Context) error Close() error diff --git a/cmd/indexer/indexer/periodic.go b/cmd/indexer/indexer/periodic.go index c22dac25f..115cbfa00 100644 --- a/cmd/indexer/indexer/periodic.go +++ b/cmd/indexer/indexer/periodic.go @@ -3,13 +3,14 @@ package indexer import ( "context" "errors" - "sync" "time" "github.com/baking-bad/bcdhub/internal/config" + "github.com/baking-bad/bcdhub/internal/logger" "github.com/baking-bad/bcdhub/internal/models/types" "github.com/baking-bad/bcdhub/internal/noderpc" "github.com/baking-bad/bcdhub/internal/periodic" + "github.com/dipdup-io/workerpool" ) // PeriodicIndexer - @@ -21,12 +22,17 @@ type PeriodicIndexer struct { indexerCfg config.IndexerConfig worker *periodic.Worker - - wg *sync.WaitGroup + g workerpool.Group } // NewPeriodicIndexer - -func NewPeriodicIndexer(ctx context.Context, network string, cfg config.Config, indexerCfg config.IndexerConfig) (*PeriodicIndexer, error) { +func NewPeriodicIndexer( + ctx context.Context, + network string, + cfg config.Config, + indexerCfg config.IndexerConfig, + g workerpool.Group, +) (*PeriodicIndexer, error) { if indexerCfg.Periodic == nil { return nil, errors.New("not periodic") } @@ -34,7 +40,7 @@ func NewPeriodicIndexer(ctx context.Context, network string, cfg config.Config, p := &PeriodicIndexer{ cfg: cfg, indexerCfg: indexerCfg, - wg: new(sync.WaitGroup), + g: g, } worker, err := periodic.New(*indexerCfg.Periodic, types.NewNetwork(network), p.handleUrlChanged) @@ -60,12 +66,10 @@ func NewPeriodicIndexer(ctx context.Context, network string, cfg config.Config, } // Start - -func (p *PeriodicIndexer) Start(ctx context.Context, wg *sync.WaitGroup) { - +func (p *PeriodicIndexer) Start(ctx context.Context) { indexerCtx, indexerCancel := context.WithCancel(ctx) p.indexerCancel = indexerCancel - - p.indexer.Start(indexerCtx, p.wg) + p.indexer.Start(indexerCtx) } // Close - @@ -87,8 +91,8 @@ func (p *PeriodicIndexer) Rollback(ctx context.Context) error { } func (p *PeriodicIndexer) handleUrlChanged(ctx context.Context, network, url string) error { + logger.Warning().Str("network", network).Str("url", url).Msg("cancelling indexer due to URL changing...") p.indexerCancel() - p.wg.Wait() if err := p.indexer.Close(); err != nil { return err @@ -102,7 +106,7 @@ func (p *PeriodicIndexer) handleUrlChanged(ctx context.Context, network, url str indexerCtx, indexerCancel := context.WithCancel(ctx) p.indexerCancel = indexerCancel - p.indexer.Start(indexerCtx, p.wg) + p.g.GoCtx(indexerCtx, p.indexer.Start) return nil } diff --git a/cmd/indexer/main.go b/cmd/indexer/main.go index 22d67ec41..ba63f2677 100644 --- a/cmd/indexer/main.go +++ b/cmd/indexer/main.go @@ -4,13 +4,13 @@ import ( "context" "os" "os/signal" - "sync" "syscall" "github.com/baking-bad/bcdhub/cmd/indexer/indexer" "github.com/baking-bad/bcdhub/internal/config" "github.com/baking-bad/bcdhub/internal/helpers" "github.com/baking-bad/bcdhub/internal/logger" + "github.com/dipdup-io/workerpool" "github.com/pyroscope-io/client/pyroscope" ) @@ -56,10 +56,10 @@ func main() { } } - var wg sync.WaitGroup + g := workerpool.NewGroup() ctx, cancel := context.WithCancel(context.Background()) - indexers, err := indexer.CreateIndexers(ctx, cfg) + indexers, err := indexer.CreateIndexers(ctx, cfg, g) if err != nil { cancel() logger.Err(err) @@ -71,14 +71,13 @@ func main() { signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) for i := range indexers { - wg.Add(1) - go indexers[i].Start(ctx, &wg) + g.GoCtx(ctx, indexers[i].Start) } <-sigChan cancel() - wg.Wait() + g.Wait() for i := range indexers { if err := indexers[i].Close(); err != nil { panic(err) diff --git a/go.mod b/go.mod index 61a2375f9..095b9c515 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.20 require ( github.com/aws/aws-sdk-go v1.44.92 github.com/btcsuite/btcutil v1.0.2 - github.com/dipdup-io/workerpool v0.0.3 + github.com/dipdup-io/workerpool v0.0.4 github.com/ebellocchia/go-base58 v0.1.0 github.com/fatih/color v1.13.0 github.com/getsentry/sentry-go v0.13.0 diff --git a/go.sum b/go.sum index f8da635fc..2a77d5134 100644 --- a/go.sum +++ b/go.sum @@ -31,8 +31,8 @@ github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dipdup-io/workerpool v0.0.3 h1:+cnO0/J0e4UiJ0EBEDpvuhriSDVHlsPminGRU2Il+ZI= -github.com/dipdup-io/workerpool v0.0.3/go.mod h1:m6YMqx7M+fORTyabHD/auKymBRpbDax0t1aPZ1i7GZA= +github.com/dipdup-io/workerpool v0.0.4 h1:m58fuFY3VIPRc+trWpjw2Lsm4FvIgtjP/4VRe79r+/s= +github.com/dipdup-io/workerpool v0.0.4/go.mod h1:m6YMqx7M+fORTyabHD/auKymBRpbDax0t1aPZ1i7GZA= github.com/ebellocchia/go-base58 v0.1.0 h1:0w/ODEfZnOPW5KW0QY/Xpb1fxba/BxQJMUa5iYzpljk= github.com/ebellocchia/go-base58 v0.1.0/go.mod h1:RHE/6C6Ru6YAH9Tc+A9eHQ6ZKEooLC0jw+YLnpt3CAU= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= diff --git a/internal/models/consts.go b/internal/models/consts.go index 508033fc1..6811575c9 100644 --- a/internal/models/consts.go +++ b/internal/models/consts.go @@ -27,7 +27,7 @@ const ( DocProtocol = "protocols" DocScripts = "scripts" DocTicketUpdates = "ticket_updates" - DocSmartRollups = "smart_rollups" + DocSmartRollups = "smart_rollup" ) // AllDocuments - returns all document names diff --git a/internal/models/smart_rollup/model.go b/internal/models/smart_rollup/model.go index f49c5be9f..951764298 100644 --- a/internal/models/smart_rollup/model.go +++ b/internal/models/smart_rollup/model.go @@ -33,7 +33,7 @@ func (sr *SmartRollup) GetID() int64 { // GetIndex - func (SmartRollup) GetIndex() string { - return "contracts" + return "smart_rollup" } // Save - diff --git a/internal/periodic/worker.go b/internal/periodic/worker.go index f0a4014db..b51e94512 100644 --- a/internal/periodic/worker.go +++ b/internal/periodic/worker.go @@ -64,6 +64,8 @@ func (w *Worker) Start(ctx context.Context) { logger.Error().Err(err).Str("network", w.network.String()).Msg("failed to run cron function") return } + + logger.Info().Time("cron", w.cron.Entries()[0].Schedule.Next(time.Now())).Msg("cron") w.cron.Start() } @@ -82,6 +84,7 @@ func (w *Worker) handleScheduleEvent(ctx context.Context) func() { logger.Error().Err(err).Str("network", w.network.String()).Msg("failed to receive periodic network info") } if changed { + logger.Info().Msg("rpc url changed") return } @@ -98,6 +101,7 @@ func (w *Worker) handleScheduleEvent(ctx context.Context) func() { logger.Error().Err(err).Str("network", w.network.String()).Msg("failed to receive periodic network info") } if changed { + logger.Info().Msg("rpc url changed") return } } @@ -122,12 +126,13 @@ func (w *Worker) checkNetwork(ctx context.Context) (bool, error) { } if w.currentUrl != data.RPCURL { + w.currentUrl = data.RPCURL + if w.currentUrl != "" { - if err := w.handler(ctx, w.network.String(), w.currentUrl); err != nil { + if err := w.handler(ctx, w.network.String(), data.RPCURL); err != nil { logger.Error().Err(err).Str("network", w.network.String()).Msg("failed to apply new rpc url") } } - w.currentUrl = data.RPCURL logger.Info().Str("network", parts[0]).Str("url", w.currentUrl).Msg("new url was found") return true, nil