diff --git a/.changeset/add_support_for_block_pruning.md b/.changeset/add_support_for_block_pruning.md new file mode 100644 index 0000000..95fe1ad --- /dev/null +++ b/.changeset/add_support_for_block_pruning.md @@ -0,0 +1,7 @@ +--- +default: minor +--- + +# Add support for block pruning + +The chain manager can now automatically delete blocks after a configurable number of confirmations. Note that this does not apply retroactively. diff --git a/chain/db.go b/chain/db.go index 9f1eda0..466acfa 100644 --- a/chain/db.go +++ b/chain/db.go @@ -13,43 +13,32 @@ import ( ) type supplementedBlock struct { - Block types.Block + Header *types.BlockHeader + Block *types.Block Supplement *consensus.V1BlockSupplement } func (sb supplementedBlock) EncodeTo(e *types.Encoder) { - e.WriteUint8(2) - (types.V2Block)(sb.Block).EncodeTo(e) - e.WriteBool(sb.Supplement != nil) - if sb.Supplement != nil { - sb.Supplement.EncodeTo(e) - } + e.WriteUint8(3) + types.EncodePtr(e, sb.Header) + types.EncodePtr(e, (*types.V2Block)(sb.Block)) + types.EncodePtr(e, sb.Supplement) } func (sb *supplementedBlock) DecodeFrom(d *types.Decoder) { - if v := d.ReadUint8(); v != 2 { - d.SetErr(fmt.Errorf("incompatible version (%d)", v)) - } - (*types.V2Block)(&sb.Block).DecodeFrom(d) - if d.ReadBool() { - sb.Supplement = new(consensus.V1BlockSupplement) - sb.Supplement.DecodeFrom(d) - } -} - -// helper type for decoding just the header information from a block -type supplementedHeader struct { - ParentID types.BlockID - Timestamp time.Time -} - -func (sh *supplementedHeader) DecodeFrom(d *types.Decoder) { - if v := d.ReadUint8(); v != 2 { + switch v := d.ReadUint8(); v { + case 2: + sb.Header = nil + sb.Block = new(types.Block) + (*types.V2Block)(sb.Block).DecodeFrom(d) + types.DecodePtr(d, &sb.Supplement) + case 3: + types.DecodePtr(d, &sb.Header) + types.DecodePtrCast[types.V2Block](d, &sb.Block) + types.DecodePtr(d, &sb.Supplement) + default: d.SetErr(fmt.Errorf("incompatible version (%d)", v)) } - sh.ParentID.DecodeFrom(d) - _ = d.ReadUint64() // nonce - sh.Timestamp = d.ReadTime() } type versionedState struct { @@ -304,21 +293,62 @@ func (db *DBStore) putState(cs consensus.State) { db.bucket(bStates).put(cs.Index.ID[:], versionedState{cs}) } -func (db *DBStore) getBlock(id types.BlockID) (b types.Block, bs *consensus.V1BlockSupplement, _ bool) { +func (db *DBStore) getBlock(id types.BlockID) (bh types.BlockHeader, b *types.Block, bs *consensus.V1BlockSupplement, _ bool) { var sb supplementedBlock ok := db.bucket(bBlocks).get(id[:], &sb) - return sb.Block, sb.Supplement, ok + if sb.Header == nil { + sb.Header = new(types.BlockHeader) + *sb.Header = sb.Block.Header() + } + return *sb.Header, sb.Block, sb.Supplement, ok } -func (db *DBStore) putBlock(b types.Block, bs *consensus.V1BlockSupplement) { - id := b.ID() - db.bucket(bBlocks).put(id[:], supplementedBlock{b, bs}) +func (db *DBStore) putBlock(bh types.BlockHeader, b *types.Block, bs *consensus.V1BlockSupplement) { + id := bh.ID() + db.bucket(bBlocks).put(id[:], supplementedBlock{&bh, b, bs}) } -func (db *DBStore) getBlockHeader(id types.BlockID) (parentID types.BlockID, timestamp time.Time, _ bool) { - var sh supplementedHeader - ok := db.bucket(bBlocks).get(id[:], &sh) - return sh.ParentID, sh.Timestamp, ok +func (db *DBStore) getAncestorInfo(id types.BlockID) (parentID types.BlockID, timestamp time.Time, ok bool) { + ok = db.bucket(bBlocks).get(id[:], types.DecoderFunc(func(d *types.Decoder) { + v := d.ReadUint8() + if v != 2 && v != 3 { + d.SetErr(fmt.Errorf("incompatible version (%d)", v)) + } + // kinda cursed; don't worry about it + if v == 3 { + if !d.ReadBool() { + d.ReadBool() + } + } + parentID.DecodeFrom(d) + _ = d.ReadUint64() // nonce + timestamp = d.ReadTime() + })) + return +} + +func (db *DBStore) getBlockHeader(id types.BlockID) (bh types.BlockHeader, ok bool) { + ok = db.bucket(bBlocks).get(id[:], types.DecoderFunc(func(d *types.Decoder) { + v := d.ReadUint8() + if v != 2 && v != 3 { + d.SetErr(fmt.Errorf("incompatible version (%d)", v)) + return + } + if v == 3 { + bhp := &bh + types.DecodePtr(d, &bhp) + if bhp != nil { + return + } else if !d.ReadBool() { + d.SetErr(errors.New("neither header nor block present")) + return + } + } + var b types.Block + (*types.V2Block)(&b).DecodeFrom(d) + bh = b.Header() + })) + return } func (db *DBStore) treeKey(row, col uint64) []byte { @@ -628,9 +658,9 @@ func (db *DBStore) AncestorTimestamp(id types.BlockID) (t time.Time, ok bool) { } break } - ancestorID, _, _ = db.getBlockHeader(ancestorID) + ancestorID, _, _ = db.getAncestorInfo(ancestorID) } - _, t, ok = db.getBlockHeader(ancestorID) + _, t, ok = db.getAncestorInfo(ancestorID) return } @@ -646,12 +676,23 @@ func (db *DBStore) AddState(cs consensus.State) { // Block implements Store. func (db *DBStore) Block(id types.BlockID) (types.Block, *consensus.V1BlockSupplement, bool) { - return db.getBlock(id) + _, b, bs, ok := db.getBlock(id) + if !ok || b == nil { + return types.Block{}, nil, false + } + return *b, bs, ok } // AddBlock implements Store. func (db *DBStore) AddBlock(b types.Block, bs *consensus.V1BlockSupplement) { - db.putBlock(b, bs) + db.putBlock(b.Header(), &b, bs) +} + +// PruneBlock implements Store. +func (db *DBStore) PruneBlock(id types.BlockID) { + if bh, _, _, ok := db.getBlock(id); ok { + db.putBlock(bh, nil, nil) + } } func (db *DBStore) shouldFlush() bool { @@ -743,7 +784,7 @@ func NewDBStore(db DB, n *consensus.Network, genesisBlock types.Block) (_ *DBSto dbs.putState(genesisState) bs := consensus.V1BlockSupplement{Transactions: make([]consensus.V1TransactionSupplement, len(genesisBlock.Transactions))} cs, cau := consensus.ApplyBlock(genesisState, genesisBlock, bs, time.Time{}) - dbs.putBlock(genesisBlock, &bs) + dbs.putBlock(genesisBlock.Header(), &genesisBlock, &bs) dbs.putState(cs) dbs.ApplyBlock(cs, cau) if err := dbs.Flush(); err != nil { diff --git a/chain/manager.go b/chain/manager.go index 679b281..53e47da 100644 --- a/chain/manager.go +++ b/chain/manager.go @@ -45,6 +45,7 @@ type Store interface { Block(id types.BlockID) (types.Block, *consensus.V1BlockSupplement, bool) AddBlock(b types.Block, bs *consensus.V1BlockSupplement) + PruneBlock(id types.BlockID) State(id types.BlockID) (consensus.State, bool) AddState(cs consensus.State) AncestorTimestamp(id types.BlockID) (time.Time, bool) @@ -74,12 +75,15 @@ func blockAndChild(s Store, id types.BlockID) (types.Block, *consensus.V1BlockSu // A Manager tracks multiple blockchains and identifies the best valid // chain. type Manager struct { - log *zap.Logger store Store tipState consensus.State onReorg map[[16]byte]func(types.ChainIndex) invalidBlocks map[types.BlockID]error + // configuration options + log *zap.Logger + pruneTarget uint64 + txpool struct { txns []types.Transaction v2txns []types.V2Transaction @@ -314,6 +318,12 @@ func (m *Manager) applyTip(index types.ChainIndex) error { m.store.ApplyBlock(cs, cau) m.applyPoolUpdate(cau, cs) m.tipState = cs + + if m.pruneTarget != 0 && cs.Index.Height > m.pruneTarget { + if index, ok := m.store.BestIndex(cs.Index.Height - m.pruneTarget); ok { + m.store.PruneBlock(index.ID) + } + } return nil } diff --git a/chain/options.go b/chain/options.go index f7fa10a..05d5f6d 100644 --- a/chain/options.go +++ b/chain/options.go @@ -11,3 +11,10 @@ func WithLog(l *zap.Logger) ManagerOption { m.log = l } } + +// WithPruneTarget sets the target number of blocks to store. +func WithPruneTarget(n uint64) ManagerOption { + return func(m *Manager) { + m.pruneTarget = n + } +} diff --git a/miner.go b/miner.go index db4358a..e028d9d 100644 --- a/miner.go +++ b/miner.go @@ -10,15 +10,17 @@ import ( // FindBlockNonce attempts to find a nonce for b that meets the PoW target. func FindBlockNonce(cs consensus.State, b *types.Block, timeout time.Duration) bool { - b.Nonce = 0 + bh := b.Header() + bh.Nonce = 0 factor := cs.NonceFactor() startBlock := time.Now() - for b.ID().CmpWork(cs.ChildTarget) < 0 { - b.Nonce += factor + for bh.ID().CmpWork(cs.ChildTarget) < 0 { + bh.Nonce += factor if time.Since(startBlock) > timeout { return false } } + b.Nonce = bh.Nonce return true } diff --git a/rhp/v4/rpc_test.go b/rhp/v4/rpc_test.go index a869635..e264b1b 100644 --- a/rhp/v4/rpc_test.go +++ b/rhp/v4/rpc_test.go @@ -74,7 +74,7 @@ func startTestNode(tb testing.TB, n *consensus.Network, genesis types.Block) (*c } tb.Cleanup(func() { syncerListener.Close() }) - s := syncer.New(syncerListener, cm, testutil.NewMemPeerStore(), gateway.Header{ + s := syncer.New(syncerListener, cm, testutil.NewEphemeralPeerStore(), gateway.Header{ GenesisID: genesis.ID(), UniqueID: gateway.GenerateUniqueID(), NetAddress: "localhost:1234", diff --git a/syncer/peer.go b/syncer/peer.go index 45a6d81..de98c86 100644 --- a/syncer/peer.go +++ b/syncer/peer.go @@ -182,8 +182,8 @@ func (p *Peer) SendCheckpoint(index types.ChainIndex, timeout time.Duration) (ty } // RelayV2Header relays a v2 block header to the peer. -func (p *Peer) RelayV2Header(h types.BlockHeader, timeout time.Duration) error { - return p.callRPC(&gateway.RPCRelayV2Header{Header: h}, timeout) +func (p *Peer) RelayV2Header(bh types.BlockHeader, timeout time.Duration) error { + return p.callRPC(&gateway.RPCRelayV2Header{Header: bh}, timeout) } // RelayV2BlockOutline relays a v2 block outline to the peer. diff --git a/syncer/syncer.go b/syncer/syncer.go index f1ac29b..6778d79 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -533,9 +533,9 @@ func (s *Syncer) peerLoop(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, s.config.ConnectTimeout) if _, err := s.Connect(ctx, p); err != nil { - log.Debug("connected to peer", zap.String("peer", p)) - } else { log.Debug("failed to connect to peer", zap.String("peer", p), zap.Error(err)) + } else { + log.Debug("connected to peer", zap.String("peer", p)) } cancel() lastTried[p] = time.Now() @@ -723,10 +723,12 @@ func (s *Syncer) Connect(ctx context.Context, addr string) (*Peer, error) { } // BroadcastHeader broadcasts a header to all peers. -func (s *Syncer) BroadcastHeader(h types.BlockHeader) { s.relayHeader(h, nil) } +func (s *Syncer) BroadcastHeader(bh types.BlockHeader) { s.relayHeader(bh, nil) } // BroadcastV2Header broadcasts a v2 header to all peers. -func (s *Syncer) BroadcastV2Header(h types.BlockHeader) { s.relayV2Header(h, nil) } +func (s *Syncer) BroadcastV2Header(bh types.BlockHeader) { + s.relayV2Header(bh, nil) +} // BroadcastV2BlockOutline broadcasts a v2 block outline to all peers. func (s *Syncer) BroadcastV2BlockOutline(b gateway.V2BlockOutline) { s.relayV2BlockOutline(b, nil) } diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index d3551a0..eb5d673 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -42,7 +42,7 @@ func TestSyncer(t *testing.T) { } defer l2.Close() - s1 := syncer.New(l1, cm1, testutil.NewMemPeerStore(), gateway.Header{ + s1 := syncer.New(l1, cm1, testutil.NewEphemeralPeerStore(), gateway.Header{ GenesisID: genesis.ID(), UniqueID: gateway.GenerateUniqueID(), NetAddress: l1.Addr().String(), @@ -50,7 +50,7 @@ func TestSyncer(t *testing.T) { defer s1.Close() go s1.Run(context.Background()) - s2 := syncer.New(l2, cm2, testutil.NewMemPeerStore(), gateway.Header{ + s2 := syncer.New(l2, cm2, testutil.NewEphemeralPeerStore(), gateway.Header{ GenesisID: genesis.ID(), UniqueID: gateway.GenerateUniqueID(), NetAddress: l2.Addr().String(), diff --git a/testutil/syncer.go b/testutil/syncer.go index 1aba4f6..caa621a 100644 --- a/testutil/syncer.go +++ b/testutil/syncer.go @@ -7,15 +7,15 @@ import ( "go.sia.tech/coreutils/syncer" ) -// A MemPeerStore is an in-memory implementation of a PeerStore. -type MemPeerStore struct { +// A EphemeralPeerStore is an in-memory implementation of a PeerStore. +type EphemeralPeerStore struct { mu sync.Mutex peers map[string]syncer.PeerInfo } // AddPeer adds a peer to the store. If the peer already exists, nil should // be returned. -func (ps *MemPeerStore) AddPeer(addr string) error { +func (ps *EphemeralPeerStore) AddPeer(addr string) error { ps.mu.Lock() defer ps.mu.Unlock() if _, ok := ps.peers[addr]; ok { @@ -26,7 +26,7 @@ func (ps *MemPeerStore) AddPeer(addr string) error { } // Peers returns the set of known peers. -func (ps *MemPeerStore) Peers() ([]syncer.PeerInfo, error) { +func (ps *EphemeralPeerStore) Peers() ([]syncer.PeerInfo, error) { ps.mu.Lock() defer ps.mu.Unlock() var peers []syncer.PeerInfo @@ -38,7 +38,7 @@ func (ps *MemPeerStore) Peers() ([]syncer.PeerInfo, error) { // PeerInfo returns the metadata for the specified peer or ErrPeerNotFound // if the peer wasn't found in the store. -func (ps *MemPeerStore) PeerInfo(addr string) (syncer.PeerInfo, error) { +func (ps *EphemeralPeerStore) PeerInfo(addr string) (syncer.PeerInfo, error) { ps.mu.Lock() defer ps.mu.Unlock() p, ok := ps.peers[addr] @@ -50,7 +50,7 @@ func (ps *MemPeerStore) PeerInfo(addr string) (syncer.PeerInfo, error) { // UpdatePeerInfo updates the metadata for the specified peer. If the peer // is not found, the error should be ErrPeerNotFound. -func (ps *MemPeerStore) UpdatePeerInfo(addr string, fn func(*syncer.PeerInfo)) error { +func (ps *EphemeralPeerStore) UpdatePeerInfo(addr string, fn func(*syncer.PeerInfo)) error { ps.mu.Lock() defer ps.mu.Unlock() p := ps.peers[addr] @@ -61,18 +61,18 @@ func (ps *MemPeerStore) UpdatePeerInfo(addr string, fn func(*syncer.PeerInfo)) e // Ban temporarily bans one or more IPs. The addr should either be a single // IP with port (e.g. 1.2.3.4:5678) or a CIDR subnet (e.g. 1.2.3.4/16). -func (ps *MemPeerStore) Ban(addr string, duration time.Duration, reason string) error { +func (ps *EphemeralPeerStore) Ban(addr string, duration time.Duration, reason string) error { return nil } // Banned returns false -func (ps *MemPeerStore) Banned(addr string) (bool, error) { return false, nil } +func (ps *EphemeralPeerStore) Banned(addr string) (bool, error) { return false, nil } -var _ syncer.PeerStore = (*MemPeerStore)(nil) +var _ syncer.PeerStore = (*EphemeralPeerStore)(nil) -// NewMemPeerStore returns a new MemPeerStore. -func NewMemPeerStore() *MemPeerStore { - return &MemPeerStore{ +// NewEphemeralPeerStore returns a new EphemeralPeerStore. +func NewEphemeralPeerStore() *EphemeralPeerStore { + return &EphemeralPeerStore{ peers: make(map[string]syncer.PeerInfo), } }