diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..b8a5089 --- /dev/null +++ b/.env.example @@ -0,0 +1,10 @@ +INDEXER_BRIDGED_TOKENS_FILE=mainnet.json # full list of files you can find in repo ./build/bridged_tokens/ +INDEXER_CLASS_INTERFACES_DIR=./interfaces/ # REQUIRED +HASURA_HOST=hasura +HASURA_POSTGRES_HOST=db +LOG_LEVEL=info +CACHE_ENABLED=false +POSTGRES_PORT=5432 +POSTGRES_HOST=db +POSTGRES_DB=starknet +POSTGRES_PASSWORD= # REQUIRED \ No newline at end of file diff --git a/Makefile b/Makefile index 074c2da..389cba4 100644 --- a/Makefile +++ b/Makefile @@ -13,6 +13,9 @@ starknet_id: loot_survivor: cd cmd/rpc_tester && go run . -c ../../cmd/rpc_tester/loot_survivor.yml +blocks: + cd cmd/rpc_tester && go run . -c ../../cmd/rpc_tester/blocks.yml + build-proto: protoc \ -I=${GOPATH}/src \ diff --git a/build/Dockerfile b/build/Dockerfile index b4bf63c..2b407f5 100644 --- a/build/Dockerfile +++ b/build/Dockerfile @@ -1,7 +1,7 @@ # --------------------------------------------------------------------- # The first stage container, for building the application # --------------------------------------------------------------------- -FROM golang:1.20-alpine as builder +FROM golang:1.21.2-alpine as builder ENV CGO_ENABLED=0 ENV GO111MODULE=on @@ -10,17 +10,17 @@ ENV GOOS=linux RUN apk --no-cache add ca-certificates RUN apk add --update git musl-dev gcc build-base -RUN mkdir -p $GOPATH/src/github.com/dipdup-io/straknet-indexer/ +RUN mkdir -p $GOPATH/src/github.com/dipdup-io/starknet-indexer/ -COPY ./go.* $GOPATH/src/github.com/dipdup-io/straknet-indexer/ -WORKDIR $GOPATH/src/github.com/dipdup-io/straknet-indexer +COPY ./go.* $GOPATH/src/github.com/dipdup-io/starknet-indexer/ +WORKDIR $GOPATH/src/github.com/dipdup-io/starknet-indexer RUN go mod download COPY cmd/indexer cmd/indexer COPY internal internal COPY pkg pkg -WORKDIR $GOPATH/src/github.com/dipdup-io/straknet-indexer/cmd/indexer/ +WORKDIR $GOPATH/src/github.com/dipdup-io/starknet-indexer/cmd/indexer/ RUN go build -a -o /go/bin/indexer . # --------------------------------------------------------------------- @@ -28,7 +28,7 @@ RUN go build -a -o /go/bin/indexer . # --------------------------------------------------------------------- FROM scratch -WORKDIR /app/straknet-indexer/ +WORKDIR /app/starknet-indexer/ COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ COPY --from=builder /go/bin/indexer /go/bin/indexer diff --git a/cmd/rpc_tester/blocks.yml b/cmd/rpc_tester/blocks.yml new file mode 100644 index 0000000..f167a4a --- /dev/null +++ b/cmd/rpc_tester/blocks.yml @@ -0,0 +1,9 @@ +version: 0.0.1 + +log_level: ${LOG_LEVEL:-info} + +grpc: + server_address: ${GRPC_BIND:-127.0.0.1:7779} + subscriptions: + blocks: + head: true diff --git a/cmd/rpc_tester/printer.go b/cmd/rpc_tester/printer.go index eb32ab4..bbcb244 100644 --- a/cmd/rpc_tester/printer.go +++ b/cmd/rpc_tester/printer.go @@ -13,7 +13,7 @@ import ( // Printer - type Printer struct { - *printer.Printer + printer.Printer eventCounters map[string]*atomic.Uint64 diff --git a/docker-compose.yml b/docker-compose.yml index 45d4be0..20832c6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ version: "3.6" services: indexer: - image: ghcr.io/dipdup-io/starknet-indexer:master + image: ghcr.io/dipdup-io/starknet-indexer:${TAG:-master} build: dockerfile: build/Dockerfile context: . @@ -16,7 +16,7 @@ services: depends_on: - db - hasura - logging: &straknet-dipdup-logging + logging: &starknet-indexer-logging options: max-size: 10m max-file: "5" @@ -27,21 +27,21 @@ services: image: postgres:15 restart: always volumes: - - db:/var/lib/postgres/data + - db:/var/lib/postgresql/data - /etc/postgresql/postgresql.conf:/etc/postgresql/postgresql.conf ports: - - 127.0.0.1:5432:5432 + - 127.0.0.1:${POSTGRES_PORT:-5432}:5432 environment: - POSTGRES_HOST=${POSTGRES_HOST:-db} - POSTGRES_USER=${POSTGRES_USER:-dipdup} - POSTGRES_DB=${POSTGRES_DB:-starknet} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-changeme} healthcheck: - test: ["CMD-SHELL", "pg_isready -U dipdup -d starknet"] + test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-dipdup} -d ${POSTGRES_DB:-starknet}"] interval: 10s timeout: 5s retries: 5 - logging: *straknet-dipdup-logging + logging: *starknet-indexer-logging command: - "postgres" - "-c" @@ -59,7 +59,7 @@ services: - HASURA_GRAPHQL_ENABLED_LOG_TYPES=startup, http-log, webhook-log, websocket-log, query-log - HASURA_GRAPHQL_ADMIN_SECRET=${ADMIN_SECRET:-changeme} - HASURA_GRAPHQL_UNAUTHORIZED_ROLE=user - logging: *straknet-dipdup-logging + logging: *starknet-indexer-logging volumes: db: \ No newline at end of file diff --git a/go.mod b/go.mod index f9bfeba..12e6fd7 100644 --- a/go.mod +++ b/go.mod @@ -1,12 +1,12 @@ module github.com/dipdup-io/starknet-indexer -go 1.20 +go 1.21 require ( github.com/dipdup-io/starknet-go-api v0.0.0-20230912113406-c699cdbd6582 - github.com/dipdup-io/workerpool v0.0.3 + github.com/dipdup-io/workerpool v0.0.4 github.com/dipdup-net/go-lib v0.3.3 - github.com/dipdup-net/indexer-sdk v0.0.0-20230819120445-392cbc4cfb65 + github.com/dipdup-net/indexer-sdk v0.0.4 github.com/go-testfixtures/testfixtures/v3 v3.9.0 github.com/goccy/go-json v0.10.2 github.com/karlseguin/ccache/v2 v2.0.8 @@ -18,7 +18,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/uptrace/bun v1.1.14 go.uber.org/mock v0.2.0 - google.golang.org/grpc v1.57.0 + google.golang.org/grpc v1.57.1 google.golang.org/protobuf v1.31.0 ) @@ -36,7 +36,7 @@ require ( github.com/cpuguy83/dockercfg v0.3.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/docker/distribution v2.8.2+incompatible // indirect - github.com/docker/docker v24.0.5+incompatible // indirect + github.com/docker/docker v24.0.7+incompatible // indirect github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect @@ -98,7 +98,7 @@ require ( go.opentelemetry.io/otel v1.16.0 // indirect go.opentelemetry.io/otel/trace v1.16.0 // indirect golang.org/x/crypto v0.14.0 // indirect - golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect + golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect golang.org/x/mod v0.12.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.13.0 // indirect diff --git a/go.sum b/go.sum index 0d25339..a6101dd 100644 --- a/go.sum +++ b/go.sum @@ -36,16 +36,16 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/denisenkom/go-mssqldb v0.12.3 h1:pBSGx9Tq67pBOTLmxNuirNTeB8Vjmf886Kx+8Y+8shw= github.com/dipdup-io/starknet-go-api v0.0.0-20230912113406-c699cdbd6582 h1:Mo5MDCO9Bqdj62Jrn1at/f44KOOzWCPF+ohu5jqlb4o= github.com/dipdup-io/starknet-go-api v0.0.0-20230912113406-c699cdbd6582/go.mod h1:Pbi1et2OC3ZWhzv/76nDg5C0/v4Mrj7YWkZPXcZFys0= -github.com/dipdup-io/workerpool v0.0.3 h1:+cnO0/J0e4UiJ0EBEDpvuhriSDVHlsPminGRU2Il+ZI= -github.com/dipdup-io/workerpool v0.0.3/go.mod h1:m6YMqx7M+fORTyabHD/auKymBRpbDax0t1aPZ1i7GZA= +github.com/dipdup-io/workerpool v0.0.4 h1:m58fuFY3VIPRc+trWpjw2Lsm4FvIgtjP/4VRe79r+/s= +github.com/dipdup-io/workerpool v0.0.4/go.mod h1:m6YMqx7M+fORTyabHD/auKymBRpbDax0t1aPZ1i7GZA= github.com/dipdup-net/go-lib v0.3.3 h1:vTUI+sT4L+x+eiMf712Cg8EtlqUCMiN6M3vcNaPlCw8= github.com/dipdup-net/go-lib v0.3.3/go.mod h1:oBDOSsM/F8fEnmuDnaJ6QA/cHH4lne49ASbsh8WXDe4= -github.com/dipdup-net/indexer-sdk v0.0.0-20230819120445-392cbc4cfb65 h1:jZ53iH1UhoBTtIj4pRUPz826EFIwr7KYUR7EkwokwPM= -github.com/dipdup-net/indexer-sdk v0.0.0-20230819120445-392cbc4cfb65/go.mod h1:sZEbnuguFw8kxgD3iNLLHDq1DUDrXI6KcghNMcmNCS4= +github.com/dipdup-net/indexer-sdk v0.0.4 h1:mhTW3f4U6oc05UjxSiffOV+HIi4vQkDgOq1MbJXia8U= +github.com/dipdup-net/indexer-sdk v0.0.4/go.mod h1:n1oBIm5MPY1WxLS9tQfTWr+Ytrwv6ThCZF7TASsJslg= github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8= github.com/docker/distribution v2.8.2+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= -github.com/docker/docker v24.0.5+incompatible h1:WmgcE4fxyI6EEXxBRxsHnZXrO1pQ3smi0k/jho4HLeY= -github.com/docker/docker v24.0.5+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v24.0.7+incompatible h1:Wo6l37AuwP3JaMnZa226lzVXGA3F9Ig1seQen0cKYlM= +github.com/docker/docker v24.0.7+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= @@ -295,8 +295,8 @@ golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea h1:vLCWI/yYrdEHyN2JzIzPO3aaQJHQdp89IZBA/+azVC4= -golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= +golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc h1:mCRnTeVUjcrhlRmO0VK8a6k6Rrf6TF9htwo2pJVSjIU= +golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -396,8 +396,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577/go. google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= -google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= -google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= +google.golang.org/grpc v1.57.1 h1:upNTNqv0ES+2ZOOqACwVtS3Il8M12/+Hz41RCPzAjQg= +google.golang.org/grpc v1.57.1/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/pkg/grpc/client.go b/pkg/grpc/client.go index 9db0dfa..45d70c2 100644 --- a/pkg/grpc/client.go +++ b/pkg/grpc/client.go @@ -2,7 +2,6 @@ package grpc import ( "context" - "sync" "time" "github.com/dipdup-io/starknet-indexer/pkg/grpc/pb" @@ -10,13 +9,12 @@ import ( "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc" grpcSDK "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc" generalPB "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc/pb" - "github.com/pkg/errors" "github.com/rs/zerolog/log" ) -// outputs names const ( OutputMessages = "messages" + ModuleName = "layer1_grpc_client" ) // Stream - @@ -37,74 +35,43 @@ func NewStream(stream *grpcSDK.Stream[pb.Subscription], request *pb.SubscribeReq // Client - type Client struct { - grpc *grpcSDK.Client - - output *modules.Output + modules.BaseModule + grpc *grpcSDK.Client streams map[uint64]*Stream service pb.IndexerServiceClient reconnect chan uint64 - - wg *sync.WaitGroup } // NewClient - func NewClient(cfg ClientConfig) *Client { - return &Client{ - grpc: grpcSDK.NewClient(cfg.ServerAddress), - output: modules.NewOutput(OutputMessages), - streams: make(map[uint64]*Stream), - reconnect: make(chan uint64, 16), - wg: new(sync.WaitGroup), + client := &Client{ + BaseModule: modules.New(ModuleName), + grpc: grpcSDK.NewClient(cfg.ServerAddress), + streams: make(map[uint64]*Stream), + reconnect: make(chan uint64, 16), } + client.CreateOutput(OutputMessages) + return client } // NewClientWithServerAddress - func NewClientWithServerAddress(address string) *Client { - return &Client{ - grpc: grpcSDK.NewClient(address), - output: modules.NewOutput(OutputMessages), - streams: make(map[uint64]*Stream), - reconnect: make(chan uint64, 16), - wg: new(sync.WaitGroup), - } -} - -// Name - -func (client *Client) Name() string { - return "layer1_grpc_client" -} - -// Input - -func (client *Client) Input(name string) (*modules.Input, error) { - return nil, errors.Wrap(modules.ErrUnknownInput, name) -} - -// Output - -func (client *Client) Output(name string) (*modules.Output, error) { - if name != OutputMessages { - return nil, errors.Wrap(modules.ErrUnknownOutput, name) - } - return client.output, nil -} - -// AttachTo - -func (client *Client) AttachTo(name string, input *modules.Input) error { - output, err := client.Output(name) - if err != nil { - return err + client := &Client{ + BaseModule: modules.New(ModuleName), + grpc: grpcSDK.NewClient(address), + streams: make(map[uint64]*Stream), + reconnect: make(chan uint64, 16), } - output.Attach(input) - return nil + client.CreateOutput(OutputMessages) + return client } // Start - func (client *Client) Start(ctx context.Context) { client.grpc.Start(ctx) client.service = pb.NewIndexerServiceClient(client.grpc.Connection()) - - client.wg.Add(1) - go client.reconnectThread(ctx) + client.G.GoCtx(ctx, client.reconnectThread) } // Connect - @@ -114,7 +81,7 @@ func (client *Client) Connect(ctx context.Context, opts ...grpcSDK.ConnectOption // Close - closes client func (client *Client) Close() error { - client.wg.Wait() + client.G.Wait() for id, stream := range client.streams { unsubscribeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -144,8 +111,6 @@ func (client *Client) Reconnect() <-chan uint64 { } func (client *Client) reconnectThread(ctx context.Context) { - defer client.wg.Done() - for { select { case <-ctx.Done(): @@ -171,8 +136,9 @@ func (client *Client) subscribe(ctx context.Context, req *pb.SubscribeRequest) ( } grpcStream := grpc.NewStream[pb.Subscription](stream) - client.wg.Add(1) - go client.handleMessage(ctx, grpcStream) + client.G.GoCtx(ctx, func(ctx context.Context) { + client.handleMessage(ctx, grpcStream) + }) id, err := grpcStream.Subscribe(ctx) return id, grpcStream, err @@ -199,8 +165,6 @@ func (client *Client) sendToOutput(name string, data any) error { } func (client *Client) handleMessage(ctx context.Context, stream *grpcSDK.Stream[pb.Subscription]) { - defer client.wg.Done() - for { select { case <-stream.Context().Done(): diff --git a/pkg/grpc/server.go b/pkg/grpc/server.go index a0a1f91..87bc35d 100644 --- a/pkg/grpc/server.go +++ b/pkg/grpc/server.go @@ -2,7 +2,6 @@ package grpc import ( "context" - "sync" "time" "github.com/dipdup-io/starknet-indexer/internal/storage" @@ -12,8 +11,6 @@ import ( "github.com/dipdup-io/starknet-indexer/pkg/indexer" "github.com/dipdup-net/indexer-sdk/pkg/modules" grpcSDK "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc" - "github.com/pkg/errors" - "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -24,16 +21,13 @@ const ( // Server - type Server struct { - *grpcSDK.Server + modules.BaseModule pb.UnimplementedIndexerServiceServer - db postgres.Storage + GRPC *grpcSDK.Server + db postgres.Storage - input *modules.Input subscriptions *grpcSDK.Subscriptions[*subscriptions.Message, *pb.Subscription] - log zerolog.Logger - - wg *sync.WaitGroup } // NewServer - @@ -46,47 +40,46 @@ func NewServer( return nil, err } - return &Server{ - Server: server, + s := &Server{ + GRPC: server, db: db, - input: modules.NewInput(InputBlocks), + BaseModule: modules.New("layer1_grpc_server"), subscriptions: grpcSDK.NewSubscriptions[*subscriptions.Message, *pb.Subscription](), - log: log.With().Str("module", "grpc_server").Logger(), + } + s.CreateInput(InputBlocks) - wg: new(sync.WaitGroup), - }, nil + return s, nil } // Start - func (module *Server) Start(ctx context.Context) { - pb.RegisterIndexerServiceServer(module.Server.Server(), module) + pb.RegisterIndexerServiceServer(module.GRPC.Server(), module) - module.Server.Start(ctx) + module.GRPC.Start(ctx) - module.wg.Add(1) - go module.listen(ctx) + module.G.GoCtx(ctx, module.listen) } func (module *Server) listen(ctx context.Context) { - defer module.wg.Done() - ticker := time.NewTicker(time.Second * 15) defer ticker.Stop() + input := module.MustInput(InputBlocks) + for { select { case <-ctx.Done(): return case <-ticker.C: - case msg, ok := <-module.input.Listen(): + case msg, ok := <-input.Listen(): if !ok { return } if message, ok := msg.(*indexer.IndexerMessage); ok { module.blockHandler(ctx, message) } else { - module.log.Warn().Msgf("unknown message type: %T", msg) + module.Log.Warn().Msgf("unknown message type: %T", msg) } } } @@ -280,34 +273,7 @@ func (module *Server) notifyAboutAddress(address *storage.Address) { // Close - func (module *Server) Close() error { - module.wg.Wait() - - if err := module.input.Close(); err != nil { - return err - } - - return module.Server.Close() -} - -// Input - -func (module *Server) Input(name string) (*modules.Input, error) { - if name != InputBlocks { - return nil, errors.Wrap(modules.ErrUnknownInput, name) - } - return module.input, nil -} - -// Output - -func (module *Server) Output(name string) (*modules.Output, error) { - return nil, errors.Wrap(modules.ErrUnknownOutput, name) -} - -// AttachTo - -func (module *Server) AttachTo(name string, input *modules.Input) error { - return errors.Wrap(modules.ErrUnknownOutput, name) -} + module.G.Wait() -// Name - -func (module *Server) Name() string { - return "layer1_grpc_server" + return module.GRPC.Close() } diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index 6fd23e0..956c3df 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -21,7 +21,6 @@ import ( "github.com/dipdup-net/indexer-sdk/pkg/modules" sdk "github.com/dipdup-net/indexer-sdk/pkg/storage" "github.com/pkg/errors" - "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -32,9 +31,10 @@ const ( // Indexer - type Indexer struct { - cfg config.Config - outputs map[string]*modules.Output - queue map[uint64]receiver.Result + modules.BaseModule + + cfg config.Config + queue map[uint64]receiver.Result address models.IAddress blocks models.IBlock @@ -61,10 +61,7 @@ type Indexer struct { rollbackRerun chan struct{} rollbackWait *sync.WaitGroup - log zerolog.Logger - txWriteMutex *sync.Mutex - wg *sync.WaitGroup } // New - creates new indexer entity @@ -73,10 +70,8 @@ func New( storage postgres.Storage, ) *Indexer { indexer := &Indexer{ - cfg: cfg, - outputs: map[string]*modules.Output{ - OutputBlocks: modules.NewOutput(OutputBlocks), - }, + BaseModule: modules.New("indexer"), + cfg: cfg, queue: make(map[uint64]receiver.Result), stateRepo: storage.State, address: storage.Address, @@ -94,13 +89,12 @@ func New( cache: cache.New(storage.Address, storage.Class, storage.Proxy), receiver: receiver.NewReceiver(cfg), rollbackManager: storage.RollbackManager, - log: log.With().Str("module", "indexer").Logger(), rollback: make(chan struct{}, 1), rollbackRerun: make(chan struct{}, 1), txWriteMutex: new(sync.Mutex), rollbackWait: new(sync.WaitGroup), - wg: new(sync.WaitGroup), } + indexer.CreateOutput(OutputBlocks) indexer.idGenerator = generator.NewIdGenerator(storage.Address, storage.Class, indexer.cache, indexer.state.Current()) indexer.store = store.New( @@ -129,9 +123,9 @@ func New( // Start - func (indexer *Indexer) Start(ctx context.Context) { - indexer.log.Info().Msg("starting indexer...") + indexer.Log.Info().Msg("starting indexer...") if err := indexer.init(ctx); err != nil { - indexer.log.Err(err).Msg("state initializing error") + indexer.Log.Err(err).Msg("state initializing error") return } @@ -140,15 +134,12 @@ func (indexer *Indexer) Start(ctx context.Context) { indexer.statusChecker.Start(ctx) if err := indexer.fixClassAbi(ctx); err != nil { - indexer.log.Err(err).Msg("recovering class abi error") + indexer.Log.Err(err).Msg("recovering class abi error") return } - indexer.wg.Add(1) - go indexer.saveBlocks(ctx) - - indexer.wg.Add(1) - go indexer.sync(ctx) + indexer.G.GoCtx(ctx, indexer.saveBlocks) + indexer.G.GoCtx(ctx, indexer.sync) } // Name - @@ -161,8 +152,8 @@ func (indexer *Indexer) Name() string { // Close - func (indexer *Indexer) Close() error { - indexer.wg.Wait() - indexer.log.Info().Msgf("closing...") + indexer.G.Wait() + indexer.Log.Info().Msgf("closing...") if err := indexer.statusChecker.Close(); err != nil { return err @@ -227,7 +218,7 @@ func (indexer *Indexer) getNewBlocks(ctx context.Context) error { } for head > indexer.state.Height() { - indexer.log.Info(). + indexer.Log.Info(). Uint64("indexer_block", indexer.state.Height()). Uint64("node_block", head). Msg("syncing...") @@ -271,15 +262,13 @@ func (indexer *Indexer) getNewBlocks(ctx context.Context) error { } } - indexer.log.Info().Uint64("height", indexer.state.Height()).Msg("synced") + indexer.Log.Info().Uint64("height", indexer.state.Height()).Msg("synced") return nil } func (indexer *Indexer) sync(ctx context.Context) { - defer indexer.wg.Done() - if err := indexer.getNewBlocks(ctx); err != nil { - indexer.log.Err(err).Msg("getNewBlocks") + indexer.Log.Err(err).Msg("getNewBlocks") } ticker := time.NewTicker(time.Second * 30) @@ -293,19 +282,17 @@ func (indexer *Indexer) sync(ctx context.Context) { return case <-ticker.C: if err := indexer.getNewBlocks(ctx); err != nil { - indexer.log.Err(err).Msg("getNewBlocks") + indexer.Log.Err(err).Msg("getNewBlocks") } case <-indexer.rollbackRerun: if err := indexer.getNewBlocks(ctx); err != nil { - indexer.log.Err(err).Msg("getNewBlocks") + indexer.Log.Err(err).Msg("getNewBlocks") } } } } func (indexer *Indexer) saveBlocks(ctx context.Context) { - defer indexer.wg.Done() - var zeroBlock bool for { @@ -319,7 +306,7 @@ func (indexer *Indexer) saveBlocks(ctx context.Context) { if indexer.state.Height() == 0 && !zeroBlock { if data, ok := indexer.queue[0]; ok { if err := indexer.handleBlock(ctx, data); err != nil { - indexer.log.Err(err).Msg("handle block") + indexer.Log.Err(err).Msg("handle block") } zeroBlock = true } else { @@ -339,7 +326,7 @@ func (indexer *Indexer) saveBlocks(ctx context.Context) { if errors.Is(err, context.Canceled) { return } - indexer.log.Err(err).Stack().Msg("handle reorg") + indexer.Log.Err(err).Stack().Msg("handle reorg") time.Sleep(time.Second * 3) } @@ -351,7 +338,7 @@ func (indexer *Indexer) saveBlocks(ctx context.Context) { if errors.Is(err, context.Canceled) { return } - indexer.log.Err(err).Stack().Msg("handle block") + indexer.Log.Err(err).Stack().Msg("handle block") time.Sleep(time.Second * 3) } if next%25 == 0 { @@ -435,7 +422,7 @@ func (indexer *Indexer) handleBlock(ctx context.Context, result receiver.Result) delete(indexer.queue, result.Block.BlockNumber) - l := indexer.log.Info(). + l := indexer.Log.Info(). Uint64("height", result.Block.BlockNumber). Int("tx_count", parseResult.Block.TxCount). Time("block_time", parseResult.Block.Time). diff --git a/pkg/indexer/messages.go b/pkg/indexer/messages.go index fb9bc81..d356e50 100644 --- a/pkg/indexer/messages.go +++ b/pkg/indexer/messages.go @@ -2,8 +2,6 @@ package indexer import ( "github.com/dipdup-io/starknet-indexer/internal/storage" - "github.com/dipdup-net/indexer-sdk/pkg/modules" - "github.com/pkg/errors" ) // topics @@ -18,30 +16,6 @@ type IndexerMessage struct { Tokens []*storage.Token } -// Input - -func (indexer *Indexer) Input(name string) (*modules.Input, error) { - return nil, errors.Wrap(modules.ErrUnknownInput, name) -} - -// Output - -func (indexer *Indexer) Output(name string) (*modules.Output, error) { - output, ok := indexer.outputs[name] - if !ok { - return nil, errors.Wrap(modules.ErrUnknownInput, name) - } - return output, nil -} - -// AttachTo - -func (indexer *Indexer) AttachTo(name string, input *modules.Input) error { - output, err := indexer.Output(name) - if err != nil { - return err - } - output.Attach(input) - return nil -} - func (indexer *Indexer) notifyAllAboutBlock( blocks storage.Block, addresses map[string]*storage.Address, @@ -53,7 +27,9 @@ func (indexer *Indexer) notifyAllAboutBlock( newTokens = append(newTokens, token) } } - indexer.outputs[OutputBlocks].Push(&IndexerMessage{ + + output := indexer.MustOutput(OutputBlocks) + output.Push(&IndexerMessage{ Block: &blocks, Addresses: addresses, Tokens: newTokens, diff --git a/pkg/indexer/status_checker.go b/pkg/indexer/status_checker.go index f90dd39..e23e3c5 100644 --- a/pkg/indexer/status_checker.go +++ b/pkg/indexer/status_checker.go @@ -2,13 +2,13 @@ package indexer import ( "context" - "sync" "time" "github.com/dipdup-io/starknet-go-api/pkg/encoding" "github.com/dipdup-io/starknet-indexer/internal/storage" "github.com/dipdup-io/starknet-indexer/internal/storage/postgres" "github.com/dipdup-io/starknet-indexer/pkg/indexer/receiver" + "github.com/dipdup-io/workerpool" sdk "github.com/dipdup-net/indexer-sdk/pkg/storage" "github.com/pkg/errors" "github.com/rs/zerolog" @@ -68,7 +68,7 @@ type statusChecker struct { transactable sdk.Transactable receiver *receiver.Receiver log zerolog.Logger - wg *sync.WaitGroup + g workerpool.Group } func newStatusChecker( @@ -92,19 +92,16 @@ func newStatusChecker( l1Handlers: l1Handlers, transactable: transactable, log: log.With().Str("module", "status_checker").Logger(), - wg: new(sync.WaitGroup), + g: workerpool.NewGroup(), } } // Start - func (checker *statusChecker) Start(ctx context.Context) { - checker.wg.Add(1) - go checker.start(ctx) + checker.g.GoCtx(ctx, checker.start) } func (checker *statusChecker) start(ctx context.Context) { - defer checker.wg.Done() - if err := checker.init(ctx); err != nil { checker.log.Err(err).Msg("checker init") return @@ -131,7 +128,7 @@ func (checker *statusChecker) start(ctx context.Context) { // Close - func (checker *statusChecker) Close() error { - checker.wg.Wait() + checker.g.Wait() return nil }