From e9428c36227f44178f8ad9328c586030b64d8579 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Wed, 30 Oct 2024 16:54:02 +0100 Subject: [PATCH] Temporarily bypass cache store --- config/defaults.go | 2 +- internal/metamorph/processor.go | 21 ++++++++++----------- internal/metamorph/processor_helpers.go | 16 +++++++++++++++- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/config/defaults.go b/config/defaults.go index 05d958323..f97ba1e50 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -189,7 +189,7 @@ func getCacheConfig() *CacheConfig { return &CacheConfig{ Engine: FreeCache, Freecache: &FreeCacheConfig{ - Size: 100 * 1024 * 1024, // Default size 100MB. + Size: 1 * 1024 * 1024, // Default size 1MB. }, Redis: &RedisConfig{ Addr: "localhost:6379", diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index 736933c35..3474840c1 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -403,32 +403,33 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { ticker := time.NewTicker(p.processStatusUpdatesInterval) p.waitGroup.Add(1) + ctx := p.ctx + go func() { defer p.waitGroup.Done() + statusUpdatesMap := map[chainhash.Hash]store.UpdateStatus{} + for { select { case <-p.ctx.Done(): return case statusUpdate := <-p.storageStatusUpdateCh: // Ensure no duplicate statuses - actualUpdateStatusMap, err := p.updateStatusMap(statusUpdate) - if err != nil { - p.logger.Error("failed to update status", slog.String("err", err.Error())) - return - } + updateStatusMap(statusUpdatesMap, statusUpdate) - if len(actualUpdateStatusMap) >= p.processStatusUpdatesBatchSize { - p.checkAndUpdate(p.ctx, actualUpdateStatusMap) + if len(statusUpdatesMap) >= p.processStatusUpdatesBatchSize { + p.checkAndUpdate(ctx, statusUpdatesMap) + statusUpdatesMap = map[chainhash.Hash]store.UpdateStatus{} // Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed. // This prevents unnecessary immediate updates and maintains the intended time interval between batches. ticker.Reset(p.processStatusUpdatesInterval) } case <-ticker.C: - statusUpdatesMap := p.getStatusUpdateMap() if len(statusUpdatesMap) > 0 { - p.checkAndUpdate(p.ctx, statusUpdatesMap) + p.checkAndUpdate(ctx, statusUpdatesMap) + statusUpdatesMap = map[chainhash.Hash]store.UpdateStatus{} // Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed. // This prevents unnecessary immediate updates and maintains the intended time interval between batches. @@ -462,8 +463,6 @@ func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap map[cha if err != nil { p.logger.Error("failed to bulk update statuses", slog.String("err", err.Error())) } - - _ = p.cacheStore.Del(CacheStatusUpdateKey) } func (p *Processor) statusUpdateWithCallback(ctx context.Context, statusUpdates, doubleSpendUpdates []store.UpdateStatus) error { diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index 89c62eed8..e1c5bb1c4 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -3,8 +3,10 @@ package metamorph import ( "encoding/json" "errors" - "github.com/bitcoin-sv/arc/internal/metamorph/store" + "github.com/libsv/go-p2p/chaincfg/chainhash" + + "github.com/bitcoin-sv/arc/internal/metamorph/store" ) const CacheStatusUpdateKey = "status-updates" @@ -18,6 +20,18 @@ func (p *Processor) GetProcessorMapSize() int { return p.responseProcessor.getMapLen() } +func updateStatusMap(statusUpdatesMap map[chainhash.Hash]store.UpdateStatus, statusUpdate store.UpdateStatus) { + foundStatusUpdate, found := statusUpdatesMap[statusUpdate.Hash] + + if !found || shouldUpdateStatus(statusUpdate, foundStatusUpdate) { + if len(statusUpdate.CompetingTxs) > 0 { + statusUpdate.CompetingTxs = mergeUnique(statusUpdate.CompetingTxs, foundStatusUpdate.CompetingTxs) + } + + statusUpdatesMap[statusUpdate.Hash] = statusUpdate + } +} + func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) (map[chainhash.Hash]store.UpdateStatus, error) { statusUpdatesMap := p.getStatusUpdateMap()