diff --git a/interactors/errors.go b/interactors/errors.go index b5c52912..8faebebb 100644 --- a/interactors/errors.go +++ b/interactors/errors.go @@ -34,3 +34,6 @@ var ErrTxWithSameNonceAndGasPriceAlreadySent = errors.New("transaction with the // ErrGapNonce signals that a gap nonce between the lowest nonce of the transactions from the cache and the blockchain nonce has been detected var ErrGapNonce = errors.New("gap nonce detected") + +// ErrWorkerClosed signals that the worker is closed +var ErrWorkerClosed = errors.New("worker closed") diff --git a/interactors/interface.go b/interactors/interface.go index 560ae1f8..33c59a83 100644 --- a/interactors/interface.go +++ b/interactors/interface.go @@ -4,6 +4,7 @@ import ( "context" "github.com/multiversx/mx-chain-core-go/data/transaction" + "github.com/multiversx/mx-sdk-go/core" "github.com/multiversx/mx-sdk-go/data" ) @@ -40,6 +41,14 @@ type AddressNonceHandler interface { IsInterfaceNil() bool } +// AddressNonceHandlerV3 defines the component able to handler address nonces +type AddressNonceHandlerV3 interface { + ApplyNonceAndGasPrice(ctx context.Context, tx ...*transaction.FrontendTransaction) error + SendTransaction(ctx context.Context, tx *transaction.FrontendTransaction) (string, error) + IsInterfaceNil() bool + Close() +} + // TransactionNonceHandlerV1 defines the component able to manage transaction nonces type TransactionNonceHandlerV1 interface { GetNonce(ctx context.Context, address core.AddressHandler) (uint64, error) @@ -49,10 +58,18 @@ type TransactionNonceHandlerV1 interface { IsInterfaceNil() bool } -// TransactionNonceHandlerV2 defines the component able to apply nonce for a given frontend transaction +// TransactionNonceHandlerV2 defines the component able to apply nonce for a given frontend transaction. type TransactionNonceHandlerV2 interface { ApplyNonceAndGasPrice(ctx context.Context, address core.AddressHandler, tx *transaction.FrontendTransaction) error SendTransaction(ctx context.Context, tx *transaction.FrontendTransaction) (string, error) Close() error IsInterfaceNil() bool } + +// TransactionNonceHandlerV3 defines the component able to apply nonce for a given frontend transaction. +type TransactionNonceHandlerV3 interface { + ApplyNonceAndGasPrice(ctx context.Context, tx ...*transaction.FrontendTransaction) error + SendTransactions(ctx context.Context, txs ...*transaction.FrontendTransaction) ([]string, error) + Close() + IsInterfaceNil() bool +} diff --git a/interactors/nonceHandlerV1/addressNonceHandler.go b/interactors/nonceHandlerV1/addressNonceHandler.go index f087412e..5f62ab00 100644 --- a/interactors/nonceHandlerV1/addressNonceHandler.go +++ b/interactors/nonceHandlerV1/addressNonceHandler.go @@ -7,6 +7,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/data/transaction" + sdkCore "github.com/multiversx/mx-sdk-go/core" "github.com/multiversx/mx-sdk-go/interactors" ) diff --git a/interactors/nonceHandlerV1/nonceTransactionsHandler.go b/interactors/nonceHandlerV1/nonceTransactionsHandler.go index 1d397193..3ccfc4e8 100644 --- a/interactors/nonceHandlerV1/nonceTransactionsHandler.go +++ b/interactors/nonceHandlerV1/nonceTransactionsHandler.go @@ -9,6 +9,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data/transaction" logger "github.com/multiversx/mx-chain-logger-go" + "github.com/multiversx/mx-sdk-go/core" "github.com/multiversx/mx-sdk-go/data" "github.com/multiversx/mx-sdk-go/interactors" @@ -111,14 +112,12 @@ func (nth *nonceTransactionsHandlerV1) SendTransaction(ctx context.Context, tx * } func (nth *nonceTransactionsHandlerV1) resendTransactionsLoop(ctx context.Context, intervalToResend time.Duration) { - timer := time.NewTimer(intervalToResend) - defer timer.Stop() + ticker := time.NewTicker(intervalToResend) + defer ticker.Stop() for { - timer.Reset(intervalToResend) - select { - case <-timer.C: + case <-ticker.C: nth.resendTransactions(ctx) case <-ctx.Done(): log.Debug("finishing nonceTransactionsHandlerV1.resendTransactionsLoop...") diff --git a/interactors/nonceHandlerV2/addressNonceHandler.go b/interactors/nonceHandlerV2/addressNonceHandler.go index 5b69ee37..f9ba7181 100644 --- a/interactors/nonceHandlerV2/addressNonceHandler.go +++ b/interactors/nonceHandlerV2/addressNonceHandler.go @@ -7,6 +7,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data/transaction" + sdkCore "github.com/multiversx/mx-sdk-go/core" "github.com/multiversx/mx-sdk-go/interactors" ) diff --git a/interactors/nonceHandlerV2/nonceTransactionsHandler.go b/interactors/nonceHandlerV2/nonceTransactionsHandler.go index 297fc78d..1f488430 100644 --- a/interactors/nonceHandlerV2/nonceTransactionsHandler.go +++ b/interactors/nonceHandlerV2/nonceTransactionsHandler.go @@ -9,6 +9,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data/transaction" logger "github.com/multiversx/mx-chain-logger-go" + "github.com/multiversx/mx-sdk-go/core" "github.com/multiversx/mx-sdk-go/data" "github.com/multiversx/mx-sdk-go/interactors" @@ -149,14 +150,12 @@ func (nth *nonceTransactionsHandlerV2) SendTransaction(ctx context.Context, tx * } func (nth *nonceTransactionsHandlerV2) resendTransactionsLoop(ctx context.Context) { - timer := time.NewTimer(nth.intervalToResend) - defer timer.Stop() + ticker := time.NewTicker(nth.intervalToResend) + defer ticker.Stop() for { - timer.Reset(nth.intervalToResend) - select { - case <-timer.C: + case <-ticker.C: nth.resendTransactions(ctx) case <-ctx.Done(): log.Debug("finishing nonceTransactionsHandlerV2.resendTransactionsLoop...") diff --git a/interactors/nonceHandlerV3/addressNonceHandler.go b/interactors/nonceHandlerV3/addressNonceHandler.go new file mode 100644 index 00000000..521e2ad5 --- /dev/null +++ b/interactors/nonceHandlerV3/addressNonceHandler.go @@ -0,0 +1,148 @@ +package nonceHandlerV3 + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/multiversx/mx-chain-core-go/core" + "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-core-go/data/transaction" + + sdkCore "github.com/multiversx/mx-sdk-go/core" + "github.com/multiversx/mx-sdk-go/interactors" + "github.com/multiversx/mx-sdk-go/interactors/nonceHandlerV3/workers" +) + +// addressNonceHandler is the handler used for one address. It is able to handle the current +// nonce as max(current_stored_nonce, account_nonce). After each call of the getNonce function +// the current_stored_nonce is incremented. This will prevent "nonce too low in transaction" +// errors on the node interceptor. To prevent the "nonce too high in transaction" error, +// a retrial mechanism is implemented. This struct is able to store all sent transactions, +// having a function that sweeps the map in order to resend a transaction or remove them +// because they were executed. This struct is concurrent safe. +type addressNonceHandler struct { + mut sync.RWMutex + address sdkCore.AddressHandler + proxy interactors.Proxy + nonce int64 + gasPrice uint64 + transactionWorker *workers.TransactionWorker + cancelFunc func() +} + +// NewAddressNonceHandlerV3 returns a new instance of a addressNonceHandler +func NewAddressNonceHandlerV3(proxy interactors.Proxy, address sdkCore.AddressHandler, intervalToSend time.Duration) (*addressNonceHandler, error) { + if check.IfNil(proxy) { + return nil, interactors.ErrNilProxy + } + if check.IfNil(address) { + return nil, interactors.ErrNilAddress + } + + ctx, cancelFunc := context.WithCancel(context.Background()) + + anh := &addressNonceHandler{ + mut: sync.RWMutex{}, + address: address, + nonce: -1, + proxy: proxy, + transactionWorker: workers.NewTransactionWorker(ctx, proxy, intervalToSend), + cancelFunc: cancelFunc, + } + + return anh, nil +} + +// ApplyNonceAndGasPrice will apply the computed nonce to the given FrontendTransaction +func (anh *addressNonceHandler) ApplyNonceAndGasPrice(ctx context.Context, txs ...*transaction.FrontendTransaction) error { + for _, tx := range txs { + nonce, err := anh.computeNonce(ctx) + if err != nil { + //anh.mut.Unlock() + return fmt.Errorf("failed to fetch nonce: %w", err) + } + tx.Nonce = uint64(nonce) + + anh.applyGasPriceIfRequired(ctx, tx) + } + + return nil +} + +// SendTransaction will save and propagate a transaction to the network +func (anh *addressNonceHandler) SendTransaction(ctx context.Context, tx *transaction.FrontendTransaction) (string, error) { + ch := anh.transactionWorker.AddTransaction(tx) + + select { + case response := <-ch: + anh.adaptNonceBasedOnResponse(response) + + return response.TxHash, response.Error + + case <-ctx.Done(): + return "", ctx.Err() + } +} + +func (anh *addressNonceHandler) adaptNonceBasedOnResponse(response *workers.TransactionResponse) { + anh.mut.Lock() + defer anh.mut.Unlock() + + // if the response did not contain any errors, increase the cached nonce. + if response.Error == nil { + anh.nonce++ + return + } + // we invalidate the cache if there was an error sending the transaction. + anh.nonce = -1 +} + +// IsInterfaceNil returns true if there is no value under the interface +func (anh *addressNonceHandler) IsInterfaceNil() bool { + return anh == nil +} + +// Close will cancel all related processes.. +func (anh *addressNonceHandler) Close() { + anh.cancelFunc() +} + +func (anh *addressNonceHandler) applyGasPriceIfRequired(ctx context.Context, tx *transaction.FrontendTransaction) { + anh.mut.RLock() + gasPrice := anh.gasPrice + anh.mut.RUnlock() + + if gasPrice == 0 { + networkConfig, err := anh.proxy.GetNetworkConfig(ctx) + + if err != nil { + log.Error("%w: while fetching network config", err) + } + + gasPrice = networkConfig.MinGasPrice + } + anh.mut.Lock() + defer anh.mut.Unlock() + anh.gasPrice = gasPrice + tx.GasPrice = core.MaxUint64(gasPrice, tx.GasPrice) +} + +func (anh *addressNonceHandler) computeNonce(ctx context.Context) (int64, error) { + // if it is the first time applying nonces to this address, or if the cache was invalidated it will try to fetch + // the nonce from the chain. + anh.mut.Lock() + defer anh.mut.Unlock() + + if anh.nonce == -1 { + account, err := anh.proxy.GetAccount(ctx, anh.address) + if err != nil { + return -1, fmt.Errorf("failed to fetch nonce: %w", err) + } + anh.nonce = int64(account.Nonce) + } else { + anh.nonce++ + } + return anh.nonce, nil +} diff --git a/interactors/nonceHandlerV3/nonceTransactionsHandler.go b/interactors/nonceHandlerV3/nonceTransactionsHandler.go new file mode 100644 index 00000000..3c485d66 --- /dev/null +++ b/interactors/nonceHandlerV3/nonceTransactionsHandler.go @@ -0,0 +1,195 @@ +package nonceHandlerV3 + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-core-go/data/transaction" + logger "github.com/multiversx/mx-chain-logger-go" + "golang.org/x/sync/errgroup" + + "github.com/multiversx/mx-sdk-go/core" + "github.com/multiversx/mx-sdk-go/data" + "github.com/multiversx/mx-sdk-go/interactors" +) + +const minimumIntervalToResend = 1 * time.Millisecond + +var log = logger.GetOrCreate("mx-sdk-go/interactors/nonceHandlerV3") + +// ArgsNonceTransactionsHandlerV3 is the argument DTO for a nonce workers handler component +type ArgsNonceTransactionsHandlerV3 struct { + Proxy interactors.Proxy + IntervalToSend time.Duration +} + +// nonceTransactionsHandlerV3 is the handler used for an unlimited number of addresses. +// It basically contains a map of addressNonceHandler, creating new entries on the first +// access of a provided address. This struct delegates all the operations on the right +// instance of addressNonceHandler. It also starts a go routine that will periodically +// try to resend "stuck workers" and to clean the inner state. The recommended resend +// interval is 1 minute. The Close method should be called whenever the current instance of +// nonceTransactionsHandlerV3 should be terminated and collected by the GC. +// This struct is concurrent safe. +type nonceTransactionsHandlerV3 struct { + proxy interactors.Proxy + mutHandlers sync.RWMutex + handlers map[string]interactors.AddressNonceHandlerV3 + intervalToSend time.Duration +} + +// NewNonceTransactionHandlerV3 will create a new instance of the nonceTransactionsHandlerV3. It requires a Proxy implementation +// and an interval at which the workers sent are rechecked and eventually, resent. +func NewNonceTransactionHandlerV3(args ArgsNonceTransactionsHandlerV3) (*nonceTransactionsHandlerV3, error) { + if check.IfNil(args.Proxy) { + return nil, interactors.ErrNilProxy + } + if args.IntervalToSend < minimumIntervalToResend { + return nil, fmt.Errorf("%w for intervalToSend in NewNonceTransactionHandlerV2", interactors.ErrInvalidValue) + } + + nth := &nonceTransactionsHandlerV3{ + proxy: args.Proxy, + handlers: make(map[string]interactors.AddressNonceHandlerV3), + intervalToSend: args.IntervalToSend, + } + + return nth, nil +} + +// ApplyNonceAndGasPrice will apply the nonce to the given frontend transaction +func (nth *nonceTransactionsHandlerV3) ApplyNonceAndGasPrice(ctx context.Context, tx ...*transaction.FrontendTransaction) error { + if tx == nil { + return interactors.ErrNilTransaction + } + + mapAddressTransactions := nth.filterTransactionsBySenderAddress(tx) + + for addressRawString, transactions := range mapAddressTransactions { + address, err := data.NewAddressFromBech32String(addressRawString) + if err != nil { + return err + } + anh, err := nth.getOrCreateAddressNonceHandler(address) + if err != nil { + return err + } + + err = anh.ApplyNonceAndGasPrice(ctx, transactions...) + if err != nil { + return err + } + } + + return nil +} + +func (nth *nonceTransactionsHandlerV3) getOrCreateAddressNonceHandler(address core.AddressHandler) (interactors.AddressNonceHandlerV3, error) { + anh := nth.getAddressNonceHandler(address) + if !check.IfNil(anh) { + return anh, nil + } + + return nth.createAddressNonceHandler(address) +} + +func (nth *nonceTransactionsHandlerV3) getAddressNonceHandler(address core.AddressHandler) interactors.AddressNonceHandlerV3 { + nth.mutHandlers.RLock() + defer nth.mutHandlers.RUnlock() + + anh, found := nth.handlers[string(address.AddressBytes())] + if found { + return anh + } + return nil +} + +func (nth *nonceTransactionsHandlerV3) createAddressNonceHandler(address core.AddressHandler) (interactors.AddressNonceHandlerV3, error) { + nth.mutHandlers.Lock() + defer nth.mutHandlers.Unlock() + + addressAsString := string(address.AddressBytes()) + anh, found := nth.handlers[addressAsString] + if found { + return anh, nil + } + + anh, err := NewAddressNonceHandlerV3(nth.proxy, address, nth.intervalToSend) + if err != nil { + return nil, err + } + nth.handlers[addressAsString] = anh + + return anh, nil +} + +func (nth *nonceTransactionsHandlerV3) filterTransactionsBySenderAddress(transactions []*transaction.FrontendTransaction) map[string][]*transaction.FrontendTransaction { + filterMap := make(map[string][]*transaction.FrontendTransaction) + for _, tx := range transactions { + transactionsPerAddress, ok := filterMap[tx.Sender] + if !ok { + transactionsPerAddress = make([]*transaction.FrontendTransaction, 0) + } + filterMap[tx.Sender] = append(transactionsPerAddress, tx) + } + + return filterMap +} + +// SendTransactions will store and send the provided transaction +func (nth *nonceTransactionsHandlerV3) SendTransactions(ctx context.Context, txs ...*transaction.FrontendTransaction) ([]string, error) { + g, ctx := errgroup.WithContext(ctx) + sentHashes := make([]string, len(txs)) + for i, tx := range txs { + if tx == nil { + return nil, interactors.ErrNilTransaction + } + + // Work with a full copy of the provided transaction so the provided one can change without affecting this component. + // Abnormal and unpredictable behaviors due to the resending mechanism are prevented this way + txCopy := *tx + + addrAsBech32 := txCopy.Sender + address, err := data.NewAddressFromBech32String(addrAsBech32) + if err != nil { + return nil, fmt.Errorf("%w while creating address handler for string %s", err, addrAsBech32) + } + + anh, err := nth.getOrCreateAddressNonceHandler(address) + if err != nil { + return nil, err + } + + idx := i + g.Go(func() error { + sentHash, errSend := anh.SendTransaction(ctx, &txCopy) + if errSend != nil { + return fmt.Errorf("%w while sending transaction for address %s", errSend, addrAsBech32) + } + + sentHashes[idx] = sentHash + return nil + }) + } + + err := g.Wait() + + return sentHashes, err +} + +// Close will cancel all related processes. +func (nth *nonceTransactionsHandlerV3) Close() { + nth.mutHandlers.RLock() + defer nth.mutHandlers.RUnlock() + for _, handler := range nth.handlers { + handler.Close() + } +} + +// IsInterfaceNil returns true if there is no value under the interface +func (nth *nonceTransactionsHandlerV3) IsInterfaceNil() bool { + return nth == nil +} diff --git a/interactors/nonceHandlerV3/nonceTransactionsHandler_test.go b/interactors/nonceHandlerV3/nonceTransactionsHandler_test.go new file mode 100644 index 00000000..e96742d5 --- /dev/null +++ b/interactors/nonceHandlerV3/nonceTransactionsHandler_test.go @@ -0,0 +1,384 @@ +package nonceHandlerV3 + +import ( + "context" + "errors" + "sort" + "strconv" + "sync" + "testing" + "time" + + "github.com/multiversx/mx-chain-core-go/data/transaction" + "github.com/stretchr/testify/require" + + "github.com/multiversx/mx-sdk-go/core" + "github.com/multiversx/mx-sdk-go/data" + "github.com/multiversx/mx-sdk-go/testsCommon" +) + +var testAddressAsBech32String = "erd1zptg3eu7uw0qvzhnu009lwxupcn6ntjxptj5gaxt8curhxjqr9tsqpsnht" + +func TestSendTransactionsOneByOne(t *testing.T) { + t.Parallel() + + var getAccountCalled bool + + // Since the endpoint to send workers for the nonce-management-service has the same definition as the one + // in the gateway, we can create a proxy instance that points towards the nonce-management-service instead. + // The nonce-management-service will then, in turn send the workers to the gateway. + transactionHandler, err := NewNonceTransactionHandlerV3(createMockArgsNonceTransactionsHandlerV3(&getAccountCalled)) + require.NoError(t, err, "failed to create transaction handler") + + var txs []*transaction.FrontendTransaction + + for i := 0; i < 100; i++ { + tx := &transaction.FrontendTransaction{ + Sender: testAddressAsBech32String, + Receiver: testAddressAsBech32String, + GasLimit: 50000, + ChainID: "T", + Value: "5000000000000000000", + Nonce: uint64(i), + GasPrice: 1000000000, + Version: 2, + } + txs = append(txs, tx) + } + + err = transactionHandler.ApplyNonceAndGasPrice(context.Background(), txs...) + require.NoError(t, err, "failed to apply nonce") + require.True(t, getAccountCalled, "get account was not called") + + var wg sync.WaitGroup + for _, tt := range txs { + wg.Add(1) + go func(tt *transaction.FrontendTransaction) { + defer wg.Done() + h, err := transactionHandler.SendTransactions(context.Background(), tt) + require.NoError(t, err, "failed to send transaction") + require.Equal(t, []string{strconv.FormatUint(tt.Nonce, 10)}, h) + }(tt) + } + wg.Wait() +} + +func TestSendTransactionsBulk(t *testing.T) { + t.Parallel() + + var getAccountCalled bool + + // Since the endpoint to send workers for the nonce-management-service has the same definition as the one + // in the gateway, we can create a proxy instance that points towards the nonce-management-service instead. + // The nonce-management-service will then, in turn send the workers to the gateway. + transactionHandler, err := NewNonceTransactionHandlerV3(createMockArgsNonceTransactionsHandlerV3(&getAccountCalled)) + require.NoError(t, err, "failed to create transaction handler") + + var txs []*transaction.FrontendTransaction + + for i := 0; i < 100; i++ { + tx := &transaction.FrontendTransaction{ + Sender: testAddressAsBech32String, + Receiver: testAddressAsBech32String, + GasLimit: 50000, + ChainID: "T", + Value: "5000000000000000000", + Nonce: uint64(i), + GasPrice: 1000000000, + Version: 2, + } + txs = append(txs, tx) + } + + err = transactionHandler.ApplyNonceAndGasPrice(context.Background(), txs...) + require.NoError(t, err, "failed to apply nonce") + require.True(t, getAccountCalled, "get account was not called") + + txHashes, err := transactionHandler.SendTransactions(context.Background(), txs...) + require.NoError(t, err, "failed to send transactions as bulk") + require.Equal(t, mockedStrings(100), txHashes) +} + +func TestSendTransactionsCloseInstant(t *testing.T) { + t.Parallel() + + var getAccountCalled bool + + // Since the endpoint to send workers for the nonce-management-service has the same definition as the one + // in the gateway, we can create a proxy instance that points towards the nonce-management-service instead. + // The nonce-management-service will then, in turn send the workers to the gateway. + transactionHandler, err := NewNonceTransactionHandlerV3(createMockArgsNonceTransactionsHandlerV3(&getAccountCalled)) + require.NoError(t, err, "failed to create transaction handler") + + var txs []*transaction.FrontendTransaction + + // Create 1k transactions. + for i := 0; i < 100; i++ { + tx := &transaction.FrontendTransaction{ + Sender: testAddressAsBech32String, + Receiver: testAddressAsBech32String, + GasLimit: 50000, + ChainID: "T", + Value: "5000000000000000000", + Nonce: uint64(i), + GasPrice: 1000000000, + Version: 2, + } + txs = append(txs, tx) + } + + // Apply nonce to them in a bulk. + err = transactionHandler.ApplyNonceAndGasPrice(context.Background(), txs...) + require.NoError(t, err, "failed to apply nonce") + + // We only do this once, we check if the bool has been modified. + require.True(t, getAccountCalled, "get account was not called") + + var wg sync.WaitGroup + wg.Add(1) + go func() { + // make sure that the Close function is called before the send function + time.Sleep(time.Second) + + hashes, errSend := transactionHandler.SendTransactions(context.Background(), txs...) + + var counter int + // Since the close is almost instant there should be none or very few transactions that have been processed. + for _, h := range hashes { + if h != "" { + counter++ + } + } + + require.Equal(t, 0, counter) + require.NotNil(t, errSend) + wg.Done() + }() + + // Close the processes related to the transaction handler. + transactionHandler.Close() + + wg.Wait() + require.NoError(t, err, "failed to send transactions as bulk") +} + +func TestSendTransactionsCloseDelay(t *testing.T) { + t.Parallel() + + var getAccountCalled bool + + // Create another proxyStub that adds some delay when sending transactions. + mockArgs := ArgsNonceTransactionsHandlerV3{ + Proxy: &testsCommon.ProxyStub{ + SendTransactionCalled: func(tx *transaction.FrontendTransaction) (string, error) { + // Presume this operation is taking roughly 100 ms. Meaning 10 operations / second. + time.Sleep(100 * time.Millisecond) + return strconv.FormatUint(tx.Nonce, 10), nil + }, + GetAccountCalled: func(address core.AddressHandler) (*data.Account, error) { + getAccountCalled = true + return &data.Account{}, nil + }, + }, + IntervalToSend: time.Second * 5, + } + + // Since the endpoint to send workers for the nonce-management-service has the same definition as the one + // in the gateway, we can create a proxy instance that points towards the nonce-management-service instead. + // The nonce-management-service will then, in turn send the workers to the gateway. + transactionHandler, err := NewNonceTransactionHandlerV3(mockArgs) + require.NoError(t, err, "failed to create transaction handler") + + var txs []*transaction.FrontendTransaction + + // Create 1k transactions. + for i := 0; i < 100; i++ { + tx := &transaction.FrontendTransaction{ + Sender: testAddressAsBech32String, + Receiver: testAddressAsBech32String, + GasLimit: 50000, + ChainID: "T", + Value: "5000000000000000000", + Nonce: uint64(i), + GasPrice: 1000000000, + Version: 2, + } + txs = append(txs, tx) + } + + // Apply nonce to them in a bulk. + err = transactionHandler.ApplyNonceAndGasPrice(context.Background(), txs...) + require.NoError(t, err, "failed to apply nonce") + + // We only do this once, we check if the bool has been modified. + require.True(t, getAccountCalled, "get account was not called") + + var wg sync.WaitGroup + wg.Add(1) + go func() { + hashes, errSend := transactionHandler.SendTransactions(context.Background(), txs...) + + // Since the close is not instant. There should be some hashes that have been processed. + require.NotEmpty(t, hashes, "no transaction should be processed") + require.Equal(t, "context canceled while sending transaction for address erd1zptg3eu7uw0qvzhnu009lwxupcn6ntjxptj5gaxt8curhxjqr9tsqpsnht", errSend.Error()) + wg.Done() + }() + + // Close the processes related to the transaction handler with a delay. + time.AfterFunc(2*time.Second, func() { + transactionHandler.Close() + }) + + wg.Wait() + require.NoError(t, err, "failed to send transactions as bulk") +} + +func TestApplyNonceAndGasPriceConcurrently(t *testing.T) { + t.Parallel() + + var getAccountCalled bool + + transactionHandler, err := NewNonceTransactionHandlerV3(createMockArgsNonceTransactionsHandlerV3(&getAccountCalled)) + require.NoError(t, err, "failed to create transaction handler") + + var txs []*transaction.FrontendTransaction + + for i := 0; i < 100; i++ { + tx := &transaction.FrontendTransaction{ + Sender: testAddressAsBech32String, + Receiver: testAddressAsBech32String, + GasLimit: 50000, + ChainID: "T", + Value: "5000000000000000000", + Nonce: uint64(i), + GasPrice: 1000000000, + Version: 2, + } + txs = append(txs, tx) + } + + var wg sync.WaitGroup + + // we apply the nonce on the initial transaction list in batches of 20. in order to test that the nonce handler is + // able to do it concurrently providing unique nonces for every transaction. + indices := []int{0, 19, 39, 59, 79, 99} + for i := 0; i < len(indices)-1; i++ { + wg.Add(1) + beginIdx := indices[i] + endIdx := indices[i+1] + go func() { + defer wg.Done() + err := transactionHandler.ApplyNonceAndGasPrice(context.Background(), txs[beginIdx:endIdx]...) + require.Nil(t, err, "error should be nil") + }() + } + wg.Wait() + + // since we applied the nonces concurrently, the slice won't have all of them in order. therefore we sort them + // before comparing them to the expected output. + sort.SliceStable(txs, func(i, j int) bool { + return txs[i].Nonce < txs[j].Nonce + }) + mockedNonces := mockedStrings(100) + for idx := range txs { + mockNonce, _ := strconv.ParseUint(mockedNonces[idx], 10, 64) + require.Equal(t, mockNonce, txs[idx].Nonce) + } +} + +func TestSendDuplicateNonces(t *testing.T) { + t.Parallel() + + var getAccountCalled bool + + transactionHandler, err := NewNonceTransactionHandlerV3(createMockArgsNonceTransactionsHandlerV3(&getAccountCalled)) + require.NoError(t, err, "failed to create transaction handler") + + tx := &transaction.FrontendTransaction{ + Sender: testAddressAsBech32String, + Receiver: testAddressAsBech32String, + GasLimit: 50000, + ChainID: "T", + Value: "5000000000000000000", + Nonce: 0, + GasPrice: 1000000000, + Version: 2, + } + + wg := sync.WaitGroup{} + var errCount int + var sentCount int + + for i := 0; i < 2; i++ { + wg.Add(1) + go func() { + defer wg.Done() + hashes, sendErr := transactionHandler.SendTransactions(context.Background(), tx) + if sendErr != nil { + errCount++ + } + + if hashes[0] != "" { + sentCount++ + } + }() + } + wg.Wait() + + require.Equal(t, 1, errCount) + require.Equal(t, 1, sentCount) +} + +func TestSendDuplicateNoncesBatch(t *testing.T) { + t.Parallel() + + var getAccountCalled bool + + transactionHandler, err := NewNonceTransactionHandlerV3(createMockArgsNonceTransactionsHandlerV3(&getAccountCalled)) + require.NoError(t, err, "failed to create transaction handler") + + txs := make([]*transaction.FrontendTransaction, 0) + for i := 0; i < 100; i++ { + tx := &transaction.FrontendTransaction{ + Sender: testAddressAsBech32String, + Receiver: testAddressAsBech32String, + GasLimit: 50000, + ChainID: "T", + Value: "5000000000000000000", + Nonce: 0, + GasPrice: 1000000000, + Version: 2, + } + txs = append(txs, tx) + } + + hashes, err := transactionHandler.SendTransactions(context.Background(), txs...) + for _, h := range hashes { + require.Equal(t, "", h, "a transaction has been sent") + } + require.Error(t, errors.New("transaction with nonce: 0 has already been scheduled to send while sending transaction for address erd1zptg3eu7uw0qvzhnu009lwxupcn6ntjxptj5gaxt8curhxjqr9tsqpsnht"), err) +} + +func createMockArgsNonceTransactionsHandlerV3(getAccountCalled *bool) ArgsNonceTransactionsHandlerV3 { + return ArgsNonceTransactionsHandlerV3{ + Proxy: &testsCommon.ProxyStub{ + SendTransactionCalled: func(tx *transaction.FrontendTransaction) (string, error) { + return strconv.FormatUint(tx.Nonce, 10), nil + }, + GetAccountCalled: func(address core.AddressHandler) (*data.Account, error) { + *getAccountCalled = true + return &data.Account{}, nil + }, + }, + IntervalToSend: time.Millisecond * 1, + } +} + +func mockedStrings(index int) []string { + mock := make([]string, index) + for i := 0; i < index; i++ { + mock[i] = strconv.Itoa(i) + } + + return mock +} diff --git a/interactors/nonceHandlerV3/workers/transactionWorker.go b/interactors/nonceHandlerV3/workers/transactionWorker.go new file mode 100644 index 00000000..b9b2d237 --- /dev/null +++ b/interactors/nonceHandlerV3/workers/transactionWorker.go @@ -0,0 +1,186 @@ +package workers + +import ( + "container/heap" + "context" + "fmt" + "sync" + "time" + + "github.com/multiversx/mx-chain-core-go/data/transaction" + logger "github.com/multiversx/mx-chain-logger-go" + + "github.com/multiversx/mx-sdk-go/interactors" +) + +var log = logger.GetOrCreate("mx-sdk-go/interactors/workers/transactionWorker") + +// TransactionResponse wraps the results provided by the endpoint which will send the transaction in a struct. +type TransactionResponse struct { + TxHash string + Error error +} + +// TransactionQueueItem is a wrapper struct on the transaction itself that is used to encapsulate transactions in +// the priority queue. +type TransactionQueueItem struct { + tx *transaction.FrontendTransaction + index int +} + +// A transactionQueue implements heap.Interface and holds Items. Acts like a priority queue. +type transactionQueue []*TransactionQueueItem + +// Push required by the heap.Interface +func (tq *transactionQueue) Push(x any) { + n := len(*tq) + item := x.(*TransactionQueueItem) + item.index = n + *tq = append(*tq, item) +} + +// Pop required by the heap.Interface +func (tq *transactionQueue) Pop() any { + old := *tq + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + item.index = -1 // for safety + *tq = old[0 : n-1] + return item +} + +// Len required by sort.Interface +func (tq transactionQueue) Len() int { + return len(tq) +} + +// Swap required by the sort.Interface +func (tq transactionQueue) Swap(a, b int) { + tq[a], tq[b] = tq[b], tq[a] + tq[a].index = a + tq[b].index = b +} + +// Less required by the sort.Interface +// Meaning that in the heap, the transaction with the lowest nonce has priority. +func (tq transactionQueue) Less(a, b int) bool { + return tq[a].tx.Nonce < tq[b].tx.Nonce +} + +// TransactionWorker handles all transaction stored inside a priority queue. The priority is given by the nonce, meaning +// that transactions with lower nonce will be sent first. +type TransactionWorker struct { + mu sync.Mutex + tq transactionQueue + + workerClosed bool + proxy interactors.Proxy + responsesChannels map[uint64]chan *TransactionResponse +} + +// NewTransactionWorker creates a new instance of TransactionWorker. +func NewTransactionWorker(context context.Context, proxy interactors.Proxy, intervalToSend time.Duration) *TransactionWorker { + tw := &TransactionWorker{ + mu: sync.Mutex{}, + tq: make(transactionQueue, 0), + proxy: proxy, + responsesChannels: make(map[uint64]chan *TransactionResponse), + } + heap.Init(&tw.tq) + + tw.start(context, intervalToSend) + return tw +} + +// AddTransaction will add a transaction to the priority queue (heap) and will create a channel where the promised result +// will be broadcast on. +func (tw *TransactionWorker) AddTransaction(transaction *transaction.FrontendTransaction) <-chan *TransactionResponse { + tw.mu.Lock() + defer tw.mu.Unlock() + + r := make(chan *TransactionResponse, 1) + if tw.workerClosed { + r <- &TransactionResponse{TxHash: "", Error: interactors.ErrWorkerClosed} + return r + } + + // check if a tx with the same nonce is currently being sent. + if _, ok := tw.responsesChannels[transaction.Nonce]; ok { + r <- &TransactionResponse{TxHash: "", Error: fmt.Errorf("transaction with nonce:"+ + " %d has already been scheduled to send", transaction.Nonce)} + return r + } + + tw.responsesChannels[transaction.Nonce] = r + heap.Push(&tw.tq, &TransactionQueueItem{tx: transaction}) + return r +} + +// start will spawn a goroutine tasked with iterating all the transactions inside the priority queue. The priority is +// given by the nonce, meaning that transaction with lower nonce will be sent first. +// All these transactions are send with an interval between them. +func (tw *TransactionWorker) start(ctx context.Context, intervalToSend time.Duration) { + ticker := time.NewTicker(intervalToSend) + + go func() { + for { + select { + case <-ctx.Done(): + log.Info("context cancelled - transaction worker has stopped") + tw.closeAllChannels(ctx) + return + case <-ticker.C: + tw.processNextTransaction(ctx) + } + } + }() +} + +func (tw *TransactionWorker) processNextTransaction(ctx context.Context) { + tx := tw.nextTransaction() + if tx == nil { + return + } + + // Retrieve channel where the response will be broadcast on. + r := tw.retrieveChannel(tx.Nonce) + + // Send the transaction and forward the response on the channel promised. + txHash, err := tw.proxy.SendTransaction(ctx, tx) + r <- &TransactionResponse{TxHash: txHash, Error: err} +} + +// nextTransaction will return the transaction stored in the priority queue (heap) with the lowest nonce. +// If there aren't any transaction, the result will be nil. +func (tw *TransactionWorker) nextTransaction() *transaction.FrontendTransaction { + tw.mu.Lock() + defer tw.mu.Unlock() + if tw.tq.Len() == 0 { + return nil + } + + nextTransaction := heap.Pop(&tw.tq) + return nextTransaction.(*TransactionQueueItem).tx +} + +func (tw *TransactionWorker) closeAllChannels(ctx context.Context) { + tw.mu.Lock() + defer tw.mu.Unlock() + for _, ch := range tw.responsesChannels { + ch <- &TransactionResponse{TxHash: "", Error: ctx.Err()} + } + tw.workerClosed = true +} + +func (tw *TransactionWorker) retrieveChannel(nonce uint64) chan *TransactionResponse { + // We retrieve the channel where we will send the response. + // Everytime a transaction is added to the queue, such a channel is created and placed in a map. + // After retrieving it, delete the entry from the map that stores all of them. + tw.mu.Lock() + r := tw.responsesChannels[nonce] + delete(tw.responsesChannels, nonce) + tw.mu.Unlock() + + return r +} diff --git a/interactors/nonceHandlerV3/workers/transactionWorker_test.go b/interactors/nonceHandlerV3/workers/transactionWorker_test.go new file mode 100644 index 00000000..2821b61c --- /dev/null +++ b/interactors/nonceHandlerV3/workers/transactionWorker_test.go @@ -0,0 +1,76 @@ +package workers + +import ( + "context" + "strconv" + "sync" + "testing" + "time" + + "github.com/multiversx/mx-chain-core-go/data/transaction" + "github.com/stretchr/testify/require" + + "github.com/multiversx/mx-sdk-go/testsCommon" +) + +func TestTransactionWorker_AddTransaction(t *testing.T) { + t.Parallel() + sortedNonces := []uint64{1, 7, 8, 10, 13, 91, 99} + proxy := &testsCommon.ProxyStub{ + SendTransactionCalled: func(tx *transaction.FrontendTransaction) (string, error) { + return strconv.FormatUint(tx.Nonce, 10), nil + }, + } + + w := NewTransactionWorker(context.Background(), proxy, 2*time.Second) + + responseChannels := make([]<-chan *TransactionResponse, 7) + + // We add roughly at the same time un-ordered transactions. + responseChannels[5] = w.AddTransaction(&transaction.FrontendTransaction{Nonce: 91}) + responseChannels[0] = w.AddTransaction(&transaction.FrontendTransaction{Nonce: 1}) + responseChannels[4] = w.AddTransaction(&transaction.FrontendTransaction{Nonce: 13}) + responseChannels[3] = w.AddTransaction(&transaction.FrontendTransaction{Nonce: 10}) + responseChannels[6] = w.AddTransaction(&transaction.FrontendTransaction{Nonce: 99}) + responseChannels[2] = w.AddTransaction(&transaction.FrontendTransaction{Nonce: 8}) + responseChannels[1] = w.AddTransaction(&transaction.FrontendTransaction{Nonce: 7}) + + // Verify that the results come in ordered. + for i, n := range sortedNonces { + require.Equal(t, &TransactionResponse{TxHash: strconv.FormatUint(n, 10), Error: nil}, <-responseChannels[i]) + } +} + +func TestTransactionWorker_AddTransactionWithLowerNonceAfter(t *testing.T) { + t.Parallel() + nonces := []uint64{10, 11, 9} + proxy := &testsCommon.ProxyStub{ + SendTransactionCalled: func(tx *transaction.FrontendTransaction) (string, error) { + return strconv.FormatUint(tx.Nonce, 10), nil + }, + } + + w := NewTransactionWorker(context.Background(), proxy, 1*time.Second) + + // We add two ordered by nonce transactions roughly at the same time. + r1 := w.AddTransaction(&transaction.FrontendTransaction{Nonce: nonces[0]}) + r2 := w.AddTransaction(&transaction.FrontendTransaction{Nonce: nonces[1]}) + + // We add another transaction with a lower nonce after a while + var wg sync.WaitGroup + r3 := make(<-chan *TransactionResponse, 1) + wg.Add(1) + time.AfterFunc(2*time.Second, func() { + r3 = w.AddTransaction(&transaction.FrontendTransaction{Nonce: nonces[2]}) + wg.Done() + }) + + // Verify that the transactions have been processed in the right order. + require.Equal(t, &TransactionResponse{TxHash: strconv.FormatUint(nonces[0], 10), Error: nil}, <-r1) + require.Equal(t, &TransactionResponse{TxHash: strconv.FormatUint(nonces[1], 10), Error: nil}, <-r2) + + // Wait for the scheduled transaction to finish. After that we verify that the transaction it has been processed. + // Even though the nonce was lower than the first two. + wg.Wait() + require.Equal(t, &TransactionResponse{TxHash: strconv.FormatUint(nonces[2], 10), Error: nil}, <-r3) +}