Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: merge latest changes to v2 #118

Merged
merged 4 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
matrix:
args: ["build", "lint", "test"]
os: [ubuntu-latest]
go-version: [1.21.0]
go-version: [1.23.1]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout
Expand Down
10 changes: 3 additions & 7 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ run:
# Default: 1m
timeout: 3m


# This file contains only configs which differ from defaults.
# All possible options can be found here https://github.com/golangci/golangci-lint/blob/master/.golangci.reference.yml
linters-settings:
Expand Down Expand Up @@ -96,7 +95,6 @@ linters-settings:
# Default: 40
statements: 50


gci:
sections:
- standard # Standard section: captures all standard packages.
Expand Down Expand Up @@ -130,7 +128,7 @@ linters-settings:
# Default: true
skipRecvDeref: false

gomnd:
mnd:
# List of function patterns to exclude from analysis.
# Values always ignored: `time.Date`,
# `strconv.FormatInt`, `strconv.FormatUint`, `strconv.FormatFloat`,
Expand Down Expand Up @@ -179,7 +177,7 @@ linters-settings:
nolintlint:
# Exclude following linters from requiring an explanation.
# Default: []
allow-no-explanation: [ funlen, gocognit, lll ]
allow-no-explanation: [funlen, gocognit, lll]
# Enable to require an explanation of nonzero length after each nolint directive.
# Default: false
require-explanation: true
Expand Down Expand Up @@ -221,9 +219,7 @@ linters:
- durationcheck # checks for two durations multiplied together
- errname # checks that sentinel errors are prefixed with the Err and error types are suffixed with the Error
- errorlint # finds code that will cause problems with the error wrapping scheme introduced in Go 1.13
- execinquery # checks query string in Query function which reads your Go src files and warning it finds
- exhaustive # checks exhaustiveness of enum switch statements
- exportloopref # checks for pointers to enclosing loop variables
- forbidigo # forbids identifiers
- funlen # tool for detection of long functions
- gci # controls golang package import order and makes it always deterministic
Expand All @@ -236,7 +232,7 @@ linters:
- godot # checks if comments end in a period
- goheader # checks is file header matches to pattern
- goimports # in addition to fixing imports, goimports also formats your code in the same style as gofmt
- gomnd # detects magic numbers
- mnd # detects magic numbers
# - gomoddirectives # manages the use of 'replace', 'retract', and 'excludes' directives in go.mod
# - gomodguard # allow and block lists linter for direct Go module dependencies. This is different from depguard where there are different block types for example version constraints and module recommendations
- goprintffuncname # checks that printf-like functions are named with f at the end
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ generate: |
go generate ./...

tidy: |
go mod tidy
go mod tidy
8 changes: 8 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type BaseApp struct {

// svr is the server for the baseapp.
svr *server.Server

// chain is the eth client for the baseapp.
chain eth.Client
}

// New creates a new baseapp.
Expand Down Expand Up @@ -86,3 +89,8 @@ func (b *BaseApp) Stop() {
b.svr.Stop()
}
}

// Chain returns the eth client for the baseapp.
func (b *BaseApp) Chain() eth.Client {
return b.chain
}
2 changes: 1 addition & 1 deletion baseapp/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewManager(
// TODO: read pool configs from the config file.

// Setup the producer worker pool.
jobCount := uint16(m.jobRegistry.Count())
jobCount := uint16(m.jobRegistry.Count()) //nolint:gosec // safe to convert.
m.producerCfg = &worker.PoolConfig{
Name: producerName,
PrometheusPrefix: producerPromName,
Expand Down
73 changes: 56 additions & 17 deletions client/eth/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,18 @@ func NewConnectionPoolImpl(cfg ConnectionPoolConfig, logger log.Logger) (Connect
cfg.HealthCheckInterval = defaultHealthCheckInterval
}

cache, err := lru.NewWithEvict(
len(cfg.EthHTTPURLs), func(_ string, v *HealthCheckedClient) {
defer v.Close()
// The timeout is added so that any in progress
// requests have a chance to complete before we close.
time.Sleep(cfg.DefaultTimeout)
})
if err != nil {
return nil, err
var (
cache *lru.Cache[string, *HealthCheckedClient]
wsCache *lru.Cache[string, *HealthCheckedClient]
err error
)

// The LRU cache needs at least one URL provided for HTTP.
if len(cfg.EthHTTPURLs) == 0 {
return nil, fmt.Errorf("ConnectionPool: missing URL for HTTP clients")
}
wsCache, err := lru.NewWithEvict(

cache, err = lru.NewWithEvict(
len(cfg.EthHTTPURLs), func(_ string, v *HealthCheckedClient) {
defer v.Close()
// The timeout is added so that any in progress
Expand All @@ -57,6 +58,21 @@ func NewConnectionPoolImpl(cfg ConnectionPoolConfig, logger log.Logger) (Connect
return nil, err
}

if len(cfg.EthWSURLs) == 0 {
logger.Warn("ConnectionPool: missing URL for WS clients")
} else {
wsCache, err = lru.NewWithEvict(
len(cfg.EthWSURLs), func(_ string, v *HealthCheckedClient) {
defer v.Close()
// The timeout is added so that any in progress
// requests have a chance to complete before we close.
time.Sleep(cfg.DefaultTimeout)
})
if err != nil {
return nil, err
}
}

return &ConnectionPoolImpl{
cache: cache,
wsCache: wsCache,
Expand All @@ -68,6 +84,11 @@ func NewConnectionPoolImpl(cfg ConnectionPoolConfig, logger log.Logger) (Connect
func (c *ConnectionPoolImpl) Close() error {
c.mutex.Lock()
defer c.mutex.Unlock()

if c.cache == nil {
return nil
}

for _, client := range c.cache.Keys() {
if err := c.removeClient(client); err != nil {
return err
Expand All @@ -81,13 +102,21 @@ func (c *ConnectionPoolImpl) Dial(string) error {
}

func (c *ConnectionPoolImpl) DialContext(ctx context.Context, _ string) error {
// NOTE: Check the cache for the HTTP URL is not needed because it
// is guaranteed to be non-nil when a ConnectionPoolImpl is created.
for _, url := range c.config.EthHTTPURLs {
client := NewHealthCheckedClient(c.config.HealthCheckInterval, c.logger)
if err := client.DialContextWithTimeout(ctx, url, c.config.DefaultTimeout); err != nil {
return err
}
c.cache.Add(url, client)
}

// Check is needed because the WS URL is optional.
if c.wsCache == nil {
return nil
}

for _, url := range c.config.EthWSURLs {
client := NewHealthCheckedClient(c.config.HealthCheckInterval, c.logger)
if err := client.DialContextWithTimeout(ctx, url, c.config.DefaultTimeout); err != nil {
Expand All @@ -98,22 +127,32 @@ func (c *ConnectionPoolImpl) DialContext(ctx context.Context, _ string) error {
return nil
}

// NOTE: this function assumes the cache is non-nil
// because it is guaranteed to be non-nil when a ConnectionPoolImpl is created.
func (c *ConnectionPoolImpl) GetHTTP() (Client, bool) {
c.mutex.Lock()
defer c.mutex.Unlock()
retry:
_, client, ok := c.cache.GetOldest()
if !client.Health() {
goto retry
}
return client, ok

return c.getClientFrom(c.cache)
}

func (c *ConnectionPoolImpl) GetWS() (Client, bool) {
c.mutex.Lock()
defer c.mutex.Unlock()

// Because the WS URL is optional, we need to check if it's nil.
if c.wsCache == nil {
return nil, false
}
return c.getClientFrom(c.wsCache)
}

// NOTE: this function assumes the lock is held and cache is non-nil.
func (c *ConnectionPoolImpl) getClientFrom(
cache *lru.Cache[string, *HealthCheckedClient],
) (Client, bool) {
retry:
_, client, ok := c.wsCache.GetOldest()
_, client, ok := cache.GetOldest()
if !client.Health() {
goto retry
}
Expand Down
134 changes: 134 additions & 0 deletions client/eth/connection_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package eth_test

import (
"bytes"
"io"
"os"
"testing"

"github.com/berachain/offchain-sdk/v2/client/eth"
"github.com/berachain/offchain-sdk/v2/log"
"github.com/stretchr/testify/require"
)

var (
HTTPURL = os.Getenv("ETH_HTTP_URL")
WSURL = os.Getenv("ETH_WS_URL")
)

/******************************* HELPER FUNCTIONS ***************************************/

// NOTE: requires chain rpc url at env var `ETH_HTTP_URL` and `ETH_WS_URL`.
func checkEnv(t *testing.T) {
ethHTTPRPC := os.Getenv("ETH_HTTP_URL")
ethWSRPC := os.Getenv("ETH_WS_URL")
if ethHTTPRPC == "" || ethWSRPC == "" {
t.Skipf("Skipping test: no eth rpc url provided")
}
}

// initConnectionPool initializes a new connection pool.
func initConnectionPool(
cfg eth.ConnectionPoolConfig, writer io.Writer,
) (eth.ConnectionPool, error) {
logger := log.NewLogger(writer, "test-runner")
return eth.NewConnectionPoolImpl(cfg, logger)
}

// Use Init function as a setup function for the tests.
// It means each test will have to call Init function to set up the test.
func Init(
cfg eth.ConnectionPoolConfig, writer io.Writer, t *testing.T,
) (eth.ConnectionPool, error) {
checkEnv(t)
return initConnectionPool(cfg, writer)
}

/******************************* TEST CASES ***************************************/

// TestNewConnectionPoolImpl_MissingURLs tests the case when the URLs are missing.
func TestNewConnectionPoolImpl_MissingURLs(t *testing.T) {
cfg := eth.ConnectionPoolConfig{}
var logBuffer bytes.Buffer

_, err := Init(cfg, &logBuffer, t)
require.ErrorContains(t, err, "ConnectionPool: missing URL for HTTP clients")
}

// TestNewConnectionPoolImpl_MissingWSURLs tests the case when the WS URLs are missing.
func TestNewConnectionPoolImpl_MissingWSURLs(t *testing.T) {
cfg := eth.ConnectionPoolConfig{
EthHTTPURLs: []string{HTTPURL},
}
var logBuffer bytes.Buffer
pool, err := Init(cfg, &logBuffer, t)

require.NoError(t, err)
require.NotNil(t, pool)
require.Contains(t, logBuffer.String(), "ConnectionPool: missing URL for WS clients")
}

// TestNewConnectionPoolImpl tests the case when the URLs are provided.
// It should the expected behavior.
func TestNewConnectionPoolImpl(t *testing.T) {
cfg := eth.ConnectionPoolConfig{
EthHTTPURLs: []string{HTTPURL},
EthWSURLs: []string{WSURL},
}
var logBuffer bytes.Buffer
pool, err := Init(cfg, &logBuffer, t)

require.NoError(t, err)
require.NotNil(t, pool)
require.Empty(t, logBuffer.String())
}

// TestGetHTTP tests the retrieval of the HTTP client when it
// has been set and the connection has been established.
func TestGetHTTP(t *testing.T) {
cfg := eth.ConnectionPoolConfig{
EthHTTPURLs: []string{HTTPURL},
}
var logBuffer bytes.Buffer
pool, _ := Init(cfg, &logBuffer, t)
err := pool.Dial("")
require.NoError(t, err)

client, ok := pool.GetHTTP()
require.True(t, ok)
require.NotNil(t, client)
}

// TestGetWS tests the retrieval of the HTTP client when it
// has been set and the connection has been established.
func TestGetWS(t *testing.T) {
cfg := eth.ConnectionPoolConfig{
EthHTTPURLs: []string{HTTPURL},
EthWSURLs: []string{WSURL},
}
var logBuffer bytes.Buffer
pool, _ := Init(cfg, &logBuffer, t)
err := pool.Dial("")

require.NoError(t, err)

client, ok := pool.GetWS()
require.True(t, ok)
require.NotNil(t, client)
}

// TestGetWS_WhenItIsNotSet tests the retrieval of the WS client when
// no WS URLs have been provided.
func TestGetWS_WhenItIsNotSet(t *testing.T) {
cfg := eth.ConnectionPoolConfig{
EthHTTPURLs: []string{HTTPURL},
}
var logBuffer bytes.Buffer
pool, _ := Init(cfg, &logBuffer, t)
err := pool.Dial("")
require.NoError(t, err)

client, ok := pool.GetWS()
require.False(t, ok)
require.Nil(t, client)
}
2 changes: 1 addition & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func StartCmd[C any](app coreapp.App[C], defaultAppHome string) *cobra.Command {
}

// StartCmdWithOptions runs the service passed in.
func StartCmdWithOptions[C any](
func StartCmdWithOptions[C any]( //nolint:gocognit // TODO: refactor
app coreapp.App[C], defaultAppHome string, _ StartCmdOptions,
) *cobra.Command {
cmd := &cobra.Command{
Expand Down
4 changes: 3 additions & 1 deletion core/transactor/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func (t *TxrV2) retrieveBatch(ctx context.Context) types.Requests {
}

// Get at most txsRemaining tx requests from the queue.
msgIDs, txReqs, err := t.requests.ReceiveMany(int32(txsRemaining))
msgIDs, txReqs, err := t.requests.ReceiveMany(
int32(txsRemaining), //nolint:gosec // safe to convert.
)
if err != nil {
t.logger.Error("failed to receive tx request", "err", err)
continue
Expand Down
4 changes: 2 additions & 2 deletions core/transactor/sender/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
)

var (
multiplier = big.NewInt(11500) //nolint:gomnd // its okay.
quotient = big.NewInt(10000) //nolint:gomnd // its okay.
multiplier = big.NewInt(11500) //nolint:mnd // its okay.
quotient = big.NewInt(10000) //nolint:mnd // its okay.
)

// BumpGas bumps the gas on a tx by a 15% increase.
Expand Down
2 changes: 1 addition & 1 deletion core/transactor/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (t *TxrV2) Execute(context.Context, any) (any, error) {
"🧠 system status",
"waiting-tx", acquired, "in-flight-tx", inFlight, "pending-requests", t.requests.Len(),
)
return nil, nil //nolint:nilnil // its okay.
return 1, nil
}

// IntervalTime implements job.Polling.
Expand Down
Loading
Loading