Skip to content

Commit

Permalink
change architecture so that we have only 1 subscription and then we f…
Browse files Browse the repository at this point in the history
…an out the header to all goroutines; use SafeEVMHeader
  • Loading branch information
Tofel committed Apr 25, 2024
1 parent e3de908 commit 3a4979a
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/rs/zerolog"
"github.com/smartcontractkit/seth"

"github.com/smartcontractkit/chainlink-testing-framework/blockchain"
"github.com/smartcontractkit/chainlink-testing-framework/networks"
"github.com/smartcontractkit/chainlink/integration-tests/contracts/ethereum"
eth_contracts "github.com/smartcontractkit/chainlink/integration-tests/contracts/ethereum"
Expand Down Expand Up @@ -2359,7 +2360,7 @@ func NewKeeperConsumerBenchmarkkUpkeepObserver(

// ReceiveHeader will query the latest Keeper round and check to see whether upkeep was performed, it returns
// true when observation has finished.
func (o *KeeperConsumerBenchmarkUpkeepObserver) ReceiveHeader(receivedHeader *types.Header) (bool, error) {
func (o *KeeperConsumerBenchmarkUpkeepObserver) ReceiveHeader(receivedHeader *blockchain.SafeEVMHeader) (bool, error) {
if receivedHeader.Number.Uint64() <= o.lastBlockNum { // Uncle / reorg we won't count
return false, nil
}
Expand Down
88 changes: 70 additions & 18 deletions integration-tests/testsetups/keeper_benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package testsetups

import (
"context"
"errors"
"fmt"
"math"
"math/big"
Expand All @@ -17,14 +18,14 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/slack-go/slack"
"github.com/smartcontractkit/seth"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/smartcontractkit/chainlink-testing-framework/blockchain"
"github.com/smartcontractkit/chainlink-testing-framework/k8s/environment"
"github.com/smartcontractkit/chainlink-testing-framework/logging"
reportModel "github.com/smartcontractkit/chainlink-testing-framework/testreporters"
Expand Down Expand Up @@ -259,14 +260,69 @@ func (k *KeeperBenchmarkTest) Run() {
var startedObservations = atomic.Int32{}
var finishedObservations = atomic.Int32{}

// We create as many channels as listening goroutines (1 per upkeep). In the background we will be fanning out
// headers that we get from a single channel connected to EVM node to all upkeep-specific channels.
headerCh := make(chan *blockchain.SafeEVMHeader, 10)
sub, err := k.chainClient.Client.Client().EthSubscribe(context.Background(), headerCh, "newHeads")
require.NoError(k.t, err, "Subscribing to new headers for upkeep observation shouldn't fail")

totalNumberOfChannels := 0
for rIndex := range k.keeperRegistries {
totalNumberOfChannels += len(k.upkeepIDs[rIndex])
}

contractChannels := make([]chan *blockchain.SafeEVMHeader, totalNumberOfChannels)
for idx := 0; idx < totalNumberOfChannels; idx++ {
contractChannels[idx] = make(chan *blockchain.SafeEVMHeader, 10) // Buffered just in case processing is slow
}

// signals all goroutines to stop when subscription error occurs
stopAllGoroutinesCh := make(chan struct{})

// this goroutine fans out headers to goroutines in the background
// and exists when all goroutines are done or when an error occurs
go func() {
defer func() {
// close all fanning out channels at the very end
for _, ch := range contractChannels {
close(ch)
}
k.log.Debug().Msg("Closed header distribution channels")
}()
for {
select {
case header := <-headerCh:
k.log.Debug().Int64("Number", header.Number.Int64()).Msg("Fanning out new header")
for _, ch := range contractChannels {
ch <- header
}
// we don't really care if it was a success or an error, we just want to exit
// if it was an error, we will have an error in the main goroutine
case <-errCtx.Done():
k.log.Debug().Msg("All goroutines finished.")
sub.Unsubscribe()
return
case err := <-sub.Err():
// no need to unsubscribe, subscripion errored
k.log.Err(err).Msg("header subscription failed. Exiting")
// close channel to signal all goroutines they should exit
close(stopAllGoroutinesCh)
return
}
}
}()

currentChannelIndex := 0
for rIndex := range k.keeperRegistries {
for index, upkeepID := range k.upkeepIDs[rIndex] {
chIndex := currentChannelIndex
currentChannelIndex++
upkeepIDCopy := upkeepID
registryIndex := rIndex
upkeepIndex := int64(index)
errgroup.Go(func() error {
startedObservations.Add(1)
k.log.Info().Str("UpkeepID", upkeepIDCopy.String()).Msg("Starting upkeep observation")
k.log.Info().Int("Channel index", chIndex).Str("UpkeepID", upkeepIDCopy.String()).Msg("Starting upkeep observation")

confirmer := contracts.NewKeeperConsumerBenchmarkkUpkeepObserver(
k.keeperConsumerContracts[registryIndex],
Expand All @@ -281,39 +337,32 @@ func (k *KeeperBenchmarkTest) Run() {
)

k.log.Debug().Str("UpkeepID", upkeepIDCopy.String()).Msg("Subscribing to new headers for upkeep observation")
// hard to say what should be the buffer size, we want it big, but too big as we might run out of memory
// but on L2s with super fast block times, we might run into `subscription queue overflow` errors, when this
// buffer is too small
headerCh := make(chan *types.Header, 5000)
sub, err := k.chainClient.Client.SubscribeNewHead(context.Background(), headerCh)
if err != nil {
return err
}

for {
select {
case subscriptionErr := <-sub.Err(): // header listening failed for the upkeep, exit
return errors.Wrapf(subscriptionErr, "listening for new headers for upkeep %s failed. Exiting", upkeepIDCopy.String())
case <-errCtx.Done(): //one of goroutines errored, shut down gracefully
case <-stopAllGoroutinesCh: // header listening failed, exit
return errors.New("header distribution channel closed")
case <-errCtx.Done(): //one of goroutines errored, shut down gracefully, no need to return error
k.log.Error().Err(errCtx.Err()).Str("UpkeepID", upkeepIDCopy.String()).Msg("Stopping obervations due to error in one of the goroutines")
sub.Unsubscribe()
return nil
case header := <-headerCh: // new block, check if upkeep was performed
case header := <-contractChannels[chIndex]: // new block, check if upkeep was performed
k.log.Debug().Interface("Header number", header.Number).Str("UpkeepID", upkeepIDCopy.String()).Msg("Started processing new header")
finished, headerErr := confirmer.ReceiveHeader(header)
if headerErr != nil {
return headerErr
}
if finished { // observations should be completed as we are beyond block range

if finished { // observations should be completed as we are beyond block range, if there are not there's a bug in test code
finishedObservations.Add(1)
k.log.Info().Str("Done/Total", fmt.Sprintf("%d/%d", finishedObservations.Load(), startedObservations.Load())).Str("UpkeepID", upkeepIDCopy.String()).Msg("Upkeep observation completed")

sub.Unsubscribe()
if confirmer.Complete() {
confirmer.LogDetails()
return nil
}
return fmt.Errorf("confimer has finished, but without completing observation, this should never happen. UpkdeepID: %s", upkeepIDCopy.String())
return fmt.Errorf("confimer has finished, but without completing observation, this should never happen. Review your code. UpkdeepID: %s", upkeepIDCopy.String())
}
k.log.Debug().Interface("Header number", header.Number).Str("UpkeepID", upkeepIDCopy.String()).Msg("Finished processing new header")
}
}
})
Expand All @@ -324,6 +373,9 @@ func (k *KeeperBenchmarkTest) Run() {
k.t.Fatalf("errored when waiting for upkeeps: %v", err)
}

// Close header distribution channel once all observations are done
close(stopAllGoroutinesCh)

// Main test loop
k.observeUpkeepEvents()

Expand Down

0 comments on commit 3a4979a

Please sign in to comment.