Skip to content

Commit

Permalink
Temporarily bypass cache store
Browse files Browse the repository at this point in the history
  • Loading branch information
boecklim committed Oct 30, 2024
1 parent da0ed73 commit e9428c3
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
2 changes: 1 addition & 1 deletion config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func getCacheConfig() *CacheConfig {
return &CacheConfig{
Engine: FreeCache,
Freecache: &FreeCacheConfig{
Size: 100 * 1024 * 1024, // Default size 100MB.
Size: 1 * 1024 * 1024, // Default size 1MB.
},
Redis: &RedisConfig{
Addr: "localhost:6379",
Expand Down
21 changes: 10 additions & 11 deletions internal/metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,32 +403,33 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() {
ticker := time.NewTicker(p.processStatusUpdatesInterval)
p.waitGroup.Add(1)

ctx := p.ctx

go func() {
defer p.waitGroup.Done()

statusUpdatesMap := map[chainhash.Hash]store.UpdateStatus{}

for {
select {
case <-p.ctx.Done():
return
case statusUpdate := <-p.storageStatusUpdateCh:
// Ensure no duplicate statuses
actualUpdateStatusMap, err := p.updateStatusMap(statusUpdate)
if err != nil {
p.logger.Error("failed to update status", slog.String("err", err.Error()))
return
}
updateStatusMap(statusUpdatesMap, statusUpdate)

if len(actualUpdateStatusMap) >= p.processStatusUpdatesBatchSize {
p.checkAndUpdate(p.ctx, actualUpdateStatusMap)
if len(statusUpdatesMap) >= p.processStatusUpdatesBatchSize {
p.checkAndUpdate(ctx, statusUpdatesMap)
statusUpdatesMap = map[chainhash.Hash]store.UpdateStatus{}

// Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed.
// This prevents unnecessary immediate updates and maintains the intended time interval between batches.
ticker.Reset(p.processStatusUpdatesInterval)
}
case <-ticker.C:
statusUpdatesMap := p.getStatusUpdateMap()
if len(statusUpdatesMap) > 0 {
p.checkAndUpdate(p.ctx, statusUpdatesMap)
p.checkAndUpdate(ctx, statusUpdatesMap)
statusUpdatesMap = map[chainhash.Hash]store.UpdateStatus{}

// Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed.
// This prevents unnecessary immediate updates and maintains the intended time interval between batches.
Expand Down Expand Up @@ -462,8 +463,6 @@ func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap map[cha
if err != nil {
p.logger.Error("failed to bulk update statuses", slog.String("err", err.Error()))
}

_ = p.cacheStore.Del(CacheStatusUpdateKey)
}

func (p *Processor) statusUpdateWithCallback(ctx context.Context, statusUpdates, doubleSpendUpdates []store.UpdateStatus) error {
Expand Down
16 changes: 15 additions & 1 deletion internal/metamorph/processor_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package metamorph
import (
"encoding/json"
"errors"
"github.com/bitcoin-sv/arc/internal/metamorph/store"

"github.com/libsv/go-p2p/chaincfg/chainhash"

"github.com/bitcoin-sv/arc/internal/metamorph/store"
)

const CacheStatusUpdateKey = "status-updates"
Expand All @@ -18,6 +20,18 @@ func (p *Processor) GetProcessorMapSize() int {
return p.responseProcessor.getMapLen()
}

func updateStatusMap(statusUpdatesMap map[chainhash.Hash]store.UpdateStatus, statusUpdate store.UpdateStatus) {
foundStatusUpdate, found := statusUpdatesMap[statusUpdate.Hash]

if !found || shouldUpdateStatus(statusUpdate, foundStatusUpdate) {
if len(statusUpdate.CompetingTxs) > 0 {
statusUpdate.CompetingTxs = mergeUnique(statusUpdate.CompetingTxs, foundStatusUpdate.CompetingTxs)
}

statusUpdatesMap[statusUpdate.Hash] = statusUpdate
}
}

func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) (map[chainhash.Hash]store.UpdateStatus, error) {

Check failure on line 35 in internal/metamorph/processor_helpers.go

View workflow job for this annotation

GitHub Actions / Static check

func (*Processor).updateStatusMap is unused (U1000)

Check failure on line 35 in internal/metamorph/processor_helpers.go

View workflow job for this annotation

GitHub Actions / Golangci-lint

func `(*Processor).updateStatusMap` is unused (unused)
statusUpdatesMap := p.getStatusUpdateMap()

Expand Down

0 comments on commit e9428c3

Please sign in to comment.