Skip to content

Commit

Permalink
Merge pull request #116 from SiaFoundation/prune
Browse files Browse the repository at this point in the history
Add support for block pruning
  • Loading branch information
n8maninger authored Dec 7, 2024
2 parents 51f467c + a349c35 commit ed9bce1
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 67 deletions.
7 changes: 7 additions & 0 deletions .changeset/add_support_for_block_pruning.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
default: minor
---

# Add support for block pruning

The chain manager can now automatically delete blocks after a configurable number of confirmations. Note that this does not apply retroactively.
125 changes: 83 additions & 42 deletions chain/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,32 @@ import (
)

type supplementedBlock struct {
Block types.Block
Header *types.BlockHeader
Block *types.Block
Supplement *consensus.V1BlockSupplement
}

func (sb supplementedBlock) EncodeTo(e *types.Encoder) {
e.WriteUint8(2)
(types.V2Block)(sb.Block).EncodeTo(e)
e.WriteBool(sb.Supplement != nil)
if sb.Supplement != nil {
sb.Supplement.EncodeTo(e)
}
e.WriteUint8(3)
types.EncodePtr(e, sb.Header)
types.EncodePtr(e, (*types.V2Block)(sb.Block))
types.EncodePtr(e, sb.Supplement)
}

func (sb *supplementedBlock) DecodeFrom(d *types.Decoder) {
if v := d.ReadUint8(); v != 2 {
d.SetErr(fmt.Errorf("incompatible version (%d)", v))
}
(*types.V2Block)(&sb.Block).DecodeFrom(d)
if d.ReadBool() {
sb.Supplement = new(consensus.V1BlockSupplement)
sb.Supplement.DecodeFrom(d)
}
}

// helper type for decoding just the header information from a block
type supplementedHeader struct {
ParentID types.BlockID
Timestamp time.Time
}

func (sh *supplementedHeader) DecodeFrom(d *types.Decoder) {
if v := d.ReadUint8(); v != 2 {
switch v := d.ReadUint8(); v {
case 2:
sb.Header = nil
sb.Block = new(types.Block)
(*types.V2Block)(sb.Block).DecodeFrom(d)
types.DecodePtr(d, &sb.Supplement)
case 3:
types.DecodePtr(d, &sb.Header)
types.DecodePtrCast[types.V2Block](d, &sb.Block)
types.DecodePtr(d, &sb.Supplement)
default:
d.SetErr(fmt.Errorf("incompatible version (%d)", v))
}
sh.ParentID.DecodeFrom(d)
_ = d.ReadUint64() // nonce
sh.Timestamp = d.ReadTime()
}

type versionedState struct {
Expand Down Expand Up @@ -304,21 +293,62 @@ func (db *DBStore) putState(cs consensus.State) {
db.bucket(bStates).put(cs.Index.ID[:], versionedState{cs})
}

func (db *DBStore) getBlock(id types.BlockID) (b types.Block, bs *consensus.V1BlockSupplement, _ bool) {
func (db *DBStore) getBlock(id types.BlockID) (bh types.BlockHeader, b *types.Block, bs *consensus.V1BlockSupplement, _ bool) {
var sb supplementedBlock
ok := db.bucket(bBlocks).get(id[:], &sb)
return sb.Block, sb.Supplement, ok
if sb.Header == nil {
sb.Header = new(types.BlockHeader)
*sb.Header = sb.Block.Header()
}
return *sb.Header, sb.Block, sb.Supplement, ok
}

func (db *DBStore) putBlock(b types.Block, bs *consensus.V1BlockSupplement) {
id := b.ID()
db.bucket(bBlocks).put(id[:], supplementedBlock{b, bs})
func (db *DBStore) putBlock(bh types.BlockHeader, b *types.Block, bs *consensus.V1BlockSupplement) {
id := bh.ID()
db.bucket(bBlocks).put(id[:], supplementedBlock{&bh, b, bs})
}

func (db *DBStore) getBlockHeader(id types.BlockID) (parentID types.BlockID, timestamp time.Time, _ bool) {
var sh supplementedHeader
ok := db.bucket(bBlocks).get(id[:], &sh)
return sh.ParentID, sh.Timestamp, ok
func (db *DBStore) getAncestorInfo(id types.BlockID) (parentID types.BlockID, timestamp time.Time, ok bool) {
ok = db.bucket(bBlocks).get(id[:], types.DecoderFunc(func(d *types.Decoder) {
v := d.ReadUint8()
if v != 2 && v != 3 {
d.SetErr(fmt.Errorf("incompatible version (%d)", v))
}
// kinda cursed; don't worry about it
if v == 3 {
if !d.ReadBool() {
d.ReadBool()
}
}
parentID.DecodeFrom(d)
_ = d.ReadUint64() // nonce
timestamp = d.ReadTime()
}))
return
}

func (db *DBStore) getBlockHeader(id types.BlockID) (bh types.BlockHeader, ok bool) {
ok = db.bucket(bBlocks).get(id[:], types.DecoderFunc(func(d *types.Decoder) {
v := d.ReadUint8()
if v != 2 && v != 3 {
d.SetErr(fmt.Errorf("incompatible version (%d)", v))
return
}
if v == 3 {
bhp := &bh
types.DecodePtr(d, &bhp)
if bhp != nil {
return
} else if !d.ReadBool() {
d.SetErr(errors.New("neither header nor block present"))
return
}
}
var b types.Block
(*types.V2Block)(&b).DecodeFrom(d)
bh = b.Header()
}))
return
}

func (db *DBStore) treeKey(row, col uint64) []byte {
Expand Down Expand Up @@ -628,9 +658,9 @@ func (db *DBStore) AncestorTimestamp(id types.BlockID) (t time.Time, ok bool) {
}
break
}
ancestorID, _, _ = db.getBlockHeader(ancestorID)
ancestorID, _, _ = db.getAncestorInfo(ancestorID)
}
_, t, ok = db.getBlockHeader(ancestorID)
_, t, ok = db.getAncestorInfo(ancestorID)
return
}

Expand All @@ -646,12 +676,23 @@ func (db *DBStore) AddState(cs consensus.State) {

// Block implements Store.
func (db *DBStore) Block(id types.BlockID) (types.Block, *consensus.V1BlockSupplement, bool) {
return db.getBlock(id)
_, b, bs, ok := db.getBlock(id)
if !ok || b == nil {
return types.Block{}, nil, false
}
return *b, bs, ok
}

// AddBlock implements Store.
func (db *DBStore) AddBlock(b types.Block, bs *consensus.V1BlockSupplement) {
db.putBlock(b, bs)
db.putBlock(b.Header(), &b, bs)
}

// PruneBlock implements Store.
func (db *DBStore) PruneBlock(id types.BlockID) {
if bh, _, _, ok := db.getBlock(id); ok {
db.putBlock(bh, nil, nil)
}
}

func (db *DBStore) shouldFlush() bool {
Expand Down Expand Up @@ -743,7 +784,7 @@ func NewDBStore(db DB, n *consensus.Network, genesisBlock types.Block) (_ *DBSto
dbs.putState(genesisState)
bs := consensus.V1BlockSupplement{Transactions: make([]consensus.V1TransactionSupplement, len(genesisBlock.Transactions))}
cs, cau := consensus.ApplyBlock(genesisState, genesisBlock, bs, time.Time{})
dbs.putBlock(genesisBlock, &bs)
dbs.putBlock(genesisBlock.Header(), &genesisBlock, &bs)
dbs.putState(cs)
dbs.ApplyBlock(cs, cau)
if err := dbs.Flush(); err != nil {
Expand Down
12 changes: 11 additions & 1 deletion chain/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Store interface {

Block(id types.BlockID) (types.Block, *consensus.V1BlockSupplement, bool)
AddBlock(b types.Block, bs *consensus.V1BlockSupplement)
PruneBlock(id types.BlockID)
State(id types.BlockID) (consensus.State, bool)
AddState(cs consensus.State)
AncestorTimestamp(id types.BlockID) (time.Time, bool)
Expand Down Expand Up @@ -74,12 +75,15 @@ func blockAndChild(s Store, id types.BlockID) (types.Block, *consensus.V1BlockSu
// A Manager tracks multiple blockchains and identifies the best valid
// chain.
type Manager struct {
log *zap.Logger
store Store
tipState consensus.State
onReorg map[[16]byte]func(types.ChainIndex)
invalidBlocks map[types.BlockID]error

// configuration options
log *zap.Logger
pruneTarget uint64

txpool struct {
txns []types.Transaction
v2txns []types.V2Transaction
Expand Down Expand Up @@ -314,6 +318,12 @@ func (m *Manager) applyTip(index types.ChainIndex) error {
m.store.ApplyBlock(cs, cau)
m.applyPoolUpdate(cau, cs)
m.tipState = cs

if m.pruneTarget != 0 && cs.Index.Height > m.pruneTarget {
if index, ok := m.store.BestIndex(cs.Index.Height - m.pruneTarget); ok {
m.store.PruneBlock(index.ID)
}
}
return nil
}

Expand Down
7 changes: 7 additions & 0 deletions chain/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,10 @@ func WithLog(l *zap.Logger) ManagerOption {
m.log = l
}
}

// WithPruneTarget sets the target number of blocks to store.
func WithPruneTarget(n uint64) ManagerOption {
return func(m *Manager) {
m.pruneTarget = n
}
}
8 changes: 5 additions & 3 deletions miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ import (

// FindBlockNonce attempts to find a nonce for b that meets the PoW target.
func FindBlockNonce(cs consensus.State, b *types.Block, timeout time.Duration) bool {
b.Nonce = 0
bh := b.Header()
bh.Nonce = 0
factor := cs.NonceFactor()
startBlock := time.Now()
for b.ID().CmpWork(cs.ChildTarget) < 0 {
b.Nonce += factor
for bh.ID().CmpWork(cs.ChildTarget) < 0 {
bh.Nonce += factor
if time.Since(startBlock) > timeout {
return false
}
}
b.Nonce = bh.Nonce
return true
}

Expand Down
2 changes: 1 addition & 1 deletion rhp/v4/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func startTestNode(tb testing.TB, n *consensus.Network, genesis types.Block) (*c
}
tb.Cleanup(func() { syncerListener.Close() })

s := syncer.New(syncerListener, cm, testutil.NewMemPeerStore(), gateway.Header{
s := syncer.New(syncerListener, cm, testutil.NewEphemeralPeerStore(), gateway.Header{
GenesisID: genesis.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: "localhost:1234",
Expand Down
4 changes: 2 additions & 2 deletions syncer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ func (p *Peer) SendCheckpoint(index types.ChainIndex, timeout time.Duration) (ty
}

// RelayV2Header relays a v2 block header to the peer.
func (p *Peer) RelayV2Header(h types.BlockHeader, timeout time.Duration) error {
return p.callRPC(&gateway.RPCRelayV2Header{Header: h}, timeout)
func (p *Peer) RelayV2Header(bh types.BlockHeader, timeout time.Duration) error {
return p.callRPC(&gateway.RPCRelayV2Header{Header: bh}, timeout)
}

// RelayV2BlockOutline relays a v2 block outline to the peer.
Expand Down
10 changes: 6 additions & 4 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,9 +533,9 @@ func (s *Syncer) peerLoop(ctx context.Context) error {

ctx, cancel := context.WithTimeout(ctx, s.config.ConnectTimeout)
if _, err := s.Connect(ctx, p); err != nil {
log.Debug("connected to peer", zap.String("peer", p))
} else {
log.Debug("failed to connect to peer", zap.String("peer", p), zap.Error(err))
} else {
log.Debug("connected to peer", zap.String("peer", p))
}
cancel()
lastTried[p] = time.Now()
Expand Down Expand Up @@ -723,10 +723,12 @@ func (s *Syncer) Connect(ctx context.Context, addr string) (*Peer, error) {
}

// BroadcastHeader broadcasts a header to all peers.
func (s *Syncer) BroadcastHeader(h types.BlockHeader) { s.relayHeader(h, nil) }
func (s *Syncer) BroadcastHeader(bh types.BlockHeader) { s.relayHeader(bh, nil) }

// BroadcastV2Header broadcasts a v2 header to all peers.
func (s *Syncer) BroadcastV2Header(h types.BlockHeader) { s.relayV2Header(h, nil) }
func (s *Syncer) BroadcastV2Header(bh types.BlockHeader) {
s.relayV2Header(bh, nil)
}

// BroadcastV2BlockOutline broadcasts a v2 block outline to all peers.
func (s *Syncer) BroadcastV2BlockOutline(b gateway.V2BlockOutline) { s.relayV2BlockOutline(b, nil) }
Expand Down
4 changes: 2 additions & 2 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ func TestSyncer(t *testing.T) {
}
defer l2.Close()

s1 := syncer.New(l1, cm1, testutil.NewMemPeerStore(), gateway.Header{
s1 := syncer.New(l1, cm1, testutil.NewEphemeralPeerStore(), gateway.Header{
GenesisID: genesis.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: l1.Addr().String(),
}, syncer.WithLogger(log.Named("syncer1")))
defer s1.Close()
go s1.Run(context.Background())

s2 := syncer.New(l2, cm2, testutil.NewMemPeerStore(), gateway.Header{
s2 := syncer.New(l2, cm2, testutil.NewEphemeralPeerStore(), gateway.Header{
GenesisID: genesis.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: l2.Addr().String(),
Expand Down
24 changes: 12 additions & 12 deletions testutil/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import (
"go.sia.tech/coreutils/syncer"
)

// A MemPeerStore is an in-memory implementation of a PeerStore.
type MemPeerStore struct {
// A EphemeralPeerStore is an in-memory implementation of a PeerStore.
type EphemeralPeerStore struct {
mu sync.Mutex
peers map[string]syncer.PeerInfo
}

// AddPeer adds a peer to the store. If the peer already exists, nil should
// be returned.
func (ps *MemPeerStore) AddPeer(addr string) error {
func (ps *EphemeralPeerStore) AddPeer(addr string) error {
ps.mu.Lock()
defer ps.mu.Unlock()
if _, ok := ps.peers[addr]; ok {
Expand All @@ -26,7 +26,7 @@ func (ps *MemPeerStore) AddPeer(addr string) error {
}

// Peers returns the set of known peers.
func (ps *MemPeerStore) Peers() ([]syncer.PeerInfo, error) {
func (ps *EphemeralPeerStore) Peers() ([]syncer.PeerInfo, error) {
ps.mu.Lock()
defer ps.mu.Unlock()
var peers []syncer.PeerInfo
Expand All @@ -38,7 +38,7 @@ func (ps *MemPeerStore) Peers() ([]syncer.PeerInfo, error) {

// PeerInfo returns the metadata for the specified peer or ErrPeerNotFound
// if the peer wasn't found in the store.
func (ps *MemPeerStore) PeerInfo(addr string) (syncer.PeerInfo, error) {
func (ps *EphemeralPeerStore) PeerInfo(addr string) (syncer.PeerInfo, error) {
ps.mu.Lock()
defer ps.mu.Unlock()
p, ok := ps.peers[addr]
Expand All @@ -50,7 +50,7 @@ func (ps *MemPeerStore) PeerInfo(addr string) (syncer.PeerInfo, error) {

// UpdatePeerInfo updates the metadata for the specified peer. If the peer
// is not found, the error should be ErrPeerNotFound.
func (ps *MemPeerStore) UpdatePeerInfo(addr string, fn func(*syncer.PeerInfo)) error {
func (ps *EphemeralPeerStore) UpdatePeerInfo(addr string, fn func(*syncer.PeerInfo)) error {
ps.mu.Lock()
defer ps.mu.Unlock()
p := ps.peers[addr]
Expand All @@ -61,18 +61,18 @@ func (ps *MemPeerStore) UpdatePeerInfo(addr string, fn func(*syncer.PeerInfo)) e

// Ban temporarily bans one or more IPs. The addr should either be a single
// IP with port (e.g. 1.2.3.4:5678) or a CIDR subnet (e.g. 1.2.3.4/16).
func (ps *MemPeerStore) Ban(addr string, duration time.Duration, reason string) error {
func (ps *EphemeralPeerStore) Ban(addr string, duration time.Duration, reason string) error {
return nil
}

// Banned returns false
func (ps *MemPeerStore) Banned(addr string) (bool, error) { return false, nil }
func (ps *EphemeralPeerStore) Banned(addr string) (bool, error) { return false, nil }

var _ syncer.PeerStore = (*MemPeerStore)(nil)
var _ syncer.PeerStore = (*EphemeralPeerStore)(nil)

// NewMemPeerStore returns a new MemPeerStore.
func NewMemPeerStore() *MemPeerStore {
return &MemPeerStore{
// NewEphemeralPeerStore returns a new EphemeralPeerStore.
func NewEphemeralPeerStore() *EphemeralPeerStore {
return &EphemeralPeerStore{
peers: make(map[string]syncer.PeerInfo),
}
}

0 comments on commit ed9bce1

Please sign in to comment.