Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for block pruning #116

Merged
merged 4 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .changeset/add_support_for_block_pruning.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
default: minor
---

# Add support for block pruning

#116 by @lukechampine
n8maninger marked this conversation as resolved.
Show resolved Hide resolved

Been a long time coming. 😅
lukechampine marked this conversation as resolved.
Show resolved Hide resolved

The strategy here is quite naive, but I think it will be serviceable. Basically, when we apply a block `N`, we delete block `N-P`. `P` is therefore the "prune target," i.e. the maximum number of blocks you want to store.

In practice, this isn't exhaustive: it only deletes blocks from the best chain. It also won't dramatically shrink the size of an existing database. I think this is acceptable, because pruning is most important during the initial sync, and during the initial sync, you'll only be receiving blocks from one chain at a time. Also, we don't want to make pruning *too* easy; after all, we need a good percentage of nodes to be storing the full chain, so that others can sync to them.

I tested this out locally with a prune target of 1000, and after syncing 400,000 blocks, my `consensus.db` was around 18 GB. This is disappointing; it should be much smaller. With some investigation, I found that the Bolt database was only storing ~5 GB of data (most of which was the accumulator tree, which we can't prune until after v2). I think this is a combination of a) Bolt grows the DB capacity aggressively in response to writes, and b) Bolt never shrinks the DB capacity. So it's possible that we could reduce this number by tweaking our DB batching parameters. Alternatively, we could provide a tool that copies the DB to a new file. Not the most user-friendly, but again, I think I'm okay with that for now.

Depends on https://github.com/SiaFoundation/core/pull/228
125 changes: 83 additions & 42 deletions chain/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,32 @@ import (
)

type supplementedBlock struct {
Block types.Block
Header *types.BlockHeader
lukechampine marked this conversation as resolved.
Show resolved Hide resolved
Block *types.Block
Supplement *consensus.V1BlockSupplement
}

func (sb supplementedBlock) EncodeTo(e *types.Encoder) {
e.WriteUint8(2)
(types.V2Block)(sb.Block).EncodeTo(e)
e.WriteBool(sb.Supplement != nil)
if sb.Supplement != nil {
sb.Supplement.EncodeTo(e)
}
e.WriteUint8(3)
types.EncodePtr(e, sb.Header)
types.EncodePtr(e, (*types.V2Block)(sb.Block))
types.EncodePtr(e, sb.Supplement)
}

func (sb *supplementedBlock) DecodeFrom(d *types.Decoder) {
if v := d.ReadUint8(); v != 2 {
d.SetErr(fmt.Errorf("incompatible version (%d)", v))
}
(*types.V2Block)(&sb.Block).DecodeFrom(d)
if d.ReadBool() {
sb.Supplement = new(consensus.V1BlockSupplement)
sb.Supplement.DecodeFrom(d)
}
}

// helper type for decoding just the header information from a block
type supplementedHeader struct {
ParentID types.BlockID
Timestamp time.Time
}

func (sh *supplementedHeader) DecodeFrom(d *types.Decoder) {
if v := d.ReadUint8(); v != 2 {
switch v := d.ReadUint8(); v {
case 2:
sb.Header = nil
sb.Block = new(types.Block)
(*types.V2Block)(sb.Block).DecodeFrom(d)
types.DecodePtr(d, &sb.Supplement)
case 3:
types.DecodePtr(d, &sb.Header)
types.DecodePtrCast[types.V2Block](d, &sb.Block)
types.DecodePtr(d, &sb.Supplement)
default:
d.SetErr(fmt.Errorf("incompatible version (%d)", v))
}
sh.ParentID.DecodeFrom(d)
_ = d.ReadUint64() // nonce
sh.Timestamp = d.ReadTime()
}

type versionedState struct {
Expand Down Expand Up @@ -304,21 +293,62 @@ func (db *DBStore) putState(cs consensus.State) {
db.bucket(bStates).put(cs.Index.ID[:], versionedState{cs})
}

func (db *DBStore) getBlock(id types.BlockID) (b types.Block, bs *consensus.V1BlockSupplement, _ bool) {
func (db *DBStore) getBlock(id types.BlockID) (bh types.BlockHeader, b *types.Block, bs *consensus.V1BlockSupplement, _ bool) {
var sb supplementedBlock
ok := db.bucket(bBlocks).get(id[:], &sb)
return sb.Block, sb.Supplement, ok
if sb.Header == nil {
sb.Header = new(types.BlockHeader)
*sb.Header = sb.Block.Header()
}
return *sb.Header, sb.Block, sb.Supplement, ok
}

func (db *DBStore) putBlock(b types.Block, bs *consensus.V1BlockSupplement) {
id := b.ID()
db.bucket(bBlocks).put(id[:], supplementedBlock{b, bs})
func (db *DBStore) putBlock(bh types.BlockHeader, b *types.Block, bs *consensus.V1BlockSupplement) {
id := bh.ID()
db.bucket(bBlocks).put(id[:], supplementedBlock{&bh, b, bs})
}

func (db *DBStore) getBlockHeader(id types.BlockID) (parentID types.BlockID, timestamp time.Time, _ bool) {
var sh supplementedHeader
ok := db.bucket(bBlocks).get(id[:], &sh)
return sh.ParentID, sh.Timestamp, ok
func (db *DBStore) getAncestorInfo(id types.BlockID) (parentID types.BlockID, timestamp time.Time, ok bool) {
ok = db.bucket(bBlocks).get(id[:], types.DecoderFunc(func(d *types.Decoder) {
v := d.ReadUint8()
if v != 2 && v != 3 {
d.SetErr(fmt.Errorf("incompatible version (%d)", v))
}
// kinda cursed; don't worry about it
if v == 3 {
if !d.ReadBool() {
d.ReadBool()
}
}
parentID.DecodeFrom(d)
_ = d.ReadUint64() // nonce
timestamp = d.ReadTime()
}))
return
}

func (db *DBStore) getBlockHeader(id types.BlockID) (bh types.BlockHeader, ok bool) {
ok = db.bucket(bBlocks).get(id[:], types.DecoderFunc(func(d *types.Decoder) {
v := d.ReadUint8()
if v != 2 && v != 3 {
d.SetErr(fmt.Errorf("incompatible version (%d)", v))
return
}
if v == 3 {
bhp := &bh
types.DecodePtr(d, &bhp)
if bhp != nil {
return
} else if !d.ReadBool() {
d.SetErr(errors.New("neither header nor block present"))
return
}
}
var b types.Block
(*types.V2Block)(&b).DecodeFrom(d)
bh = b.Header()
}))
return
}

func (db *DBStore) treeKey(row, col uint64) []byte {
Expand Down Expand Up @@ -628,9 +658,9 @@ func (db *DBStore) AncestorTimestamp(id types.BlockID) (t time.Time, ok bool) {
}
break
}
ancestorID, _, _ = db.getBlockHeader(ancestorID)
ancestorID, _, _ = db.getAncestorInfo(ancestorID)
}
_, t, ok = db.getBlockHeader(ancestorID)
_, t, ok = db.getAncestorInfo(ancestorID)
return
}

Expand All @@ -646,12 +676,23 @@ func (db *DBStore) AddState(cs consensus.State) {

// Block implements Store.
func (db *DBStore) Block(id types.BlockID) (types.Block, *consensus.V1BlockSupplement, bool) {
return db.getBlock(id)
_, b, bs, ok := db.getBlock(id)
if !ok || b == nil {
return types.Block{}, nil, false
}
return *b, bs, ok
}

// AddBlock implements Store.
func (db *DBStore) AddBlock(b types.Block, bs *consensus.V1BlockSupplement) {
db.putBlock(b, bs)
db.putBlock(b.Header(), &b, bs)
}

// PruneBlock implements Store.
func (db *DBStore) PruneBlock(id types.BlockID) {
if bh, _, _, ok := db.getBlock(id); ok {
db.putBlock(bh, nil, nil)
}
}

func (db *DBStore) shouldFlush() bool {
Expand Down Expand Up @@ -743,7 +784,7 @@ func NewDBStore(db DB, n *consensus.Network, genesisBlock types.Block) (_ *DBSto
dbs.putState(genesisState)
bs := consensus.V1BlockSupplement{Transactions: make([]consensus.V1TransactionSupplement, len(genesisBlock.Transactions))}
cs, cau := consensus.ApplyBlock(genesisState, genesisBlock, bs, time.Time{})
dbs.putBlock(genesisBlock, &bs)
dbs.putBlock(genesisBlock.Header(), &genesisBlock, &bs)
dbs.putState(cs)
dbs.ApplyBlock(cs, cau)
if err := dbs.Flush(); err != nil {
Expand Down
12 changes: 11 additions & 1 deletion chain/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Store interface {

Block(id types.BlockID) (types.Block, *consensus.V1BlockSupplement, bool)
AddBlock(b types.Block, bs *consensus.V1BlockSupplement)
PruneBlock(id types.BlockID)
State(id types.BlockID) (consensus.State, bool)
AddState(cs consensus.State)
AncestorTimestamp(id types.BlockID) (time.Time, bool)
Expand Down Expand Up @@ -74,12 +75,15 @@ func blockAndChild(s Store, id types.BlockID) (types.Block, *consensus.V1BlockSu
// A Manager tracks multiple blockchains and identifies the best valid
// chain.
type Manager struct {
log *zap.Logger
store Store
tipState consensus.State
onReorg map[[16]byte]func(types.ChainIndex)
invalidBlocks map[types.BlockID]error

// configuration options
log *zap.Logger
pruneTarget uint64

txpool struct {
txns []types.Transaction
v2txns []types.V2Transaction
Expand Down Expand Up @@ -314,6 +318,12 @@ func (m *Manager) applyTip(index types.ChainIndex) error {
m.store.ApplyBlock(cs, cau)
m.applyPoolUpdate(cau, cs)
m.tipState = cs

if m.pruneTarget != 0 && cs.Index.Height > m.pruneTarget {
if index, ok := m.store.BestIndex(cs.Index.Height - m.pruneTarget); ok {
m.store.PruneBlock(index.ID)
}
}
return nil
}

Expand Down
7 changes: 7 additions & 0 deletions chain/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,10 @@ func WithLog(l *zap.Logger) ManagerOption {
m.log = l
}
}

// WithPruneTarget sets the target number of blocks to store.
func WithPruneTarget(n uint64) ManagerOption {
return func(m *Manager) {
m.pruneTarget = n
}
}
8 changes: 5 additions & 3 deletions miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ import (

// FindBlockNonce attempts to find a nonce for b that meets the PoW target.
func FindBlockNonce(cs consensus.State, b *types.Block, timeout time.Duration) bool {
b.Nonce = 0
bh := b.Header()
bh.Nonce = 0
factor := cs.NonceFactor()
startBlock := time.Now()
for b.ID().CmpWork(cs.ChildTarget) < 0 {
b.Nonce += factor
for bh.ID().CmpWork(cs.ChildTarget) < 0 {
bh.Nonce += factor
if time.Since(startBlock) > timeout {
return false
}
}
b.Nonce = bh.Nonce
return true
}

Expand Down
2 changes: 1 addition & 1 deletion rhp/v4/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func startTestNode(tb testing.TB, n *consensus.Network, genesis types.Block) (*c
}
tb.Cleanup(func() { syncerListener.Close() })

s := syncer.New(syncerListener, cm, testutil.NewMemPeerStore(), gateway.Header{
s := syncer.New(syncerListener, cm, testutil.NewEphemeralPeerStore(), gateway.Header{
GenesisID: genesis.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: "localhost:1234",
Expand Down
4 changes: 2 additions & 2 deletions syncer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ func (p *Peer) SendCheckpoint(index types.ChainIndex, timeout time.Duration) (ty
}

// RelayV2Header relays a v2 block header to the peer.
func (p *Peer) RelayV2Header(h types.BlockHeader, timeout time.Duration) error {
return p.callRPC(&gateway.RPCRelayV2Header{Header: h}, timeout)
func (p *Peer) RelayV2Header(bh types.BlockHeader, timeout time.Duration) error {
return p.callRPC(&gateway.RPCRelayV2Header{Header: bh}, timeout)
}

// RelayV2BlockOutline relays a v2 block outline to the peer.
Expand Down
10 changes: 6 additions & 4 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,9 +533,9 @@ func (s *Syncer) peerLoop(ctx context.Context) error {

ctx, cancel := context.WithTimeout(ctx, s.config.ConnectTimeout)
if _, err := s.Connect(ctx, p); err != nil {
log.Debug("connected to peer", zap.String("peer", p))
} else {
log.Debug("failed to connect to peer", zap.String("peer", p), zap.Error(err))
} else {
log.Debug("connected to peer", zap.String("peer", p))
}
cancel()
lastTried[p] = time.Now()
Expand Down Expand Up @@ -723,10 +723,12 @@ func (s *Syncer) Connect(ctx context.Context, addr string) (*Peer, error) {
}

// BroadcastHeader broadcasts a header to all peers.
func (s *Syncer) BroadcastHeader(h types.BlockHeader) { s.relayHeader(h, nil) }
func (s *Syncer) BroadcastHeader(bh types.BlockHeader) { s.relayHeader(bh, nil) }

// BroadcastV2Header broadcasts a v2 header to all peers.
func (s *Syncer) BroadcastV2Header(h types.BlockHeader) { s.relayV2Header(h, nil) }
func (s *Syncer) BroadcastV2Header(bh types.BlockHeader) {
s.relayV2Header(bh, nil)
}

// BroadcastV2BlockOutline broadcasts a v2 block outline to all peers.
func (s *Syncer) BroadcastV2BlockOutline(b gateway.V2BlockOutline) { s.relayV2BlockOutline(b, nil) }
Expand Down
4 changes: 2 additions & 2 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ func TestSyncer(t *testing.T) {
}
defer l2.Close()

s1 := syncer.New(l1, cm1, testutil.NewMemPeerStore(), gateway.Header{
s1 := syncer.New(l1, cm1, testutil.NewEphemeralPeerStore(), gateway.Header{
GenesisID: genesis.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: l1.Addr().String(),
}, syncer.WithLogger(log.Named("syncer1")))
defer s1.Close()
go s1.Run(context.Background())

s2 := syncer.New(l2, cm2, testutil.NewMemPeerStore(), gateway.Header{
s2 := syncer.New(l2, cm2, testutil.NewEphemeralPeerStore(), gateway.Header{
GenesisID: genesis.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: l2.Addr().String(),
Expand Down
Loading
Loading