From 7f39f9f9a0c2644fc09d850191c25489606028a4 Mon Sep 17 00:00:00 2001 From: Artem Date: Wed, 8 Nov 2023 18:16:42 +0100 Subject: [PATCH] Fix: docker-compose and client module --- .env.example | 10 +++++ build/Dockerfile | 12 +++--- docker-compose.yml | 14 +++---- go.mod | 2 +- pkg/grpc/client.go | 102 ++++++++++----------------------------------- pkg/grpc/server.go | 4 +- 6 files changed, 48 insertions(+), 96 deletions(-) create mode 100644 .env.example diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..b8a5089 --- /dev/null +++ b/.env.example @@ -0,0 +1,10 @@ +INDEXER_BRIDGED_TOKENS_FILE=mainnet.json # full list of files you can find in repo ./build/bridged_tokens/ +INDEXER_CLASS_INTERFACES_DIR=./interfaces/ # REQUIRED +HASURA_HOST=hasura +HASURA_POSTGRES_HOST=db +LOG_LEVEL=info +CACHE_ENABLED=false +POSTGRES_PORT=5432 +POSTGRES_HOST=db +POSTGRES_DB=starknet +POSTGRES_PASSWORD= # REQUIRED \ No newline at end of file diff --git a/build/Dockerfile b/build/Dockerfile index b4bf63c..2b407f5 100644 --- a/build/Dockerfile +++ b/build/Dockerfile @@ -1,7 +1,7 @@ # --------------------------------------------------------------------- # The first stage container, for building the application # --------------------------------------------------------------------- -FROM golang:1.20-alpine as builder +FROM golang:1.21.2-alpine as builder ENV CGO_ENABLED=0 ENV GO111MODULE=on @@ -10,17 +10,17 @@ ENV GOOS=linux RUN apk --no-cache add ca-certificates RUN apk add --update git musl-dev gcc build-base -RUN mkdir -p $GOPATH/src/github.com/dipdup-io/straknet-indexer/ +RUN mkdir -p $GOPATH/src/github.com/dipdup-io/starknet-indexer/ -COPY ./go.* $GOPATH/src/github.com/dipdup-io/straknet-indexer/ -WORKDIR $GOPATH/src/github.com/dipdup-io/straknet-indexer +COPY ./go.* $GOPATH/src/github.com/dipdup-io/starknet-indexer/ +WORKDIR $GOPATH/src/github.com/dipdup-io/starknet-indexer RUN go mod download COPY cmd/indexer cmd/indexer COPY internal internal COPY pkg pkg -WORKDIR $GOPATH/src/github.com/dipdup-io/straknet-indexer/cmd/indexer/ +WORKDIR $GOPATH/src/github.com/dipdup-io/starknet-indexer/cmd/indexer/ RUN go build -a -o /go/bin/indexer . # --------------------------------------------------------------------- @@ -28,7 +28,7 @@ RUN go build -a -o /go/bin/indexer . # --------------------------------------------------------------------- FROM scratch -WORKDIR /app/straknet-indexer/ +WORKDIR /app/starknet-indexer/ COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ COPY --from=builder /go/bin/indexer /go/bin/indexer diff --git a/docker-compose.yml b/docker-compose.yml index 45d4be0..20832c6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ version: "3.6" services: indexer: - image: ghcr.io/dipdup-io/starknet-indexer:master + image: ghcr.io/dipdup-io/starknet-indexer:${TAG:-master} build: dockerfile: build/Dockerfile context: . @@ -16,7 +16,7 @@ services: depends_on: - db - hasura - logging: &straknet-dipdup-logging + logging: &starknet-indexer-logging options: max-size: 10m max-file: "5" @@ -27,21 +27,21 @@ services: image: postgres:15 restart: always volumes: - - db:/var/lib/postgres/data + - db:/var/lib/postgresql/data - /etc/postgresql/postgresql.conf:/etc/postgresql/postgresql.conf ports: - - 127.0.0.1:5432:5432 + - 127.0.0.1:${POSTGRES_PORT:-5432}:5432 environment: - POSTGRES_HOST=${POSTGRES_HOST:-db} - POSTGRES_USER=${POSTGRES_USER:-dipdup} - POSTGRES_DB=${POSTGRES_DB:-starknet} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-changeme} healthcheck: - test: ["CMD-SHELL", "pg_isready -U dipdup -d starknet"] + test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-dipdup} -d ${POSTGRES_DB:-starknet}"] interval: 10s timeout: 5s retries: 5 - logging: *straknet-dipdup-logging + logging: *starknet-indexer-logging command: - "postgres" - "-c" @@ -59,7 +59,7 @@ services: - HASURA_GRAPHQL_ENABLED_LOG_TYPES=startup, http-log, webhook-log, websocket-log, query-log - HASURA_GRAPHQL_ADMIN_SECRET=${ADMIN_SECRET:-changeme} - HASURA_GRAPHQL_UNAUTHORIZED_ROLE=user - logging: *straknet-dipdup-logging + logging: *starknet-indexer-logging volumes: db: \ No newline at end of file diff --git a/go.mod b/go.mod index 3f0a168..12e6fd7 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/dipdup-io/starknet-indexer -go 1.20 +go 1.21 require ( github.com/dipdup-io/starknet-go-api v0.0.0-20230912113406-c699cdbd6582 diff --git a/pkg/grpc/client.go b/pkg/grpc/client.go index d495c4b..45d70c2 100644 --- a/pkg/grpc/client.go +++ b/pkg/grpc/client.go @@ -2,7 +2,6 @@ package grpc import ( "context" - "sync" "time" "github.com/dipdup-io/starknet-indexer/pkg/grpc/pb" @@ -10,13 +9,12 @@ import ( "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc" grpcSDK "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc" generalPB "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc/pb" - "github.com/pkg/errors" "github.com/rs/zerolog/log" ) -// outputs names const ( OutputMessages = "messages" + ModuleName = "layer1_grpc_client" ) // Stream - @@ -37,96 +35,43 @@ func NewStream(stream *grpcSDK.Stream[pb.Subscription], request *pb.SubscribeReq // Client - type Client struct { - grpc *grpcSDK.Client - - output *modules.Output + modules.BaseModule + grpc *grpcSDK.Client streams map[uint64]*Stream service pb.IndexerServiceClient reconnect chan uint64 - - wg *sync.WaitGroup } // NewClient - func NewClient(cfg ClientConfig) *Client { - return &Client{ - grpc: grpcSDK.NewClient(cfg.ServerAddress), - output: modules.NewOutput(OutputMessages), - streams: make(map[uint64]*Stream), - reconnect: make(chan uint64, 16), - wg: new(sync.WaitGroup), + client := &Client{ + BaseModule: modules.New(ModuleName), + grpc: grpcSDK.NewClient(cfg.ServerAddress), + streams: make(map[uint64]*Stream), + reconnect: make(chan uint64, 16), } + client.CreateOutput(OutputMessages) + return client } // NewClientWithServerAddress - func NewClientWithServerAddress(address string) *Client { - return &Client{ - grpc: grpcSDK.NewClient(address), - output: modules.NewOutput(OutputMessages), - streams: make(map[uint64]*Stream), - reconnect: make(chan uint64, 16), - wg: new(sync.WaitGroup), - } -} - -// Name - -func (client *Client) Name() string { - return "layer1_grpc_client" -} - -// Input - -func (client *Client) Input(name string) (*modules.Input, error) { - return nil, errors.Wrap(modules.ErrUnknownInput, name) -} - -// MustInput - -func (client *Client) MustInput(name string) *modules.Input { - input, err := client.Input(name) - if err != nil { - panic(err) - } - return input -} - -// Output - -func (client *Client) Output(name string) (*modules.Output, error) { - if name != OutputMessages { - return nil, errors.Wrap(modules.ErrUnknownOutput, name) + client := &Client{ + BaseModule: modules.New(ModuleName), + grpc: grpcSDK.NewClient(address), + streams: make(map[uint64]*Stream), + reconnect: make(chan uint64, 16), } - return client.output, nil -} - -// MustOutput - -func (client *Client) MustOutput(name string) *modules.Output { - output, err := client.Output(name) - if err != nil { - panic(err) - } - return output -} - -// AttachTo - -func (client *Client) AttachTo(outputModule modules.Module, outputName, inputName string) error { - output, err := outputModule.Output(outputName) - if err != nil { - return err - } - input, err := client.Input(inputName) - if err != nil { - return err - } - output.Attach(input) - return nil + client.CreateOutput(OutputMessages) + return client } // Start - func (client *Client) Start(ctx context.Context) { client.grpc.Start(ctx) client.service = pb.NewIndexerServiceClient(client.grpc.Connection()) - - client.wg.Add(1) - go client.reconnectThread(ctx) + client.G.GoCtx(ctx, client.reconnectThread) } // Connect - @@ -136,7 +81,7 @@ func (client *Client) Connect(ctx context.Context, opts ...grpcSDK.ConnectOption // Close - closes client func (client *Client) Close() error { - client.wg.Wait() + client.G.Wait() for id, stream := range client.streams { unsubscribeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -166,8 +111,6 @@ func (client *Client) Reconnect() <-chan uint64 { } func (client *Client) reconnectThread(ctx context.Context) { - defer client.wg.Done() - for { select { case <-ctx.Done(): @@ -193,8 +136,9 @@ func (client *Client) subscribe(ctx context.Context, req *pb.SubscribeRequest) ( } grpcStream := grpc.NewStream[pb.Subscription](stream) - client.wg.Add(1) - go client.handleMessage(ctx, grpcStream) + client.G.GoCtx(ctx, func(ctx context.Context) { + client.handleMessage(ctx, grpcStream) + }) id, err := grpcStream.Subscribe(ctx) return id, grpcStream, err @@ -221,8 +165,6 @@ func (client *Client) sendToOutput(name string, data any) error { } func (client *Client) handleMessage(ctx context.Context, stream *grpcSDK.Stream[pb.Subscription]) { - defer client.wg.Done() - for { select { case <-stream.Context().Done(): diff --git a/pkg/grpc/server.go b/pkg/grpc/server.go index a52f5ac..87bc35d 100644 --- a/pkg/grpc/server.go +++ b/pkg/grpc/server.go @@ -21,11 +21,11 @@ const ( // Server - type Server struct { - GRPC *grpcSDK.Server modules.BaseModule pb.UnimplementedIndexerServiceServer - db postgres.Storage + GRPC *grpcSDK.Server + db postgres.Storage subscriptions *grpcSDK.Subscriptions[*subscriptions.Message, *pb.Subscription] }