Skip to content

Commit

Permalink
Temporarily bypass cache store
Browse files Browse the repository at this point in the history
  • Loading branch information
boecklim committed Oct 31, 2024
1 parent 0a54194 commit da42f1d
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 19 deletions.
2 changes: 1 addition & 1 deletion config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func getCacheConfig() *CacheConfig {
return &CacheConfig{
Engine: FreeCache,
Freecache: &FreeCacheConfig{
Size: 100 * 1024 * 1024, // Default size 100MB.
Size: 10 * 1024 * 1024, // Default size 10MB.
},
Redis: &RedisConfig{
Addr: "localhost:6379",
Expand Down
2 changes: 1 addition & 1 deletion config/example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ peers: # list of bitcoin node peers to connect to
cache:
engine: freecache # cache engine - freecache/redis
freecache: # freecache configuration
size: 1000000 # size of cache
size: 10000000 # size of cache
redis:
addr: "localhost:6379"
password: ""
Expand Down
21 changes: 10 additions & 11 deletions internal/metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,32 +403,33 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() {
ticker := time.NewTicker(p.processStatusUpdatesInterval)
p.waitGroup.Add(1)

ctx := p.ctx

go func() {
defer p.waitGroup.Done()

statusUpdatesMap := map[chainhash.Hash]store.UpdateStatus{}

for {
select {
case <-p.ctx.Done():
return
case statusUpdate := <-p.storageStatusUpdateCh:
// Ensure no duplicate statuses
actualUpdateStatusMap, err := p.updateStatusMap(statusUpdate)
if err != nil {
p.logger.Error("failed to update status", slog.String("err", err.Error()))
return
}
updateStatusMap(statusUpdatesMap, statusUpdate)

if len(actualUpdateStatusMap) >= p.processStatusUpdatesBatchSize {
p.checkAndUpdate(p.ctx, actualUpdateStatusMap)
if len(statusUpdatesMap) >= p.processStatusUpdatesBatchSize {
p.checkAndUpdate(ctx, statusUpdatesMap)
statusUpdatesMap = map[chainhash.Hash]store.UpdateStatus{}

// Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed.
// This prevents unnecessary immediate updates and maintains the intended time interval between batches.
ticker.Reset(p.processStatusUpdatesInterval)
}
case <-ticker.C:
statusUpdatesMap := p.getStatusUpdateMap()
if len(statusUpdatesMap) > 0 {
p.checkAndUpdate(p.ctx, statusUpdatesMap)
p.checkAndUpdate(ctx, statusUpdatesMap)
statusUpdatesMap = map[chainhash.Hash]store.UpdateStatus{}

// Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed.
// This prevents unnecessary immediate updates and maintains the intended time interval between batches.
Expand Down Expand Up @@ -462,8 +463,6 @@ func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap map[cha
if err != nil {
p.logger.Error("failed to bulk update statuses", slog.String("err", err.Error()))
}

_ = p.cacheStore.Del(CacheStatusUpdateKey)
}

func (p *Processor) statusUpdateWithCallback(ctx context.Context, statusUpdates, doubleSpendUpdates []store.UpdateStatus) error {
Expand Down
28 changes: 22 additions & 6 deletions internal/metamorph/processor_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package metamorph
import (
"encoding/json"
"errors"
"github.com/bitcoin-sv/arc/internal/metamorph/store"

"github.com/libsv/go-p2p/chaincfg/chainhash"

"github.com/bitcoin-sv/arc/internal/metamorph/store"
)

//lint:file-ignore U1000 Ignore all unused code, functions are temporarily not used

const CacheStatusUpdateKey = "status-updates"

var (
Expand All @@ -18,7 +22,19 @@ func (p *Processor) GetProcessorMapSize() int {
return p.responseProcessor.getMapLen()
}

func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) (map[chainhash.Hash]store.UpdateStatus, error) {
func updateStatusMap(statusUpdatesMap map[chainhash.Hash]store.UpdateStatus, statusUpdate store.UpdateStatus) {
foundStatusUpdate, found := statusUpdatesMap[statusUpdate.Hash]

if !found || shouldUpdateStatus(statusUpdate, foundStatusUpdate) {
if len(statusUpdate.CompetingTxs) > 0 {
statusUpdate.CompetingTxs = mergeUnique(statusUpdate.CompetingTxs, foundStatusUpdate.CompetingTxs)
}

statusUpdatesMap[statusUpdate.Hash] = statusUpdate
}
}

func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) (map[chainhash.Hash]store.UpdateStatus, error) { //nolint:unused
statusUpdatesMap := p.getStatusUpdateMap()

foundStatusUpdate, found := statusUpdatesMap[statusUpdate.Hash]
Expand All @@ -39,7 +55,7 @@ func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) (map[chainh
return statusUpdatesMap, nil
}

func (p *Processor) setStatusUpdateMap(statusUpdatesMap map[chainhash.Hash]store.UpdateStatus) error {
func (p *Processor) setStatusUpdateMap(statusUpdatesMap map[chainhash.Hash]store.UpdateStatus) error { //nolint:unused
bytes, err := serializeStatusMap(statusUpdatesMap)
if err != nil {
return err
Expand All @@ -52,7 +68,7 @@ func (p *Processor) setStatusUpdateMap(statusUpdatesMap map[chainhash.Hash]store
return nil
}

func (p *Processor) getStatusUpdateMap() map[chainhash.Hash]store.UpdateStatus {
func (p *Processor) getStatusUpdateMap() map[chainhash.Hash]store.UpdateStatus { //nolint:unused
existingMap, err := p.cacheStore.Get(CacheStatusUpdateKey)

if err == nil {
Expand Down Expand Up @@ -120,7 +136,7 @@ func mergeUnique(arr1, arr2 []string) []string {
return uniqueSlice
}

func serializeStatusMap(updateStatusMap map[chainhash.Hash]store.UpdateStatus) ([]byte, error) {
func serializeStatusMap(updateStatusMap map[chainhash.Hash]store.UpdateStatus) ([]byte, error) { //nolint:unused
serializeMap := make(map[string]store.UpdateStatus)
for k, v := range updateStatusMap {
serializeMap[k.String()] = v
Expand All @@ -133,7 +149,7 @@ func serializeStatusMap(updateStatusMap map[chainhash.Hash]store.UpdateStatus) (
return bytes, nil
}

func deserializeStatusMap(data []byte) (map[chainhash.Hash]store.UpdateStatus, error) {
func deserializeStatusMap(data []byte) (map[chainhash.Hash]store.UpdateStatus, error) { //nolint:unused
serializeMap := make(map[string]store.UpdateStatus)
updateStatusMap := make(map[chainhash.Hash]store.UpdateStatus)

Expand Down

0 comments on commit da42f1d

Please sign in to comment.