Skip to content

Commit

Permalink
ADR-006 stage 2 (#153)
Browse files Browse the repository at this point in the history
* ADR-006 stage 2

* add telemetry

* Update domain/telemetry.go

* lint metrics
  • Loading branch information
p0mvn authored Apr 8, 2024
1 parent b83975b commit 7f25cef
Show file tree
Hide file tree
Showing 50 changed files with 1,055 additions and 1,414 deletions.
11 changes: 8 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,26 @@ run-docker:
docker run -d --name sqs -p 9092:9092 -p 26657:26657 -v /root/sqs/config-testnet.json/:/osmosis/config.json --net host osmolabs/sqs:local "--config /osmosis/config.json"
docker logs -f sqs

# Note: we migrated away from Redis.
# This is left in case we require more data in the near future
# prompting the need for Redis.
redis-start:
docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 -v ./redis-cache/:/data redis/redis-stack:7.2.0-v3

# Note: we migrated away from Redis.
# This is left in case we require more data in the near future
# prompting the need for Redis.
redis-stop:
docker container rm -f redis-stack

osmosis-start:
docker run -d --name osmosis -p 26657:26657 -p 9090:9090 -p 1317:1317 -p 9091:9091 -p 6060:6060 -v $(HOME)/.osmosisd/:/osmosis/.osmosisd/ --net host osmolabs/osmosis-dev:sqs-out-v0.2 "start"

osmosis-stop:
docker container rm -f osmosis

all-stop: osmosis-stop redis-stop
all-stop: osmosis-stop

all-start: redis-start osmosis-start run
all-start: osmosis-start run

lint:
@echo "--> Running linter"
Expand Down
17 changes: 0 additions & 17 deletions app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/osmosis-labs/sqs/chaininfo/client"
"github.com/osmosis-labs/sqs/domain"
sqslog "github.com/osmosis-labs/sqs/log"
"github.com/redis/go-redis/v9"
"github.com/spf13/viper"
_ "github.com/swaggo/echo-swagger"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -66,12 +65,6 @@ func main() {
}
}()

redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%s", config.StorageHost, config.StoragePort),
Password: "", // no password set
DB: 0, // use default DB
})

if config.OTEL.DSN != "" {
otelConfig := config.OTEL
err = sentry.Init(sentry.ClientOptions{
Expand All @@ -93,12 +86,6 @@ func main() {
initOTELTracer()
}

redisStatus := redisClient.Ping(context.Background())
_, err = redisStatus.Result()
if err != nil {
panic(err)
}

chainClient, err := client.NewClient(config.ChainID, config.ChainGRPCGatewayEndpoint)
if err != nil {
panic(err)
Expand Down Expand Up @@ -130,10 +117,6 @@ func main() {
<-exitChan
cancel() // Trigger shutdown

if err := redisClient.Close(); err != nil {
log.Fatal(err)
}

err := sidecarQueryServer.Shutdown(ctx)
if err != nil {
log.Fatal(err)
Expand Down
97 changes: 22 additions & 75 deletions app/sidecar_query_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,21 @@ package main

import (
"context"
"fmt"
"net"
"net/http"
"time"

"github.com/cosmos/cosmos-sdk/codec"
"github.com/labstack/echo/v4"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"

ingestrpcdelivry "github.com/osmosis-labs/sqs/ingest/delivery/grpc"
ingestusecase "github.com/osmosis-labs/sqs/ingest/usecase"

chaininforepo "github.com/osmosis-labs/sqs/chaininfo/repository"
chaininfousecase "github.com/osmosis-labs/sqs/chaininfo/usecase"
poolsHttpDelivery "github.com/osmosis-labs/sqs/pools/delivery/http"
poolsUseCase "github.com/osmosis-labs/sqs/pools/usecase"
"github.com/osmosis-labs/sqs/sqsdomain/repository"
redisrepo "github.com/osmosis-labs/sqs/sqsdomain/repository/redis"
chaininforedisrepo "github.com/osmosis-labs/sqs/sqsdomain/repository/redis/chaininfo"
poolsredisrepo "github.com/osmosis-labs/sqs/sqsdomain/repository/redis/pools"
routerredisrepo "github.com/osmosis-labs/sqs/sqsdomain/repository/redis/router"
routerrepo "github.com/osmosis-labs/sqs/router/repository"
tokenshttpdelivery "github.com/osmosis-labs/sqs/tokens/delivery/http"
tokensUseCase "github.com/osmosis-labs/sqs/tokens/usecase"

Expand All @@ -42,50 +36,31 @@ import (
// It encapsulates all logic for ingesting chain data into the server
// and exposes endpoints for querying formatter and processed data from frontend.
type SideCarQueryServer interface {
GetTxManager() repository.TxManager
GetChainInfoRepository() chaininforedisrepo.ChainInfoRepository
GetRouterRepository() routerredisrepo.RouterRepository
GetRouterRepository() routerrepo.RouterRepository
GetTokensUseCase() mvc.TokensUsecase
GetLogger() log.Logger
Shutdown(context.Context) error
Start(context.Context) error
}

type sideCarQueryServer struct {
txManager repository.TxManager
poolsRepository poolsredisrepo.PoolsRepository
chainInfoRepository chaininforedisrepo.ChainInfoRepository
routerRepository routerredisrepo.RouterRepository
tokensUseCase mvc.TokensUsecase
e *echo.Echo
sqsAddress string
logger log.Logger
routerRepository routerrepo.RouterRepository
tokensUseCase mvc.TokensUsecase
e *echo.Echo
sqsAddress string
logger log.Logger
}

// GetTokensUseCase implements SideCarQueryServer.
func (sqs *sideCarQueryServer) GetTokensUseCase() mvc.TokensUsecase {
return sqs.tokensUseCase
}

// GetPoolsRepository implements SideCarQueryServer.
func (sqs *sideCarQueryServer) GetPoolsRepository() poolsredisrepo.PoolsRepository {
return sqs.poolsRepository
}

func (sqs *sideCarQueryServer) GetChainInfoRepository() chaininforedisrepo.ChainInfoRepository {
return sqs.chainInfoRepository
}

// GetRouterRepository implements SideCarQueryServer.
func (sqs *sideCarQueryServer) GetRouterRepository() routerredisrepo.RouterRepository {
func (sqs *sideCarQueryServer) GetRouterRepository() routerrepo.RouterRepository {
return sqs.routerRepository
}

// GetTxManager implements SideCarQueryServer.
func (sqs *sideCarQueryServer) GetTxManager() repository.TxManager {
return sqs.txManager
}

// GetLogger implements SideCarQueryServer.
func (sqs *sideCarQueryServer) GetLogger() log.Logger {
return sqs.logger
Expand Down Expand Up @@ -116,42 +91,16 @@ func NewSideCarQueryServer(appCodec codec.Codec, config domain.Config, logger lo
e.Use(middleware.InstrumentMiddleware)
e.Use(middleware.TraceWithParamsMiddleware("sqs"))

// Use context for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())

defer func() {
cancel() // Trigger shutdown
}()

// Create redis client and ensure that it is up.
redisAddress := fmt.Sprintf("%s:%s", config.StorageHost, config.StoragePort)
logger.Info("Pinging redis", zap.String("redis_address", redisAddress))
redisClient := redis.NewClient(&redis.Options{
Addr: redisAddress,
Password: "", // no password set
DB: 0, // use default DB
})
redisStatus := redisClient.Ping(ctx)
_, err := redisStatus.Result()
if err != nil {
return nil, err
}

// Creare repository manager
redisTxManager := redisrepo.NewTxManager(redisClient)

// Initialize pools repository, usecase and HTTP handler
poolsRepository := poolsredisrepo.New(appCodec, redisTxManager)
timeoutContext := time.Duration(config.ServerTimeoutDurationSecs) * time.Second
poolsUseCase := poolsUseCase.NewPoolsUsecase(timeoutContext, poolsRepository, redisTxManager, config.Pools, config.ChainGRPCGatewayEndpoint)
poolsUseCase := poolsUseCase.NewPoolsUsecase(config.Pools, config.ChainGRPCGatewayEndpoint)

// Initialize router repository, usecase
routerRepository := routerredisrepo.New(redisTxManager, 0)
routerUsecase := routerUseCase.NewRouterUsecase(timeoutContext, routerRepository, poolsUseCase, *config.Router, poolsUseCase.GetCosmWasmPoolConfig(), logger, cache.New(), cache.New())
routerRepository := routerrepo.New()
routerUsecase := routerUseCase.NewRouterUsecase(routerRepository, poolsUseCase, *config.Router, poolsUseCase.GetCosmWasmPoolConfig(), logger, cache.New(), cache.New())

// Initialize system handler
chainInfoRepository := chaininforedisrepo.New(redisTxManager)
chainInfoUseCase := chaininfousecase.NewChainInfoUsecase(timeoutContext, chainInfoRepository, redisTxManager)
chainInfoRepository := chaininforepo.New()
chainInfoUseCase := chaininfousecase.NewChainInfoUsecase(chainInfoRepository)

// Compute token metadata from chain denom.
tokenMetadataByChainDenom, err := tokensUseCase.GetTokensFromChainRegistry(config.ChainRegistryAssetsFileURL)
Expand All @@ -160,11 +109,11 @@ func NewSideCarQueryServer(appCodec codec.Codec, config domain.Config, logger lo
}

// Initialized tokens usecase
tokensUseCase := tokensUseCase.NewTokensUsecase(timeoutContext, tokenMetadataByChainDenom)
tokensUseCase := tokensUseCase.NewTokensUsecase(tokenMetadataByChainDenom)

// HTTP handlers
poolsHttpDelivery.NewPoolsHandler(e, poolsUseCase)
systemhttpdelivery.NewSystemHandler(e, redisAddress, config, logger, chainInfoUseCase)
systemhttpdelivery.NewSystemHandler(e, config, logger, chainInfoUseCase)
if err := tokenshttpdelivery.NewTokensHandler(e, *config.Pricing, tokensUseCase, routerUsecase, logger); err != nil {
return nil, err
}
Expand All @@ -174,7 +123,7 @@ func NewSideCarQueryServer(appCodec codec.Codec, config domain.Config, logger lo
grpcIngesterConfig := config.GRPCIngester
if grpcIngesterConfig.Enabeld {
// Initialize ingest handler and usecase
ingestUseCase, err := ingestusecase.NewIngestUsecase(logger)
ingestUseCase, err := ingestusecase.NewIngestUsecase(poolsUseCase, routerUsecase, chainInfoUseCase, tokensUseCase, appCodec, *config.Pricing, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -206,12 +155,10 @@ func NewSideCarQueryServer(appCodec codec.Codec, config domain.Config, logger lo
}()

return &sideCarQueryServer{
txManager: redisTxManager,
chainInfoRepository: chainInfoRepository,
routerRepository: routerRepository,
tokensUseCase: tokensUseCase,
logger: logger,
e: e,
sqsAddress: config.ServerAddress,
routerRepository: routerRepository,
tokensUseCase: tokensUseCase,
logger: logger,
e: e,
sqsAddress: config.ServerAddress,
}, nil
}
3 changes: 1 addition & 2 deletions app/sqs_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ var DefaultConfig = domain.Config{
StorageHost: "localhost",
StoragePort: "6379",

ServerAddress: ":9092",
ServerTimeoutDurationSecs: 2,
ServerAddress: ":9092",

LoggerFilename: "sqs.log",
LoggerIsProduction: true,
Expand Down
45 changes: 45 additions & 0 deletions chaininfo/repository/chaininfo_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package chaininforepo

import (
"sync"
)

// ChainInfoRepository represents the contract for a repository handling chain information
type ChainInfoRepository interface {
// StoreLatestHeight stores the latest blockchain height
StoreLatestHeight(height uint64)

// GetLatestHeight retrieves the latest blockchain height
GetLatestHeight() uint64
}

var _ ChainInfoRepository = &chainInfoRepo{}

type chainInfoRepo struct {
latestHeight uint64
mu sync.RWMutex
}

// New creates a new repository for chain information
func New() ChainInfoRepository {
return &chainInfoRepo{
latestHeight: 0,
mu: sync.RWMutex{},
}
}

// StoreLatestHeight stores the latest blockchain height into store
func (r *chainInfoRepo) StoreLatestHeight(height uint64) {
r.mu.Lock()
defer r.mu.Unlock()

r.latestHeight = height
}

// GetLatestHeight retrieves the latest blockchain height store.
func (r *chainInfoRepo) GetLatestHeight() uint64 {
r.mu.RLock()
defer r.mu.RUnlock()

return r.latestHeight
}
24 changes: 24 additions & 0 deletions chaininfo/repository/chaininfo_repository_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package chaininforepo_test

import (
"testing"

chaininforepo "github.com/osmosis-labs/sqs/chaininfo/repository"
"github.com/stretchr/testify/require"
)

// TestStoreAndGetLatestHeight tests storing the latest blockchain height
func TestStoreAndGetLatestHeight(t *testing.T) {
repo := chaininforepo.New()
height := uint64(100)

repo.StoreLatestHeight(height)

require.Equal(t, height, repo.GetLatestHeight())

// change height
height = 200
repo.StoreLatestHeight(height)

require.Equal(t, height, repo.GetLatestHeight())
}
29 changes: 11 additions & 18 deletions chaininfo/usecase/chain_info_usecase.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
package usecase

import (
"context"
"sync"
"time"

chaininforepo "github.com/osmosis-labs/sqs/chaininfo/repository"
"github.com/osmosis-labs/sqs/domain"
"github.com/osmosis-labs/sqs/sqsdomain/repository"
chaininforedisrepo "github.com/osmosis-labs/sqs/sqsdomain/repository/redis/chaininfo"

"github.com/osmosis-labs/sqs/domain/mvc"
)

type chainInfoUseCase struct {
contextTimeout time.Duration
chainInfoRepository chaininforedisrepo.ChainInfoRepository
redisRepositoryManager repository.TxManager
chainInfoRepository chaininforepo.ChainInfoRepository

// N.B. sometimes the node gets stuck and does not make progress.
// However, it returns 200 OK for the status endpoint and claims to be not catching up.
Expand All @@ -33,24 +29,16 @@ const MaxAllowedHeightUpdateTimeDeltaSecs = 30

var _ mvc.ChainInfoUsecase = &chainInfoUseCase{}

func NewChainInfoUsecase(timeout time.Duration, chainInfoRepository chaininforedisrepo.ChainInfoRepository, redisRepositoryManager repository.TxManager) mvc.ChainInfoUsecase {
func NewChainInfoUsecase(chainInfoRepository chaininforepo.ChainInfoRepository) mvc.ChainInfoUsecase {
return &chainInfoUseCase{
contextTimeout: timeout,
chainInfoRepository: chainInfoRepository,
redisRepositoryManager: redisRepositoryManager,
chainInfoRepository: chainInfoRepository,

lastSeenMx: sync.Mutex{},
}
}

func (p *chainInfoUseCase) GetLatestHeight(ctx context.Context) (uint64, error) {
ctx, cancel := context.WithTimeout(ctx, p.contextTimeout)
defer cancel()

latestHeight, err := p.chainInfoRepository.GetLatestHeight(ctx)
if err != nil {
return 0, err
}
func (p *chainInfoUseCase) GetLatestHeight() (uint64, error) {
latestHeight := p.chainInfoRepository.GetLatestHeight()

p.lastSeenMx.Lock()
defer p.lastSeenMx.Unlock()
Expand All @@ -77,3 +65,8 @@ func (p *chainInfoUseCase) GetLatestHeight(ctx context.Context) (uint64, error)

return latestHeight, nil
}

// StoreLatestHeight implements mvc.ChainInfoUsecase.
func (p *chainInfoUseCase) StoreLatestHeight(height uint64) {
p.chainInfoRepository.StoreLatestHeight(height)
}
Loading

0 comments on commit 7f25cef

Please sign in to comment.