Skip to content

Commit

Permalink
Refactor/broadcaster cli broadcast (#555)
Browse files Browse the repository at this point in the history
Refactor multi rate broadcaster
  • Loading branch information
boecklim authored Aug 15, 2024
1 parent 2c990cc commit 059edf8
Show file tree
Hide file tree
Showing 11 changed files with 548 additions and 80 deletions.
5 changes: 4 additions & 1 deletion cmd/broadcaster-cli/app/keyset/address/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ var (
return err
}

for name, keySet := range keySets {
names := helper.GetOrderedKeys(keySets)

for _, name := range names {
keySet := keySets[name]

logger.Info("address", slog.String("name", name), slog.String("address", keySet.Address(!isTestnet)), slog.String("key", keySet.GetMaster().String()))
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/broadcaster-cli/app/keyset/balance/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ var Cmd = &cobra.Command{
return err
}

for name, keySet := range keySets {
names := helper.GetOrderedKeys(keySets)

for _, name := range names {
keySet := keySets[name]
if wocApiKey == "" {
time.Sleep(500 * time.Millisecond)
}
Expand Down
7 changes: 6 additions & 1 deletion cmd/broadcaster-cli/app/keyset/utxos/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package utxos
import (
"context"
"errors"
"github.com/bitcoin-sv/arc/cmd/broadcaster-cli/helper"
"log/slog"
"sort"
"strconv"
Expand All @@ -25,7 +26,11 @@ func getUtxosTable(ctx context.Context, logger *slog.Logger, keySets map[string]
columns := make([][]row, len(keySets))
maxRowNr := 0
counter := 0
for name, ks := range keySets {
names := helper.GetOrderedKeys(keySets)

for _, name := range names {

ks := keySets[name]
utxos, err := wocClient.GetUTXOsWithRetries(ctx, ks.Script, ks.Address(!isTestnet), 1*time.Second, 5)
if err != nil {
if errors.Is(err, context.Canceled) {
Expand Down
8 changes: 4 additions & 4 deletions cmd/broadcaster-cli/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@ func init() {
viper.SetConfigName("broadcaster-cli")
}

if viper.ConfigFileUsed() != "" {
logger.Info("Config file used", slog.String("filename", viper.ConfigFileUsed()))
}

err = viper.ReadInConfig()
if err != nil {
logger.Error("failed to read config file", slog.String("err", err.Error()))
os.Exit(1)
}

if viper.ConfigFileUsed() != "" {
logger.Info("Config file used", slog.String("filename", viper.ConfigFileUsed()))
}

RootCmd.AddCommand(keyset.Cmd)
RootCmd.AddCommand(utxos.Cmd)
}
Expand Down
24 changes: 14 additions & 10 deletions cmd/broadcaster-cli/app/utxos/broadcast/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,38 +129,42 @@ var Cmd = &cobra.Command{
counter++
}

rateBroadcaster, err := broadcaster.NewMultiKeyRateBroadcaster(logger, client, ks, wocClient, isTestnet, opts...)
if err != nil {
return fmt.Errorf("failed to create rate broadcaster: %v", err)
rbs := make([]broadcaster.RateBroadcaster, 0, len(keySets))
for keyName, ks := range keySets {
rb, err := broadcaster.NewRateBroadcaster(logger, client, ks, wocClient, isTestnet, keyName, rateTxsPerSecond, limit, opts...)
if err != nil {
return err
}

rbs = append(rbs, rb)
}

rateBroadcaster := broadcaster.NewMultiKeyRateBroadcaster(logger, rbs)

doneChan := make(chan error) // Channel to signal the completion of Start
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt) // Listen for Ctrl+C

go func() {
// Start the broadcasting process
err := rateBroadcaster.Start(rateTxsPerSecond, limit)
err := rateBroadcaster.Start()
logger.Info("Starting broadcaster", slog.Int("rate [txs/s]", rateTxsPerSecond), slog.Int("batch size", batchSize))
doneChan <- err // Send the completion or error signal
}()

select {
case <-signalChan:
// If an interrupt signal is received
fmt.Println("Shutdown signal received. Shutting down the rate broadcaster.")
logger.Info("Shutdown signal received. Shutting down the rate broadcaster.")
case err := <-doneChan:
// Or wait for the normal completion
if err != nil {
fmt.Printf("Error during broadcasting: %v\n", err)
} else {
fmt.Println("Broadcasting completed successfully.")
logger.Error("Error during broadcasting", slog.String("err", err.Error()))
}
}

// Shutdown the broadcaster in all cases
rateBroadcaster.Shutdown()
fmt.Println("Broadcaster shutdown complete.")
logger.Info("Broadcasting shutdown complete")
return nil
},
}
Expand Down
13 changes: 13 additions & 0 deletions cmd/broadcaster-cli/helper/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"log/slog"
"os"
"sort"
"strconv"
"strings"

Expand Down Expand Up @@ -204,3 +205,15 @@ func GetKeySets() (map[string]*keyset.KeySet, error) {

return GetKeySetsFor(keys, selectedKeys)
}

func GetOrderedKeys[T any](keysMap map[string]T) []string {

var keys []string

for key := range keysMap {
keys = append(keys, key)
}

sort.Strings(keys)
return keys
}
3 changes: 3 additions & 0 deletions internal/broadcaster/broadcaster_mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ package broadcaster

// from mutli_utxo_consolidator.go
//go:generate moq -pkg mocks -out ./mocks/consolidator_mock.go . Consolidator

// from multi_rate_broadcaster.go
//go:generate moq -pkg mocks -out ./mocks/rate_broadcaster_mock.go . RateBroadcaster
Loading

0 comments on commit 059edf8

Please sign in to comment.