Skip to content

Commit

Permalink
bugfixes: small tidy ups and bug fixes (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
msf authored Jun 10, 2024
1 parent 5ec4bfa commit 3ff81f1
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 55 deletions.
22 changes: 11 additions & 11 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line
httpClient.CheckRetry = retryablehttp.DefaultRetryPolicy
httpClient.Backoff = retryablehttp.LinearJitterBackoff
return &client{
log: log,
log: log.With("module", "duneapi"),
httpClient: httpClient,
cfg: cfg,
compressor: comp,
Expand All @@ -63,12 +63,8 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line
// SendBlock sends a block to DuneAPI
// TODO: support batching multiple blocks in a single request
func (c *client) SendBlock(ctx context.Context, payload models.RPCBlock) error {
start := time.Now()
buffer := c.bufPool.Get().(*bytes.Buffer)
defer func() {
c.bufPool.Put(buffer)
c.log.Info("SendBlock", "payloadLength", len(payload.Payload), "duration", time.Since(start))
}()
defer c.bufPool.Put(buffer)

request, err := c.buildRequest(payload, buffer)
if err != nil {
Expand All @@ -82,15 +78,14 @@ func (c *client) buildRequest(payload models.RPCBlock, buffer *bytes.Buffer) (Bl

if c.cfg.DisableCompression {
request.Payload = payload.Payload
request.ContentType = "application/x-ndjson"
} else {
buffer.Reset()
c.compressor.Reset(buffer)
_, err := c.compressor.Write(payload.Payload)
if err != nil {
return request, err
}
request.ContentType = "application/zstd"
request.ContentEncoding = "application/zstd"
request.Payload = buffer.Bytes()
}
request.BlockNumber = payload.BlockNumber
Expand All @@ -111,24 +106,29 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques
"blockNumber", request.BlockNumber,
"error", err,
"statusCode", responseStatus,
"payloadSize", len(request.Payload),
"duration", time.Since(start),
)
} else {
c.log.Info("BLOCK INGESTED",
c.log.Info("BLOCK SENT",
"blockNumber", request.BlockNumber,
"response", response.String(),
"payloadSize", len(request.Payload),
"duration", time.Since(start),
)
}
}()

url := fmt.Sprintf("%s/beta/blockchain/%s/ingest", c.cfg.URL, c.cfg.BlockchainName)
url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest", c.cfg.URL, c.cfg.BlockchainName)
c.log.Debug("Sending request", "url", url)
req, err := retryablehttp.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(request.Payload))
if err != nil {
return err
}
req.Header.Set("Content-Type", request.ContentType)
if request.ContentEncoding != "" {
req.Header.Set("Content-Encoding", request.ContentEncoding)
}
req.Header.Set("Content-Type", "application/x-ndjson")
req.Header.Set("x-idempotency-key", request.IdempotencyKey)
req.Header.Set("x-dune-evm-stack", request.EVMStack)
req.Header.Set("x-dune-api-key", c.cfg.APIKey)
Expand Down
20 changes: 12 additions & 8 deletions client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package duneapi

import (
"fmt"
"sort"

"github.com/duneanalytics/blockchain-ingester/models"
)
Expand All @@ -27,18 +28,21 @@ type BlockchainIngestResponse struct {

type IngestedTableInfo struct {
Name string `json:"name"`
Rows int `json:"rows"`
Bytes int `json:"bytes"`
Rows int `json:"rows_written"`
Bytes int `json:"bytes_written"`
}

func (b *BlockchainIngestResponse) String() string {
return fmt.Sprintf("Ingested: %+v", b.Tables)
sort.Slice(b.Tables, func(i, j int) bool {
return b.Tables[i].Name < b.Tables[j].Name
})
return fmt.Sprintf("%+v", b.Tables)
}

type BlockchainIngestRequest struct {
BlockNumber int64
ContentType string
EVMStack string
IdempotencyKey string
Payload []byte
BlockNumber int64
ContentEncoding string
EVMStack string
IdempotencyKey string
Payload []byte
}
7 changes: 5 additions & 2 deletions client/jsonrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log/slog"
"net/http"
"sync"
"time"

"github.com/duneanalytics/blockchain-ingester/lib/hexutils"
"github.com/duneanalytics/blockchain-ingester/models"
Expand All @@ -21,7 +22,8 @@ type BlockchainClient interface {
}

const (
MaxRetries = 10
MaxRetries = 10
DefaultRequestTimeout = 30 * time.Second
)

type rpcClient struct {
Expand All @@ -37,6 +39,7 @@ func NewClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:dis
client.Logger = log
client.CheckRetry = retryablehttp.DefaultRetryPolicy
client.Backoff = retryablehttp.LinearJitterBackoff
client.HTTPClient.Timeout = DefaultRequestTimeout
rpc := &rpcClient{
client: client,
cfg: cfg,
Expand Down Expand Up @@ -81,7 +84,7 @@ func (c *rpcClient) LatestBlockNumber() (int64, error) {

// getResponseBody sends a request to the server and returns the response body
func (c *rpcClient) getResponseBody(
ctx context.Context, method string, params interface{}, output *bytes.Buffer,
ctx context.Context, method string, params []interface{}, output *bytes.Buffer,
) error {
reqData := map[string]interface{}{
"jsonrpc": "2.0",
Expand Down
4 changes: 2 additions & 2 deletions client/jsonrpc/opstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type OpStackClient struct {
var _ BlockchainClient = &OpStackClient{}

func NewOpStackClient(log *slog.Logger, cfg Config) (*OpStackClient, error) {
rpcClient, err := NewClient(log, cfg)
rpcClient, err := NewClient(log.With("module", "jsonrpc"), cfg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -81,7 +81,7 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m
// copy the responses in order
var buffer bytes.Buffer
for _, res := range results {
buffer.Grow(res.Len() + 1)
buffer.Grow(res.Len())
buffer.ReadFrom(res)
}
return models.RPCBlock{
Expand Down
5 changes: 3 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ func main() {
rpcClient,
duneClient,
ingester.Config{
PollInterval: cfg.PollInterval,
MaxBatchSize: 1,
MaxBatchSize: 1,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
},
)

Expand Down
17 changes: 9 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

type DuneClient struct {
APIKey string `long:"dune-api-key" env:"DUNE_API_KEY" description:"API key for DuneAPI"`
URL string `long:"dune-api-url" env:"DUNE_API_URL" description:"URL for DuneAPI" default:"https://api.dune.com/api"`
URL string `long:"dune-api-url" env:"DUNE_API_URL" description:"URL for DuneAPI" default:"https://api.dune.com"`
}

func (d DuneClient) HasError() error {
Expand All @@ -32,13 +32,14 @@ func (r RPCClient) HasError() error {
}

type Config struct {
BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll
BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll
EnableCompression bool `long:"enable-compression" env:"ENABLE_COMPRESSION" description:"enable compression when pushing payload to Dune"` // nolint:lll
Dune DuneClient
PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"500ms"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll
BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll
EnableCompression bool `long:"enable-compression" env:"ENABLE_COMPRESSION" description:"enable compression when pushing payload to Dune"` // nolint:lll
Dune DuneClient
PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
}

func (c Config) HasError() error {
Expand Down
21 changes: 16 additions & 5 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@ type Ingester interface {
Info() Info
}

const defaultMaxBatchSize = 1
const (
defaultMaxBatchSize = 1
defaultPollInterval = 1 * time.Second
defaultReportProgressInterval = 30 * time.Second
)

type Config struct {
MaxBatchSize int
PollInterval time.Duration
MaxBatchSize int
PollInterval time.Duration
ReportProgressInterval time.Duration
}

type Info struct {
Expand All @@ -61,7 +66,7 @@ type ingester struct {

func New(log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.BlockchainIngester, cfg Config) Ingester {
ing := &ingester{
log: log,
log: log.With("module", "ingester"),
node: node,
dune: dune,
cfg: cfg,
Expand All @@ -73,9 +78,15 @@ func New(log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.Blockchai
if ing.cfg.MaxBatchSize == 0 {
ing.cfg.MaxBatchSize = defaultMaxBatchSize
}
if ing.cfg.PollInterval == 0 {
ing.cfg.PollInterval = defaultPollInterval
}
if ing.cfg.ReportProgressInterval == 0 {
ing.cfg.ReportProgressInterval = defaultReportProgressInterval
}
return ing
}

func (i *ingester) Info() Info {
return Info{}
return i.info
}
Binary file added ingester/main
Binary file not shown.
46 changes: 29 additions & 17 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,33 @@ func (i *ingester) ConsumeBlocks(
dontStop := endBlockNumber <= startBlockNumber
latestBlockNumber := i.tryUpdateLatestBlockNumber()

waitForBlock := func(blockNumber, latestBlockNumber int64) int64 {
waitForBlock := func(ctx context.Context, blockNumber, latestBlockNumber int64) int64 {
for blockNumber > latestBlockNumber {
select {
case <-ctx.Done():
return latestBlockNumber
default:
case <-time.After(i.cfg.PollInterval):
}
i.log.Info(fmt.Sprintf("Waiting %v for block to be available..", i.cfg.PollInterval),
i.log.Debug(fmt.Sprintf("Waiting %v for block to be available..", i.cfg.PollInterval),
"blockNumber", blockNumber,
"latestBlockNumber", latestBlockNumber,
)
time.Sleep(i.cfg.PollInterval)
latestBlockNumber = i.tryUpdateLatestBlockNumber()
}
return latestBlockNumber
}

for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ {
latestBlockNumber = waitForBlock(blockNumber, latestBlockNumber)
latestBlockNumber = waitForBlock(ctx, blockNumber, latestBlockNumber)
startTime := time.Now()

block, err := i.node.BlockByNumber(ctx, blockNumber)
if err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("Context canceled, stopping..")
return err
}

i.log.Error("Failed to get block by number, continuing..",
"blockNumber", blockNumber,
"latestBlockNumber", latestBlockNumber,
Expand All @@ -90,10 +94,6 @@ func (i *ingester) ConsumeBlocks(
Error: err,
})

if errors.Is(err, context.Canceled) {
return err
}

// TODO: should I sleep (backoff) here?
continue
}
Expand Down Expand Up @@ -159,7 +159,7 @@ func (i *ingester) tryUpdateLatestBlockNumber() int64 {
}

func (i *ingester) ReportProgress(ctx context.Context) error {
timer := time.NewTicker(20 * time.Second)
timer := time.NewTicker(i.cfg.ReportProgressInterval)
defer timer.Stop()

previousTime := time.Now()
Expand All @@ -173,20 +173,32 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
case tNow := <-timer.C:
latest := atomic.LoadInt64(&i.info.LatestBlockNumber)
lastIngested := atomic.LoadInt64(&i.info.IngestedBlockNumber)
lastConsumed := atomic.LoadInt64(&i.info.ConsumedBlockNumber)

blocksPerSec := float64(lastIngested-previousIngested) / tNow.Sub(previousTime).Seconds()
newDistance := latest - lastIngested
fallingBehind := newDistance > (previousDistance + 1) // TODO: make is more stable

i.log.Info("Info",
rpcErrors := len(i.info.RPCErrors)
duneErrors := len(i.info.DuneErrors)
fields := []interface{}{
"blocksPerSec", fmt.Sprintf("%.2f", blocksPerSec),
"latestBlockNumber", latest,
"ingestedBlockNumber", lastIngested,
"consumedBlockNumber", lastConsumed,
"distanceFromLatest", latest-lastIngested,
"FallingBehind", fallingBehind,
"blocksPerSec", fmt.Sprintf("%.2f", blocksPerSec),
)
}
if fallingBehind {
fields = append(fields, "fallingBehind", fallingBehind)
}
if newDistance > 1 {
fields = append(fields, "distanceFromLatest", newDistance)
}
if rpcErrors > 0 {
fields = append(fields, "rpcErrors", rpcErrors)
}
if duneErrors > 0 {
fields = append(fields, "duneErrors", duneErrors)
}

i.log.Info("ProgressReport", fields...)
previousIngested = lastIngested
previousDistance = newDistance
previousTime = tNow
Expand Down

0 comments on commit 3ff81f1

Please sign in to comment.