diff --git a/config/defaults.go b/config/defaults.go index 05d958323..7df86dbac 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -189,7 +189,7 @@ func getCacheConfig() *CacheConfig { return &CacheConfig{ Engine: FreeCache, Freecache: &FreeCacheConfig{ - Size: 100 * 1024 * 1024, // Default size 100MB. + Size: 10 * 1024 * 1024, // Default size 10MB. }, Redis: &RedisConfig{ Addr: "localhost:6379", diff --git a/config/example_config.yaml b/config/example_config.yaml index fb6bdd771..2decb40f3 100644 --- a/config/example_config.yaml +++ b/config/example_config.yaml @@ -35,7 +35,7 @@ peers: # list of bitcoin node peers to connect to cache: engine: freecache # cache engine - freecache/redis freecache: # freecache configuration - size: 1000000 # size of cache + size: 10000000 # size of cache redis: addr: "localhost:6379" password: "" diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index 736933c35..3474840c1 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -403,32 +403,33 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { ticker := time.NewTicker(p.processStatusUpdatesInterval) p.waitGroup.Add(1) + ctx := p.ctx + go func() { defer p.waitGroup.Done() + statusUpdatesMap := map[chainhash.Hash]store.UpdateStatus{} + for { select { case <-p.ctx.Done(): return case statusUpdate := <-p.storageStatusUpdateCh: // Ensure no duplicate statuses - actualUpdateStatusMap, err := p.updateStatusMap(statusUpdate) - if err != nil { - p.logger.Error("failed to update status", slog.String("err", err.Error())) - return - } + updateStatusMap(statusUpdatesMap, statusUpdate) - if len(actualUpdateStatusMap) >= p.processStatusUpdatesBatchSize { - p.checkAndUpdate(p.ctx, actualUpdateStatusMap) + if len(statusUpdatesMap) >= p.processStatusUpdatesBatchSize { + p.checkAndUpdate(ctx, statusUpdatesMap) + statusUpdatesMap = map[chainhash.Hash]store.UpdateStatus{} // Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed. // This prevents unnecessary immediate updates and maintains the intended time interval between batches. ticker.Reset(p.processStatusUpdatesInterval) } case <-ticker.C: - statusUpdatesMap := p.getStatusUpdateMap() if len(statusUpdatesMap) > 0 { - p.checkAndUpdate(p.ctx, statusUpdatesMap) + p.checkAndUpdate(ctx, statusUpdatesMap) + statusUpdatesMap = map[chainhash.Hash]store.UpdateStatus{} // Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed. // This prevents unnecessary immediate updates and maintains the intended time interval between batches. @@ -462,8 +463,6 @@ func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap map[cha if err != nil { p.logger.Error("failed to bulk update statuses", slog.String("err", err.Error())) } - - _ = p.cacheStore.Del(CacheStatusUpdateKey) } func (p *Processor) statusUpdateWithCallback(ctx context.Context, statusUpdates, doubleSpendUpdates []store.UpdateStatus) error { diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index 89c62eed8..d50f1ba80 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -3,10 +3,14 @@ package metamorph import ( "encoding/json" "errors" - "github.com/bitcoin-sv/arc/internal/metamorph/store" + "github.com/libsv/go-p2p/chaincfg/chainhash" + + "github.com/bitcoin-sv/arc/internal/metamorph/store" ) +//lint:file-ignore U1000 Ignore all unused code, functions are temporarily not used + const CacheStatusUpdateKey = "status-updates" var ( @@ -18,7 +22,19 @@ func (p *Processor) GetProcessorMapSize() int { return p.responseProcessor.getMapLen() } -func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) (map[chainhash.Hash]store.UpdateStatus, error) { +func updateStatusMap(statusUpdatesMap map[chainhash.Hash]store.UpdateStatus, statusUpdate store.UpdateStatus) { + foundStatusUpdate, found := statusUpdatesMap[statusUpdate.Hash] + + if !found || shouldUpdateStatus(statusUpdate, foundStatusUpdate) { + if len(statusUpdate.CompetingTxs) > 0 { + statusUpdate.CompetingTxs = mergeUnique(statusUpdate.CompetingTxs, foundStatusUpdate.CompetingTxs) + } + + statusUpdatesMap[statusUpdate.Hash] = statusUpdate + } +} + +func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) (map[chainhash.Hash]store.UpdateStatus, error) { //nolint:unused statusUpdatesMap := p.getStatusUpdateMap() foundStatusUpdate, found := statusUpdatesMap[statusUpdate.Hash] @@ -39,7 +55,7 @@ func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) (map[chainh return statusUpdatesMap, nil } -func (p *Processor) setStatusUpdateMap(statusUpdatesMap map[chainhash.Hash]store.UpdateStatus) error { +func (p *Processor) setStatusUpdateMap(statusUpdatesMap map[chainhash.Hash]store.UpdateStatus) error { //nolint:unused bytes, err := serializeStatusMap(statusUpdatesMap) if err != nil { return err @@ -52,7 +68,7 @@ func (p *Processor) setStatusUpdateMap(statusUpdatesMap map[chainhash.Hash]store return nil } -func (p *Processor) getStatusUpdateMap() map[chainhash.Hash]store.UpdateStatus { +func (p *Processor) getStatusUpdateMap() map[chainhash.Hash]store.UpdateStatus { //nolint:unused existingMap, err := p.cacheStore.Get(CacheStatusUpdateKey) if err == nil { @@ -120,7 +136,7 @@ func mergeUnique(arr1, arr2 []string) []string { return uniqueSlice } -func serializeStatusMap(updateStatusMap map[chainhash.Hash]store.UpdateStatus) ([]byte, error) { +func serializeStatusMap(updateStatusMap map[chainhash.Hash]store.UpdateStatus) ([]byte, error) { //nolint:unused serializeMap := make(map[string]store.UpdateStatus) for k, v := range updateStatusMap { serializeMap[k.String()] = v @@ -133,7 +149,7 @@ func serializeStatusMap(updateStatusMap map[chainhash.Hash]store.UpdateStatus) ( return bytes, nil } -func deserializeStatusMap(data []byte) (map[chainhash.Hash]store.UpdateStatus, error) { +func deserializeStatusMap(data []byte) (map[chainhash.Hash]store.UpdateStatus, error) { //nolint:unused serializeMap := make(map[string]store.UpdateStatus) updateStatusMap := make(map[chainhash.Hash]store.UpdateStatus)