Skip to content

Commit

Permalink
WS URL can be optional when LogBroadcaster is disabled (#14364)
Browse files Browse the repository at this point in the history
* WS URL can be optional

* add changeset

* change

* make WSURL optional

* fix test, and enforce SubscribeFilterLogs to fail when ws url not provided

* add comments

* update changeset

* update dial logic and make ws optional not required

* fix test

* fix

* fix lint

* address comments

* update comments

* fix test

* add check when both ws and http missing

* add test and add restrictions

* add comment

* revert outdated change

* remove extra line

* fix test

* revert changes from rpc client

* unintended change

* remove unused

* update verification logic

* add test fix

* modify unit test to cover logbroadcaster enabled false

* update doc

* udpate changeset

* address PR comments

* address pr comments

* update invalid toml config

* fix test

* ws required for primary nodes when logbroadcaster enabled

* minor

* Dmytro's comments

* fix nil ptr, more fix to come

* fix make

* refactor function sig

* fix test

* fix

* make ws pointer

* fix

* fix make

* address comments

* fix lint

* fix make

* fix make

* fix make

* fix rpc disconnect with optional ws url

---------

Co-authored-by: Dmytro Haidashenko <[email protected]>
  • Loading branch information
huangzhen1997 and dhaidashenko authored Oct 3, 2024
1 parent 659b75a commit 5d96be5
Show file tree
Hide file tree
Showing 13 changed files with 157 additions and 97 deletions.
8 changes: 8 additions & 0 deletions .changeset/silly-lies-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"chainlink": minor
---

Make websocket URL `WSURL` for `EVM.Nodes` optional, and apply logic so that:
* If WS URL was not provided, SubscribeFilterLogs should fail with an explicit error
* If WS URL was not provided LogBroadcaster should be disabled
#nops
17 changes: 11 additions & 6 deletions common/client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ type node[
services.StateMachine
lfcLog logger.Logger
name string
id int32
id int
chainID CHAIN_ID
nodePoolCfg NodeConfig
chainCfg ChainConfig
order int32
chainFamily string

ws url.URL
ws *url.URL
http *url.URL

rpc RPC
Expand All @@ -121,10 +121,10 @@ func NewNode[
nodeCfg NodeConfig,
chainCfg ChainConfig,
lggr logger.Logger,
wsuri url.URL,
wsuri *url.URL,
httpuri *url.URL,
name string,
id int32,
id int,
chainID CHAIN_ID,
nodeOrder int32,
rpc RPC,
Expand All @@ -136,8 +136,10 @@ func NewNode[
n.chainID = chainID
n.nodePoolCfg = nodeCfg
n.chainCfg = chainCfg
n.ws = wsuri
n.order = nodeOrder
if wsuri != nil {
n.ws = wsuri
}
if httpuri != nil {
n.http = httpuri
}
Expand All @@ -157,7 +159,10 @@ func NewNode[
}

func (n *node[CHAIN_ID, HEAD, RPC]) String() string {
s := fmt.Sprintf("(%s)%s:%s", Primary.String(), n.name, n.ws.String())
s := fmt.Sprintf("(%s)%s", Primary.String(), n.name)
if n.ws != nil {
s = s + fmt.Sprintf(":%s", n.ws.String())
}
if n.http != nil {
s = s + fmt.Sprintf(":%s", n.http.String())
}
Expand Down
4 changes: 2 additions & 2 deletions common/client/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ type testNodeOpts struct {
config testNodeConfig
chainConfig clientMocks.ChainConfig
lggr logger.Logger
wsuri url.URL
wsuri *url.URL
httpuri *url.URL
name string
id int32
id int
chainID types.ID
nodeOrder int32
rpc *mockNodeClient[types.ID, Head]
Expand Down
18 changes: 12 additions & 6 deletions core/chains/evm/client/config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,21 @@ func NewClientConfigs(
func parseNodeConfigs(nodeCfgs []NodeConfig) ([]*toml.Node, error) {
nodes := make([]*toml.Node, len(nodeCfgs))
for i, nodeCfg := range nodeCfgs {
if nodeCfg.WSURL == nil || nodeCfg.HTTPURL == nil {
return nil, fmt.Errorf("node config [%d]: missing WS or HTTP URL", i)
var wsURL, httpURL *commonconfig.URL
// wsUrl requirement will be checked in EVMConfig validation
if nodeCfg.WSURL != nil {
wsURL = commonconfig.MustParseURL(*nodeCfg.WSURL)
}
wsUrl := commonconfig.MustParseURL(*nodeCfg.WSURL)
httpUrl := commonconfig.MustParseURL(*nodeCfg.HTTPURL)

if nodeCfg.HTTPURL == nil {
return nil, fmt.Errorf("node config [%d]: missing HTTP URL", i)
}

httpURL = commonconfig.MustParseURL(*nodeCfg.HTTPURL)
node := &toml.Node{
Name: nodeCfg.Name,
WSURL: wsUrl,
HTTPURL: httpUrl,
WSURL: wsURL,
HTTPURL: httpURL,
SendOnly: nodeCfg.SendOnly,
Order: nodeCfg.Order,
}
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/client/config_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ func TestNodeConfigs(t *testing.T) {
require.Len(t, tomlNodes, len(nodeConfigs))
})

t.Run("parsing missing ws url fails", func(t *testing.T) {
t.Run("ws can be optional", func(t *testing.T) {
nodeConfigs := []client.NodeConfig{
{
Name: ptr("foo1"),
HTTPURL: ptr("http://foo1.test"),
},
}
_, err := client.ParseTestNodeConfigs(nodeConfigs)
require.Error(t, err)
require.Nil(t, err)
})

t.Run("parsing missing http url fails", func(t *testing.T) {
Expand Down
11 changes: 7 additions & 4 deletions core/chains/evm/client/evm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,25 @@ import (
)

func NewEvmClient(cfg evmconfig.NodePool, chainCfg commonclient.ChainConfig, clientErrors evmconfig.ClientErrors, lggr logger.Logger, chainID *big.Int, nodes []*toml.Node, chainType chaintype.ChainType) Client {
var empty url.URL
var primaries []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient]
var sendonlys []commonclient.SendOnlyNode[*big.Int, RPCClient]
largePayloadRPCTimeout, defaultRPCTimeout := getRPCTimeouts(chainType)
for i, node := range nodes {
var ws *url.URL
if node.WSURL != nil {
ws = (*url.URL)(node.WSURL)
}
if node.SendOnly != nil && *node.SendOnly {
rpc := NewRPCClient(lggr, empty, (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID,
rpc := NewRPCClient(lggr, nil, (*url.URL)(node.HTTPURL), *node.Name, i, chainID,
commonclient.Secondary, cfg.FinalizedBlockPollInterval(), cfg.NewHeadsPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
sendonly := commonclient.NewSendOnlyNode(lggr, (url.URL)(*node.HTTPURL),
*node.Name, chainID, rpc)
sendonlys = append(sendonlys, sendonly)
} else {
rpc := NewRPCClient(lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i),
rpc := NewRPCClient(lggr, ws, (*url.URL)(node.HTTPURL), *node.Name, i,
chainID, commonclient.Primary, cfg.FinalizedBlockPollInterval(), cfg.NewHeadsPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
primaryNode := commonclient.NewNode(cfg, chainCfg,
lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID, *node.Order,
lggr, ws, (*url.URL)(node.HTTPURL), *node.Name, i, chainID, *node.Order,
rpc, "EVM")
primaries = append(primaries, primaryNode)
}
Expand Down
10 changes: 5 additions & 5 deletions core/chains/evm/client/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func NewChainClientWithTestNode(
rpcUrl string,
rpcHTTPURL *url.URL,
sendonlyRPCURLs []url.URL,
id int32,
id int,
chainID *big.Int,
) (Client, error) {
parsed, err := url.ParseRequestURI(rpcUrl)
Expand All @@ -148,10 +148,10 @@ func NewChainClientWithTestNode(
}

lggr := logger.Test(t)
rpc := NewRPCClient(lggr, *parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := NewRPCClient(lggr, parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")

n := commonclient.NewNode[*big.Int, *evmtypes.Head, RPCClient](
nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM")
nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM")
primaries := []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient]{n}

var sendonlys []commonclient.SendOnlyNode[*big.Int, RPCClient]
Expand All @@ -160,7 +160,7 @@ func NewChainClientWithTestNode(
return nil, pkgerrors.Errorf("sendonly ethereum rpc url scheme must be http(s): %s", u.String())
}
var empty url.URL
rpc := NewRPCClient(lggr, empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := NewRPCClient(lggr, &empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
s := commonclient.NewSendOnlyNode[*big.Int, RPCClient](
lggr, u, fmt.Sprintf("eth-sendonly-%d", i), chainID, rpc)
sendonlys = append(sendonlys, s)
Expand Down Expand Up @@ -206,7 +206,7 @@ func NewChainClientWithMockedRpc(
parsed, _ := url.ParseRequestURI("ws://test")

n := commonclient.NewNode[*big.Int, *evmtypes.Head, RPCClient](
cfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, nil, "eth-primary-node-0", 1, chainID, 1, rpc, "EVM")
cfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, parsed, nil, "eth-primary-node-0", 1, chainID, 1, rpc, "EVM")
primaries := []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient]{n}
clientErrors := NewTestClientErrors()
c := NewChainClient(lggr, selectionMode, leaseDuration, noNewHeadsThreshold, primaries, nil, chainID, chainType, &clientErrors, 0)
Expand Down
74 changes: 47 additions & 27 deletions core/chains/evm/client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type rawclient struct {
type rpcClient struct {
rpcLog logger.SugaredLogger
name string
id int32
id int
chainID *big.Int
tier commonclient.NodeTier
largePayloadRpcTimeout time.Duration
Expand All @@ -126,7 +126,7 @@ type rpcClient struct {
newHeadsPollInterval time.Duration
chainType chaintype.ChainType

ws rawclient
ws *rawclient
http *rawclient

stateMu sync.RWMutex // protects state* fields
Expand Down Expand Up @@ -154,10 +154,10 @@ type rpcClient struct {
// NewRPCCLient returns a new *rpcClient as commonclient.RPC
func NewRPCClient(
lggr logger.Logger,
wsuri url.URL,
wsuri *url.URL,
httpuri *url.URL,
name string,
id int32,
id int,
chainID *big.Int,
tier commonclient.NodeTier,
finalizedBlockPollInterval time.Duration,
Expand All @@ -175,9 +175,11 @@ func NewRPCClient(
r.id = id
r.chainID = chainID
r.tier = tier
r.ws.uri = wsuri
r.finalizedBlockPollInterval = finalizedBlockPollInterval
r.newHeadsPollInterval = newHeadsPollInterval
if wsuri != nil {
r.ws = &rawclient{uri: *wsuri}
}
if httpuri != nil {
r.http = &rawclient{uri: *httpuri}
}
Expand All @@ -199,30 +201,33 @@ func (r *rpcClient) Dial(callerCtx context.Context) error {
ctx, cancel := r.makeQueryCtx(callerCtx, r.rpcTimeout)
defer cancel()

promEVMPoolRPCNodeDials.WithLabelValues(r.chainID.String(), r.name).Inc()
lggr := r.rpcLog.With("wsuri", r.ws.uri.Redacted())
if r.http != nil {
lggr = lggr.With("httpuri", r.http.uri.Redacted())
if r.ws == nil && r.http == nil {
return errors.New("cannot dial rpc client when both ws and http info are missing")
}
lggr.Debugw("RPC dial: evmclient.Client#dial")

wsrpc, err := rpc.DialWebsocket(ctx, r.ws.uri.String(), "")
if err != nil {
promEVMPoolRPCNodeDialsFailed.WithLabelValues(r.chainID.String(), r.name).Inc()
return r.wrapRPCClientError(pkgerrors.Wrapf(err, "error while dialing websocket: %v", r.ws.uri.Redacted()))
}
promEVMPoolRPCNodeDials.WithLabelValues(r.chainID.String(), r.name).Inc()
lggr := r.rpcLog
if r.ws != nil {
lggr = lggr.With("wsuri", r.ws.uri.Redacted())
wsrpc, err := rpc.DialWebsocket(ctx, r.ws.uri.String(), "")
if err != nil {
promEVMPoolRPCNodeDialsFailed.WithLabelValues(r.chainID.String(), r.name).Inc()
return r.wrapRPCClientError(pkgerrors.Wrapf(err, "error while dialing websocket: %v", r.ws.uri.Redacted()))
}

r.ws.rpc = wsrpc
r.ws.geth = ethclient.NewClient(wsrpc)
r.ws.rpc = wsrpc
r.ws.geth = ethclient.NewClient(wsrpc)
}

if r.http != nil {
lggr = lggr.With("httpuri", r.http.uri.Redacted())
if err := r.DialHTTP(); err != nil {
return err
}
}

lggr.Debugw("RPC dial: evmclient.Client#dial")
promEVMPoolRPCNodeDialsSuccess.WithLabelValues(r.chainID.String(), r.name).Inc()

return nil
}

Expand All @@ -231,7 +236,7 @@ func (r *rpcClient) Dial(callerCtx context.Context) error {
// It can only return error if the URL is malformed.
func (r *rpcClient) DialHTTP() error {
promEVMPoolRPCNodeDials.WithLabelValues(r.chainID.String(), r.name).Inc()
lggr := r.rpcLog.With("httpuri", r.ws.uri.Redacted())
lggr := r.rpcLog.With("httpuri", r.http.uri.Redacted())
lggr.Debugw("RPC dial: evmclient.Client#dial")

var httprpc *rpc.Client
Expand All @@ -251,7 +256,7 @@ func (r *rpcClient) DialHTTP() error {

func (r *rpcClient) Close() {
defer func() {
if r.ws.rpc != nil {
if r.ws != nil && r.ws.rpc != nil {
r.ws.rpc.Close()
}
}()
Expand All @@ -270,7 +275,10 @@ func (r *rpcClient) cancelInflightRequests() {
}

func (r *rpcClient) String() string {
s := fmt.Sprintf("(%s)%s:%s", r.tier.String(), r.name, r.ws.uri.Redacted())
s := fmt.Sprintf("(%s)%s", r.tier.String(), r.name)
if r.ws != nil {
s = s + fmt.Sprintf(":%s", r.ws.uri.Redacted())
}
if r.http != nil {
s = s + fmt.Sprintf(":%s", r.http.uri.Redacted())
}
Expand Down Expand Up @@ -336,7 +344,7 @@ func (r *rpcClient) registerSub(sub ethereum.Subscription, stopInFLightCh chan s
// DisconnectAll disconnects all clients connected to the rpcClient
func (r *rpcClient) DisconnectAll() {
r.stateMu.Lock()
if r.ws.rpc != nil {
if r.ws != nil && r.ws.rpc != nil {
r.ws.rpc.Close()
}
r.cancelInflightRequests()
Expand Down Expand Up @@ -497,7 +505,6 @@ func (r *rpcClient) SubscribeNewHead(ctx context.Context, channel chan<- *evmtyp
defer cancel()
args := []interface{}{"newHeads"}
lggr := r.newRqLggr().With("args", args)

if r.newHeadsPollInterval > 0 {
interval := r.newHeadsPollInterval
timeout := interval
Expand Down Expand Up @@ -529,6 +536,10 @@ func (r *rpcClient) SubscribeNewHead(ctx context.Context, channel chan<- *evmtyp
return &poller, nil
}

if ws == nil {
return nil, errors.New("SubscribeNewHead is not allowed without ws url")
}

lggr.Debug("RPC call: evmclient.Client#EthSubscribe")
start := time.Now()
defer func() {
Expand Down Expand Up @@ -557,7 +568,6 @@ func (r *rpcClient) SubscribeNewHead(ctx context.Context, channel chan<- *evmtyp
func (r *rpcClient) SubscribeToHeads(ctx context.Context) (ch <-chan *evmtypes.Head, sub commontypes.Subscription, err error) {
ctx, cancel, chStopInFlight, ws, _ := r.acquireQueryCtx(ctx, r.rpcTimeout)
defer cancel()

args := []interface{}{rpcSubscriptionMethodNewHeads}
start := time.Now()
lggr := r.newRqLggr().With("args", args)
Expand All @@ -580,6 +590,10 @@ func (r *rpcClient) SubscribeToHeads(ctx context.Context) (ch <-chan *evmtypes.H
return channel, &poller, nil
}

if ws == nil {
return nil, nil, errors.New("SubscribeNewHead is not allowed without ws url")
}

lggr.Debug("RPC call: evmclient.Client#EthSubscribe")
defer func() {
duration := time.Since(start)
Expand Down Expand Up @@ -1286,6 +1300,9 @@ func (r *rpcClient) ClientVersion(ctx context.Context) (version string, err erro
func (r *rpcClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (_ ethereum.Subscription, err error) {
ctx, cancel, chStopInFlight, ws, _ := r.acquireQueryCtx(ctx, r.rpcTimeout)
defer cancel()
if ws == nil {
return nil, errors.New("SubscribeFilterLogs is not allowed without ws url")
}
lggr := r.newRqLggr().With("q", q)

lggr.Debug("RPC call: evmclient.Client#SubscribeFilterLogs")
Expand Down Expand Up @@ -1390,18 +1407,21 @@ func (r *rpcClient) wrapHTTP(err error) error {
}

// makeLiveQueryCtxAndSafeGetClients wraps makeQueryCtx
func (r *rpcClient) makeLiveQueryCtxAndSafeGetClients(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, ws rawclient, http *rawclient) {
func (r *rpcClient) makeLiveQueryCtxAndSafeGetClients(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, ws *rawclient, http *rawclient) {
ctx, cancel, _, ws, http = r.acquireQueryCtx(parentCtx, timeout)
return
}

func (r *rpcClient) acquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc,
chStopInFlight chan struct{}, ws rawclient, http *rawclient) {
chStopInFlight chan struct{}, ws *rawclient, http *rawclient) {
// Need to wrap in mutex because state transition can cancel and replace the
// context
r.stateMu.RLock()
chStopInFlight = r.chStopInFlight
ws = r.ws
if r.ws != nil {
cp := *r.ws
ws = &cp
}
if r.http != nil {
cp := *r.http
http = &cp
Expand Down
Loading

0 comments on commit 5d96be5

Please sign in to comment.