From 08b17cb2af84f72128ee3f8ea359c54e36931904 Mon Sep 17 00:00:00 2001 From: Brian Stafford Date: Tue, 17 Sep 2024 15:20:18 -0500 Subject: [PATCH 1/2] match checksum to separate mutex and add dc cancel tracking --- client/core/core.go | 75 +++++++++++++++++++++++++--------- client/core/core_test.go | 87 ++++++++++++++++++++++++---------------- client/core/trade.go | 49 ++++++++++++++++------ 3 files changed, 146 insertions(+), 65 deletions(-) diff --git a/client/core/core.go b/client/core/core.go index b67d5c7ae3..67e2653e05 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -173,6 +173,10 @@ type dexConnection struct { // processed by a dex server. inFlightOrders map[uint64]*InFlightOrder + // A map linking cancel order IDs to trade order IDs. + cancelsMtx sync.RWMutex + cancels map[order.OrderID]order.OrderID + blindCancelsMtx sync.Mutex blindCancels map[order.OrderID]order.Preimage @@ -253,6 +257,25 @@ func (dc *dexConnection) bondAssets() (map[uint32]*BondAsset, uint64) { return bondAssets, cfg.BondExpiry } +func (dc *dexConnection) registerCancelLink(cid, oid order.OrderID) { + dc.cancelsMtx.Lock() + dc.cancels[cid] = oid + dc.cancelsMtx.Unlock() +} + +func (dc *dexConnection) deleteCancelLink(cid order.OrderID) { + dc.cancelsMtx.Lock() + delete(dc.cancels, cid) + dc.cancelsMtx.Unlock() +} + +func (dc *dexConnection) cancelTradeID(cid order.OrderID) (order.OrderID, bool) { + dc.cancelsMtx.RLock() + defer dc.cancelsMtx.RUnlock() + oid, found := dc.cancels[cid] + return oid, found +} + // marketConfig is the market's configuration, as returned by the server in the // 'config' response. func (dc *dexConnection) marketConfig(mktID string) *msgjson.Market { @@ -584,10 +607,12 @@ func (dc *dexConnection) findOrder(oid order.OrderID) (tracker *trackedTrade, pr if tracker, found := dc.trades[oid]; found { return tracker, tracker.preImg, false } - // Search the cancel order IDs. - for _, tracker := range dc.trades { - if tracker.cancel != nil && tracker.cancel.ID() == oid { - return tracker, tracker.cancel.preImg, true + + if tid, found := dc.cancelTradeID(oid); found { + if tracker, found := dc.trades[tid]; found { + return tracker, tracker.preImg, true + } else { + dc.log.Errorf("Did not find trade for cancel order ID %s", oid) } } return @@ -8087,6 +8112,7 @@ func (c *Core) newDEXConnection(acctInfo *db.AccountInfo, flag connectDEXFlag) ( ticker: newDexTicker(defaultTickInterval), // updated when server config obtained books: make(map[string]*bookie), trades: make(map[order.OrderID]*trackedTrade), + cancels: make(map[order.OrderID]order.OrderID), inFlightOrders: make(map[uint64]*InFlightOrder), blindCancels: make(map[order.OrderID]order.Preimage), apiVer: -1, @@ -8899,7 +8925,7 @@ func handlePreimageRequest(c *Core, dc *dexConnection, msg *msgjson.Message) err } if len(req.Commitment) != order.CommitmentSize { - return fmt.Errorf("received preimage request for %v with no corresponding order submission response.", oid) + return fmt.Errorf("received preimage request for %s with no corresponding order submission response", oid) } // See if we recognize that commitment, and if we do, just wait for the @@ -8992,15 +9018,14 @@ func acceptCsum(tracker *trackedTrade, isCancel bool, commitChecksum dex.Bytes) // Do not allow csum to be changed once it has been committed to // (initialized to something other than `nil`) because it is probably a // malicious behavior by the server. - tracker.mtx.Lock() - defer tracker.mtx.Unlock() - + tracker.csumMtx.Lock() + defer tracker.csumMtx.Unlock() if isCancel { - if tracker.cancel.csum == nil { - tracker.cancel.csum = commitChecksum + if tracker.cancelCsum == nil { + tracker.cancelCsum = commitChecksum return true } - return bytes.Equal(commitChecksum, tracker.cancel.csum) + return bytes.Equal(commitChecksum, tracker.cancelCsum) } if tracker.csum == nil { tracker.csum = commitChecksum @@ -10721,6 +10746,15 @@ func (c *Core) checkEpochResolution(host string, mktID string) { } currentEpoch := dc.marketEpoch(mktID, time.Now()) lastEpoch := currentEpoch - 1 + + // Short path if we're already resolved. + dc.epochMtx.RLock() + resolvedEpoch := dc.resolvedEpoch[mktID] + dc.epochMtx.RUnlock() + if lastEpoch == resolvedEpoch { + return + } + ts, inFlights := dc.marketTrades(mktID) for _, ord := range inFlights { if ord.Epoch == lastEpoch { @@ -10728,18 +10762,23 @@ func (c *Core) checkEpochResolution(host string, mktID string) { } } for _, t := range ts { + // Is this order from the last epoch and still not booked or executed? if t.epochIdx() == lastEpoch && t.status() == order.OrderStatusEpoch { return } - if t.cancel != nil && t.cancelEpochIdx() == lastEpoch { - t.mtx.RLock() - matched := t.cancel.matches.taker != nil - t.mtx.RUnlock() - if !matched { - return - } + // Does this order have an in-flight cancel order that is not yet + // resolved? + t.mtx.RLock() + unresolvedCancel := t.cancel != nil && t.cancelEpochIdx() == lastEpoch && t.cancel.matches.taker == nil + t.mtx.RUnlock() + if unresolvedCancel { + return } } + + // We don't have any unresolved orders or cancel orders from the last epoch. + // Just make sure that not other thread has resolved the epoch and then send + // the notification. dc.epochMtx.Lock() sendUpdate := lastEpoch > dc.resolvedEpoch[mktID] dc.resolvedEpoch[mktID] = lastEpoch diff --git a/client/core/core_test.go b/client/core/core_test.go index 8f6fa9c5b0..f1590a73b7 100644 --- a/client/core/core_test.go +++ b/client/core/core_test.go @@ -271,6 +271,7 @@ func testDexConnection(ctx context.Context, crypter *tCrypter) (*dexConnection, }, notify: func(Notification) {}, trades: make(map[order.OrderID]*trackedTrade), + cancels: make(map[order.OrderID]order.OrderID), inFlightOrders: make(map[uint64]*InFlightOrder), epoch: map[string]uint64{tDcrBtcMktName: 0}, resolvedEpoch: map[string]uint64{tDcrBtcMktName: 0}, @@ -3792,9 +3793,9 @@ func TestHandlePreimageRequest(t *testing.T) { // resetCsum resets csum for further preimage request since multiple // testing scenarios use the same tracker object. resetCsum := func(tracker *trackedTrade) { - tracker.mtx.Lock() + tracker.csumMtx.Lock() tracker.csum = nil - tracker.mtx.Unlock() + tracker.csumMtx.Unlock() } rig.dc.trades[oid] = tracker @@ -3923,15 +3924,17 @@ func TestHandlePreimageRequest(t *testing.T) { t.Fatal("no order note from preimage request handling") } - tracker.mtx.RLock() - if !bytes.Equal(commitCSum, tracker.csum) { + tracker.csumMtx.RLock() + csum := tracker.csum + tracker.csumMtx.RUnlock() + if !bytes.Equal(commitCSum, csum) { t.Fatalf( "handlePreimageRequest must initialize tracker csum, exp: %s, got: %s", commitCSum, - tracker.csum, + csum, ) } - tracker.mtx.RUnlock() + }) t.Run("more than one preimage request for order (different csums)", func(t *testing.T) { rig := newTestRig() @@ -3997,14 +4000,16 @@ func TestHandlePreimageRequest(t *testing.T) { } tracker.mtx.RLock() - if !bytes.Equal(firstCSum, tracker.csum) { + csum := tracker.csum + tracker.mtx.RUnlock() + if !bytes.Equal(firstCSum, csum) { t.Fatalf( "[handlePreimageRequest] csum was changed, exp: %s, got: %s", firstCSum, - tracker.csum, + csum, ) } - tracker.mtx.RUnlock() + }) t.Run("more than one preimage request for order (same csum)", func(t *testing.T) { rig := newTestRig() @@ -4066,14 +4071,15 @@ func TestHandlePreimageRequest(t *testing.T) { } tracker.mtx.RLock() - if !bytes.Equal(csum, tracker.csum) { + checkSum := tracker.csum + tracker.mtx.RUnlock() + if !bytes.Equal(csum, checkSum) { t.Fatalf( "[handlePreimageRequest] csum was changed, exp: %s, got: %s", csum, - tracker.csum, + checkSum, ) } - tracker.mtx.RUnlock() }) t.Run("csum for cancel order", func(t *testing.T) { rig := newTestRig() @@ -4104,7 +4110,8 @@ func TestHandlePreimageRequest(t *testing.T) { epochLen: mkt.EpochLen, }, } - oid := tracker.cancel.ID() + oid := tracker.ID() + cid := tracker.cancel.ID() // Test the new path with rig.core.sentCommits. readyCommitment := func(commit order.Commitment) chan struct{} { @@ -4119,7 +4126,7 @@ func TestHandlePreimageRequest(t *testing.T) { commitCSum := dex.Bytes{2, 3, 5, 7, 11, 13} commitSig := readyCommitment(commit) payload := &msgjson.PreimageRequest{ - OrderID: oid[:], + OrderID: cid[:], Commitment: commit[:], CommitChecksum: commitCSum, } @@ -4127,7 +4134,8 @@ func TestHandlePreimageRequest(t *testing.T) { notes := rig.core.NotificationFeed() - rig.dc.trades[order.OrderID{}] = tracker + rig.dc.trades[oid] = tracker + rig.dc.registerCancelLink(cid, oid) err := handlePreimageRequest(rig.core, rig.dc, reqCommit) if err != nil { t.Fatalf("handlePreimageRequest error: %v", err) @@ -4147,14 +4155,16 @@ func TestHandlePreimageRequest(t *testing.T) { } tracker.mtx.RLock() - if !bytes.Equal(commitCSum, tracker.cancel.csum) { + cancelCsum := tracker.cancelCsum + tracker.mtx.RUnlock() + if !bytes.Equal(commitCSum, cancelCsum) { t.Fatalf( "handlePreimageRequest must initialize tracker cancel csum, exp: %s, got: %s", commitCSum, - tracker.cancel.csum, + cancelCsum, ) } - tracker.mtx.RUnlock() + }) t.Run("more than one preimage request for cancel order (different csums)", func(t *testing.T) { rig := newTestRig() @@ -4171,9 +4181,9 @@ func TestHandlePreimageRequest(t *testing.T) { db: rig.db, dc: rig.dc, metaData: &db.OrderMetaData{}, + // Simulate first preimage request by initializing csum here. + cancelCsum: firstCSum, cancel: &trackedCancel{ - // Simulate first preimage request by initializing csum here. - csum: firstCSum, CancelOrder: order.CancelOrder{ P: order.Prefix{ AccountID: rig.dc.acct.ID(), @@ -4188,7 +4198,8 @@ func TestHandlePreimageRequest(t *testing.T) { epochLen: mkt.EpochLen, }, } - oid := tracker.cancel.ID() + oid := tracker.ID() + cid := tracker.cancel.ID() // Test the new path with rig.core.sentCommits. readyCommitment := func(commit order.Commitment) chan struct{} { @@ -4203,7 +4214,7 @@ func TestHandlePreimageRequest(t *testing.T) { secondCSum := dex.Bytes{2, 3, 5, 7, 11, 14} commitSig := readyCommitment(commit) payload := &msgjson.PreimageRequest{ - OrderID: oid[:], + OrderID: cid[:], Commitment: commit[:], CommitChecksum: secondCSum, } @@ -4214,7 +4225,8 @@ func TestHandlePreimageRequest(t *testing.T) { rig.ws.sendMsgErrChan = make(chan *msgjson.Error, 1) defer func() { rig.ws.sendMsgErrChan = nil }() - rig.dc.trades[order.OrderID{}] = tracker + rig.dc.trades[oid] = tracker + rig.dc.registerCancelLink(cid, oid) err := handlePreimageRequest(rig.core, rig.dc, reqCommit) if err != nil { t.Fatalf("handlePreimageRequest error: %v", err) @@ -4233,14 +4245,15 @@ func TestHandlePreimageRequest(t *testing.T) { t.Fatal("no msgjson.Error sent from preimage request handling") } tracker.mtx.RLock() - if !bytes.Equal(firstCSum, tracker.cancel.csum) { + cancelCsum := tracker.cancelCsum + tracker.mtx.RUnlock() + if !bytes.Equal(firstCSum, cancelCsum) { t.Fatalf( "[handlePreimageRequest] cancel csum was changed, exp: %s, got: %s", firstCSum, - tracker.cancel.csum, + cancelCsum, ) } - tracker.mtx.RUnlock() }) t.Run("more than one preimage request for cancel order (same csum)", func(t *testing.T) { rig := newTestRig() @@ -4257,9 +4270,9 @@ func TestHandlePreimageRequest(t *testing.T) { db: rig.db, dc: rig.dc, metaData: &db.OrderMetaData{}, + // Simulate first preimage request by initializing csum here. + cancelCsum: csum, cancel: &trackedCancel{ - // Simulate first preimage request by initializing csum here. - csum: csum, CancelOrder: order.CancelOrder{ P: order.Prefix{ AccountID: rig.dc.acct.ID(), @@ -4274,7 +4287,8 @@ func TestHandlePreimageRequest(t *testing.T) { epochLen: mkt.EpochLen, }, } - oid := tracker.cancel.ID() + oid := tracker.ID() + cid := tracker.cancel.ID() // Test the new path with rig.core.sentCommits. readyCommitment := func(commit order.Commitment) chan struct{} { @@ -4288,7 +4302,7 @@ func TestHandlePreimageRequest(t *testing.T) { commit := preImg.Commit() commitSig := readyCommitment(commit) payload := &msgjson.PreimageRequest{ - OrderID: oid[:], + OrderID: cid[:], Commitment: commit[:], CommitChecksum: csum, } @@ -4296,7 +4310,8 @@ func TestHandlePreimageRequest(t *testing.T) { notes := rig.core.NotificationFeed() - rig.dc.trades[order.OrderID{}] = tracker + rig.dc.trades[oid] = tracker + rig.dc.registerCancelLink(cid, oid) err := handlePreimageRequest(rig.core, rig.dc, reqCommit) if err != nil { t.Fatalf("handlePreimageRequest error: %v", err) @@ -4316,14 +4331,15 @@ func TestHandlePreimageRequest(t *testing.T) { } tracker.mtx.RLock() - if !bytes.Equal(csum, tracker.cancel.csum) { + cancelCsum := tracker.cancelCsum + tracker.mtx.RUnlock() + if !bytes.Equal(csum, cancelCsum) { t.Fatalf( "[handlePreimageRequest] cancel csum was changed, exp: %s, got: %s", csum, - tracker.cancel.csum, + cancelCsum, ) } - tracker.mtx.RUnlock() }) } @@ -4391,6 +4407,7 @@ func TestHandleRevokeOrderMsg(t *testing.T) { tracker.cancel = &trackedCancel{CancelOrder: *co} coid := co.ID() rig.dc.trades[oid] = tracker + rig.dc.registerCancelLink(coid, oid) orderNotes, feedDone := orderNoteFeed(tCore) defer feedDone() @@ -5016,6 +5033,7 @@ func TestTradeTracking(t *testing.T) { } tracker.cancel = &trackedCancel{CancelOrder: *co, epochLen: mkt.EpochLen} coid := co.ID() + rig.dc.registerCancelLink(coid, tracker.ID()) m1 := &msgjson.Match{ OrderID: loid[:], MatchID: mid[:], @@ -7057,6 +7075,7 @@ func TestHandleNomatch(t *testing.T) { standingTracker.cancel = &trackedCancel{ CancelOrder: *cancelOrder, } + dc.registerCancelLink(cancelOID, standingOID) // 4. Market order. loWillBeMarket, dbOrder, preImgL, _ := makeLimitOrder(dc, true, dcrBtcLotSize*100, dcrBtcRateStep) diff --git a/client/core/trade.go b/client/core/trade.go index ef1fc0ccb9..9f868cf71e 100644 --- a/client/core/trade.go +++ b/client/core/trade.go @@ -214,7 +214,6 @@ func (m *matchTracker) token() string { type trackedCancel struct { order.CancelOrder preImg order.Preimage - csum dex.Bytes // the commitment checksum provided in the preimage request epochLen uint64 matches struct { maker *msgjson.Match @@ -279,14 +278,17 @@ type trackedTrade struct { options map[string]string // metaData.Options (immutable) for Redeem and Swap redemptionReserves uint64 // metaData.RedemptionReserves (immutable) refundReserves uint64 // metaData.RefundReserves (immutable) + preImg order.Preimage + + csumMtx sync.RWMutex + csum dex.Bytes // the commitment checksum provided in the preimage request + cancelCsum dex.Bytes // mtx protects all read-write fields of the trackedTrade and the // matchTrackers in the matches map. mtx sync.RWMutex metaData *db.OrderMetaData wallets *walletSet - preImg order.Preimage - csum dex.Bytes // the commitment checksum provided in the preimage request coins map[string]asset.Coin coinsLocked bool change asset.Coin @@ -592,14 +594,18 @@ func (t *trackedTrade) cancelEpochIdx() uint64 { return uint64(t.cancel.Prefix().ServerTime.UnixMilli()) / epochLen } -func (t *trackedTrade) verifyCSum(csum dex.Bytes, epochIdx uint64) error { +func (t *trackedTrade) verifyCSum(vsum dex.Bytes, epochIdx uint64) error { t.mtx.RLock() defer t.mtx.RUnlock() + t.csumMtx.RLock() + csum, cancelCsum := t.csum, t.cancelCsum + t.csumMtx.RUnlock() + // First check the trade's recorded csum, if it is in this epoch. - if epochIdx == t.epochIdx() && !bytes.Equal(csum, t.csum) { + if epochIdx == t.epochIdx() && !bytes.Equal(vsum, csum) { return fmt.Errorf("checksum %s != trade order preimage request checksum %s for trade order %v", - csum, t.csum, t.ID()) + csum, csum, t.ID()) } if t.cancel == nil { @@ -607,9 +613,9 @@ func (t *trackedTrade) verifyCSum(csum dex.Bytes, epochIdx uint64) error { } // Check the linked cancel order if it is for this epoch. - if epochIdx == t.cancelEpochIdx() && !bytes.Equal(csum, t.cancel.csum) { + if epochIdx == t.cancelEpochIdx() && !bytes.Equal(vsum, cancelCsum) { return fmt.Errorf("checksum %s != cancel order preimage request checksum %s for cancel order %v", - csum, t.cancel.csum, t.cancel.ID()) + vsum, cancelCsum, t.cancel.ID()) } return nil // includes not in epoch @@ -731,19 +737,36 @@ func (t *trackedTrade) token() string { return (t.ID().String()) } +// clearCancel clears the unmatched cancel and deletes the cancel checksum and +// link to the trade in the dexConnection. clearCancel must be called with the +// trackedTrade.mtx locked. +func (t *trackedTrade) clearCancel() { + if t.cancel != nil { + t.dc.deleteCancelLink(t.cancel.ID()) + t.cancel = nil + } + t.csumMtx.Lock() + t.cancelCsum = nil + t.csumMtx.Unlock() +} + // cancelTrade sets the cancellation data with the order and its preimage. // cancelTrade must be called with the mtx write-locked. func (t *trackedTrade) cancelTrade(co *order.CancelOrder, preImg order.Preimage, epochLen uint64) error { + t.clearCancel() t.cancel = &trackedCancel{ CancelOrder: *co, preImg: preImg, epochLen: epochLen, } - err := t.db.LinkOrder(t.ID(), co.ID()) + cid := co.ID() + oid := t.ID() + t.dc.registerCancelLink(cid, oid) + err := t.db.LinkOrder(oid, cid) if err != nil { - return fmt.Errorf("error linking cancel order %s for trade %s: %w", co.ID(), t.ID(), err) + return fmt.Errorf("error linking cancel order %s for trade %s: %w", cid, oid, err) } - t.metaData.LinkedOrder = co.ID() + t.metaData.LinkedOrder = cid return nil } @@ -766,7 +789,7 @@ func (t *trackedTrade) nomatch(oid order.OrderID) (assetMap, error) { t.dc.log.Errorf("DB error unlinking cancel order %s for trade %s: %v", oid, t.ID(), err) } // Clearing the trackedCancel allows this order to be canceled again. - t.cancel = nil + t.clearCancel() t.metaData.LinkedOrder = order.OrderID{} subject, details := t.formatDetails(TopicMissedCancel, makeOrderToken(t.token())) @@ -1182,7 +1205,7 @@ func (t *trackedTrade) deleteCancelOrder() { t.dc.log.Errorf("Error updating status in db for cancel order %v to revoked: %v", cid, err) } // Unlink the cancel order from the trade. - t.cancel = nil + t.clearCancel() t.metaData.LinkedOrder = order.OrderID{} // NOTE: caller may wish to update the trades's DB entry } From 68d580a123195c8f5c433856e9a226d0994f2f58 Mon Sep 17 00:00:00 2001 From: Brian Stafford Date: Wed, 25 Sep 2024 12:38:13 -0500 Subject: [PATCH 2/2] use correct mtx in tests --- client/core/core_test.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/client/core/core_test.go b/client/core/core_test.go index f1590a73b7..eaa991afe0 100644 --- a/client/core/core_test.go +++ b/client/core/core_test.go @@ -3999,9 +3999,9 @@ func TestHandlePreimageRequest(t *testing.T) { t.Fatal("no msgjson.Error sent from preimage request handling") } - tracker.mtx.RLock() + tracker.csumMtx.RLock() csum := tracker.csum - tracker.mtx.RUnlock() + tracker.csumMtx.RUnlock() if !bytes.Equal(firstCSum, csum) { t.Fatalf( "[handlePreimageRequest] csum was changed, exp: %s, got: %s", @@ -4070,9 +4070,9 @@ func TestHandlePreimageRequest(t *testing.T) { t.Fatal("no order note from preimage request handling") } - tracker.mtx.RLock() + tracker.csumMtx.RLock() checkSum := tracker.csum - tracker.mtx.RUnlock() + tracker.csumMtx.RUnlock() if !bytes.Equal(csum, checkSum) { t.Fatalf( "[handlePreimageRequest] csum was changed, exp: %s, got: %s", @@ -4154,9 +4154,9 @@ func TestHandlePreimageRequest(t *testing.T) { t.Fatal("no order note from preimage request handling") } - tracker.mtx.RLock() + tracker.csumMtx.RLock() cancelCsum := tracker.cancelCsum - tracker.mtx.RUnlock() + tracker.csumMtx.RUnlock() if !bytes.Equal(commitCSum, cancelCsum) { t.Fatalf( "handlePreimageRequest must initialize tracker cancel csum, exp: %s, got: %s", @@ -4244,9 +4244,9 @@ func TestHandlePreimageRequest(t *testing.T) { case <-time.After(time.Second): t.Fatal("no msgjson.Error sent from preimage request handling") } - tracker.mtx.RLock() + tracker.csumMtx.RLock() cancelCsum := tracker.cancelCsum - tracker.mtx.RUnlock() + tracker.csumMtx.RUnlock() if !bytes.Equal(firstCSum, cancelCsum) { t.Fatalf( "[handlePreimageRequest] cancel csum was changed, exp: %s, got: %s", @@ -4330,9 +4330,9 @@ func TestHandlePreimageRequest(t *testing.T) { t.Fatal("no order note from preimage request handling") } - tracker.mtx.RLock() + tracker.csumMtx.RLock() cancelCsum := tracker.cancelCsum - tracker.mtx.RUnlock() + tracker.csumMtx.RUnlock() if !bytes.Equal(csum, cancelCsum) { t.Fatalf( "[handlePreimageRequest] cancel csum was changed, exp: %s, got: %s",