Skip to content

Commit

Permalink
Refactor to single channel
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidNix committed Sep 11, 2023
1 parent 0244933 commit f98f7ce
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 28 deletions.
33 changes: 10 additions & 23 deletions privval/load_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,28 @@ import (
type RemoteSignerLoadBalancer struct {
logger cometlog.Logger
listeners []SignerListener
avail chan SignerListener // Available listeners that are ready to accept requests.
}

func NewRemoteSignerLoadBalancer(logger cometlog.Logger, listeners []SignerListener) *RemoteSignerLoadBalancer {
ch := make(chan SignerListener, len(listeners))
for i := range listeners {
ch <- listeners[i]
}
return &RemoteSignerLoadBalancer{
logger: logger,
listeners: listeners,
avail: ch,
}
}

// SendRequest sends a request to the first available listener.
func (lb *RemoteSignerLoadBalancer) SendRequest(request privvalproto.Message) (*privvalproto.Message, error) {
reqCh := make(chan privvalproto.Message)
resCh := make(chan signerListenerEndpointResponse)
lis := <-lb.avail
defer func() { lb.avail <- lis }()

for _, listener := range lb.listeners {
go lb.send(listener, reqCh, resCh)
}
reqCh <- request
res := <-resCh
close(reqCh)
return res.res, res.err
lb.logger.Debug("Sent request to listener", "address", lis.address)
return lis.SendRequest(request)
}

func (lb *RemoteSignerLoadBalancer) Start() error {
Expand All @@ -50,17 +51,3 @@ func (lb *RemoteSignerLoadBalancer) Stop() error {
}
return err
}

type signerListenerEndpointResponse struct {
res *privvalproto.Message
err error
}

func (lb *RemoteSignerLoadBalancer) send(listener SignerListener, reqCh <-chan privvalproto.Message, resCh chan<- signerListenerEndpointResponse) {
for req := range reqCh {
var res signerListenerEndpointResponse
lb.logger.Debug("Sent request to listener", "address", listener.address)
res.res, res.err = listener.SendRequest(req)
resCh <- res
}
}
5 changes: 0 additions & 5 deletions privval/signer_listener_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,6 @@ func (sl *SignerListenerEndpoint) SendRequest(request privvalproto.Message) (*pr
sl.instanceMtx.Lock()
defer sl.instanceMtx.Unlock()

return sl.SendRequestLocked(request)
}

// SendRequest ensures there is a connection, sends a request and waits for a response
func (sl *SignerListenerEndpoint) SendRequestLocked(request privvalproto.Message) (*privvalproto.Message, error) {
err := sl.ensureConnection(sl.timeoutAccept)
if err != nil {
return nil, err
Expand Down

0 comments on commit f98f7ce

Please sign in to comment.