Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Temporarily bypass cache store #627

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func getCacheConfig() *CacheConfig {
return &CacheConfig{
Engine: FreeCache,
Freecache: &FreeCacheConfig{
Size: 100 * 1024 * 1024, // Default size 100MB.
Size: 10 * 1024 * 1024, // Default size 10MB.
},
Redis: &RedisConfig{
Addr: "localhost:6379",
Expand Down
2 changes: 1 addition & 1 deletion config/example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ peers: # list of bitcoin node peers to connect to
cache:
engine: freecache # cache engine - freecache/redis
freecache: # freecache configuration
size: 1000000 # size of cache
size: 10000000 # size of cache
redis:
addr: "localhost:6379"
password: ""
Expand Down
21 changes: 10 additions & 11 deletions internal/metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,32 +403,33 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() {
ticker := time.NewTicker(p.processStatusUpdatesInterval)
p.waitGroup.Add(1)

ctx := p.ctx

go func() {
defer p.waitGroup.Done()

statusUpdatesMap := map[chainhash.Hash]store.UpdateStatus{}

for {
select {
case <-p.ctx.Done():
return
case statusUpdate := <-p.storageStatusUpdateCh:
// Ensure no duplicate statuses
actualUpdateStatusMap, err := p.updateStatusMap(statusUpdate)
if err != nil {
p.logger.Error("failed to update status", slog.String("err", err.Error()))
return
}
updateStatusMap(statusUpdatesMap, statusUpdate)

if len(actualUpdateStatusMap) >= p.processStatusUpdatesBatchSize {
p.checkAndUpdate(p.ctx, actualUpdateStatusMap)
if len(statusUpdatesMap) >= p.processStatusUpdatesBatchSize {
p.checkAndUpdate(ctx, statusUpdatesMap)
statusUpdatesMap = map[chainhash.Hash]store.UpdateStatus{}

// Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed.
// This prevents unnecessary immediate updates and maintains the intended time interval between batches.
ticker.Reset(p.processStatusUpdatesInterval)
}
case <-ticker.C:
statusUpdatesMap := p.getStatusUpdateMap()
if len(statusUpdatesMap) > 0 {
p.checkAndUpdate(p.ctx, statusUpdatesMap)
p.checkAndUpdate(ctx, statusUpdatesMap)
statusUpdatesMap = map[chainhash.Hash]store.UpdateStatus{}

// Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed.
// This prevents unnecessary immediate updates and maintains the intended time interval between batches.
Expand Down Expand Up @@ -462,8 +463,6 @@ func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap map[cha
if err != nil {
p.logger.Error("failed to bulk update statuses", slog.String("err", err.Error()))
}

_ = p.cacheStore.Del(CacheStatusUpdateKey)
}

func (p *Processor) statusUpdateWithCallback(ctx context.Context, statusUpdates, doubleSpendUpdates []store.UpdateStatus) error {
Expand Down
28 changes: 22 additions & 6 deletions internal/metamorph/processor_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package metamorph
import (
"encoding/json"
"errors"
"github.com/bitcoin-sv/arc/internal/metamorph/store"

"github.com/libsv/go-p2p/chaincfg/chainhash"

"github.com/bitcoin-sv/arc/internal/metamorph/store"
)

//lint:file-ignore U1000 Ignore all unused code, functions are temporarily not used

const CacheStatusUpdateKey = "status-updates"

var (
Expand All @@ -18,7 +22,19 @@ func (p *Processor) GetProcessorMapSize() int {
return p.responseProcessor.getMapLen()
}

func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) (map[chainhash.Hash]store.UpdateStatus, error) {
func updateStatusMap(statusUpdatesMap map[chainhash.Hash]store.UpdateStatus, statusUpdate store.UpdateStatus) {
foundStatusUpdate, found := statusUpdatesMap[statusUpdate.Hash]

if !found || shouldUpdateStatus(statusUpdate, foundStatusUpdate) {
if len(statusUpdate.CompetingTxs) > 0 {
statusUpdate.CompetingTxs = mergeUnique(statusUpdate.CompetingTxs, foundStatusUpdate.CompetingTxs)
}

statusUpdatesMap[statusUpdate.Hash] = statusUpdate
}
}

func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) (map[chainhash.Hash]store.UpdateStatus, error) { //nolint:unused
statusUpdatesMap := p.getStatusUpdateMap()

foundStatusUpdate, found := statusUpdatesMap[statusUpdate.Hash]
Expand All @@ -39,7 +55,7 @@ func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) (map[chainh
return statusUpdatesMap, nil
}

func (p *Processor) setStatusUpdateMap(statusUpdatesMap map[chainhash.Hash]store.UpdateStatus) error {
func (p *Processor) setStatusUpdateMap(statusUpdatesMap map[chainhash.Hash]store.UpdateStatus) error { //nolint:unused
bytes, err := serializeStatusMap(statusUpdatesMap)
if err != nil {
return err
Expand All @@ -52,7 +68,7 @@ func (p *Processor) setStatusUpdateMap(statusUpdatesMap map[chainhash.Hash]store
return nil
}

func (p *Processor) getStatusUpdateMap() map[chainhash.Hash]store.UpdateStatus {
func (p *Processor) getStatusUpdateMap() map[chainhash.Hash]store.UpdateStatus { //nolint:unused
existingMap, err := p.cacheStore.Get(CacheStatusUpdateKey)

if err == nil {
Expand Down Expand Up @@ -120,7 +136,7 @@ func mergeUnique(arr1, arr2 []string) []string {
return uniqueSlice
}

func serializeStatusMap(updateStatusMap map[chainhash.Hash]store.UpdateStatus) ([]byte, error) {
func serializeStatusMap(updateStatusMap map[chainhash.Hash]store.UpdateStatus) ([]byte, error) { //nolint:unused
serializeMap := make(map[string]store.UpdateStatus)
for k, v := range updateStatusMap {
serializeMap[k.String()] = v
Expand All @@ -133,7 +149,7 @@ func serializeStatusMap(updateStatusMap map[chainhash.Hash]store.UpdateStatus) (
return bytes, nil
}

func deserializeStatusMap(data []byte) (map[chainhash.Hash]store.UpdateStatus, error) {
func deserializeStatusMap(data []byte) (map[chainhash.Hash]store.UpdateStatus, error) { //nolint:unused
serializeMap := make(map[string]store.UpdateStatus)
updateStatusMap := make(map[chainhash.Hash]store.UpdateStatus)

Expand Down
Loading