From cbb9a06a9f62af8d5a803158db42467498f67023 Mon Sep 17 00:00:00 2001 From: Artem Date: Wed, 8 Nov 2023 16:06:12 +0100 Subject: [PATCH] Upgrade: indexer-sdk --- Makefile | 3 ++ cmd/rpc_tester/blocks.yml | 9 +++++ cmd/rpc_tester/printer.go | 2 +- go.mod | 18 +++++----- go.sum | 36 +++++++++---------- pkg/grpc/client.go | 26 ++++++++++++-- pkg/grpc/server.go | 68 +++++++++-------------------------- pkg/indexer/indexer.go | 59 ++++++++++++------------------ pkg/indexer/messages.go | 30 ++-------------- pkg/indexer/status_checker.go | 13 +++---- 10 files changed, 112 insertions(+), 152 deletions(-) create mode 100644 cmd/rpc_tester/blocks.yml diff --git a/Makefile b/Makefile index 074c2da..389cba4 100644 --- a/Makefile +++ b/Makefile @@ -13,6 +13,9 @@ starknet_id: loot_survivor: cd cmd/rpc_tester && go run . -c ../../cmd/rpc_tester/loot_survivor.yml +blocks: + cd cmd/rpc_tester && go run . -c ../../cmd/rpc_tester/blocks.yml + build-proto: protoc \ -I=${GOPATH}/src \ diff --git a/cmd/rpc_tester/blocks.yml b/cmd/rpc_tester/blocks.yml new file mode 100644 index 0000000..f167a4a --- /dev/null +++ b/cmd/rpc_tester/blocks.yml @@ -0,0 +1,9 @@ +version: 0.0.1 + +log_level: ${LOG_LEVEL:-info} + +grpc: + server_address: ${GRPC_BIND:-127.0.0.1:7779} + subscriptions: + blocks: + head: true diff --git a/cmd/rpc_tester/printer.go b/cmd/rpc_tester/printer.go index eb32ab4..bbcb244 100644 --- a/cmd/rpc_tester/printer.go +++ b/cmd/rpc_tester/printer.go @@ -13,7 +13,7 @@ import ( // Printer - type Printer struct { - *printer.Printer + printer.Printer eventCounters map[string]*atomic.Uint64 diff --git a/go.mod b/go.mod index e03a5dc..3f0a168 100644 --- a/go.mod +++ b/go.mod @@ -4,9 +4,9 @@ go 1.20 require ( github.com/dipdup-io/starknet-go-api v0.0.0-20230912113406-c699cdbd6582 - github.com/dipdup-io/workerpool v0.0.3 + github.com/dipdup-io/workerpool v0.0.4 github.com/dipdup-net/go-lib v0.3.3 - github.com/dipdup-net/indexer-sdk v0.0.0-20230819120445-392cbc4cfb65 + github.com/dipdup-net/indexer-sdk v0.0.4 github.com/go-testfixtures/testfixtures/v3 v3.9.0 github.com/goccy/go-json v0.10.2 github.com/karlseguin/ccache/v2 v2.0.8 @@ -18,7 +18,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/uptrace/bun v1.1.14 go.uber.org/mock v0.2.0 - google.golang.org/grpc v1.57.0 + google.golang.org/grpc v1.57.1 google.golang.org/protobuf v1.31.0 ) @@ -36,7 +36,7 @@ require ( github.com/cpuguy83/dockercfg v0.3.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/docker/distribution v2.8.2+incompatible // indirect - github.com/docker/docker v24.0.5+incompatible // indirect + github.com/docker/docker v24.0.7+incompatible // indirect github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect @@ -97,12 +97,12 @@ require ( github.com/wealdtech/go-merkletree v1.0.1-0.20230205101955-ec7a95ea11ca // indirect go.opentelemetry.io/otel v1.16.0 // indirect go.opentelemetry.io/otel/trace v1.16.0 // indirect - golang.org/x/crypto v0.12.0 // indirect - golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect + golang.org/x/crypto v0.14.0 // indirect + golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect golang.org/x/mod v0.12.0 // indirect - golang.org/x/net v0.14.0 // indirect - golang.org/x/sys v0.11.0 // indirect - golang.org/x/text v0.12.0 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect golang.org/x/time v0.3.0 // indirect golang.org/x/tools v0.12.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect diff --git a/go.sum b/go.sum index e2043fd..a6101dd 100644 --- a/go.sum +++ b/go.sum @@ -36,16 +36,16 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/denisenkom/go-mssqldb v0.12.3 h1:pBSGx9Tq67pBOTLmxNuirNTeB8Vjmf886Kx+8Y+8shw= github.com/dipdup-io/starknet-go-api v0.0.0-20230912113406-c699cdbd6582 h1:Mo5MDCO9Bqdj62Jrn1at/f44KOOzWCPF+ohu5jqlb4o= github.com/dipdup-io/starknet-go-api v0.0.0-20230912113406-c699cdbd6582/go.mod h1:Pbi1et2OC3ZWhzv/76nDg5C0/v4Mrj7YWkZPXcZFys0= -github.com/dipdup-io/workerpool v0.0.3 h1:+cnO0/J0e4UiJ0EBEDpvuhriSDVHlsPminGRU2Il+ZI= -github.com/dipdup-io/workerpool v0.0.3/go.mod h1:m6YMqx7M+fORTyabHD/auKymBRpbDax0t1aPZ1i7GZA= +github.com/dipdup-io/workerpool v0.0.4 h1:m58fuFY3VIPRc+trWpjw2Lsm4FvIgtjP/4VRe79r+/s= +github.com/dipdup-io/workerpool v0.0.4/go.mod h1:m6YMqx7M+fORTyabHD/auKymBRpbDax0t1aPZ1i7GZA= github.com/dipdup-net/go-lib v0.3.3 h1:vTUI+sT4L+x+eiMf712Cg8EtlqUCMiN6M3vcNaPlCw8= github.com/dipdup-net/go-lib v0.3.3/go.mod h1:oBDOSsM/F8fEnmuDnaJ6QA/cHH4lne49ASbsh8WXDe4= -github.com/dipdup-net/indexer-sdk v0.0.0-20230819120445-392cbc4cfb65 h1:jZ53iH1UhoBTtIj4pRUPz826EFIwr7KYUR7EkwokwPM= -github.com/dipdup-net/indexer-sdk v0.0.0-20230819120445-392cbc4cfb65/go.mod h1:sZEbnuguFw8kxgD3iNLLHDq1DUDrXI6KcghNMcmNCS4= +github.com/dipdup-net/indexer-sdk v0.0.4 h1:mhTW3f4U6oc05UjxSiffOV+HIi4vQkDgOq1MbJXia8U= +github.com/dipdup-net/indexer-sdk v0.0.4/go.mod h1:n1oBIm5MPY1WxLS9tQfTWr+Ytrwv6ThCZF7TASsJslg= github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8= github.com/docker/distribution v2.8.2+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= -github.com/docker/docker v24.0.5+incompatible h1:WmgcE4fxyI6EEXxBRxsHnZXrO1pQ3smi0k/jho4HLeY= -github.com/docker/docker v24.0.5+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v24.0.7+incompatible h1:Wo6l37AuwP3JaMnZa226lzVXGA3F9Ig1seQen0cKYlM= +github.com/docker/docker v24.0.7+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= @@ -292,11 +292,11 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= -golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= -golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= +golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea h1:vLCWI/yYrdEHyN2JzIzPO3aaQJHQdp89IZBA/+azVC4= -golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= +golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc h1:mCRnTeVUjcrhlRmO0VK8a6k6Rrf6TF9htwo2pJVSjIU= +golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -320,8 +320,8 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= -golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= -golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -354,8 +354,8 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= -golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -365,8 +365,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= -golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -396,8 +396,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577/go. google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= -google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= -google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= +google.golang.org/grpc v1.57.1 h1:upNTNqv0ES+2ZOOqACwVtS3Il8M12/+Hz41RCPzAjQg= +google.golang.org/grpc v1.57.1/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/pkg/grpc/client.go b/pkg/grpc/client.go index 9db0dfa..d495c4b 100644 --- a/pkg/grpc/client.go +++ b/pkg/grpc/client.go @@ -80,6 +80,15 @@ func (client *Client) Input(name string) (*modules.Input, error) { return nil, errors.Wrap(modules.ErrUnknownInput, name) } +// MustInput - +func (client *Client) MustInput(name string) *modules.Input { + input, err := client.Input(name) + if err != nil { + panic(err) + } + return input +} + // Output - func (client *Client) Output(name string) (*modules.Output, error) { if name != OutputMessages { @@ -88,9 +97,22 @@ func (client *Client) Output(name string) (*modules.Output, error) { return client.output, nil } -// AttachTo - -func (client *Client) AttachTo(name string, input *modules.Input) error { +// MustOutput - +func (client *Client) MustOutput(name string) *modules.Output { output, err := client.Output(name) + if err != nil { + panic(err) + } + return output +} + +// AttachTo - +func (client *Client) AttachTo(outputModule modules.Module, outputName, inputName string) error { + output, err := outputModule.Output(outputName) + if err != nil { + return err + } + input, err := client.Input(inputName) if err != nil { return err } diff --git a/pkg/grpc/server.go b/pkg/grpc/server.go index a0a1f91..a52f5ac 100644 --- a/pkg/grpc/server.go +++ b/pkg/grpc/server.go @@ -2,7 +2,6 @@ package grpc import ( "context" - "sync" "time" "github.com/dipdup-io/starknet-indexer/internal/storage" @@ -12,8 +11,6 @@ import ( "github.com/dipdup-io/starknet-indexer/pkg/indexer" "github.com/dipdup-net/indexer-sdk/pkg/modules" grpcSDK "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc" - "github.com/pkg/errors" - "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -24,16 +21,13 @@ const ( // Server - type Server struct { - *grpcSDK.Server + GRPC *grpcSDK.Server + modules.BaseModule pb.UnimplementedIndexerServiceServer db postgres.Storage - input *modules.Input subscriptions *grpcSDK.Subscriptions[*subscriptions.Message, *pb.Subscription] - log zerolog.Logger - - wg *sync.WaitGroup } // NewServer - @@ -46,47 +40,46 @@ func NewServer( return nil, err } - return &Server{ - Server: server, + s := &Server{ + GRPC: server, db: db, - input: modules.NewInput(InputBlocks), + BaseModule: modules.New("layer1_grpc_server"), subscriptions: grpcSDK.NewSubscriptions[*subscriptions.Message, *pb.Subscription](), - log: log.With().Str("module", "grpc_server").Logger(), + } + s.CreateInput(InputBlocks) - wg: new(sync.WaitGroup), - }, nil + return s, nil } // Start - func (module *Server) Start(ctx context.Context) { - pb.RegisterIndexerServiceServer(module.Server.Server(), module) + pb.RegisterIndexerServiceServer(module.GRPC.Server(), module) - module.Server.Start(ctx) + module.GRPC.Start(ctx) - module.wg.Add(1) - go module.listen(ctx) + module.G.GoCtx(ctx, module.listen) } func (module *Server) listen(ctx context.Context) { - defer module.wg.Done() - ticker := time.NewTicker(time.Second * 15) defer ticker.Stop() + input := module.MustInput(InputBlocks) + for { select { case <-ctx.Done(): return case <-ticker.C: - case msg, ok := <-module.input.Listen(): + case msg, ok := <-input.Listen(): if !ok { return } if message, ok := msg.(*indexer.IndexerMessage); ok { module.blockHandler(ctx, message) } else { - module.log.Warn().Msgf("unknown message type: %T", msg) + module.Log.Warn().Msgf("unknown message type: %T", msg) } } } @@ -280,34 +273,7 @@ func (module *Server) notifyAboutAddress(address *storage.Address) { // Close - func (module *Server) Close() error { - module.wg.Wait() - - if err := module.input.Close(); err != nil { - return err - } - - return module.Server.Close() -} - -// Input - -func (module *Server) Input(name string) (*modules.Input, error) { - if name != InputBlocks { - return nil, errors.Wrap(modules.ErrUnknownInput, name) - } - return module.input, nil -} - -// Output - -func (module *Server) Output(name string) (*modules.Output, error) { - return nil, errors.Wrap(modules.ErrUnknownOutput, name) -} - -// AttachTo - -func (module *Server) AttachTo(name string, input *modules.Input) error { - return errors.Wrap(modules.ErrUnknownOutput, name) -} + module.G.Wait() -// Name - -func (module *Server) Name() string { - return "layer1_grpc_server" + return module.GRPC.Close() } diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index 6fd23e0..956c3df 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -21,7 +21,6 @@ import ( "github.com/dipdup-net/indexer-sdk/pkg/modules" sdk "github.com/dipdup-net/indexer-sdk/pkg/storage" "github.com/pkg/errors" - "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -32,9 +31,10 @@ const ( // Indexer - type Indexer struct { - cfg config.Config - outputs map[string]*modules.Output - queue map[uint64]receiver.Result + modules.BaseModule + + cfg config.Config + queue map[uint64]receiver.Result address models.IAddress blocks models.IBlock @@ -61,10 +61,7 @@ type Indexer struct { rollbackRerun chan struct{} rollbackWait *sync.WaitGroup - log zerolog.Logger - txWriteMutex *sync.Mutex - wg *sync.WaitGroup } // New - creates new indexer entity @@ -73,10 +70,8 @@ func New( storage postgres.Storage, ) *Indexer { indexer := &Indexer{ - cfg: cfg, - outputs: map[string]*modules.Output{ - OutputBlocks: modules.NewOutput(OutputBlocks), - }, + BaseModule: modules.New("indexer"), + cfg: cfg, queue: make(map[uint64]receiver.Result), stateRepo: storage.State, address: storage.Address, @@ -94,13 +89,12 @@ func New( cache: cache.New(storage.Address, storage.Class, storage.Proxy), receiver: receiver.NewReceiver(cfg), rollbackManager: storage.RollbackManager, - log: log.With().Str("module", "indexer").Logger(), rollback: make(chan struct{}, 1), rollbackRerun: make(chan struct{}, 1), txWriteMutex: new(sync.Mutex), rollbackWait: new(sync.WaitGroup), - wg: new(sync.WaitGroup), } + indexer.CreateOutput(OutputBlocks) indexer.idGenerator = generator.NewIdGenerator(storage.Address, storage.Class, indexer.cache, indexer.state.Current()) indexer.store = store.New( @@ -129,9 +123,9 @@ func New( // Start - func (indexer *Indexer) Start(ctx context.Context) { - indexer.log.Info().Msg("starting indexer...") + indexer.Log.Info().Msg("starting indexer...") if err := indexer.init(ctx); err != nil { - indexer.log.Err(err).Msg("state initializing error") + indexer.Log.Err(err).Msg("state initializing error") return } @@ -140,15 +134,12 @@ func (indexer *Indexer) Start(ctx context.Context) { indexer.statusChecker.Start(ctx) if err := indexer.fixClassAbi(ctx); err != nil { - indexer.log.Err(err).Msg("recovering class abi error") + indexer.Log.Err(err).Msg("recovering class abi error") return } - indexer.wg.Add(1) - go indexer.saveBlocks(ctx) - - indexer.wg.Add(1) - go indexer.sync(ctx) + indexer.G.GoCtx(ctx, indexer.saveBlocks) + indexer.G.GoCtx(ctx, indexer.sync) } // Name - @@ -161,8 +152,8 @@ func (indexer *Indexer) Name() string { // Close - func (indexer *Indexer) Close() error { - indexer.wg.Wait() - indexer.log.Info().Msgf("closing...") + indexer.G.Wait() + indexer.Log.Info().Msgf("closing...") if err := indexer.statusChecker.Close(); err != nil { return err @@ -227,7 +218,7 @@ func (indexer *Indexer) getNewBlocks(ctx context.Context) error { } for head > indexer.state.Height() { - indexer.log.Info(). + indexer.Log.Info(). Uint64("indexer_block", indexer.state.Height()). Uint64("node_block", head). Msg("syncing...") @@ -271,15 +262,13 @@ func (indexer *Indexer) getNewBlocks(ctx context.Context) error { } } - indexer.log.Info().Uint64("height", indexer.state.Height()).Msg("synced") + indexer.Log.Info().Uint64("height", indexer.state.Height()).Msg("synced") return nil } func (indexer *Indexer) sync(ctx context.Context) { - defer indexer.wg.Done() - if err := indexer.getNewBlocks(ctx); err != nil { - indexer.log.Err(err).Msg("getNewBlocks") + indexer.Log.Err(err).Msg("getNewBlocks") } ticker := time.NewTicker(time.Second * 30) @@ -293,19 +282,17 @@ func (indexer *Indexer) sync(ctx context.Context) { return case <-ticker.C: if err := indexer.getNewBlocks(ctx); err != nil { - indexer.log.Err(err).Msg("getNewBlocks") + indexer.Log.Err(err).Msg("getNewBlocks") } case <-indexer.rollbackRerun: if err := indexer.getNewBlocks(ctx); err != nil { - indexer.log.Err(err).Msg("getNewBlocks") + indexer.Log.Err(err).Msg("getNewBlocks") } } } } func (indexer *Indexer) saveBlocks(ctx context.Context) { - defer indexer.wg.Done() - var zeroBlock bool for { @@ -319,7 +306,7 @@ func (indexer *Indexer) saveBlocks(ctx context.Context) { if indexer.state.Height() == 0 && !zeroBlock { if data, ok := indexer.queue[0]; ok { if err := indexer.handleBlock(ctx, data); err != nil { - indexer.log.Err(err).Msg("handle block") + indexer.Log.Err(err).Msg("handle block") } zeroBlock = true } else { @@ -339,7 +326,7 @@ func (indexer *Indexer) saveBlocks(ctx context.Context) { if errors.Is(err, context.Canceled) { return } - indexer.log.Err(err).Stack().Msg("handle reorg") + indexer.Log.Err(err).Stack().Msg("handle reorg") time.Sleep(time.Second * 3) } @@ -351,7 +338,7 @@ func (indexer *Indexer) saveBlocks(ctx context.Context) { if errors.Is(err, context.Canceled) { return } - indexer.log.Err(err).Stack().Msg("handle block") + indexer.Log.Err(err).Stack().Msg("handle block") time.Sleep(time.Second * 3) } if next%25 == 0 { @@ -435,7 +422,7 @@ func (indexer *Indexer) handleBlock(ctx context.Context, result receiver.Result) delete(indexer.queue, result.Block.BlockNumber) - l := indexer.log.Info(). + l := indexer.Log.Info(). Uint64("height", result.Block.BlockNumber). Int("tx_count", parseResult.Block.TxCount). Time("block_time", parseResult.Block.Time). diff --git a/pkg/indexer/messages.go b/pkg/indexer/messages.go index fb9bc81..d356e50 100644 --- a/pkg/indexer/messages.go +++ b/pkg/indexer/messages.go @@ -2,8 +2,6 @@ package indexer import ( "github.com/dipdup-io/starknet-indexer/internal/storage" - "github.com/dipdup-net/indexer-sdk/pkg/modules" - "github.com/pkg/errors" ) // topics @@ -18,30 +16,6 @@ type IndexerMessage struct { Tokens []*storage.Token } -// Input - -func (indexer *Indexer) Input(name string) (*modules.Input, error) { - return nil, errors.Wrap(modules.ErrUnknownInput, name) -} - -// Output - -func (indexer *Indexer) Output(name string) (*modules.Output, error) { - output, ok := indexer.outputs[name] - if !ok { - return nil, errors.Wrap(modules.ErrUnknownInput, name) - } - return output, nil -} - -// AttachTo - -func (indexer *Indexer) AttachTo(name string, input *modules.Input) error { - output, err := indexer.Output(name) - if err != nil { - return err - } - output.Attach(input) - return nil -} - func (indexer *Indexer) notifyAllAboutBlock( blocks storage.Block, addresses map[string]*storage.Address, @@ -53,7 +27,9 @@ func (indexer *Indexer) notifyAllAboutBlock( newTokens = append(newTokens, token) } } - indexer.outputs[OutputBlocks].Push(&IndexerMessage{ + + output := indexer.MustOutput(OutputBlocks) + output.Push(&IndexerMessage{ Block: &blocks, Addresses: addresses, Tokens: newTokens, diff --git a/pkg/indexer/status_checker.go b/pkg/indexer/status_checker.go index f90dd39..e23e3c5 100644 --- a/pkg/indexer/status_checker.go +++ b/pkg/indexer/status_checker.go @@ -2,13 +2,13 @@ package indexer import ( "context" - "sync" "time" "github.com/dipdup-io/starknet-go-api/pkg/encoding" "github.com/dipdup-io/starknet-indexer/internal/storage" "github.com/dipdup-io/starknet-indexer/internal/storage/postgres" "github.com/dipdup-io/starknet-indexer/pkg/indexer/receiver" + "github.com/dipdup-io/workerpool" sdk "github.com/dipdup-net/indexer-sdk/pkg/storage" "github.com/pkg/errors" "github.com/rs/zerolog" @@ -68,7 +68,7 @@ type statusChecker struct { transactable sdk.Transactable receiver *receiver.Receiver log zerolog.Logger - wg *sync.WaitGroup + g workerpool.Group } func newStatusChecker( @@ -92,19 +92,16 @@ func newStatusChecker( l1Handlers: l1Handlers, transactable: transactable, log: log.With().Str("module", "status_checker").Logger(), - wg: new(sync.WaitGroup), + g: workerpool.NewGroup(), } } // Start - func (checker *statusChecker) Start(ctx context.Context) { - checker.wg.Add(1) - go checker.start(ctx) + checker.g.GoCtx(ctx, checker.start) } func (checker *statusChecker) start(ctx context.Context) { - defer checker.wg.Done() - if err := checker.init(ctx); err != nil { checker.log.Err(err).Msg("checker init") return @@ -131,7 +128,7 @@ func (checker *statusChecker) start(ctx context.Context) { // Close - func (checker *statusChecker) Close() error { - checker.wg.Wait() + checker.g.Wait() return nil }