Skip to content

Commit

Permalink
ledger: introduce expired stake cache (#6014)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Jun 7, 2024
1 parent 0c02f9b commit f851f50
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 33 deletions.
33 changes: 25 additions & 8 deletions ledger/acctonline.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,19 @@ type onlineAccounts struct {

// disableCache (de)activates the LRU cache use in onlineAccounts
disableCache bool

// cache for expired online circulation stake since the underlying query is quite heavy
expiredCirculationCache *expiredCirculationCache
}

// initialize initializes the accountUpdates structure
func (ao *onlineAccounts) initialize(cfg config.Local) {
ao.accountsReadCond = sync.NewCond(ao.accountsMu.RLocker())
ao.acctLookback = cfg.MaxAcctLookback
ao.disableCache = cfg.DisableLedgerLRUCache
// 2 pages * 256 entries look large enough to handle
// both early and late votes, and well as a current and previous stateproof periods
ao.expiredCirculationCache = makeExpiredCirculationCache(256)
}

// loadFromDisk is the 2nd level initialization, and is required before the onlineAccounts becomes functional
Expand Down Expand Up @@ -549,7 +555,7 @@ func (ao *onlineAccounts) onlineCirculation(rnd basics.Round, voteRnd basics.Rou
if rnd == 0 {
return totalStake, nil
}
expiredStake, err := ao.ExpiredOnlineCirculation(rnd, voteRnd)
expiredStake, err := ao.expiredOnlineCirculation(rnd, voteRnd)
if err != nil {
return basics.MicroAlgos{}, err
}
Expand Down Expand Up @@ -874,7 +880,7 @@ func (ao *onlineAccounts) TopOnlineAccounts(rnd basics.Round, voteRnd basics.Rou
for uint64(len(candidates)) < n+uint64(len(modifiedAccounts)) {
var accts map[basics.Address]*ledgercore.OnlineAccount
start := time.Now()
ledgerAccountsonlinetopCount.Inc(nil)
ledgerAccountsOnlineTopCount.Inc(nil)
err = ao.dbs.Snapshot(func(ctx context.Context, tx trackerdb.SnapshotScope) (err error) {
ar, err := tx.MakeAccountsReader()
if err != nil {
Expand All @@ -888,7 +894,7 @@ func (ao *onlineAccounts) TopOnlineAccounts(rnd basics.Round, voteRnd basics.Rou
dbRound, err = ar.AccountsRound()
return
})
ledgerAccountsonlinetopMicros.AddMicrosecondsSince(start, nil)
ledgerAccountsOnlineTopMicros.AddMicrosecondsSince(start, nil)
if err != nil {
return nil, basics.MicroAlgos{}, err
}
Expand Down Expand Up @@ -965,7 +971,7 @@ func (ao *onlineAccounts) TopOnlineAccounts(rnd basics.Round, voteRnd basics.Rou

// If set, return total online stake minus all future expired stake by voteRnd
if params.ExcludeExpiredCirculation {
expiredStake, err := ao.ExpiredOnlineCirculation(rnd, voteRnd)
expiredStake, err := ao.expiredOnlineCirculation(rnd, voteRnd)
if err != nil {
return nil, basics.MicroAlgos{}, err
}
Expand Down Expand Up @@ -1027,6 +1033,9 @@ func (ao *onlineAccounts) onlineAcctsExpiredByRound(rnd, voteRnd basics.Round) (
rewardsParams := config.Consensus[roundParams.CurrentProtocol]
rewardsLevel := roundParams.RewardsLevel

start := time.Now()
ledgerAccountExpiredByRoundCount.Inc(nil)

// Step 1: get all online accounts from DB for rnd
// Not unlocking ao.accountsMu yet, to stay consistent with Step 2
var dbRound basics.Round
Expand All @@ -1042,6 +1051,7 @@ func (ao *onlineAccounts) onlineAcctsExpiredByRound(rnd, voteRnd basics.Round) (
dbRound, err = ar.AccountsRound()
return err
})
ledgerAccountsExpiredByRoundMicros.AddMicrosecondsSince(start, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1086,9 +1096,13 @@ func (ao *onlineAccounts) onlineAcctsExpiredByRound(rnd, voteRnd basics.Round) (
return expiredAccounts, nil
}

// ExpiredOnlineCirculation returns the total online stake for accounts with participation keys registered
// expiredOnlineCirculation returns the total online stake for accounts with participation keys registered
// at round `rnd` that are expired by round `voteRnd`.
func (ao *onlineAccounts) ExpiredOnlineCirculation(rnd, voteRnd basics.Round) (basics.MicroAlgos, error) {
func (ao *onlineAccounts) expiredOnlineCirculation(rnd, voteRnd basics.Round) (basics.MicroAlgos, error) {
if expiredStake, ok := ao.expiredCirculationCache.get(rnd, voteRnd); ok {
return expiredStake, nil
}

expiredAccounts, err := ao.onlineAcctsExpiredByRound(rnd, voteRnd)
if err != nil {
return basics.MicroAlgos{}, err
Expand All @@ -1101,8 +1115,11 @@ func (ao *onlineAccounts) ExpiredOnlineCirculation(rnd, voteRnd basics.Round) (b
return basics.MicroAlgos{}, fmt.Errorf("ExpiredOnlineCirculation: overflow totaling expired stake")
}
}
ao.expiredCirculationCache.put(rnd, voteRnd, expiredStake)
return expiredStake, nil
}

var ledgerAccountsonlinetopCount = metrics.NewCounter("ledger_accountsonlinetop_count", "calls")
var ledgerAccountsonlinetopMicros = metrics.NewCounter("ledger_accountsonlinetop_micros", "µs spent")
var ledgerAccountsOnlineTopCount = metrics.NewCounter("ledger_accountsonlinetop_count", "calls")
var ledgerAccountsOnlineTopMicros = metrics.NewCounter("ledger_accountsonlinetop_micros", "µs spent")
var ledgerAccountExpiredByRoundCount = metrics.NewCounter("ledger_accountsexpired_count", "calls")
var ledgerAccountsExpiredByRoundMicros = metrics.NewCounter("ledger_accountsexpired_micros", "µs spent")
41 changes: 19 additions & 22 deletions ledger/acctonline_expired_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type onlineAcctModel interface {

LookupAgreement(rnd basics.Round, addr basics.Address) onlineAcctModelAcct
OnlineCirculation(rnd basics.Round, voteRnd basics.Round) basics.MicroAlgos
ExpiredOnlineCirculation(rnd, voteRnd basics.Round) basics.MicroAlgos
expiredOnlineCirculation(rnd, voteRnd basics.Round) basics.MicroAlgos
}

// mapOnlineAcctModel provides a reference implementation for tracking online accounts used
Expand Down Expand Up @@ -133,7 +133,7 @@ func (m *mapOnlineAcctModel) OnlineCirculation(rnd basics.Round, voteRnd basics.
return m.sumAcctStake(accts)
}

func (m *mapOnlineAcctModel) ExpiredOnlineCirculation(rnd, voteRnd basics.Round) basics.MicroAlgos {
func (m *mapOnlineAcctModel) expiredOnlineCirculation(rnd, voteRnd basics.Round) basics.MicroAlgos {
accts := m.onlineAcctsExpiredByRound(rnd, voteRnd)
return m.sumAcctStake(accts)
}
Expand Down Expand Up @@ -385,7 +385,7 @@ func (m *doubleLedgerAcctModel) OnlineCirculation(rnd basics.Round, voteRnd basi
// has already subtracted the expired stake. So to get the total, add
// it back in by querying ExpiredOnlineCirculation.
if m.params.ExcludeExpiredCirculation {
expiredStake := m.ExpiredOnlineCirculation(rnd, rnd+320)
expiredStake := m.expiredOnlineCirculation(rnd, rnd+320)
valStake = m.ops.Add(valStake, expiredStake)
}

Expand All @@ -404,20 +404,26 @@ func (l *Ledger) OnlineTotalStake(rnd basics.Round) (basics.MicroAlgos, error) {
return totalStake, err
}

// ExpiredOnlineCirculation is a wrapper to call onlineAccounts.ExpiredOnlineCirculation safely.
func (l *Ledger) ExpiredOnlineCirculation(rnd, voteRnd basics.Round) (basics.MicroAlgos, error) {
// expiredOnlineCirculation is a wrapper to call onlineAccounts.expiredOnlineCirculation safely.
func (l *Ledger) expiredOnlineCirculation(rnd, voteRnd basics.Round) (basics.MicroAlgos, error) {
l.trackerMu.RLock()
defer l.trackerMu.RUnlock()
return l.acctsOnline.ExpiredOnlineCirculation(rnd, voteRnd)
return l.acctsOnline.expiredOnlineCirculation(rnd, voteRnd)
}

// ExpiredOnlineCirculation returns the total expired stake at rnd this model produced, while
// expiredOnlineCirculation returns the total expired stake at rnd this model produced, while
// also asserting that the validator and generator Ledgers both agree.
func (m *doubleLedgerAcctModel) ExpiredOnlineCirculation(rnd, voteRnd basics.Round) basics.MicroAlgos {
valStake, err := m.dl.validator.ExpiredOnlineCirculation(rnd, voteRnd)
func (m *doubleLedgerAcctModel) expiredOnlineCirculation(rnd, voteRnd basics.Round) basics.MicroAlgos {
valStake, err := m.dl.validator.expiredOnlineCirculation(rnd, voteRnd)
require.NoError(m.t, err)
genStake, err := m.dl.generator.ExpiredOnlineCirculation(rnd, voteRnd)
valCachedStake, has := m.dl.validator.acctsOnline.expiredCirculationCache.get(rnd, voteRnd)
require.True(m.t, has)
require.Equal(m.t, valStake, valCachedStake)
genStake, err := m.dl.generator.expiredOnlineCirculation(rnd, voteRnd)
require.NoError(m.t, err)
genCachedStake, has := m.dl.generator.acctsOnline.expiredCirculationCache.get(rnd, voteRnd)
require.True(m.t, has)
require.Equal(m.t, genStake, genCachedStake)
require.Equal(m.t, valStake, genStake)
return valStake
}
Expand Down Expand Up @@ -483,7 +489,7 @@ func testOnlineAcctModelSimple(t *testing.T, m onlineAcctModel) {
a.Equal(basics.MicroAlgos{Raw: 43_210_000}, onlineStake)

// expired stake is acct 2 + acct 4
expiredStake := m.ExpiredOnlineCirculation(680, 1000)
expiredStake := m.expiredOnlineCirculation(680, 1000)
a.Equal(basics.MicroAlgos{Raw: 22_110_000}, expiredStake)
}

Expand Down Expand Up @@ -519,23 +525,14 @@ type goOfflineAction struct{ addr basics.Address }

func (a goOfflineAction) apply(t *testing.T, m onlineAcctModel) { m.goOffline(a.addr) }

type updateStakeAction struct {
addr basics.Address
stake uint64
}

func (a updateStakeAction) apply(t *testing.T, m onlineAcctModel) {
m.updateStake(a.addr, basics.MicroAlgos{Raw: a.stake})
}

type checkOnlineStakeAction struct {
rnd, voteRnd basics.Round
online, expired uint64
}

func (a checkOnlineStakeAction) apply(t *testing.T, m onlineAcctModel) {
onlineStake := m.OnlineCirculation(a.rnd, a.voteRnd)
expiredStake := m.ExpiredOnlineCirculation(a.rnd, a.voteRnd)
expiredStake := m.expiredOnlineCirculation(a.rnd, a.voteRnd)
require.Equal(t, basics.MicroAlgos{Raw: a.online}, onlineStake, "round %d, cur %d", a.rnd, m.currentRound())
require.Equal(t, basics.MicroAlgos{Raw: a.expired}, expiredStake, "rnd %d voteRnd %d, cur %d", a.rnd, a.voteRnd, m.currentRound())
}
Expand Down Expand Up @@ -681,7 +678,7 @@ func BenchmarkExpiredOnlineCirculation(b *testing.B) {
// query expired circulation across the available range (last 320 rounds, from ~680 to ~1000)
startRnd := m.currentRound() - 320
offset := basics.Round(i % 320)
_, err := m.dl.validator.ExpiredOnlineCirculation(startRnd+offset, startRnd+offset+320)
_, err := m.dl.validator.expiredOnlineCirculation(startRnd+offset, startRnd+offset+320)
require.NoError(b, err)
//total, err := m.dl.validator.OnlineTotalStake(startRnd + offset)
//b.Log("expired circulation", startRnd+offset, startRnd+offset+320, "returned", expiredStake, "total", total)
Expand Down
6 changes: 3 additions & 3 deletions ledger/acctonline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1939,7 +1939,7 @@ func TestAcctOnline_ExpiredOnlineCirculation(t *testing.T) {
initialOnlineStake, err := oa.onlineCirculation(0, basics.Round(oa.maxBalLookback()))
a.NoError(err)
a.Equal(totalStake, initialOnlineStake)
initialExpired, err := oa.ExpiredOnlineCirculation(0, 1000)
initialExpired, err := oa.expiredOnlineCirculation(0, 1000)
a.NoError(err)
a.Equal(basics.MicroAlgos{Raw: 0}, initialExpired)

Expand Down Expand Up @@ -2146,10 +2146,10 @@ func TestAcctOnline_ExpiredOnlineCirculation(t *testing.T) {
a.Fail("unknown db seed")
}
a.Equal(targetVoteRnd, rnd+basics.Round(params.MaxBalLookback))
_, err := oa.ExpiredOnlineCirculation(rnd, targetVoteRnd)
_, err := oa.expiredOnlineCirculation(rnd, targetVoteRnd)
a.Error(err)
a.Contains(err.Error(), fmt.Sprintf("round %d too high", rnd))
expiredStake, err := oa.ExpiredOnlineCirculation(rnd-1, targetVoteRnd)
expiredStake, err := oa.expiredOnlineCirculation(rnd-1, targetVoteRnd)
a.NoError(err)
a.Equal(expectedExpiredStake, expiredStake)

Expand Down
66 changes: 66 additions & 0 deletions ledger/acctonlineexp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package ledger

import (
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-deadlock"
)

type expiredCirculationCache struct {
cur map[expiredCirculationKey]basics.MicroAlgos
prev map[expiredCirculationKey]basics.MicroAlgos

maxSize int
mu deadlock.RWMutex
}

type expiredCirculationKey struct {
rnd basics.Round
voteRnd basics.Round
}

func makeExpiredCirculationCache(maxSize int) *expiredCirculationCache {
return &expiredCirculationCache{
cur: make(map[expiredCirculationKey]basics.MicroAlgos),
maxSize: maxSize,
}
}

func (c *expiredCirculationCache) get(rnd basics.Round, voteRnd basics.Round) (basics.MicroAlgos, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
if stake, ok := c.cur[expiredCirculationKey{rnd: rnd, voteRnd: voteRnd}]; ok {
return stake, true
}
if stake, ok := c.prev[expiredCirculationKey{rnd: rnd, voteRnd: voteRnd}]; ok {
return stake, true
}

return basics.MicroAlgos{}, false
}

func (c *expiredCirculationCache) put(rnd basics.Round, voteRnd basics.Round, expiredStake basics.MicroAlgos) {
c.mu.Lock()
defer c.mu.Unlock()
if len(c.cur) >= c.maxSize {
c.prev = c.cur
c.cur = make(map[expiredCirculationKey]basics.MicroAlgos)

}
c.cur[expiredCirculationKey{rnd: rnd, voteRnd: voteRnd}] = expiredStake
}
69 changes: 69 additions & 0 deletions ledger/acctonlineexp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package ledger

import (
"testing"

"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/stretchr/testify/require"
)

func TestAcctOnline_ExpiredCirculationCacheBasic(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

cache := makeExpiredCirculationCache(1)

expStake1 := basics.MicroAlgos{Raw: 123}
cache.put(1, 2, expStake1)
stake, ok := cache.get(1, 2)
require.True(t, ok)
require.Equal(t, expStake1, stake)

stake, ok = cache.get(3, 4)
require.False(t, ok)
require.Equal(t, basics.MicroAlgos{}, stake)

expStake2 := basics.MicroAlgos{Raw: 345}
cache.put(3, 4, expStake2)

stake, ok = cache.get(3, 4)
require.True(t, ok)
require.Equal(t, expStake2, stake)

// ensure the old entry is still there
stake, ok = cache.get(1, 2)
require.True(t, ok)
require.Equal(t, expStake1, stake)

// add one more, should evict the first and keep the second
expStake3 := basics.MicroAlgos{Raw: 567}
cache.put(5, 6, expStake3)
stake, ok = cache.get(5, 6)
require.True(t, ok)
require.Equal(t, expStake3, stake)

stake, ok = cache.get(3, 4)
require.True(t, ok)
require.Equal(t, expStake2, stake)

stake, ok = cache.get(1, 2)
require.False(t, ok)
require.Equal(t, basics.MicroAlgos{}, stake)
}

0 comments on commit f851f50

Please sign in to comment.