Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disable HandleTransactionAnnouncement #529

Closed
wants to merge 66 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
daa8471
disable HandleTransactionAnnouncement
shotasilagadzetaal Aug 1, 2024
3fe065d
add buffer to chan
shotasilagadzetaal Aug 9, 2024
3109d50
revert
shotasilagadzetaal Aug 9, 2024
bd9aff3
remove handler
shotasilagadzetaal Aug 9, 2024
56fdd42
not requested
shotasilagadzetaal Aug 9, 2024
80fdd43
remove test
shotasilagadzetaal Aug 9, 2024
78f4495
change expected status
shotasilagadzetaal Aug 9, 2024
f86c786
not requested
shotasilagadzetaal Aug 9, 2024
f213e35
fix test'
shotasilagadzetaal Aug 12, 2024
7aeb52a
fix test
shotasilagadzetaal Aug 12, 2024
f57b226
fix test
shotasilagadzetaal Aug 12, 2024
87084dd
fix test
shotasilagadzetaal Aug 12, 2024
9b9ed3f
fix test
shotasilagadzetaal Aug 12, 2024
6f3f467
fix test
shotasilagadzetaal Aug 12, 2024
f81dd40
fix test
shotasilagadzetaal Aug 12, 2024
25e9ede
fix test
shotasilagadzetaal Aug 12, 2024
1cf6098
fix test
shotasilagadzetaal Aug 12, 2024
c976396
fix test
shotasilagadzetaal Aug 12, 2024
5a04485
fix test
shotasilagadzetaal Aug 12, 2024
b2c5c0a
fix test
shotasilagadzetaal Aug 12, 2024
9f37b42
fix test
shotasilagadzetaal Aug 12, 2024
217f41d
fix test
shotasilagadzetaal Aug 12, 2024
3ecc27e
fix test
shotasilagadzetaal Aug 12, 2024
cd9db53
fix test
shotasilagadzetaal Aug 12, 2024
a1d17f9
fix test
shotasilagadzetaal Aug 12, 2024
d8eb773
fix test
shotasilagadzetaal Aug 12, 2024
b9d974a
fix test
shotasilagadzetaal Aug 12, 2024
a6dab6c
fix test
shotasilagadzetaal Aug 12, 2024
8820273
fix test
shotasilagadzetaal Aug 12, 2024
f51f4ec
fix test
shotasilagadzetaal Aug 12, 2024
87413df
fix test
shotasilagadzetaal Aug 12, 2024
07db225
fix test
shotasilagadzetaal Aug 12, 2024
f0e8c29
fix test
shotasilagadzetaal Aug 12, 2024
50c9987
fix test
shotasilagadzetaal Aug 12, 2024
e0464e5
fix test
shotasilagadzetaal Aug 12, 2024
1a6cc93
fix test
shotasilagadzetaal Aug 12, 2024
bc8368c
fix test
shotasilagadzetaal Aug 12, 2024
0c81a11
fix test
shotasilagadzetaal Aug 12, 2024
bf4e845
fix test
shotasilagadzetaal Aug 12, 2024
5bba1c8
fix test
shotasilagadzetaal Aug 12, 2024
2753989
add requesting transaction after 2 seconds from announcement
shotasilagadzetaal Aug 13, 2024
f2f11cd
fix conflict
shotasilagadzetaal Aug 13, 2024
79a8234
remove test files
shotasilagadzetaal Aug 13, 2024
30fb6cd
fix import
shotasilagadzetaal Aug 13, 2024
3f16d93
add logs
shotasilagadzetaal Aug 13, 2024
ff44acc
fix log
shotasilagadzetaal Aug 13, 2024
06cbe81
fix log
shotasilagadzetaal Aug 13, 2024
17f0632
fix log
shotasilagadzetaal Aug 13, 2024
409f1be
fix requesting
shotasilagadzetaal Aug 13, 2024
de86379
fix requesting
shotasilagadzetaal Aug 13, 2024
9f16312
fix requesting
shotasilagadzetaal Aug 13, 2024
fb5608d
remove test
shotasilagadzetaal Aug 13, 2024
6a762eb
simply loop
shotasilagadzetaal Aug 13, 2024
d2d59ed
add test
shotasilagadzetaal Aug 13, 2024
a30ca52
add test
shotasilagadzetaal Aug 13, 2024
d7017a5
add test
shotasilagadzetaal Aug 13, 2024
5a3edeb
add test
shotasilagadzetaal Aug 13, 2024
cc279d5
add test
shotasilagadzetaal Aug 13, 2024
3af7bef
add test
shotasilagadzetaal Aug 13, 2024
1980a83
Merge branch 'main' into disable-handle-transaction-announce
shotasilagadzetaal Aug 15, 2024
84d76e5
fix stylish error
shotasilagadzetaal Aug 15, 2024
7194db1
add log
shotasilagadzetaal Aug 15, 2024
025fc1e
add log
shotasilagadzetaal Aug 15, 2024
98541bb
address comments
shotasilagadzetaal Aug 15, 2024
310aa12
address comments
shotasilagadzetaal Aug 15, 2024
312ae08
fix compilation
shotasilagadzetaal Aug 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/arc/services/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package cmd
import (
"context"
"fmt"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_core"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_jetstream"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/nats_connection"
"log/slog"
"net"
"net/http"
Expand All @@ -14,6 +11,10 @@ import (
"strconv"
"time"

"github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_core"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_jetstream"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/nats_connection"

"github.com/bitcoin-sv/arc/config"
"github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api"
"github.com/bitcoin-sv/arc/internal/metamorph"
Expand Down
9 changes: 0 additions & 9 deletions internal/metamorph/peer_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,6 @@ func (m *PeerHandler) HandleTransactionSent(msg *wire.MsgTx, peer p2p.PeerI) err

// HandleTransactionAnnouncement is a message sent to the PeerHandler when a transaction INV message is received from a peer.
func (m *PeerHandler) HandleTransactionAnnouncement(msg *wire.InvVect, peer p2p.PeerI) error {
select {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment why we do not handle INV messages

case m.messageCh <- &PeerTxMessage{
Hash: &msg.Hash,
Status: metamorph_api.Status_SEEN_ON_NETWORK,
Peer: peer.String(),
}:
default: // Ensure that writing to channel is non-blocking
}

return nil
}

Expand Down
25 changes: 0 additions & 25 deletions internal/metamorph/peer_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,31 +53,6 @@ func TestPeerHandler(t *testing.T) {
}
})

t.Run("HandleTransactionAnnouncement", func(t *testing.T) {
hash, err := chainhash.NewHashFromStr("1234")
require.NoError(t, err)

msgInv := wire.NewInvVect(wire.InvTypeBlock, hash)
require.NoError(t, err)

expectedMsg := &metamorph.PeerTxMessage{
Hash: &msgInv.Hash,
Status: metamorph_api.Status_SEEN_ON_NETWORK,
Peer: "mock_peer",
}

go func() {
_ = peerHandler.HandleTransactionAnnouncement(msgInv, peer)
}()

select {
case msg := <-messageCh:
assert.Equal(t, expectedMsg, msg)
case <-time.After(time.Second):
t.Fatal("test timed out or error while executing goroutine")
}
})

t.Run("HandleTransactionRejection", func(t *testing.T) {
msgReject := wire.NewMsgReject("command", wire.RejectMalformed, "malformed")

Expand Down
61 changes: 56 additions & 5 deletions internal/metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ type Processor struct {

processMinedInterval time.Duration
processMinedBatchSize int

orderedDnnouncedTransactions []AnnouncedTransaction
announcedTransactionsLock sync.Mutex
}

type AnnouncedTransaction struct {
second uint64
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
second uint64
timestamp uint64

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not actually timestamp, it's simply number of seconds.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the number of seconds since the Unix epoch. It is used to determine if an object is older than a specific moment in time. Seems like a timestamp to me

hash *chainhash.Hash
}

type Option func(f *Processor)
Expand Down Expand Up @@ -120,8 +128,9 @@ func NewProcessor(s store.MetamorphStore, pm p2p.PeerManagerI, statusMessageChan
maxRetries: maxRetriesDefault,
minimumHealthyConnections: minimumHealthyConnectionsDefault,

responseProcessor: NewResponseProcessor(),
statusMessageCh: statusMessageChannel,
responseProcessor: NewResponseProcessor(),
statusMessageCh: statusMessageChannel,
orderedDnnouncedTransactions: make([]AnnouncedTransaction, 0),

processExpiredTxsInterval: unseenTransactionRebroadcastingInterval,
processSeenOnNetworkTxsInterval: seenOnNetworkTransactionRequestingInterval,
Expand Down Expand Up @@ -197,6 +206,7 @@ func (p *Processor) Start() error {
p.StartProcessExpiredTransactions()
p.StartRequestingSeenOnNetworkTxs()
p.StartProcessStatusUpdatesInStorage()
p.StartCheckingTransactionsInNetwork()
p.StartProcessMinedCallbacks()
err = p.StartCollectStats()
if err != nil {
Expand Down Expand Up @@ -384,6 +394,38 @@ func (p *Processor) StartSendStatusUpdate() {
}()
}

func (p *Processor) StartCheckingTransactionsInNetwork() {
p.waitGroup.Add(1)
ticker := time.NewTicker(100 * time.Millisecond)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open question: Is it necessary to lock the announcedTransactions collection every 100 milliseconds? A better approach might be to increase the interval to 0.5-1 second and decrease the delta between the current time and the transaction announcement time. WDYT?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly not sure what we need here exactly. Or in other words what's the best time interval to wait before requesting transaction.

go func() {
defer p.waitGroup.Done()
for {
select {
case <-p.ctx.Done():
return

case <-ticker.C:
p.announcedTransactionsLock.Lock()
for k := 0; k < len(p.orderedDnnouncedTransactions); k++ {
if p.orderedDnnouncedTransactions[k].second < uint64(p.now().Unix())-1 {
p.logger.Info("requested transaction", slog.String("hash", p.orderedDnnouncedTransactions[k].hash.String()))
p.pm.RequestTransaction((p.orderedDnnouncedTransactions[k].hash))
if k == len(p.orderedDnnouncedTransactions)-1 {
p.orderedDnnouncedTransactions = []AnnouncedTransaction{}
break
}
} else {
p.orderedDnnouncedTransactions = p.orderedDnnouncedTransactions[k:]
break
}
}
p.announcedTransactionsLock.Unlock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reset the ticker at the end

ticker.Reset(100 * time.Millisecond)
}
}
}()
}

func (p *Processor) StartProcessStatusUpdatesInStorage() {
ticker := time.NewTicker(p.processStatusUpdatesInterval)
p.waitGroup.Add(1)
Expand Down Expand Up @@ -678,9 +720,6 @@ func (p *Processor) ProcessTransaction(ctx context.Context, req *ProcessorReques
Status: metamorph_api.Status_STORED,
})

// Send GETDATA to peers to see if they have it
p.pm.RequestTransaction(req.Data.Hash)

// Announce transaction to network peers
p.logger.Debug("announcing transaction", slog.String("hash", req.Data.Hash.String()))
peers := p.pm.AnnounceTransaction(req.Data.Hash, nil)
Expand All @@ -689,6 +728,9 @@ func (p *Processor) ProcessTransaction(ctx context.Context, req *ProcessorReques
return
}

// will be requesting transaction after ~2 seconds to get SEEN_ON_NETWORK status
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This operation may be blocking. Can we move it to the end of the processing? Does it make sense?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean blocking? It's simple mutex lock nothing complicated here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but we lock the same collection in StartCheckingTransactionsInNetwork

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry still confused. Yes we lock it in StartCheckingTransactionsInNetwork too but why is it a problem?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's assume we have heavy traffic and receive a lot of transactions.

  1. StartCheckingTransactionsInNetwork locks the announcedTransaction collection and starts sending messages to the node.
  2. Many transactions are sent to ARC and announced to the network.
  3. The transactions are waiting for the announcedTransaction collection to be released.

In this case, isn't it a problem that we are delaying the processing of transactions (status updates, responses to the client) until the transactions can be added to the collection?

p.DeferRequestTransaction(req.Data.Hash)

// update status in response
statusResponse.UpdateStatus(StatusAndError{
Status: metamorph_api.Status_ANNOUNCED_TO_NETWORK,
Expand All @@ -704,6 +746,15 @@ func (p *Processor) ProcessTransaction(ctx context.Context, req *ProcessorReques
p.responseProcessor.Add(statusResponse)
}

func (p *Processor) DeferRequestTransaction(txHash *chainhash.Hash) {
p.announcedTransactionsLock.Lock()
p.orderedDnnouncedTransactions = append(p.orderedDnnouncedTransactions, AnnouncedTransaction{
second: uint64(p.now().Unix()),
hash: txHash,
})
p.announcedTransactionsLock.Unlock()
}

func (p *Processor) ProcessTransactions(sReq []*store.StoreData) {
// store in database
err := p.store.SetBulk(p.ctx, sReq)
Expand Down
46 changes: 45 additions & 1 deletion internal/metamorph/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestProcessTransaction(t *testing.T) {
expectedResponseMapItems: 0,
expectedSetCalls: 1,
expectedAnnounceCalls: 1,
expectedRequestCalls: 1,
expectedRequestCalls: 0,
},
{
name: "record found",
Expand Down Expand Up @@ -943,6 +943,50 @@ func TestProcessorHealth(t *testing.T) {
}
}

func TestStartCheckingTransactionsInNetwork(t *testing.T) {
metamorphStore := &storeMocks.MetamorphStoreMock{
GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) {
return &store.StoreData{Hash: testdata.TX2Hash}, nil
},
SetUnlockedByNameFunc: func(ctx context.Context, lockedBy string) (int64, error) { return 0, nil },
GetUnminedFunc: func(ctx context.Context, since time.Time, limit int64, offset int64) ([]*store.StoreData, error) {
return nil, nil
},
IncrementRetriesFunc: func(ctx context.Context, hash *chainhash.Hash) error {
return nil
},
}
pm := &mocks.PeerManagerMock{
RequestTransactionFunc: func(txHash *chainhash.Hash) p2p.PeerI {
return nil
},
ShutdownFunc: func() {},
}

publisher := &mocks.MessageQueueClientMock{
PublishFunc: func(topic string, hash []byte) error {
return nil
},
}

processor, err := metamorph.NewProcessor(metamorphStore, pm, nil, metamorph.WithMessageQueueClient(publisher), metamorph.WithProcessExpiredTxsInterval(time.Millisecond*20), metamorph.WithMaxRetries(10), metamorph.WithNow(func() time.Time {
return time.Date(2033, 1, 1, 1, 0, 0, 0, time.UTC)
}))
require.NoError(t, err)
defer processor.Shutdown()

processor.StartCheckingTransactionsInNetwork()

require.Equal(t, 0, processor.GetProcessorMapSize())

processor.DeferRequestTransaction(testdata.TX2Hash)

time.Sleep(1 * time.Second)
require.Equal(t, 0, len(pm.RequestTransactionCalls()))
time.Sleep(2 * time.Second)
require.Equal(t, 1, len(pm.RequestTransactionCalls()))
}

func TestStart(t *testing.T) {
tt := []struct {
name string
Expand Down
Loading