Skip to content

Commit

Permalink
Upgrade: indexer-sdk (#27)
Browse files Browse the repository at this point in the history
* Upgrade: indexer-sdk

* Fix: docker-compose and client module
  • Loading branch information
aopoltorzhicky authored Nov 8, 2023
1 parent 09511de commit 69f96f4
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 211 deletions.
10 changes: 10 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
INDEXER_BRIDGED_TOKENS_FILE=mainnet.json # full list of files you can find in repo ./build/bridged_tokens/
INDEXER_CLASS_INTERFACES_DIR=./interfaces/ # REQUIRED
HASURA_HOST=hasura
HASURA_POSTGRES_HOST=db
LOG_LEVEL=info
CACHE_ENABLED=false
POSTGRES_PORT=5432
POSTGRES_HOST=db
POSTGRES_DB=starknet
POSTGRES_PASSWORD=<TYPE_SOMETHING_STRONG> # REQUIRED
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ starknet_id:
loot_survivor:
cd cmd/rpc_tester && go run . -c ../../cmd/rpc_tester/loot_survivor.yml

blocks:
cd cmd/rpc_tester && go run . -c ../../cmd/rpc_tester/blocks.yml

build-proto:
protoc \
-I=${GOPATH}/src \
Expand Down
12 changes: 6 additions & 6 deletions build/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# ---------------------------------------------------------------------
# The first stage container, for building the application
# ---------------------------------------------------------------------
FROM golang:1.20-alpine as builder
FROM golang:1.21.2-alpine as builder

ENV CGO_ENABLED=0
ENV GO111MODULE=on
Expand All @@ -10,25 +10,25 @@ ENV GOOS=linux
RUN apk --no-cache add ca-certificates
RUN apk add --update git musl-dev gcc build-base

RUN mkdir -p $GOPATH/src/github.com/dipdup-io/straknet-indexer/
RUN mkdir -p $GOPATH/src/github.com/dipdup-io/starknet-indexer/

COPY ./go.* $GOPATH/src/github.com/dipdup-io/straknet-indexer/
WORKDIR $GOPATH/src/github.com/dipdup-io/straknet-indexer
COPY ./go.* $GOPATH/src/github.com/dipdup-io/starknet-indexer/
WORKDIR $GOPATH/src/github.com/dipdup-io/starknet-indexer
RUN go mod download

COPY cmd/indexer cmd/indexer
COPY internal internal
COPY pkg pkg

WORKDIR $GOPATH/src/github.com/dipdup-io/straknet-indexer/cmd/indexer/
WORKDIR $GOPATH/src/github.com/dipdup-io/starknet-indexer/cmd/indexer/
RUN go build -a -o /go/bin/indexer .

# ---------------------------------------------------------------------
# The second stage container, for running the application
# ---------------------------------------------------------------------
FROM scratch

WORKDIR /app/straknet-indexer/
WORKDIR /app/starknet-indexer/

COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=builder /go/bin/indexer /go/bin/indexer
Expand Down
9 changes: 9 additions & 0 deletions cmd/rpc_tester/blocks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version: 0.0.1

log_level: ${LOG_LEVEL:-info}

grpc:
server_address: ${GRPC_BIND:-127.0.0.1:7779}
subscriptions:
blocks:
head: true
2 changes: 1 addition & 1 deletion cmd/rpc_tester/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

// Printer -
type Printer struct {
*printer.Printer
printer.Printer

eventCounters map[string]*atomic.Uint64

Expand Down
14 changes: 7 additions & 7 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "3.6"

services:
indexer:
image: ghcr.io/dipdup-io/starknet-indexer:master
image: ghcr.io/dipdup-io/starknet-indexer:${TAG:-master}
build:
dockerfile: build/Dockerfile
context: .
Expand All @@ -16,7 +16,7 @@ services:
depends_on:
- db
- hasura
logging: &straknet-dipdup-logging
logging: &starknet-indexer-logging
options:
max-size: 10m
max-file: "5"
Expand All @@ -27,21 +27,21 @@ services:
image: postgres:15
restart: always
volumes:
- db:/var/lib/postgres/data
- db:/var/lib/postgresql/data
- /etc/postgresql/postgresql.conf:/etc/postgresql/postgresql.conf
ports:
- 127.0.0.1:5432:5432
- 127.0.0.1:${POSTGRES_PORT:-5432}:5432
environment:
- POSTGRES_HOST=${POSTGRES_HOST:-db}
- POSTGRES_USER=${POSTGRES_USER:-dipdup}
- POSTGRES_DB=${POSTGRES_DB:-starknet}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-changeme}
healthcheck:
test: ["CMD-SHELL", "pg_isready -U dipdup -d starknet"]
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-dipdup} -d ${POSTGRES_DB:-starknet}"]
interval: 10s
timeout: 5s
retries: 5
logging: *straknet-dipdup-logging
logging: *starknet-indexer-logging
command:
- "postgres"
- "-c"
Expand All @@ -59,7 +59,7 @@ services:
- HASURA_GRAPHQL_ENABLED_LOG_TYPES=startup, http-log, webhook-log, websocket-log, query-log
- HASURA_GRAPHQL_ADMIN_SECRET=${ADMIN_SECRET:-changeme}
- HASURA_GRAPHQL_UNAUTHORIZED_ROLE=user
logging: *straknet-dipdup-logging
logging: *starknet-indexer-logging

volumes:
db:
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
module github.com/dipdup-io/starknet-indexer

go 1.20
go 1.21

require (
github.com/dipdup-io/starknet-go-api v0.0.0-20230912113406-c699cdbd6582
github.com/dipdup-io/workerpool v0.0.3
github.com/dipdup-io/workerpool v0.0.4
github.com/dipdup-net/go-lib v0.3.3
github.com/dipdup-net/indexer-sdk v0.0.0-20230819120445-392cbc4cfb65
github.com/dipdup-net/indexer-sdk v0.0.4
github.com/go-testfixtures/testfixtures/v3 v3.9.0
github.com/goccy/go-json v0.10.2
github.com/karlseguin/ccache/v2 v2.0.8
Expand All @@ -18,7 +18,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/uptrace/bun v1.1.14
go.uber.org/mock v0.2.0
google.golang.org/grpc v1.57.0
google.golang.org/grpc v1.57.1
google.golang.org/protobuf v1.31.0
)

Expand All @@ -36,7 +36,7 @@ require (
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/docker/docker v24.0.5+incompatible // indirect
github.com/docker/docker v24.0.7+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
Expand Down Expand Up @@ -98,7 +98,7 @@ require (
go.opentelemetry.io/otel v1.16.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/denisenkom/go-mssqldb v0.12.3 h1:pBSGx9Tq67pBOTLmxNuirNTeB8Vjmf886Kx+8Y+8shw=
github.com/dipdup-io/starknet-go-api v0.0.0-20230912113406-c699cdbd6582 h1:Mo5MDCO9Bqdj62Jrn1at/f44KOOzWCPF+ohu5jqlb4o=
github.com/dipdup-io/starknet-go-api v0.0.0-20230912113406-c699cdbd6582/go.mod h1:Pbi1et2OC3ZWhzv/76nDg5C0/v4Mrj7YWkZPXcZFys0=
github.com/dipdup-io/workerpool v0.0.3 h1:+cnO0/J0e4UiJ0EBEDpvuhriSDVHlsPminGRU2Il+ZI=
github.com/dipdup-io/workerpool v0.0.3/go.mod h1:m6YMqx7M+fORTyabHD/auKymBRpbDax0t1aPZ1i7GZA=
github.com/dipdup-io/workerpool v0.0.4 h1:m58fuFY3VIPRc+trWpjw2Lsm4FvIgtjP/4VRe79r+/s=
github.com/dipdup-io/workerpool v0.0.4/go.mod h1:m6YMqx7M+fORTyabHD/auKymBRpbDax0t1aPZ1i7GZA=
github.com/dipdup-net/go-lib v0.3.3 h1:vTUI+sT4L+x+eiMf712Cg8EtlqUCMiN6M3vcNaPlCw8=
github.com/dipdup-net/go-lib v0.3.3/go.mod h1:oBDOSsM/F8fEnmuDnaJ6QA/cHH4lne49ASbsh8WXDe4=
github.com/dipdup-net/indexer-sdk v0.0.0-20230819120445-392cbc4cfb65 h1:jZ53iH1UhoBTtIj4pRUPz826EFIwr7KYUR7EkwokwPM=
github.com/dipdup-net/indexer-sdk v0.0.0-20230819120445-392cbc4cfb65/go.mod h1:sZEbnuguFw8kxgD3iNLLHDq1DUDrXI6KcghNMcmNCS4=
github.com/dipdup-net/indexer-sdk v0.0.4 h1:mhTW3f4U6oc05UjxSiffOV+HIi4vQkDgOq1MbJXia8U=
github.com/dipdup-net/indexer-sdk v0.0.4/go.mod h1:n1oBIm5MPY1WxLS9tQfTWr+Ytrwv6ThCZF7TASsJslg=
github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8=
github.com/docker/distribution v2.8.2+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docker/docker v24.0.5+incompatible h1:WmgcE4fxyI6EEXxBRxsHnZXrO1pQ3smi0k/jho4HLeY=
github.com/docker/docker v24.0.5+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/docker v24.0.7+incompatible h1:Wo6l37AuwP3JaMnZa226lzVXGA3F9Ig1seQen0cKYlM=
github.com/docker/docker v24.0.7+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
Expand Down Expand Up @@ -295,8 +295,8 @@ golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea h1:vLCWI/yYrdEHyN2JzIzPO3aaQJHQdp89IZBA/+azVC4=
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc h1:mCRnTeVUjcrhlRmO0VK8a6k6Rrf6TF9htwo2pJVSjIU=
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down Expand Up @@ -396,8 +396,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577/go.
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/grpc v1.57.1 h1:upNTNqv0ES+2ZOOqACwVtS3Il8M12/+Hz41RCPzAjQg=
google.golang.org/grpc v1.57.1/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
80 changes: 22 additions & 58 deletions pkg/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@ package grpc

import (
"context"
"sync"
"time"

"github.com/dipdup-io/starknet-indexer/pkg/grpc/pb"
"github.com/dipdup-net/indexer-sdk/pkg/modules"
"github.com/dipdup-net/indexer-sdk/pkg/modules/grpc"
grpcSDK "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc"
generalPB "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc/pb"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)

// outputs names
const (
OutputMessages = "messages"
ModuleName = "layer1_grpc_client"
)

// Stream -
Expand All @@ -37,74 +35,43 @@ func NewStream(stream *grpcSDK.Stream[pb.Subscription], request *pb.SubscribeReq

// Client -
type Client struct {
grpc *grpcSDK.Client

output *modules.Output
modules.BaseModule
grpc *grpcSDK.Client
streams map[uint64]*Stream

service pb.IndexerServiceClient
reconnect chan uint64

wg *sync.WaitGroup
}

// NewClient -
func NewClient(cfg ClientConfig) *Client {
return &Client{
grpc: grpcSDK.NewClient(cfg.ServerAddress),
output: modules.NewOutput(OutputMessages),
streams: make(map[uint64]*Stream),
reconnect: make(chan uint64, 16),
wg: new(sync.WaitGroup),
client := &Client{
BaseModule: modules.New(ModuleName),
grpc: grpcSDK.NewClient(cfg.ServerAddress),
streams: make(map[uint64]*Stream),
reconnect: make(chan uint64, 16),
}
client.CreateOutput(OutputMessages)
return client
}

// NewClientWithServerAddress -
func NewClientWithServerAddress(address string) *Client {
return &Client{
grpc: grpcSDK.NewClient(address),
output: modules.NewOutput(OutputMessages),
streams: make(map[uint64]*Stream),
reconnect: make(chan uint64, 16),
wg: new(sync.WaitGroup),
}
}

// Name -
func (client *Client) Name() string {
return "layer1_grpc_client"
}

// Input -
func (client *Client) Input(name string) (*modules.Input, error) {
return nil, errors.Wrap(modules.ErrUnknownInput, name)
}

// Output -
func (client *Client) Output(name string) (*modules.Output, error) {
if name != OutputMessages {
return nil, errors.Wrap(modules.ErrUnknownOutput, name)
}
return client.output, nil
}

// AttachTo -
func (client *Client) AttachTo(name string, input *modules.Input) error {
output, err := client.Output(name)
if err != nil {
return err
client := &Client{
BaseModule: modules.New(ModuleName),
grpc: grpcSDK.NewClient(address),
streams: make(map[uint64]*Stream),
reconnect: make(chan uint64, 16),
}
output.Attach(input)
return nil
client.CreateOutput(OutputMessages)
return client
}

// Start -
func (client *Client) Start(ctx context.Context) {
client.grpc.Start(ctx)
client.service = pb.NewIndexerServiceClient(client.grpc.Connection())

client.wg.Add(1)
go client.reconnectThread(ctx)
client.G.GoCtx(ctx, client.reconnectThread)
}

// Connect -
Expand All @@ -114,7 +81,7 @@ func (client *Client) Connect(ctx context.Context, opts ...grpcSDK.ConnectOption

// Close - closes client
func (client *Client) Close() error {
client.wg.Wait()
client.G.Wait()

for id, stream := range client.streams {
unsubscribeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down Expand Up @@ -144,8 +111,6 @@ func (client *Client) Reconnect() <-chan uint64 {
}

func (client *Client) reconnectThread(ctx context.Context) {
defer client.wg.Done()

for {
select {
case <-ctx.Done():
Expand All @@ -171,8 +136,9 @@ func (client *Client) subscribe(ctx context.Context, req *pb.SubscribeRequest) (
}
grpcStream := grpc.NewStream[pb.Subscription](stream)

client.wg.Add(1)
go client.handleMessage(ctx, grpcStream)
client.G.GoCtx(ctx, func(ctx context.Context) {
client.handleMessage(ctx, grpcStream)
})

id, err := grpcStream.Subscribe(ctx)
return id, grpcStream, err
Expand All @@ -199,8 +165,6 @@ func (client *Client) sendToOutput(name string, data any) error {
}

func (client *Client) handleMessage(ctx context.Context, stream *grpcSDK.Stream[pb.Subscription]) {
defer client.wg.Done()

for {
select {
case <-stream.Context().Done():
Expand Down
Loading

0 comments on commit 69f96f4

Please sign in to comment.