Skip to content

Commit

Permalink
Merge pull request #118 from berachain/v2-dev
Browse files Browse the repository at this point in the history
feat: merge latest changes to v2
  • Loading branch information
hunter-bera authored Sep 25, 2024
2 parents 6fda76c + 66e63c0 commit b1c8d80
Show file tree
Hide file tree
Showing 19 changed files with 540 additions and 663 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
matrix:
args: ["build", "lint", "test"]
os: [ubuntu-latest]
go-version: [1.21.0]
go-version: [1.23.1]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout
Expand Down
10 changes: 3 additions & 7 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ run:
# Default: 1m
timeout: 3m


# This file contains only configs which differ from defaults.
# All possible options can be found here https://github.com/golangci/golangci-lint/blob/master/.golangci.reference.yml
linters-settings:
Expand Down Expand Up @@ -96,7 +95,6 @@ linters-settings:
# Default: 40
statements: 50


gci:
sections:
- standard # Standard section: captures all standard packages.
Expand Down Expand Up @@ -130,7 +128,7 @@ linters-settings:
# Default: true
skipRecvDeref: false

gomnd:
mnd:
# List of function patterns to exclude from analysis.
# Values always ignored: `time.Date`,
# `strconv.FormatInt`, `strconv.FormatUint`, `strconv.FormatFloat`,
Expand Down Expand Up @@ -179,7 +177,7 @@ linters-settings:
nolintlint:
# Exclude following linters from requiring an explanation.
# Default: []
allow-no-explanation: [ funlen, gocognit, lll ]
allow-no-explanation: [funlen, gocognit, lll]
# Enable to require an explanation of nonzero length after each nolint directive.
# Default: false
require-explanation: true
Expand Down Expand Up @@ -221,9 +219,7 @@ linters:
- durationcheck # checks for two durations multiplied together
- errname # checks that sentinel errors are prefixed with the Err and error types are suffixed with the Error
- errorlint # finds code that will cause problems with the error wrapping scheme introduced in Go 1.13
- execinquery # checks query string in Query function which reads your Go src files and warning it finds
- exhaustive # checks exhaustiveness of enum switch statements
- exportloopref # checks for pointers to enclosing loop variables
- forbidigo # forbids identifiers
- funlen # tool for detection of long functions
- gci # controls golang package import order and makes it always deterministic
Expand All @@ -236,7 +232,7 @@ linters:
- godot # checks if comments end in a period
- goheader # checks is file header matches to pattern
- goimports # in addition to fixing imports, goimports also formats your code in the same style as gofmt
- gomnd # detects magic numbers
- mnd # detects magic numbers
# - gomoddirectives # manages the use of 'replace', 'retract', and 'excludes' directives in go.mod
# - gomodguard # allow and block lists linter for direct Go module dependencies. This is different from depguard where there are different block types for example version constraints and module recommendations
- goprintffuncname # checks that printf-like functions are named with f at the end
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ generate: |
go generate ./...

tidy: |
go mod tidy
go mod tidy
8 changes: 8 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type BaseApp struct {

// svr is the server for the baseapp.
svr *server.Server

// chain is the eth client for the baseapp.
chain eth.Client
}

// New creates a new baseapp.
Expand Down Expand Up @@ -86,3 +89,8 @@ func (b *BaseApp) Stop() {
b.svr.Stop()
}
}

// Chain returns the eth client for the baseapp.
func (b *BaseApp) Chain() eth.Client {
return b.chain
}
2 changes: 1 addition & 1 deletion baseapp/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewManager(
// TODO: read pool configs from the config file.

// Setup the producer worker pool.
jobCount := uint16(m.jobRegistry.Count())
jobCount := uint16(m.jobRegistry.Count()) //nolint:gosec // safe to convert.
m.producerCfg = &worker.PoolConfig{
Name: producerName,
PrometheusPrefix: producerPromName,
Expand Down
73 changes: 56 additions & 17 deletions client/eth/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,18 @@ func NewConnectionPoolImpl(cfg ConnectionPoolConfig, logger log.Logger) (Connect
cfg.HealthCheckInterval = defaultHealthCheckInterval
}

cache, err := lru.NewWithEvict(
len(cfg.EthHTTPURLs), func(_ string, v *HealthCheckedClient) {
defer v.Close()
// The timeout is added so that any in progress
// requests have a chance to complete before we close.
time.Sleep(cfg.DefaultTimeout)
})
if err != nil {
return nil, err
var (
cache *lru.Cache[string, *HealthCheckedClient]
wsCache *lru.Cache[string, *HealthCheckedClient]
err error
)

// The LRU cache needs at least one URL provided for HTTP.
if len(cfg.EthHTTPURLs) == 0 {
return nil, fmt.Errorf("ConnectionPool: missing URL for HTTP clients")
}
wsCache, err := lru.NewWithEvict(

cache, err = lru.NewWithEvict(
len(cfg.EthHTTPURLs), func(_ string, v *HealthCheckedClient) {
defer v.Close()
// The timeout is added so that any in progress
Expand All @@ -57,6 +58,21 @@ func NewConnectionPoolImpl(cfg ConnectionPoolConfig, logger log.Logger) (Connect
return nil, err
}

if len(cfg.EthWSURLs) == 0 {
logger.Warn("ConnectionPool: missing URL for WS clients")
} else {
wsCache, err = lru.NewWithEvict(
len(cfg.EthWSURLs), func(_ string, v *HealthCheckedClient) {
defer v.Close()
// The timeout is added so that any in progress
// requests have a chance to complete before we close.
time.Sleep(cfg.DefaultTimeout)
})
if err != nil {
return nil, err
}
}

return &ConnectionPoolImpl{
cache: cache,
wsCache: wsCache,
Expand All @@ -68,6 +84,11 @@ func NewConnectionPoolImpl(cfg ConnectionPoolConfig, logger log.Logger) (Connect
func (c *ConnectionPoolImpl) Close() error {
c.mutex.Lock()
defer c.mutex.Unlock()

if c.cache == nil {
return nil
}

for _, client := range c.cache.Keys() {
if err := c.removeClient(client); err != nil {
return err
Expand All @@ -81,13 +102,21 @@ func (c *ConnectionPoolImpl) Dial(string) error {
}

func (c *ConnectionPoolImpl) DialContext(ctx context.Context, _ string) error {
// NOTE: Check the cache for the HTTP URL is not needed because it
// is guaranteed to be non-nil when a ConnectionPoolImpl is created.
for _, url := range c.config.EthHTTPURLs {
client := NewHealthCheckedClient(c.config.HealthCheckInterval, c.logger)
if err := client.DialContextWithTimeout(ctx, url, c.config.DefaultTimeout); err != nil {
return err
}
c.cache.Add(url, client)
}

// Check is needed because the WS URL is optional.
if c.wsCache == nil {
return nil
}

for _, url := range c.config.EthWSURLs {
client := NewHealthCheckedClient(c.config.HealthCheckInterval, c.logger)
if err := client.DialContextWithTimeout(ctx, url, c.config.DefaultTimeout); err != nil {
Expand All @@ -98,22 +127,32 @@ func (c *ConnectionPoolImpl) DialContext(ctx context.Context, _ string) error {
return nil
}

// NOTE: this function assumes the cache is non-nil
// because it is guaranteed to be non-nil when a ConnectionPoolImpl is created.
func (c *ConnectionPoolImpl) GetHTTP() (Client, bool) {
c.mutex.Lock()
defer c.mutex.Unlock()
retry:
_, client, ok := c.cache.GetOldest()
if !client.Health() {
goto retry
}
return client, ok

return c.getClientFrom(c.cache)
}

func (c *ConnectionPoolImpl) GetWS() (Client, bool) {
c.mutex.Lock()
defer c.mutex.Unlock()

// Because the WS URL is optional, we need to check if it's nil.
if c.wsCache == nil {
return nil, false
}
return c.getClientFrom(c.wsCache)
}

// NOTE: this function assumes the lock is held and cache is non-nil.
func (c *ConnectionPoolImpl) getClientFrom(
cache *lru.Cache[string, *HealthCheckedClient],
) (Client, bool) {
retry:
_, client, ok := c.wsCache.GetOldest()
_, client, ok := cache.GetOldest()
if !client.Health() {
goto retry
}
Expand Down
134 changes: 134 additions & 0 deletions client/eth/connection_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package eth_test

import (
"bytes"
"io"
"os"
"testing"

"github.com/berachain/offchain-sdk/v2/client/eth"
"github.com/berachain/offchain-sdk/v2/log"
"github.com/stretchr/testify/require"
)

var (
HTTPURL = os.Getenv("ETH_HTTP_URL")
WSURL = os.Getenv("ETH_WS_URL")
)

/******************************* HELPER FUNCTIONS ***************************************/

// NOTE: requires chain rpc url at env var `ETH_HTTP_URL` and `ETH_WS_URL`.
func checkEnv(t *testing.T) {
ethHTTPRPC := os.Getenv("ETH_HTTP_URL")
ethWSRPC := os.Getenv("ETH_WS_URL")
if ethHTTPRPC == "" || ethWSRPC == "" {
t.Skipf("Skipping test: no eth rpc url provided")
}
}

// initConnectionPool initializes a new connection pool.
func initConnectionPool(
cfg eth.ConnectionPoolConfig, writer io.Writer,
) (eth.ConnectionPool, error) {
logger := log.NewLogger(writer, "test-runner")
return eth.NewConnectionPoolImpl(cfg, logger)
}

// Use Init function as a setup function for the tests.
// It means each test will have to call Init function to set up the test.
func Init(
cfg eth.ConnectionPoolConfig, writer io.Writer, t *testing.T,
) (eth.ConnectionPool, error) {
checkEnv(t)
return initConnectionPool(cfg, writer)
}

/******************************* TEST CASES ***************************************/

// TestNewConnectionPoolImpl_MissingURLs tests the case when the URLs are missing.
func TestNewConnectionPoolImpl_MissingURLs(t *testing.T) {
cfg := eth.ConnectionPoolConfig{}
var logBuffer bytes.Buffer

_, err := Init(cfg, &logBuffer, t)
require.ErrorContains(t, err, "ConnectionPool: missing URL for HTTP clients")
}

// TestNewConnectionPoolImpl_MissingWSURLs tests the case when the WS URLs are missing.
func TestNewConnectionPoolImpl_MissingWSURLs(t *testing.T) {
cfg := eth.ConnectionPoolConfig{
EthHTTPURLs: []string{HTTPURL},
}
var logBuffer bytes.Buffer
pool, err := Init(cfg, &logBuffer, t)

require.NoError(t, err)
require.NotNil(t, pool)
require.Contains(t, logBuffer.String(), "ConnectionPool: missing URL for WS clients")
}

// TestNewConnectionPoolImpl tests the case when the URLs are provided.
// It should the expected behavior.
func TestNewConnectionPoolImpl(t *testing.T) {
cfg := eth.ConnectionPoolConfig{
EthHTTPURLs: []string{HTTPURL},
EthWSURLs: []string{WSURL},
}
var logBuffer bytes.Buffer
pool, err := Init(cfg, &logBuffer, t)

require.NoError(t, err)
require.NotNil(t, pool)
require.Empty(t, logBuffer.String())
}

// TestGetHTTP tests the retrieval of the HTTP client when it
// has been set and the connection has been established.
func TestGetHTTP(t *testing.T) {
cfg := eth.ConnectionPoolConfig{
EthHTTPURLs: []string{HTTPURL},
}
var logBuffer bytes.Buffer
pool, _ := Init(cfg, &logBuffer, t)
err := pool.Dial("")
require.NoError(t, err)

client, ok := pool.GetHTTP()
require.True(t, ok)
require.NotNil(t, client)
}

// TestGetWS tests the retrieval of the HTTP client when it
// has been set and the connection has been established.
func TestGetWS(t *testing.T) {
cfg := eth.ConnectionPoolConfig{
EthHTTPURLs: []string{HTTPURL},
EthWSURLs: []string{WSURL},
}
var logBuffer bytes.Buffer
pool, _ := Init(cfg, &logBuffer, t)
err := pool.Dial("")

require.NoError(t, err)

client, ok := pool.GetWS()
require.True(t, ok)
require.NotNil(t, client)
}

// TestGetWS_WhenItIsNotSet tests the retrieval of the WS client when
// no WS URLs have been provided.
func TestGetWS_WhenItIsNotSet(t *testing.T) {
cfg := eth.ConnectionPoolConfig{
EthHTTPURLs: []string{HTTPURL},
}
var logBuffer bytes.Buffer
pool, _ := Init(cfg, &logBuffer, t)
err := pool.Dial("")
require.NoError(t, err)

client, ok := pool.GetWS()
require.False(t, ok)
require.Nil(t, client)
}
2 changes: 1 addition & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func StartCmd[C any](app coreapp.App[C], defaultAppHome string) *cobra.Command {
}

// StartCmdWithOptions runs the service passed in.
func StartCmdWithOptions[C any](
func StartCmdWithOptions[C any]( //nolint:gocognit // TODO: refactor
app coreapp.App[C], defaultAppHome string, _ StartCmdOptions,
) *cobra.Command {
cmd := &cobra.Command{
Expand Down
4 changes: 3 additions & 1 deletion core/transactor/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func (t *TxrV2) retrieveBatch(ctx context.Context) types.Requests {
}

// Get at most txsRemaining tx requests from the queue.
msgIDs, txReqs, err := t.requests.ReceiveMany(int32(txsRemaining))
msgIDs, txReqs, err := t.requests.ReceiveMany(
int32(txsRemaining), //nolint:gosec // safe to convert.
)
if err != nil {
t.logger.Error("failed to receive tx request", "err", err)
continue
Expand Down
4 changes: 2 additions & 2 deletions core/transactor/sender/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
)

var (
multiplier = big.NewInt(11500) //nolint:gomnd // its okay.
quotient = big.NewInt(10000) //nolint:gomnd // its okay.
multiplier = big.NewInt(11500) //nolint:mnd // its okay.
quotient = big.NewInt(10000) //nolint:mnd // its okay.
)

// BumpGas bumps the gas on a tx by a 15% increase.
Expand Down
2 changes: 1 addition & 1 deletion core/transactor/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (t *TxrV2) Execute(context.Context, any) (any, error) {
"🧠 system status",
"waiting-tx", acquired, "in-flight-tx", inFlight, "pending-requests", t.requests.Len(),
)
return nil, nil //nolint:nilnil // its okay.
return 1, nil
}

// IntervalTime implements job.Polling.
Expand Down
Loading

0 comments on commit b1c8d80

Please sign in to comment.