Skip to content

Commit

Permalink
Fetching data from Subsquid
Browse files Browse the repository at this point in the history
  • Loading branch information
k-karuna committed Dec 12, 2024
1 parent 933979f commit 45bf456
Show file tree
Hide file tree
Showing 10 changed files with 513 additions and 85 deletions.
3 changes: 2 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ POSTGRES_DB=starknet
POSTGRES_PASSWORD=<TYPE_SOMETHING_STRONG> # REQUIRED
STARKNET_NODE_URL=<URL_HERE> # REQUIRED if INDEXER_DATASOURCE=node
NODE_APIKEY=<API_KEY_FROM_NODE_PROVIDER> # REQUIRED if your node provider has api key. It's api key.
NODE_HEADER_APIKEY=<HEADER_NAME> # REQUIRED if your node provider has api key. It's header name.
NODE_HEADER_APIKEY=<HEADER_NAME> # REQUIRED if your node provider has api key. It's header name.
STARKNET_SUBSQUID_URL=<URL_HERE> # REQUIRED if INDEXER_DATASOURCE=subsquid
3 changes: 3 additions & 0 deletions build/dipdup.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ datasources:
fallback:
url: ${STARKNET_FALLBACK_NODE_URL}
rps: ${STARKNET_FALLBACK_NODE_RPS:-1}
subsquid:
url: ${STARKNET_SUBSQUID_URL}
rps: ${STARKNET_SUBSQUID_RPS:-5}

database:
kind: postgres
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/rs/zerolog v1.30.0
github.com/shopspring/decimal v1.3.1
github.com/spf13/cobra v1.7.0
github.com/stretchr/testify v1.8.4
github.com/stretchr/testify v1.9.0
github.com/uptrace/bun v1.1.14
go.uber.org/mock v0.2.0
google.golang.org/grpc v1.58.3
Expand All @@ -31,6 +31,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/moby/sys/user v0.1.0 // indirect
github.com/opus-domini/fast-shot v1.1.4 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
golang.org/x/sync v0.3.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.0-rc4 h1:oOxKUJWnFC4YGHCCMNql1x4YaDfYBTS5Y4x/Cgeo1E0=
github.com/opencontainers/image-spec v1.1.0-rc4/go.mod h1:X4pATf0uXsnn3g5aiGIsVnJBR4mxhKzfwmvK/B2NTm8=
github.com/opus-domini/fast-shot v1.1.4 h1:xWTO/4JEILjZM/rP6mwiWe/jZyE9+L1G9sC4BsoynAk=
github.com/opus-domini/fast-shot v1.1.4/go.mod h1:BOr2JXHQJhOnYsxyCvFbgBP3BuYCjgh2YfzWKweEL0A=
github.com/paulmach/orb v0.10.0 h1:guVYVqzxHE/CQ1KpfGO077TR0ATHSNjp4s6XGLn3W9s=
github.com/paulmach/orb v0.10.0/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU=
github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY=
Expand Down Expand Up @@ -281,6 +283,7 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/testcontainers/testcontainers-go v0.22.0 h1:hOK4NzNu82VZcKEB1aP9LO1xYssVFMvlfeuDW9JMmV0=
github.com/testcontainers/testcontainers-go v0.22.0/go.mod h1:k0YiPa26xJCRUbUkYqy5rY6NGvSbVCeUBXCvucscBR4=
github.com/testcontainers/testcontainers-go/modules/postgres v0.22.0 h1:OHVaqu9MRGMSlro9AD5UCfj8XiHwQdhB9thE4vINq+E=
Expand Down
12 changes: 6 additions & 6 deletions pkg/indexer/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package config

// Config - configuration structure for indexer
type Config struct {
Name string `yaml:"name" validate:"omitempty"`
StartLevel uint64 `yaml:"start_level" validate:"omitempty"`
ThreadsCount int `yaml:"threads_count" validate:"omitempty,min=1"`
Timeout uint64 `yaml:"timeout" validate:"omitempty"`
Name string `yaml:"name" validate:"omitempty"`
StartLevel uint64 `yaml:"start_level" validate:"omitempty"`
ThreadsCount int `yaml:"threads_count" validate:"omitempty,min=1"`
Timeout uint64 `yaml:"timeout" validate:"omitempty"`
ClassInterfacesDir string `yaml:"class_interfaces_dir" validate:"required,dir"`
BridgedTokensFile string `yaml:"bridged_tokens_file" validate:"required,file"`
Datasource string `yaml:"datasource" validate:"required,oneof=sequencer node"`
BridgedTokensFile string `yaml:"bridged_tokens_file" validate:"required,file"`
Datasource string `yaml:"datasource" validate:"required,oneof=sequencer node subsquid"`
}
195 changes: 127 additions & 68 deletions pkg/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,125 @@ type Indexer struct {
statusChecker *statusChecker
rollbackManager models.Rollback

blocksFetcher IBlocksFetcher

rollback chan struct{}
rollbackRerun chan struct{}
rollbackWait *sync.WaitGroup

txWriteMutex *sync.Mutex
}

type IBlocksFetcher interface {
getNew(ctx context.Context) error
}

type BlocksFetcher struct {
indexer *Indexer
}
type SqdBlocksFetcher struct {
indexer *Indexer
}

func (f *BlocksFetcher) getNew(ctx context.Context) error {
head, err := f.indexer.receiver.Head(ctx)
if err != nil {
return err
}

if head < f.indexer.state.Height() {
log.Warn().
Uint64("indexer_height", f.indexer.state.Height()).
Uint64("node_height", head).
Msg("rollback detected by block height")
if err := f.indexer.makeRollback(ctx, head); err != nil {
return errors.Wrap(err, "makeRollback")
}
}

for head > f.indexer.state.Height() {
f.indexer.Log.Info().
Uint64("indexer_block", f.indexer.state.Height()).
Uint64("node_block", head).
Msg("syncing...")

startLevel := f.indexer.cfg.StartLevel
if startLevel < f.indexer.state.Height() {
startLevel = f.indexer.state.Height()
if f.indexer.state.Height() > 0 {
startLevel += 1
}
}

for height := startLevel; height <= head; height++ {
select {
case <-ctx.Done():
return nil
case <-f.indexer.rollback:
log.Info().Msg("stop receiving blocks")
return nil
default:
if f.indexer.checkQueue(ctx) {
return nil
}
f.indexer.receiver.AddTask(height)
}
}

time.Sleep(5 * time.Second)

for head, err = f.indexer.receiver.Head(ctx); err != nil; {
select {
case <-ctx.Done():
return nil
case <-f.indexer.rollback:
log.Info().Msg("stop receiving blocks")
return nil
default:
log.Err(err).Msg("receive head error")
return err
}
}
}

f.indexer.Log.Info().Uint64("height", f.indexer.state.Height()).Msg("synced")
return nil
}

func (f *SqdBlocksFetcher) getNew(ctx context.Context) error {
head, err := f.indexer.receiver.Head(ctx)
if err != nil {
return err
}

if head < f.indexer.state.Height() {
log.Warn().
Uint64("indexer_height", f.indexer.state.Height()).
Uint64("node_height", head).
Msg("rollback detected by block height")
if err := f.indexer.makeRollback(ctx, head); err != nil {
return errors.Wrap(err, "makeRollback")
}
}

f.indexer.Log.Info().
Uint64("indexer_block", f.indexer.state.Height()).
Uint64("node_block", head).
Msg("syncing...")

startLevel := f.indexer.cfg.StartLevel
if startLevel < f.indexer.state.Height() {
startLevel = f.indexer.state.Height()
if f.indexer.state.Height() > 0 {
startLevel += 1
}
}
f.indexer.receiver.GetSqdData(ctx, startLevel)

f.indexer.Log.Info().Uint64("height", f.indexer.state.Height()).Msg("synced")
return nil
}

// New - creates new indexer entity
func New(
cfg config.Config,
Expand Down Expand Up @@ -93,6 +205,18 @@ func New(
txWriteMutex: new(sync.Mutex),
rollbackWait: new(sync.WaitGroup),
}

switch cfg.Datasource {
case "subsquid":
indexer.blocksFetcher = &SqdBlocksFetcher{
indexer: indexer,
}
default:
indexer.blocksFetcher = &BlocksFetcher{
indexer: indexer,
}
}

rcvr, err := receiver.NewReceiver(cfg, datasource)
if err != nil {
return nil, err
Expand Down Expand Up @@ -201,73 +325,8 @@ func (indexer *Indexer) checkQueue(ctx context.Context) bool {
return false
}

func (indexer *Indexer) getNewBlocks(ctx context.Context) error {
head, err := indexer.receiver.Head(ctx)
if err != nil {
return err
}

if head < indexer.state.Height() {
log.Warn().
Uint64("indexer_height", indexer.state.Height()).
Uint64("node_height", head).
Msg("rollback detected by block height")
if err := indexer.makeRollback(ctx, head); err != nil {
return errors.Wrap(err, "makeRollback")
}
}

for head > indexer.state.Height() {
indexer.Log.Info().
Uint64("indexer_block", indexer.state.Height()).
Uint64("node_block", head).
Msg("syncing...")

startLevel := indexer.cfg.StartLevel
if startLevel < indexer.state.Height() {
startLevel = indexer.state.Height()
if indexer.state.Height() > 0 {
startLevel += 1
}
}

for height := startLevel; height <= head; height++ {
select {
case <-ctx.Done():
return nil
case <-indexer.rollback:
log.Info().Msg("stop receiving blocks")
return nil
default:
if indexer.checkQueue(ctx) {
return nil
}
indexer.receiver.AddTask(height)
}
}

time.Sleep(5 * time.Second)

for head, err = indexer.receiver.Head(ctx); err != nil; {
select {
case <-ctx.Done():
return nil
case <-indexer.rollback:
log.Info().Msg("stop receiving blocks")
return nil
default:
log.Err(err).Msg("receive head error")
return err
}
}
}

indexer.Log.Info().Uint64("height", indexer.state.Height()).Msg("synced")
return nil
}

func (indexer *Indexer) sync(ctx context.Context) {
if err := indexer.getNewBlocks(ctx); err != nil {
if err := indexer.blocksFetcher.getNew(ctx); err != nil {
indexer.Log.Err(err).Msg("getNewBlocks")
}

Expand All @@ -281,11 +340,11 @@ func (indexer *Indexer) sync(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
if err := indexer.getNewBlocks(ctx); err != nil {
if err := indexer.blocksFetcher.getNew(ctx); err != nil {
indexer.Log.Err(err).Msg("getNewBlocks")
}
case <-indexer.rollbackRerun:
if err := indexer.getNewBlocks(ctx); err != nil {
if err := indexer.blocksFetcher.getNew(ctx); err != nil {
indexer.Log.Err(err).Msg("getNewBlocks")
}
}
Expand Down
Loading

0 comments on commit 45bf456

Please sign in to comment.