From f278c020eafd3dffd68417eaf7c6065ccedaf813 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Thu, 15 Aug 2024 09:21:40 +0200 Subject: [PATCH 1/9] Refactor multi rate broadcaster --- .../app/utxos/broadcast/broadcast.go | 15 +++- .../broadcaster/multi_rate_broadcaster.go | 45 ++++++----- internal/broadcaster/rate_broadcaster.go | 78 ++++++++++++------- 3 files changed, 81 insertions(+), 57 deletions(-) diff --git a/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go b/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go index 7e97f3789..0e0d71082 100644 --- a/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go +++ b/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go @@ -129,18 +129,25 @@ var Cmd = &cobra.Command{ counter++ } - rateBroadcaster, err := broadcaster.NewMultiKeyRateBroadcaster(logger, client, ks, wocClient, isTestnet, opts...) - if err != nil { - return fmt.Errorf("failed to create rate broadcaster: %v", err) + rbs := make([]broadcaster.RateBroadcaster, 0, len(keySets)) + for keyName, ks := range keySets { + rb, err := broadcaster.NewRateBroadcaster(logger, client, ks, wocClient, isTestnet, keyName, rateTxsPerSecond, limit, opts...) + if err != nil { + return err + } + + rbs = append(rbs, rb) } + rateBroadcaster := broadcaster.NewMultiKeyRateBroadcaster(logger, rbs) + doneChan := make(chan error) // Channel to signal the completion of Start signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt) // Listen for Ctrl+C go func() { // Start the broadcasting process - err := rateBroadcaster.Start(rateTxsPerSecond, limit) + err := rateBroadcaster.Start() logger.Info("Starting broadcaster", slog.Int("rate [txs/s]", rateTxsPerSecond), slog.Int("batch size", batchSize)) doneChan <- err // Send the completion or error signal }() diff --git a/internal/broadcaster/multi_rate_broadcaster.go b/internal/broadcaster/multi_rate_broadcaster.go index c20de6d95..9ac568c52 100644 --- a/internal/broadcaster/multi_rate_broadcaster.go +++ b/internal/broadcaster/multi_rate_broadcaster.go @@ -2,15 +2,25 @@ package broadcaster import ( "context" - "github.com/bitcoin-sv/arc/pkg/keyset" "log/slog" "math" "sync" "time" ) +type RateBroadcaster interface { + Start() error + Wait() + Shutdown() + GetLimit() int64 + GetTxCount() int64 + GetConnectionCount() int64 + GetUtxoSetLen() int + GetKeyName() string +} + type MultiKeyRateBroadcaster struct { - rbs []*RateBroadcaster + rbs []RateBroadcaster logger *slog.Logger target int64 cancelAll context.CancelFunc @@ -18,16 +28,7 @@ type MultiKeyRateBroadcaster struct { wg sync.WaitGroup } -func NewMultiKeyRateBroadcaster(logger *slog.Logger, client ArcClient, keySets []*keyset.KeySet, utxoClient UtxoClient, isTestnet bool, opts ...func(p *Broadcaster)) (*MultiKeyRateBroadcaster, error) { - rbs := make([]*RateBroadcaster, 0, len(keySets)) - for _, ks := range keySets { - rb, err := NewRateBroadcaster(logger, client, ks, utxoClient, isTestnet, opts...) - if err != nil { - return nil, err - } - - rbs = append(rbs, rb) - } +func NewMultiKeyRateBroadcaster(logger *slog.Logger, rbs []RateBroadcaster) *MultiKeyRateBroadcaster { mrb := &MultiKeyRateBroadcaster{ rbs: rbs, @@ -38,23 +39,21 @@ func NewMultiKeyRateBroadcaster(logger *slog.Logger, client ArcClient, keySets [ mrb.cancelAll = cancelAll mrb.ctx = ctx - return mrb, nil + return mrb } -func (mrb *MultiKeyRateBroadcaster) Start(rateTxsPerSecond int, limit int64) error { +func (mrb *MultiKeyRateBroadcaster) Start() error { mrb.logStats() mrb.target = 0 for _, rb := range mrb.rbs { - err := rb.Start(rateTxsPerSecond, limit) + err := rb.Start() if err != nil { return err } - - mrb.target += limit } for _, rb := range mrb.rbs { - rb.wg.Wait() + rb.Wait() } return nil @@ -81,20 +80,20 @@ func (mrb *MultiKeyRateBroadcaster) logStats() { case <-logStatsTicker.C: totalTxsCount := int64(0) totalConnectionCount := int64(0) - totalUtxoSetLength := 0 - + var logArgs []slog.Attr for _, rb := range mrb.rbs { totalTxsCount += rb.GetTxCount() totalConnectionCount += rb.GetConnectionCount() - totalUtxoSetLength += rb.GetUtxoSetLen() - + mrb.target += rb.GetLimit() + logArgs = append(logArgs, slog.Int(rb.GetKeyName(), rb.GetUtxoSetLen())) } + mrb.logger.Info("stats", slog.Int64("txs", totalTxsCount), slog.Int64("target", mrb.target), slog.Float64("percentage", roundFloat(float64(totalTxsCount)/float64(mrb.target)*100, 2)), slog.Int64("connections", totalConnectionCount), - slog.Int("utxos", totalUtxoSetLength), + "utxos", logArgs, ) case <-mrb.ctx.Done(): return diff --git a/internal/broadcaster/rate_broadcaster.go b/internal/broadcaster/rate_broadcaster.go index 4f72acab1..47938f43f 100644 --- a/internal/broadcaster/rate_broadcaster.go +++ b/internal/broadcaster/rate_broadcaster.go @@ -17,37 +17,43 @@ import ( "github.com/libsv/go-bt/v2/unlocker" ) -type RateBroadcaster struct { +type UTXORateBroadcaster struct { Broadcaster - totalTxs int64 - connectionCount int64 - shutdown chan struct{} - utxoCh chan *bt.UTXO - wg sync.WaitGroup - satoshiMap sync.Map - ks *keyset.KeySet + totalTxs int64 + connectionCount int64 + shutdown chan struct{} + utxoCh chan *bt.UTXO + wg sync.WaitGroup + satoshiMap sync.Map + ks *keyset.KeySet + keyName string + rateTxsPerSecond int + limit int64 } -func NewRateBroadcaster(logger *slog.Logger, client ArcClient, ks *keyset.KeySet, utxoClient UtxoClient, isTestnet bool, opts ...func(p *Broadcaster)) (*RateBroadcaster, error) { +func NewRateBroadcaster(logger *slog.Logger, client ArcClient, ks *keyset.KeySet, utxoClient UtxoClient, isTestnet bool, keyName string, rateTxsPerSecond int, limit int64, opts ...func(p *Broadcaster)) (*UTXORateBroadcaster, error) { b, err := NewBroadcaster(logger.With(slog.String("address", ks.Address(!isTestnet))), client, utxoClient, isTestnet, opts...) if err != nil { return nil, err } - rb := &RateBroadcaster{ - Broadcaster: b, - shutdown: make(chan struct{}, 1), - utxoCh: nil, - wg: sync.WaitGroup{}, - satoshiMap: sync.Map{}, - ks: ks, - totalTxs: 0, - connectionCount: 0, + rb := &UTXORateBroadcaster{ + Broadcaster: b, + shutdown: make(chan struct{}, 1), + utxoCh: nil, + wg: sync.WaitGroup{}, + satoshiMap: sync.Map{}, + ks: ks, + totalTxs: 0, + connectionCount: 0, + keyName: keyName, + rateTxsPerSecond: rateTxsPerSecond, + limit: limit, } return rb, nil } -func (b *RateBroadcaster) calculateFeeSat(tx *bt.Tx) uint64 { +func (b *UTXORateBroadcaster) calculateFeeSat(tx *bt.Tx) uint64 { size, err := tx.EstimateSizeWithTypes() if err != nil { return 0 @@ -70,7 +76,7 @@ func (b *RateBroadcaster) calculateFeeSat(tx *bt.Tx) uint64 { return txFees } -func (b *RateBroadcaster) Start(rateTxsPerSecond int, limit int64) error { +func (b *UTXORateBroadcaster) Start() error { b.wg.Add(1) go func() { defer b.wg.Done() @@ -98,10 +104,10 @@ func (b *RateBroadcaster) Start(rateTxsPerSecond int, limit int64) error { return fmt.Errorf("failed to get utxos: %v", err) } - submitBatchesPerSecond := float64(rateTxsPerSecond) / float64(b.batchSize) + submitBatchesPerSecond := float64(b.rateTxsPerSecond) / float64(b.batchSize) if submitBatchesPerSecond > millisecondsPerSecond { - return fmt.Errorf("submission rate %d [txs/s] and batch size %d [txs] result in submission frequency %.2f greater than 1000 [/s]", rateTxsPerSecond, b.batchSize, submitBatchesPerSecond) + return fmt.Errorf("submission rate %d [txs/s] and batch size %d [txs] result in submission frequency %.2f greater than 1000 [/s]", b.rateTxsPerSecond, b.batchSize, submitBatchesPerSecond) } if len(utxoSet) < b.batchSize { @@ -138,8 +144,8 @@ func (b *RateBroadcaster) Start(rateTxsPerSecond int, limit int64) error { continue } - if limit > 0 && atomic.LoadInt64(&b.totalTxs) >= limit { - b.logger.Info("limit reached", slog.Int64("total", atomic.LoadInt64(&b.totalTxs)), slog.Int64("limit", limit)) + if b.limit > 0 && atomic.LoadInt64(&b.totalTxs) >= b.limit { + b.logger.Info("limit reached", slog.Int64("total", atomic.LoadInt64(&b.totalTxs)), slog.Int64("limit", b.limit)) b.shutdown <- struct{}{} } @@ -154,7 +160,7 @@ func (b *RateBroadcaster) Start(rateTxsPerSecond int, limit int64) error { return nil } -func (b *RateBroadcaster) createSelfPayingTxs() ([]*bt.Tx, error) { +func (b *UTXORateBroadcaster) createSelfPayingTxs() ([]*bt.Tx, error) { txs := make([]*bt.Tx, 0, b.batchSize) utxoLoop: @@ -202,7 +208,7 @@ utxoLoop: return txs, nil } -func (b *RateBroadcaster) broadcastBatchAsync(txs []*bt.Tx, errCh chan error, waitForStatus metamorph_api.Status) { +func (b *UTXORateBroadcaster) broadcastBatchAsync(txs []*bt.Tx, errCh chan error, waitForStatus metamorph_api.Status) { b.wg.Add(1) go func() { defer b.wg.Done() @@ -265,20 +271,32 @@ func (b *RateBroadcaster) broadcastBatchAsync(txs []*bt.Tx, errCh chan error, wa }() } -func (b *RateBroadcaster) Shutdown() { +func (b *UTXORateBroadcaster) Shutdown() { b.cancelAll() b.wg.Wait() } -func (b *RateBroadcaster) GetTxCount() int64 { +func (b *UTXORateBroadcaster) Wait() { + b.wg.Wait() +} + +func (b *UTXORateBroadcaster) GetLimit() int64 { + return b.limit +} + +func (b *UTXORateBroadcaster) GetTxCount() int64 { return atomic.LoadInt64(&b.totalTxs) } -func (b *RateBroadcaster) GetConnectionCount() int64 { +func (b *UTXORateBroadcaster) GetConnectionCount() int64 { return atomic.LoadInt64(&b.connectionCount) } -func (b *RateBroadcaster) GetUtxoSetLen() int { +func (b *UTXORateBroadcaster) GetUtxoSetLen() int { return len(b.utxoCh) } + +func (b *UTXORateBroadcaster) GetKeyName() string { + return b.keyName +} From 997c67fef8af7597247a508c5a4c6848205dffa4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Thu, 15 Aug 2024 09:22:31 +0200 Subject: [PATCH 2/9] Return error if no utxos with sufficient funds left --- internal/broadcaster/rate_broadcaster.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/internal/broadcaster/rate_broadcaster.go b/internal/broadcaster/rate_broadcaster.go index 47938f43f..cdffaeb52 100644 --- a/internal/broadcaster/rate_broadcaster.go +++ b/internal/broadcaster/rate_broadcaster.go @@ -173,18 +173,22 @@ utxoLoop: err := tx.FromUTXOs(utxo) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to add input: %v", err) } fee := b.calculateFeeSat(tx) if utxo.Satoshis <= fee { + if len(b.utxoCh) == 0 { + return nil, errors.New("no utxos with sufficient funds left") + } + continue } - - err = tx.PayTo(b.ks.Script, utxo.Satoshis-fee) + amount := utxo.Satoshis - fee + err = tx.PayTo(b.ks.Script, amount) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to pay transaction %d: %v", amount, err) } // Todo: Add OP_RETURN with text "ARC testing" so that WoC can tag it @@ -192,7 +196,7 @@ utxoLoop: unlockerGetter := unlocker.Getter{PrivateKey: b.ks.PrivateKey} err = tx.FillAllInputs(context.Background(), &unlockerGetter) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to fill input transactions: %v", err) } b.satoshiMap.Store(tx.TxID(), tx.Outputs[0].Satoshis) From 5a9897666c905c148d64032ef0186b491ed734b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Thu, 15 Aug 2024 09:30:25 +0200 Subject: [PATCH 3/9] Display results ordered by key name --- cmd/broadcaster-cli/app/keyset/address/address.go | 5 ++++- cmd/broadcaster-cli/app/keyset/balance/balance.go | 4 +++- cmd/broadcaster-cli/app/keyset/utxos/table.go | 7 ++++++- cmd/broadcaster-cli/helper/functions.go | 13 +++++++++++++ 4 files changed, 26 insertions(+), 3 deletions(-) diff --git a/cmd/broadcaster-cli/app/keyset/address/address.go b/cmd/broadcaster-cli/app/keyset/address/address.go index 96864bb03..f5e6c2850 100644 --- a/cmd/broadcaster-cli/app/keyset/address/address.go +++ b/cmd/broadcaster-cli/app/keyset/address/address.go @@ -24,7 +24,10 @@ var ( return err } - for name, keySet := range keySets { + names := helper.GetOrderedKeys(keySets) + + for _, name := range names { + keySet := keySets[name] logger.Info("address", slog.String("name", name), slog.String("address", keySet.Address(!isTestnet)), slog.String("key", keySet.GetMaster().String())) } diff --git a/cmd/broadcaster-cli/app/keyset/balance/balance.go b/cmd/broadcaster-cli/app/keyset/balance/balance.go index 9a660d825..6cc7c3641 100644 --- a/cmd/broadcaster-cli/app/keyset/balance/balance.go +++ b/cmd/broadcaster-cli/app/keyset/balance/balance.go @@ -34,8 +34,10 @@ var Cmd = &cobra.Command{ return err } - for name, keySet := range keySets { + names := helper.GetOrderedKeys(keySets) + for _, name := range names { + keySet := keySets[name] if wocApiKey == "" { time.Sleep(500 * time.Millisecond) } diff --git a/cmd/broadcaster-cli/app/keyset/utxos/table.go b/cmd/broadcaster-cli/app/keyset/utxos/table.go index e68033be2..33b4e0b43 100644 --- a/cmd/broadcaster-cli/app/keyset/utxos/table.go +++ b/cmd/broadcaster-cli/app/keyset/utxos/table.go @@ -3,6 +3,7 @@ package utxos import ( "context" "errors" + "github.com/bitcoin-sv/arc/cmd/broadcaster-cli/helper" "log/slog" "sort" "strconv" @@ -25,7 +26,11 @@ func getUtxosTable(ctx context.Context, logger *slog.Logger, keySets map[string] columns := make([][]row, len(keySets)) maxRowNr := 0 counter := 0 - for name, ks := range keySets { + names := helper.GetOrderedKeys(keySets) + + for _, name := range names { + + ks := keySets[name] utxos, err := wocClient.GetUTXOsWithRetries(ctx, ks.Script, ks.Address(!isTestnet), 1*time.Second, 5) if err != nil { if errors.Is(err, context.Canceled) { diff --git a/cmd/broadcaster-cli/helper/functions.go b/cmd/broadcaster-cli/helper/functions.go index 11517c78c..add9cf665 100644 --- a/cmd/broadcaster-cli/helper/functions.go +++ b/cmd/broadcaster-cli/helper/functions.go @@ -4,6 +4,7 @@ import ( "fmt" "log/slog" "os" + "sort" "strconv" "strings" @@ -204,3 +205,15 @@ func GetKeySets() (map[string]*keyset.KeySet, error) { return GetKeySetsFor(keys, selectedKeys) } + +func GetOrderedKeys[T any](keysMap map[string]T) []string { + + var keys []string + + for key := range keysMap { + keys = append(keys, key) + } + + sort.Strings(keys) + return keys +} From e39bf14608819e2b4137162e90af3049d7355c8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Thu, 15 Aug 2024 11:31:10 +0200 Subject: [PATCH 4/9] Use logger instead of fmt printer --- cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go b/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go index 0e0d71082..034cc28e2 100644 --- a/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go +++ b/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go @@ -155,19 +155,14 @@ var Cmd = &cobra.Command{ select { case <-signalChan: // If an interrupt signal is received - fmt.Println("Shutdown signal received. Shutting down the rate broadcaster.") + logger.Info("Shutdown signal received. Shutting down the rate broadcaster.") case err := <-doneChan: - // Or wait for the normal completion - if err != nil { - fmt.Printf("Error during broadcasting: %v\n", err) - } else { - fmt.Println("Broadcasting completed successfully.") - } + logger.Error("Error during broadcasting", slog.String("err", err.Error())) } // Shutdown the broadcaster in all cases rateBroadcaster.Shutdown() - fmt.Println("Broadcaster shutdown complete.") + logger.Info("Broadcasting shutdown complete") return nil }, } From 18b56f7bddef48a9cd63d0409fe342a005e495c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Thu, 15 Aug 2024 11:31:25 +0200 Subject: [PATCH 5/9] Add unit tests --- internal/broadcaster/broadcaster_mocks.go | 3 + .../mocks/rate_broadcaster_mock.go | 327 ++++++++++++++++++ .../broadcaster/multi_rate_broadcaster.go | 32 +- .../multi_rate_broadcaster_test.go | 77 +++++ 4 files changed, 429 insertions(+), 10 deletions(-) create mode 100644 internal/broadcaster/mocks/rate_broadcaster_mock.go create mode 100644 internal/broadcaster/multi_rate_broadcaster_test.go diff --git a/internal/broadcaster/broadcaster_mocks.go b/internal/broadcaster/broadcaster_mocks.go index 3d339f387..60a822f7a 100644 --- a/internal/broadcaster/broadcaster_mocks.go +++ b/internal/broadcaster/broadcaster_mocks.go @@ -8,3 +8,6 @@ package broadcaster // from mutli_utxo_consolidator.go //go:generate moq -pkg mocks -out ./mocks/consolidator_mock.go . Consolidator + +// from multi_rate_broadcaster.go +//go:generate moq -pkg mocks -out ./mocks/rate_broadcaster_mock.go . RateBroadcaster diff --git a/internal/broadcaster/mocks/rate_broadcaster_mock.go b/internal/broadcaster/mocks/rate_broadcaster_mock.go new file mode 100644 index 000000000..84eff6928 --- /dev/null +++ b/internal/broadcaster/mocks/rate_broadcaster_mock.go @@ -0,0 +1,327 @@ +// Code generated by moq; DO NOT EDIT. +// github.com/matryer/moq + +package mocks + +import ( + "github.com/bitcoin-sv/arc/internal/broadcaster" + "sync" +) + +// Ensure, that RateBroadcasterMock does implement broadcaster.RateBroadcaster. +// If this is not the case, regenerate this file with moq. +var _ broadcaster.RateBroadcaster = &RateBroadcasterMock{} + +// RateBroadcasterMock is a mock implementation of broadcaster.RateBroadcaster. +// +// func TestSomethingThatUsesRateBroadcaster(t *testing.T) { +// +// // make and configure a mocked broadcaster.RateBroadcaster +// mockedRateBroadcaster := &RateBroadcasterMock{ +// GetConnectionCountFunc: func() int64 { +// panic("mock out the GetConnectionCount method") +// }, +// GetKeyNameFunc: func() string { +// panic("mock out the GetKeyName method") +// }, +// GetLimitFunc: func() int64 { +// panic("mock out the GetLimit method") +// }, +// GetTxCountFunc: func() int64 { +// panic("mock out the GetTxCount method") +// }, +// GetUtxoSetLenFunc: func() int { +// panic("mock out the GetUtxoSetLen method") +// }, +// ShutdownFunc: func() { +// panic("mock out the Shutdown method") +// }, +// StartFunc: func() error { +// panic("mock out the Start method") +// }, +// WaitFunc: func() { +// panic("mock out the Wait method") +// }, +// } +// +// // use mockedRateBroadcaster in code that requires broadcaster.RateBroadcaster +// // and then make assertions. +// +// } +type RateBroadcasterMock struct { + // GetConnectionCountFunc mocks the GetConnectionCount method. + GetConnectionCountFunc func() int64 + + // GetKeyNameFunc mocks the GetKeyName method. + GetKeyNameFunc func() string + + // GetLimitFunc mocks the GetLimit method. + GetLimitFunc func() int64 + + // GetTxCountFunc mocks the GetTxCount method. + GetTxCountFunc func() int64 + + // GetUtxoSetLenFunc mocks the GetUtxoSetLen method. + GetUtxoSetLenFunc func() int + + // ShutdownFunc mocks the Shutdown method. + ShutdownFunc func() + + // StartFunc mocks the Start method. + StartFunc func() error + + // WaitFunc mocks the Wait method. + WaitFunc func() + + // calls tracks calls to the methods. + calls struct { + // GetConnectionCount holds details about calls to the GetConnectionCount method. + GetConnectionCount []struct { + } + // GetKeyName holds details about calls to the GetKeyName method. + GetKeyName []struct { + } + // GetLimit holds details about calls to the GetLimit method. + GetLimit []struct { + } + // GetTxCount holds details about calls to the GetTxCount method. + GetTxCount []struct { + } + // GetUtxoSetLen holds details about calls to the GetUtxoSetLen method. + GetUtxoSetLen []struct { + } + // Shutdown holds details about calls to the Shutdown method. + Shutdown []struct { + } + // Start holds details about calls to the Start method. + Start []struct { + } + // Wait holds details about calls to the Wait method. + Wait []struct { + } + } + lockGetConnectionCount sync.RWMutex + lockGetKeyName sync.RWMutex + lockGetLimit sync.RWMutex + lockGetTxCount sync.RWMutex + lockGetUtxoSetLen sync.RWMutex + lockShutdown sync.RWMutex + lockStart sync.RWMutex + lockWait sync.RWMutex +} + +// GetConnectionCount calls GetConnectionCountFunc. +func (mock *RateBroadcasterMock) GetConnectionCount() int64 { + if mock.GetConnectionCountFunc == nil { + panic("RateBroadcasterMock.GetConnectionCountFunc: method is nil but RateBroadcaster.GetConnectionCount was just called") + } + callInfo := struct { + }{} + mock.lockGetConnectionCount.Lock() + mock.calls.GetConnectionCount = append(mock.calls.GetConnectionCount, callInfo) + mock.lockGetConnectionCount.Unlock() + return mock.GetConnectionCountFunc() +} + +// GetConnectionCountCalls gets all the calls that were made to GetConnectionCount. +// Check the length with: +// +// len(mockedRateBroadcaster.GetConnectionCountCalls()) +func (mock *RateBroadcasterMock) GetConnectionCountCalls() []struct { +} { + var calls []struct { + } + mock.lockGetConnectionCount.RLock() + calls = mock.calls.GetConnectionCount + mock.lockGetConnectionCount.RUnlock() + return calls +} + +// GetKeyName calls GetKeyNameFunc. +func (mock *RateBroadcasterMock) GetKeyName() string { + if mock.GetKeyNameFunc == nil { + panic("RateBroadcasterMock.GetKeyNameFunc: method is nil but RateBroadcaster.GetKeyName was just called") + } + callInfo := struct { + }{} + mock.lockGetKeyName.Lock() + mock.calls.GetKeyName = append(mock.calls.GetKeyName, callInfo) + mock.lockGetKeyName.Unlock() + return mock.GetKeyNameFunc() +} + +// GetKeyNameCalls gets all the calls that were made to GetKeyName. +// Check the length with: +// +// len(mockedRateBroadcaster.GetKeyNameCalls()) +func (mock *RateBroadcasterMock) GetKeyNameCalls() []struct { +} { + var calls []struct { + } + mock.lockGetKeyName.RLock() + calls = mock.calls.GetKeyName + mock.lockGetKeyName.RUnlock() + return calls +} + +// GetLimit calls GetLimitFunc. +func (mock *RateBroadcasterMock) GetLimit() int64 { + if mock.GetLimitFunc == nil { + panic("RateBroadcasterMock.GetLimitFunc: method is nil but RateBroadcaster.GetLimit was just called") + } + callInfo := struct { + }{} + mock.lockGetLimit.Lock() + mock.calls.GetLimit = append(mock.calls.GetLimit, callInfo) + mock.lockGetLimit.Unlock() + return mock.GetLimitFunc() +} + +// GetLimitCalls gets all the calls that were made to GetLimit. +// Check the length with: +// +// len(mockedRateBroadcaster.GetLimitCalls()) +func (mock *RateBroadcasterMock) GetLimitCalls() []struct { +} { + var calls []struct { + } + mock.lockGetLimit.RLock() + calls = mock.calls.GetLimit + mock.lockGetLimit.RUnlock() + return calls +} + +// GetTxCount calls GetTxCountFunc. +func (mock *RateBroadcasterMock) GetTxCount() int64 { + if mock.GetTxCountFunc == nil { + panic("RateBroadcasterMock.GetTxCountFunc: method is nil but RateBroadcaster.GetTxCount was just called") + } + callInfo := struct { + }{} + mock.lockGetTxCount.Lock() + mock.calls.GetTxCount = append(mock.calls.GetTxCount, callInfo) + mock.lockGetTxCount.Unlock() + return mock.GetTxCountFunc() +} + +// GetTxCountCalls gets all the calls that were made to GetTxCount. +// Check the length with: +// +// len(mockedRateBroadcaster.GetTxCountCalls()) +func (mock *RateBroadcasterMock) GetTxCountCalls() []struct { +} { + var calls []struct { + } + mock.lockGetTxCount.RLock() + calls = mock.calls.GetTxCount + mock.lockGetTxCount.RUnlock() + return calls +} + +// GetUtxoSetLen calls GetUtxoSetLenFunc. +func (mock *RateBroadcasterMock) GetUtxoSetLen() int { + if mock.GetUtxoSetLenFunc == nil { + panic("RateBroadcasterMock.GetUtxoSetLenFunc: method is nil but RateBroadcaster.GetUtxoSetLen was just called") + } + callInfo := struct { + }{} + mock.lockGetUtxoSetLen.Lock() + mock.calls.GetUtxoSetLen = append(mock.calls.GetUtxoSetLen, callInfo) + mock.lockGetUtxoSetLen.Unlock() + return mock.GetUtxoSetLenFunc() +} + +// GetUtxoSetLenCalls gets all the calls that were made to GetUtxoSetLen. +// Check the length with: +// +// len(mockedRateBroadcaster.GetUtxoSetLenCalls()) +func (mock *RateBroadcasterMock) GetUtxoSetLenCalls() []struct { +} { + var calls []struct { + } + mock.lockGetUtxoSetLen.RLock() + calls = mock.calls.GetUtxoSetLen + mock.lockGetUtxoSetLen.RUnlock() + return calls +} + +// Shutdown calls ShutdownFunc. +func (mock *RateBroadcasterMock) Shutdown() { + if mock.ShutdownFunc == nil { + panic("RateBroadcasterMock.ShutdownFunc: method is nil but RateBroadcaster.Shutdown was just called") + } + callInfo := struct { + }{} + mock.lockShutdown.Lock() + mock.calls.Shutdown = append(mock.calls.Shutdown, callInfo) + mock.lockShutdown.Unlock() + mock.ShutdownFunc() +} + +// ShutdownCalls gets all the calls that were made to Shutdown. +// Check the length with: +// +// len(mockedRateBroadcaster.ShutdownCalls()) +func (mock *RateBroadcasterMock) ShutdownCalls() []struct { +} { + var calls []struct { + } + mock.lockShutdown.RLock() + calls = mock.calls.Shutdown + mock.lockShutdown.RUnlock() + return calls +} + +// Start calls StartFunc. +func (mock *RateBroadcasterMock) Start() error { + if mock.StartFunc == nil { + panic("RateBroadcasterMock.StartFunc: method is nil but RateBroadcaster.Start was just called") + } + callInfo := struct { + }{} + mock.lockStart.Lock() + mock.calls.Start = append(mock.calls.Start, callInfo) + mock.lockStart.Unlock() + return mock.StartFunc() +} + +// StartCalls gets all the calls that were made to Start. +// Check the length with: +// +// len(mockedRateBroadcaster.StartCalls()) +func (mock *RateBroadcasterMock) StartCalls() []struct { +} { + var calls []struct { + } + mock.lockStart.RLock() + calls = mock.calls.Start + mock.lockStart.RUnlock() + return calls +} + +// Wait calls WaitFunc. +func (mock *RateBroadcasterMock) Wait() { + if mock.WaitFunc == nil { + panic("RateBroadcasterMock.WaitFunc: method is nil but RateBroadcaster.Wait was just called") + } + callInfo := struct { + }{} + mock.lockWait.Lock() + mock.calls.Wait = append(mock.calls.Wait, callInfo) + mock.lockWait.Unlock() + mock.WaitFunc() +} + +// WaitCalls gets all the calls that were made to Wait. +// Check the length with: +// +// len(mockedRateBroadcaster.WaitCalls()) +func (mock *RateBroadcasterMock) WaitCalls() []struct { +} { + var calls []struct { + } + mock.lockWait.RLock() + calls = mock.calls.Wait + mock.lockWait.RUnlock() + return calls +} diff --git a/internal/broadcaster/multi_rate_broadcaster.go b/internal/broadcaster/multi_rate_broadcaster.go index 9ac568c52..97806af05 100644 --- a/internal/broadcaster/multi_rate_broadcaster.go +++ b/internal/broadcaster/multi_rate_broadcaster.go @@ -20,19 +20,31 @@ type RateBroadcaster interface { } type MultiKeyRateBroadcaster struct { - rbs []RateBroadcaster - logger *slog.Logger - target int64 - cancelAll context.CancelFunc - ctx context.Context - wg sync.WaitGroup + rbs []RateBroadcaster + logger *slog.Logger + target int64 + cancelAll context.CancelFunc + ctx context.Context + wg sync.WaitGroup + logInterval time.Duration } -func NewMultiKeyRateBroadcaster(logger *slog.Logger, rbs []RateBroadcaster) *MultiKeyRateBroadcaster { +func WithLogInterval(d time.Duration) func(*MultiKeyRateBroadcaster) { + return func(p *MultiKeyRateBroadcaster) { + p.logInterval = d + } +} + +func NewMultiKeyRateBroadcaster(logger *slog.Logger, rbs []RateBroadcaster, opts ...func(client *MultiKeyRateBroadcaster)) *MultiKeyRateBroadcaster { mrb := &MultiKeyRateBroadcaster{ - rbs: rbs, - logger: logger, + rbs: rbs, + logger: logger, + logInterval: 2 * time.Second, + } + + for _, opt := range opts { + opt(mrb) } ctx, cancelAll := context.WithCancel(context.Background()) @@ -71,7 +83,7 @@ func (mrb *MultiKeyRateBroadcaster) Shutdown() { func (mrb *MultiKeyRateBroadcaster) logStats() { mrb.wg.Add(1) - logStatsTicker := time.NewTicker(2 * time.Second) + logStatsTicker := time.NewTicker(mrb.logInterval) go func() { defer mrb.wg.Done() diff --git a/internal/broadcaster/multi_rate_broadcaster_test.go b/internal/broadcaster/multi_rate_broadcaster_test.go new file mode 100644 index 000000000..300dfddd0 --- /dev/null +++ b/internal/broadcaster/multi_rate_broadcaster_test.go @@ -0,0 +1,77 @@ +package broadcaster_test + +import ( + "errors" + "github.com/stretchr/testify/require" + "log/slog" + "os" + "testing" + "time" + + "github.com/bitcoin-sv/arc/internal/broadcaster" + "github.com/bitcoin-sv/arc/internal/broadcaster/mocks" +) + +func TestMultiRateBroadcasterStart(t *testing.T) { + + tt := []struct { + name string + startErr error + + expectedErrorStr string + }{ + { + name: "start and shutdown", + }, + { + name: "start and shutdown", + startErr: errors.New("failed to start"), + + expectedErrorStr: "failed to start", + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + cs := []broadcaster.RateBroadcaster{ + &mocks.RateBroadcasterMock{ + StartFunc: func() error { return nil }, + WaitFunc: func() {}, + ShutdownFunc: func() {}, + GetTxCountFunc: func() int64 { return 5 }, + GetConnectionCountFunc: func() int64 { return 2 }, + GetLimitFunc: func() int64 { return 100 }, + GetKeyNameFunc: func() string { return "key-01" }, + GetUtxoSetLenFunc: func() int { return 1000 }, + }, + &mocks.RateBroadcasterMock{ + StartFunc: func() error { return tc.startErr }, + WaitFunc: func() {}, + ShutdownFunc: func() {}, + GetTxCountFunc: func() int64 { return 10 }, + GetConnectionCountFunc: func() int64 { return 1 }, + GetLimitFunc: func() int64 { return 200 }, + GetKeyNameFunc: func() string { return "key-02" }, + GetUtxoSetLenFunc: func() int { return 1000 }, + }, + } + + mcs := broadcaster.NewMultiKeyRateBroadcaster(logger, cs, broadcaster.WithLogInterval(20*time.Millisecond)) + err := mcs.Start() + + if tc.expectedErrorStr != "" || err != nil { + require.ErrorContains(t, err, tc.expectedErrorStr) + return + } else { + require.NoError(t, err) + } + + time.Sleep(50 * time.Millisecond) + mcs.Shutdown() + }) + } + +} From aa3e3445fbc8f1a11d9832ddad26df80ecb89bee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Thu, 15 Aug 2024 11:45:44 +0200 Subject: [PATCH 6/9] Target is atomic --- internal/broadcaster/multi_rate_broadcaster.go | 12 +++++++----- internal/broadcaster/multi_rate_broadcaster_test.go | 4 ++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/internal/broadcaster/multi_rate_broadcaster.go b/internal/broadcaster/multi_rate_broadcaster.go index 97806af05..2c7667a0d 100644 --- a/internal/broadcaster/multi_rate_broadcaster.go +++ b/internal/broadcaster/multi_rate_broadcaster.go @@ -5,6 +5,7 @@ import ( "log/slog" "math" "sync" + "sync/atomic" "time" ) @@ -41,6 +42,7 @@ func NewMultiKeyRateBroadcaster(logger *slog.Logger, rbs []RateBroadcaster, opts rbs: rbs, logger: logger, logInterval: 2 * time.Second, + target: 0, } for _, opt := range opts { @@ -56,9 +58,10 @@ func NewMultiKeyRateBroadcaster(logger *slog.Logger, rbs []RateBroadcaster, opts func (mrb *MultiKeyRateBroadcaster) Start() error { mrb.logStats() - mrb.target = 0 for _, rb := range mrb.rbs { err := rb.Start() + + atomic.AddInt64(&mrb.target, rb.GetLimit()) if err != nil { return err } @@ -96,14 +99,13 @@ func (mrb *MultiKeyRateBroadcaster) logStats() { for _, rb := range mrb.rbs { totalTxsCount += rb.GetTxCount() totalConnectionCount += rb.GetConnectionCount() - mrb.target += rb.GetLimit() logArgs = append(logArgs, slog.Int(rb.GetKeyName(), rb.GetUtxoSetLen())) } - + target := atomic.LoadInt64(&mrb.target) mrb.logger.Info("stats", slog.Int64("txs", totalTxsCount), - slog.Int64("target", mrb.target), - slog.Float64("percentage", roundFloat(float64(totalTxsCount)/float64(mrb.target)*100, 2)), + slog.Int64("target", target), + slog.Float64("percentage", roundFloat(float64(totalTxsCount)/float64(target)*100, 2)), slog.Int64("connections", totalConnectionCount), "utxos", logArgs, ) diff --git a/internal/broadcaster/multi_rate_broadcaster_test.go b/internal/broadcaster/multi_rate_broadcaster_test.go index 300dfddd0..6376e78d5 100644 --- a/internal/broadcaster/multi_rate_broadcaster_test.go +++ b/internal/broadcaster/multi_rate_broadcaster_test.go @@ -24,7 +24,7 @@ func TestMultiRateBroadcasterStart(t *testing.T) { name: "start and shutdown", }, { - name: "start and shutdown", + name: "error - failed to start", startErr: errors.New("failed to start"), expectedErrorStr: "failed to start", @@ -61,6 +61,7 @@ func TestMultiRateBroadcasterStart(t *testing.T) { mcs := broadcaster.NewMultiKeyRateBroadcaster(logger, cs, broadcaster.WithLogInterval(20*time.Millisecond)) err := mcs.Start() + defer mcs.Shutdown() if tc.expectedErrorStr != "" || err != nil { require.ErrorContains(t, err, tc.expectedErrorStr) @@ -70,7 +71,6 @@ func TestMultiRateBroadcasterStart(t *testing.T) { } time.Sleep(50 * time.Millisecond) - mcs.Shutdown() }) } From f46ec9758715e767b630fc61f190b371ad3e7b53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Thu, 15 Aug 2024 11:54:11 +0200 Subject: [PATCH 7/9] Log after reading config --- cmd/broadcaster-cli/app/root.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/broadcaster-cli/app/root.go b/cmd/broadcaster-cli/app/root.go index 544a7ecdb..65dc127e9 100644 --- a/cmd/broadcaster-cli/app/root.go +++ b/cmd/broadcaster-cli/app/root.go @@ -62,16 +62,16 @@ func init() { viper.SetConfigName("broadcaster-cli") } - if viper.ConfigFileUsed() != "" { - logger.Info("Config file used", slog.String("filename", viper.ConfigFileUsed())) - } - err = viper.ReadInConfig() if err != nil { logger.Error("failed to read config file", slog.String("err", err.Error())) os.Exit(1) } + if viper.ConfigFileUsed() != "" { + logger.Info("Config file used", slog.String("filename", viper.ConfigFileUsed())) + } + RootCmd.AddCommand(keyset.Cmd) RootCmd.AddCommand(utxos.Cmd) } From 96eadde65734e52b0e0b9dc7ee464effdc151946 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Thu, 15 Aug 2024 12:17:31 +0200 Subject: [PATCH 8/9] Use logger instead of fmt printer --- cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go b/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go index 034cc28e2..3a03ac622 100644 --- a/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go +++ b/cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go @@ -157,7 +157,9 @@ var Cmd = &cobra.Command{ // If an interrupt signal is received logger.Info("Shutdown signal received. Shutting down the rate broadcaster.") case err := <-doneChan: - logger.Error("Error during broadcasting", slog.String("err", err.Error())) + if err != nil { + logger.Error("Error during broadcasting", slog.String("err", err.Error())) + } } // Shutdown the broadcaster in all cases From 8ac83566657eb30ef036e2f6ca4a223bb7538670 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Thu, 15 Aug 2024 12:20:53 +0200 Subject: [PATCH 9/9] Log total utxo set length instead of per rate broadcaster --- internal/broadcaster/multi_rate_broadcaster.go | 9 +++++---- internal/broadcaster/multi_rate_broadcaster_test.go | 2 -- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/internal/broadcaster/multi_rate_broadcaster.go b/internal/broadcaster/multi_rate_broadcaster.go index 2c7667a0d..117450ac0 100644 --- a/internal/broadcaster/multi_rate_broadcaster.go +++ b/internal/broadcaster/multi_rate_broadcaster.go @@ -95,19 +95,20 @@ func (mrb *MultiKeyRateBroadcaster) logStats() { case <-logStatsTicker.C: totalTxsCount := int64(0) totalConnectionCount := int64(0) - var logArgs []slog.Attr + totalUtxoSetLength := 0 + for _, rb := range mrb.rbs { totalTxsCount += rb.GetTxCount() totalConnectionCount += rb.GetConnectionCount() - logArgs = append(logArgs, slog.Int(rb.GetKeyName(), rb.GetUtxoSetLen())) + totalUtxoSetLength += rb.GetUtxoSetLen() } target := atomic.LoadInt64(&mrb.target) mrb.logger.Info("stats", slog.Int64("txs", totalTxsCount), slog.Int64("target", target), - slog.Float64("percentage", roundFloat(float64(totalTxsCount)/float64(target)*100, 2)), + slog.Float64("percentage", roundFloat(float64(totalTxsCount)/float64(mrb.target)*100, 2)), slog.Int64("connections", totalConnectionCount), - "utxos", logArgs, + slog.Int("utxos", totalUtxoSetLength), ) case <-mrb.ctx.Done(): return diff --git a/internal/broadcaster/multi_rate_broadcaster_test.go b/internal/broadcaster/multi_rate_broadcaster_test.go index 6376e78d5..29ced82e1 100644 --- a/internal/broadcaster/multi_rate_broadcaster_test.go +++ b/internal/broadcaster/multi_rate_broadcaster_test.go @@ -44,7 +44,6 @@ func TestMultiRateBroadcasterStart(t *testing.T) { GetTxCountFunc: func() int64 { return 5 }, GetConnectionCountFunc: func() int64 { return 2 }, GetLimitFunc: func() int64 { return 100 }, - GetKeyNameFunc: func() string { return "key-01" }, GetUtxoSetLenFunc: func() int { return 1000 }, }, &mocks.RateBroadcasterMock{ @@ -54,7 +53,6 @@ func TestMultiRateBroadcasterStart(t *testing.T) { GetTxCountFunc: func() int64 { return 10 }, GetConnectionCountFunc: func() int64 { return 1 }, GetLimitFunc: func() int64 { return 200 }, - GetKeyNameFunc: func() string { return "key-02" }, GetUtxoSetLenFunc: func() int { return 1000 }, }, }