From 286013bfde9f78efa99db8a91d154b50a68fcef2 Mon Sep 17 00:00:00 2001 From: Artem Poltorzhitskiy Date: Sun, 16 Jun 2024 01:30:43 +0200 Subject: [PATCH] Fix: skip transaction if used class is undeclared (#34) --- cmd/indexer/main.go | 23 ++-- go.mod | 2 +- go.sum | 8 +- pkg/indexer/indexer.go | 4 + pkg/indexer/parser/resolver/class.go | 7 + pkg/indexer/parser/resolver/state_updates.go | 1 + pkg/indexer/parser/version/v0/fee.go | 3 + pkg/indexer/parser/version/v0/internal_tx.go | 6 +- pkg/indexer/receiver/receiver.go | 130 ++++++++++++------- 9 files changed, 119 insertions(+), 65 deletions(-) diff --git a/cmd/indexer/main.go b/cmd/indexer/main.go index 1115c35..32de763 100644 --- a/cmd/indexer/main.go +++ b/cmd/indexer/main.go @@ -6,6 +6,7 @@ import ( "os/signal" "strconv" "syscall" + "time" "github.com/dipdup-io/starknet-indexer/internal/starknet" "github.com/dipdup-io/starknet-indexer/internal/storage" @@ -92,14 +93,20 @@ func main() { } if cfg.Hasura != nil { - if err := hasura.Create(ctx, hasura.GenerateArgs{ - Config: cfg.Hasura, - DatabaseConfig: cfg.Database, - Models: storage.ModelsAny, - Views: views, - }); err != nil { - log.Panic().Err(err).Msg("hasura initialization") - return + var hasuraInitialized bool + for !hasuraInitialized { + log.Info().Msg("hasura initialization...") + if err := hasura.Create(ctx, hasura.GenerateArgs{ + Config: cfg.Hasura, + DatabaseConfig: cfg.Database, + Models: storage.ModelsAny, + Views: views, + }); err != nil { + log.Err(err).Msg("hasura initialization. waiting 5 seconds and trying again.") + time.Sleep(time.Second * 5) + } else { + hasuraInitialized = true + } } } diff --git a/go.mod b/go.mod index eff88df..a141f92 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/dipdup-io/starknet-indexer go 1.22 require ( - github.com/dipdup-io/starknet-go-api v0.0.0-20240517123614-8bb965043f0b + github.com/dipdup-io/starknet-go-api v0.0.0-20240612124116-f145b22af8d4 github.com/dipdup-io/workerpool v0.0.4 github.com/dipdup-net/go-lib v0.3.3 github.com/dipdup-net/indexer-sdk v0.0.4 diff --git a/go.sum b/go.sum index 82ba8b9..9f2505a 100644 --- a/go.sum +++ b/go.sum @@ -39,12 +39,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/denisenkom/go-mssqldb v0.12.3 h1:pBSGx9Tq67pBOTLmxNuirNTeB8Vjmf886Kx+8Y+8shw= github.com/denisenkom/go-mssqldb v0.12.3/go.mod h1:k0mtMFOnU+AihqFxPMiF05rtiDrorD1Vrm1KEz5hxDo= -github.com/dipdup-io/starknet-go-api v0.0.0-20240428115333-9db5c96f6b81 h1:jKSCsoLYm4uuO4tu/0pcBthvixgXII4KQQCVmcoL3Cc= -github.com/dipdup-io/starknet-go-api v0.0.0-20240428115333-9db5c96f6b81/go.mod h1:y3KGLFQtwzUBcT0X2LMj6CxocUimr/A9XYg+j0KIRDE= -github.com/dipdup-io/starknet-go-api v0.0.0-20240428184801-e7a7f9b05da9 h1:gLnNH4kZZXjHBG3WVmwi91giiGHPol0VszqOdalNMXA= -github.com/dipdup-io/starknet-go-api v0.0.0-20240428184801-e7a7f9b05da9/go.mod h1:y3KGLFQtwzUBcT0X2LMj6CxocUimr/A9XYg+j0KIRDE= -github.com/dipdup-io/starknet-go-api v0.0.0-20240517123614-8bb965043f0b h1:mIonvNL2IM9/2kyGvSuqV1ut0yL1W4kpPP5ojTyA3SY= -github.com/dipdup-io/starknet-go-api v0.0.0-20240517123614-8bb965043f0b/go.mod h1:y3KGLFQtwzUBcT0X2LMj6CxocUimr/A9XYg+j0KIRDE= +github.com/dipdup-io/starknet-go-api v0.0.0-20240612124116-f145b22af8d4 h1:W7L06zVr/eEVlpuUJEtHiKc8v4fzj23WQt19oJH3hKw= +github.com/dipdup-io/starknet-go-api v0.0.0-20240612124116-f145b22af8d4/go.mod h1:y3KGLFQtwzUBcT0X2LMj6CxocUimr/A9XYg+j0KIRDE= github.com/dipdup-io/workerpool v0.0.4 h1:m58fuFY3VIPRc+trWpjw2Lsm4FvIgtjP/4VRe79r+/s= github.com/dipdup-io/workerpool v0.0.4/go.mod h1:m6YMqx7M+fORTyabHD/auKymBRpbDax0t1aPZ1i7GZA= github.com/dipdup-net/go-lib v0.3.3 h1:vTUI+sT4L+x+eiMf712Cg8EtlqUCMiN6M3vcNaPlCw8= diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index a4bdfa3..956e990 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -8,6 +8,7 @@ import ( "time" "github.com/dipdup-io/starknet-go-api/pkg/data" + api "github.com/dipdup-io/starknet-go-api/pkg/rpc" "github.com/dipdup-io/starknet-indexer/internal/starknet" models "github.com/dipdup-io/starknet-indexer/internal/storage" "github.com/dipdup-io/starknet-indexer/internal/storage/postgres" @@ -492,6 +493,9 @@ func (indexer *Indexer) fixClassAbi(ctx context.Context) error { hash := data.NewFeltFromBytes(classes[i].Hash) class, err := indexer.receiver.GetClass(ctx, hash.String()) if err != nil { + if e, ok := err.(*api.Error); ok && e.Code == 28 { + continue + } return tx.HandleError(ctx, err) } classes[i].Abi = models.Bytes(class.RawAbi) diff --git a/pkg/indexer/parser/resolver/class.go b/pkg/indexer/parser/resolver/class.go index b44b177..fff9baf 100644 --- a/pkg/indexer/parser/resolver/class.go +++ b/pkg/indexer/parser/resolver/class.go @@ -5,15 +5,22 @@ import ( "github.com/dipdup-io/starknet-go-api/pkg/data" "github.com/dipdup-io/starknet-go-api/pkg/encoding" + api "github.com/dipdup-io/starknet-go-api/pkg/rpc" "github.com/dipdup-io/starknet-indexer/internal/starknet" "github.com/dipdup-io/starknet-indexer/internal/storage" + "github.com/pkg/errors" "github.com/rs/zerolog/log" ) +var ErrUndeclaredClass = errors.New("undeclared class") + // ReceiveClass - func (resolver *Resolver) ReceiveClass(ctx context.Context, class *storage.Class) error { rawClass, err := resolver.receiver.GetClass(ctx, encoding.EncodeHex(class.Hash)) if err != nil { + if e, ok := err.(*api.Error); ok && e.Code == 28 { + return ErrUndeclaredClass + } return err } diff --git a/pkg/indexer/parser/resolver/state_updates.go b/pkg/indexer/parser/resolver/state_updates.go index 9d00edc..9fe919c 100644 --- a/pkg/indexer/parser/resolver/state_updates.go +++ b/pkg/indexer/parser/resolver/state_updates.go @@ -122,6 +122,7 @@ func (resolver *Resolver) parseDeployedContracts(ctx context.Context, block *sto if class, ok := addrs[h]; ok && addresses[i].ClassID == nil { addresses[i].ClassID = &class.ID resolver.addAddress(&addresses[i]) + resolver.cache.SetAddress(ctx, addresses[i]) delete(addrs, h) } } diff --git a/pkg/indexer/parser/version/v0/fee.go b/pkg/indexer/parser/version/v0/fee.go index 885417e..7c50e56 100644 --- a/pkg/indexer/parser/version/v0/fee.go +++ b/pkg/indexer/parser/version/v0/fee.go @@ -186,6 +186,9 @@ func (parser FeeParser) ParseInvocation(ctx context.Context, txCtx data.TxContex } if err := parser.resolver.ReceiveClass(ctx, &tx.Class); err != nil { + if errors.Is(err, resolver.ErrUndeclaredClass) { + return nil, nil + } return nil, err } tx.Class.Height = tx.Height diff --git a/pkg/indexer/parser/version/v0/internal_tx.go b/pkg/indexer/parser/version/v0/internal_tx.go index dc541ff..85c9e6d 100644 --- a/pkg/indexer/parser/version/v0/internal_tx.go +++ b/pkg/indexer/parser/version/v0/internal_tx.go @@ -130,14 +130,14 @@ func (parser InternalTxParser) Parse(ctx context.Context, txCtx parserData.TxCon } if err := parser.Resolver.ReceiveClass(ctx, &tx.Class); err != nil { + if errors.Is(err, resolver.ErrUndeclaredClass) { + return tx, nil + } return tx, err } tx.Class.Height = tx.Height } - // txHash := encoding.EncodeHex(tx.Hash) - // log.Info().Msg(txHash) - if len(tx.Selector) > 0 && internal.Selector != "0x0" { var ( _, has = contractAbi.GetByTypeAndSelector(internal.EntrypointType, encoding.EncodeHex(tx.Selector)) diff --git a/pkg/indexer/receiver/receiver.go b/pkg/indexer/receiver/receiver.go index 15fe4c2..e67f31d 100644 --- a/pkg/indexer/receiver/receiver.go +++ b/pkg/indexer/receiver/receiver.go @@ -92,66 +92,102 @@ func (r *Receiver) worker(ctx context.Context, height uint64) { blockId := starknetData.BlockID{ Number: &height, } - var result Result - for { - select { - case <-ctx.Done(): - return - default: - } - - response, err := r.api.GetBlock(ctx, blockId) - if err != nil { - if errors.Is(err, context.Canceled) { + var ( + result Result + wg sync.WaitGroup + mx = new(sync.Mutex) + ) + + wg.Add(1) + go func(mx *sync.Mutex, wg *sync.WaitGroup) { + defer wg.Done() + + for { + select { + case <-ctx.Done(): return + default: } - r.log.Err(err).Msg("get block request") - time.Sleep(time.Second) - continue - } - result.Block = response - break - } - for { - select { - case <-ctx.Done(): - return - default: + response, err := r.api.GetBlock(ctx, blockId) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + r.log.Err(err).Uint64("height", height).Msg("get block request") + time.Sleep(time.Second) + continue + } + mx.Lock() + { + result.Block = response + } + mx.Unlock() + break } + }(mx, &wg) + + wg.Add(1) + go func(mx *sync.Mutex, wg *sync.WaitGroup) { + defer wg.Done() - response, err := r.api.TraceBlock(ctx, blockId) - if err != nil { - if errors.Is(err, context.Canceled) { + for { + select { + case <-ctx.Done(): return + default: } - r.log.Err(err).Msg("get block traces request") - time.Sleep(time.Second) - continue - } - result.Traces = response - break - } - for { - select { - case <-ctx.Done(): - return - default: + response, err := r.api.TraceBlock(ctx, blockId) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + r.log.Err(err).Uint64("height", height).Msg("get block traces request") + time.Sleep(time.Second) + continue + } + + mx.Lock() + { + result.Traces = response + } + mx.Unlock() + break } + }(mx, &wg) + + wg.Add(1) + go func(mx *sync.Mutex, wg *sync.WaitGroup) { + defer wg.Done() - response, err := r.getStateUpdate(ctx, blockId) - if err != nil { - if errors.Is(err, context.Canceled) { + for { + select { + case <-ctx.Done(): return + default: + } + + response, err := r.getStateUpdate(ctx, blockId) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + r.log.Err(err).Uint64("height", height).Msg("state update request") + time.Sleep(time.Second) + continue } - r.log.Err(err).Msg("state update request") - time.Sleep(time.Second) - continue + + mx.Lock() + { + result.StateUpdate = response + } + mx.Unlock() + break } - result.StateUpdate = response - break - } + }(mx, &wg) + + wg.Wait() r.log.Info().Uint64("height", height).Int64("ms", time.Since(start).Milliseconds()).Msg("received block data") r.result <- result