Skip to content

Commit

Permalink
Add DLQ for reprocessing of failed blocks (#61)
Browse files Browse the repository at this point in the history
Adds a DLQ for block gaps and failed blocks to be reprocessed as per
internal design doc.

Incidental bug fixes:
- Fixes a memory leak in `collectedBlocks` that would have existed when
`SkipFailedBlocks` was enabled
- Makes an assertion conditional so it only works when we aren't
skipping failed blocks. If we skip the last block that would have gone
into a batch then we were panicking.
- Fixes a rare hang during an attempted clean shutdown if the context
closes when we are writing to the channel
  • Loading branch information
adammilnesmith authored Jul 12, 2024
1 parent 67dbd31 commit e491b73
Show file tree
Hide file tree
Showing 14 changed files with 736 additions and 54 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ go.work
go.work.sum

# Binary
indexer
indexer
bin

.idea
75 changes: 75 additions & 0 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type BlockchainIngester interface {
// PostProgressReport sends a progress report to DuneAPI
PostProgressReport(ctx context.Context, progress models.BlockchainIndexProgress) error

GetBlockGaps(ctx context.Context) (*models.BlockchainGaps, error)

// - API to discover the latest block number ingested
// this can also provide "next block ranges" to push to DuneAPI
// - log/metrics on catching up/falling behind, distance from tip of chain
Expand Down Expand Up @@ -366,3 +368,76 @@ func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndex
}
return progress, nil
}

func (c *client) GetBlockGaps(ctx context.Context) (*models.BlockchainGaps, error) {
var response BlockchainGapsResponse
var err error
var responseStatus string
start := time.Now()

// Log response
defer func() {
if err != nil {
c.log.Error("Getting block gaps failed",
"error", err,
"statusCode", responseStatus,
"duration", time.Since(start),
)
} else {
c.log.Info("Got block gaps",
"blockGaps", response.String(),
"duration", time.Since(start),
)
}
}()

url := fmt.Sprintf("%s/api/beta/blockchain/%s/gaps", c.cfg.URL, c.cfg.BlockchainName)
c.log.Debug("Sending request", "url", url)
req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) // nil: empty body
if err != nil {
return nil, err
}
req.Header.Set("x-dune-api-key", c.cfg.APIKey)
req = req.WithContext(ctx)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
bs, _ := io.ReadAll(resp.Body)
responseBody := string(bs)
// We mutate the global err here because we have deferred a log message where we check for non-nil err
err = fmt.Errorf("unexpected status code: %v, %v with body '%s'", resp.StatusCode, resp.Status, responseBody)
return nil, err
}

err = json.Unmarshal(responseBody, &response)
if err != nil {
return nil, err
}

gaps := &models.BlockchainGaps{
Gaps: mapSlice(response.Gaps, func(gap BlockGap) models.BlockGap {
return models.BlockGap{
FirstMissing: gap.FirstMissing,
LastMissing: gap.LastMissing,
}
}),
}
return gaps, nil
}

func mapSlice[T any, U any](slice []T, mapper func(T) U) []U {
result := make([]U, len(slice))
for i, v := range slice {
result[i] = mapper(v)
}
return result
}
14 changes: 14 additions & 0 deletions client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,17 @@ type BlockchainError struct {
Error string `json:"error"`
Source string `json:"source"`
}

type BlockchainGapsResponse struct {
Gaps []BlockGap `json:"gaps"`
}

// BlockGap declares an inclusive range of missing block numbers
type BlockGap struct {
FirstMissing int64 `json:"first_missing"`
LastMissing int64 `json:"last_missing"`
}

func (b *BlockchainGapsResponse) String() string {
return fmt.Sprintf("%+v", *b)
}
44 changes: 37 additions & 7 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/duneanalytics/blockchain-ingester/client/jsonrpc"
"github.com/duneanalytics/blockchain-ingester/config"
"github.com/duneanalytics/blockchain-ingester/ingester"
"github.com/duneanalytics/blockchain-ingester/lib/dlq"
"github.com/duneanalytics/blockchain-ingester/models"
)

Expand Down Expand Up @@ -59,6 +60,19 @@ func main() {
}
defer duneClient.Close()

// Create an extra Dune API client for DLQ processing since it is not thread-safe yet
duneClientDLQ, err := duneapi.New(logger, duneapi.Config{
APIKey: cfg.Dune.APIKey,
URL: cfg.Dune.URL,
BlockchainName: cfg.BlockchainName,
Stack: cfg.RPCStack,
DisableCompression: cfg.DisableCompression,
})
if err != nil {
stdlog.Fatal(err)
}
defer duneClientDLQ.Close()

var wg stdsync.WaitGroup
var rpcClient jsonrpc.BlockchainClient

Expand Down Expand Up @@ -101,21 +115,37 @@ func main() {
startBlockNumber = cfg.BlockHeight
}

dlqBlockNumbers := dlq.NewDLQWithDelay[int64](dlq.RetryDelayLinear(cfg.DLQRetryInterval))

if !cfg.DisableGapsQuery {
blockGaps, err := duneClient.GetBlockGaps(ctx)
if err != nil {
stdlog.Fatal(err)
} else {
ingester.AddBlockGaps(dlqBlockNumbers, blockGaps.Gaps)
}
}

maxCount := int64(0) // 0 means ingest until cancelled
ingester := ingester.New(
logger,
rpcClient,
duneClient,
duneClientDLQ,
ingester.Config{
MaxConcurrentRequests: cfg.RPCConcurrency,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
Stack: cfg.RPCStack,
BlockchainName: cfg.BlockchainName,
BlockSubmitInterval: cfg.BlockSubmitInterval,
SkipFailedBlocks: cfg.RPCNode.SkipFailedBlocks,
MaxConcurrentRequests: cfg.RPCConcurrency,
MaxConcurrentRequestsDLQ: cfg.DLQConcurrency,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
PollDLQInterval: cfg.PollDLQInterval,
Stack: cfg.RPCStack,
BlockchainName: cfg.BlockchainName,
BlockSubmitInterval: cfg.BlockSubmitInterval,
SkipFailedBlocks: cfg.RPCNode.SkipFailedBlocks,
DLQOnly: cfg.DLQOnly,
},
progress,
dlqBlockNumbers,
)

wg.Add(1)
Expand Down
11 changes: 8 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,20 @@ func (r RPCClient) HasError() error {
}

type Config struct {
BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll
BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll
DisableCompression bool `long:"disable-compression" env:"DISABLE_COMPRESSION" description:"disable compression when sending data to Dune"` // nolint:lll
BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll
BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll
DisableCompression bool `long:"disable-compression" env:"DISABLE_COMPRESSION" description:"disable compression when sending data to Dune"` // nolint:lll
DisableGapsQuery bool `long:"disable-gaps-query" env:"DISABLE_GAPS_QUERY" description:"disable gaps query used to populate the initial DLQ"` // nolint:lll
DLQOnly bool `long:"dlq-only" env:"DLQ_ONLY" description:"Runs just the DLQ processing on its own"` // nolint:lll
Dune DuneClient
PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll
PollDLQInterval time.Duration `long:"dlq-poll-interval" env:"DLQ_POLL_INTERVAL" description:"Interval to poll the dlq" default:"300ms"` // nolint:lll
DLQRetryInterval time.Duration `long:"dlq-retry-interval" env:"DLQ_RETRY_INTERVAL" description:"Interval for linear backoff in DLQ " default:"1m"` // nolint:lll
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
RPCConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent requests to the RPC node" default:"25"` // nolint:lll
DLQConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent requests to the RPC node for DLQ processing" default:"2"` // nolint:lll
BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll
LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/duneanalytics/blockchain-ingester
go 1.22.2

require (
github.com/emirpasic/gods v1.18.1
github.com/go-errors/errors v1.5.1
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/jessevdk/go-flags v1.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk=
Expand Down
69 changes: 44 additions & 25 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"log/slog"
"time"

"github.com/duneanalytics/blockchain-ingester/lib/dlq"

"github.com/duneanalytics/blockchain-ingester/client/duneapi"
"github.com/duneanalytics/blockchain-ingester/client/jsonrpc"
"github.com/duneanalytics/blockchain-ingester/models"
Expand All @@ -24,67 +26,84 @@ type Ingester interface {
// It can safely be run concurrently.
FetchBlockLoop(context.Context, chan int64, chan models.RPCBlock) error

// SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop
// SendBlocks consumes RPCBlocks from the channel, reorders them, and sends batches to DuneAPI in an endless loop
// it will block until:
// - the context is cancelled
// - channel is closed
// - a fatal error occurs
SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startFrom int64) error

// This is just a placeholder for now
Info() Info
// ProduceBlockNumbersDLQ sends block numbers from the DLQ to outChan.
// It will run continuously until the context is cancelled.
// When the DLQ does not return an eligible next block, it waits for PollDLQInterval before trying again
ProduceBlockNumbersDLQ(ctx context.Context, outChan chan dlq.Item[int64]) error

// FetchBlockLoopDLQ fetches blocks sent on the channel and sends them on the other channel.
// It will run continuously until the context is cancelled, or the channel is closed.
// It can safely be run concurrently.
FetchBlockLoopDLQ(ctx context.Context,
blockNumbers <-chan dlq.Item[int64],
blocks chan<- dlq.Item[models.RPCBlock],
) error

// SendBlocksDLQ pushes one RPCBlock at a time to DuneAPI in the order they are received in
SendBlocksDLQ(ctx context.Context, blocks <-chan dlq.Item[models.RPCBlock]) error

Close() error
}

const (
defaultMaxBatchSize = 5
defaultReportProgressInterval = 30 * time.Second
)

type Config struct {
MaxConcurrentRequests int
PollInterval time.Duration
ReportProgressInterval time.Duration
Stack models.EVMStack
BlockchainName string
BlockSubmitInterval time.Duration
SkipFailedBlocks bool
MaxConcurrentRequests int
MaxConcurrentRequestsDLQ int
PollInterval time.Duration
PollDLQInterval time.Duration
ReportProgressInterval time.Duration
Stack models.EVMStack
BlockchainName string
BlockSubmitInterval time.Duration
SkipFailedBlocks bool
DLQOnly bool
}

type ingester struct {
log *slog.Logger
node jsonrpc.BlockchainClient
dune duneapi.BlockchainIngester
cfg Config
info Info
log *slog.Logger
node jsonrpc.BlockchainClient
dune duneapi.BlockchainIngester
duneDLQ duneapi.BlockchainIngester
cfg Config
info Info
dlq *dlq.DLQ[int64]
}

func New(
log *slog.Logger,
node jsonrpc.BlockchainClient,
dune duneapi.BlockchainIngester,
duneDLQ duneapi.BlockchainIngester,
cfg Config,
progress *models.BlockchainIndexProgress,
dlq *dlq.DLQ[int64],
) Ingester {
info := NewInfo(cfg.BlockchainName, cfg.Stack.String())
if progress != nil {
info.LatestBlockNumber = progress.LatestBlockNumber
info.IngestedBlockNumber = progress.LastIngestedBlockNumber
}
ing := &ingester{
log: log.With("module", "ingester"),
node: node,
dune: dune,
cfg: cfg,
info: info,
log: log.With("module", "ingester"),
node: node,
dune: dune,
duneDLQ: duneDLQ,
cfg: cfg,
info: info,
dlq: dlq,
}
if ing.cfg.ReportProgressInterval == 0 {
ing.cfg.ReportProgressInterval = defaultReportProgressInterval
}
return ing
}

func (i *ingester) Info() Info {
return i.info
}
Loading

0 comments on commit e491b73

Please sign in to comment.