From cb6f83fec887c8ee221bca225cc89ed0293651f3 Mon Sep 17 00:00:00 2001 From: evan-forbes Date: Tue, 24 Aug 2021 17:11:27 -0500 Subject: [PATCH] Revert "Save and load block data using IPFS (#374)" This reverts commit 8da16447f34680639033b531ed10b9ab756a14e0. --- blockchain/v0/reactor.go | 13 ++--- blockchain/v0/reactor_test.go | 20 +++----- consensus/byzantine_test.go | 9 ++-- consensus/common_test.go | 17 +++---- consensus/mempool_test.go | 11 +++-- consensus/reactor_test.go | 4 +- consensus/replay.go | 11 ++--- consensus/replay_file.go | 2 +- consensus/replay_test.go | 30 +++++------- consensus/state.go | 6 +-- consensus/wal_test.go | 2 +- evidence/pool_test.go | 9 ++-- go.mod | 1 - ipfs/mock.go | 7 --- node/node.go | 2 +- node/node_test.go | 3 +- rpc/core/blocks.go | 21 ++------ rpc/core/blocks_test.go | 27 ++++------- rpc/core/tx.go | 10 +--- state/services.go | 8 ++-- store/mock.go | 21 -------- store/store.go | 85 +++++++++++++-------------------- store/store_test.go | 90 +++++++++++------------------------ 23 files changed, 130 insertions(+), 279 deletions(-) delete mode 100644 store/mock.go diff --git a/blockchain/v0/reactor.go b/blockchain/v0/reactor.go index f30a40a109..55bde3b247 100644 --- a/blockchain/v0/reactor.go +++ b/blockchain/v0/reactor.go @@ -1,7 +1,6 @@ package v0 import ( - "context" "fmt" "reflect" "time" @@ -179,10 +178,7 @@ func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { func (bcR *BlockchainReactor) respondToPeer(msg *bcproto.BlockRequest, src p2p.Peer) (queued bool) { - block, err := bcR.store.LoadBlock(context.TODO(), msg.Height) - if err != nil { - panic(err) - } + block := bcR.store.LoadBlock(msg.Height) if block != nil { bl, err := block.ToProto() if err != nil { @@ -422,14 +418,11 @@ FOR_LOOP: bcR.pool.PopRequest() // TODO: batch saves so we dont persist to disk every block - err := bcR.store.SaveBlock(context.TODO(), first, firstParts, second.LastCommit) - if err != nil { - // an error is only returned if something with the local IPFS blockstore is seriously wrong - panic(err) - } + bcR.store.SaveBlock(first, firstParts, second.LastCommit) // TODO: same thing for app - but we would need a way to get the hash // without persisting the state. + var err error state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first) if err != nil { // TODO This is bad, are we zombie? diff --git a/blockchain/v0/reactor_test.go b/blockchain/v0/reactor_test.go index cf774e95f0..55155cdbb8 100644 --- a/blockchain/v0/reactor_test.go +++ b/blockchain/v0/reactor_test.go @@ -1,7 +1,6 @@ package v0 import ( - "context" "crypto/sha256" "fmt" "os" @@ -9,6 +8,7 @@ import ( "testing" "time" + mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -69,9 +69,10 @@ func newBlockchainReactor( panic(fmt.Errorf("error start app: %w", err)) } + blockDB := memdb.NewDB() stateDB := memdb.NewDB() stateStore := sm.NewStore(stateDB) - blockStore := store.MockBlockStore(nil) + blockStore := store.NewBlockStore(blockDB, mdutils.Mock()) state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc) if err != nil { @@ -99,10 +100,7 @@ func newBlockchainReactor( lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil) if blockHeight > 1 { lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1) - lastBlock, err := blockStore.LoadBlock(context.TODO(), blockHeight-1) - if err != nil { - panic(err) - } + lastBlock := blockStore.LoadBlock(blockHeight - 1) vote, err := types.MakeVote( lastBlock.Header.Height, @@ -129,10 +127,7 @@ func newBlockchainReactor( panic(fmt.Errorf("error apply block: %w", err)) } - err := blockStore.SaveBlock(context.TODO(), thisBlock, thisParts, lastCommit) - if err != nil { - panic(err) - } + blockStore.SaveBlock(thisBlock, thisParts, lastCommit) } bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) @@ -189,10 +184,7 @@ func TestNoBlockResponse(t *testing.T) { assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height()) for _, tt := range tests { - block, err := reactorPairs[1].reactor.store.LoadBlock(context.TODO(), tt.height) - if err != nil { - panic(err) - } + block := reactorPairs[1].reactor.store.LoadBlock(tt.height) if tt.existent { assert.True(t, block != nil) } else { diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 0934785e37..2c2efa2879 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -9,9 +9,7 @@ import ( "testing" "time" - "github.com/ipfs/go-blockservice" - offline "github.com/ipfs/go-ipfs-exchange-offline" - "github.com/ipfs/go-merkledag" + mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -59,9 +57,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { app.InitChain(abci.RequestInitChain{Validators: vals}) blockDB := memdb.NewDB() - bs := ipfs.MockBlockStore() - dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) - blockStore := store.NewBlockStore(blockDB, bs, log.TestingLogger()) + dag := mdutils.Mock() + blockStore := store.NewBlockStore(blockDB, dag) // one for mempool, one for consensus mtx := new(tmsync.Mutex) diff --git a/consensus/common_test.go b/consensus/common_test.go index b00468ccf5..23dfef6bc8 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -14,10 +14,7 @@ import ( "time" "github.com/go-kit/kit/log/term" - "github.com/ipfs/go-blockservice" - offline "github.com/ipfs/go-ipfs-exchange-offline" format "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-merkledag" mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/require" @@ -359,7 +356,7 @@ func subscribeToVoter(cs *State, addr []byte) <-chan tmpubsub.Message { func newState(state sm.State, pv types.PrivValidator, app abci.Application, ipfsDagAPI format.DAGService) *State { config := cfg.ResetTestRoot("consensus_state_test") - return newStateWithConfig(config, state, pv, app) + return newStateWithConfig(config, state, pv, app, ipfsDagAPI) } func newStateWithConfig( @@ -367,9 +364,10 @@ func newStateWithConfig( state sm.State, pv types.PrivValidator, app abci.Application, + ipfsDagAPI format.DAGService, ) *State { blockDB := memdb.NewDB() - return newStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB) + return newStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB, ipfsDagAPI) } func newStateWithConfigAndBlockStore( @@ -378,11 +376,10 @@ func newStateWithConfigAndBlockStore( pv types.PrivValidator, app abci.Application, blockDB dbm.DB, + dag format.DAGService, ) *State { // Get BlockStore - bs := ipfs.MockBlockStore() - dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) - blockStore := store.NewBlockStore(blockDB, bs, log.TestingLogger()) + blockStore := store.NewBlockStore(blockDB, dag) // one for mempool, one for consensus mtx := new(tmsync.Mutex) @@ -711,7 +708,7 @@ func randConsensusNet( vals := types.TM2PB.ValidatorUpdates(state.Validators) app.InitChain(abci.RequestInitChain{Validators: vals}) - css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB) + css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB, mdutils.Mock()) css[i].SetTimeoutTicker(tickerFunc()) css[i].SetLogger(logger.With("validator", i, "module", "consensus")) } @@ -774,7 +771,7 @@ func randConsensusNetWithPeers( app.InitChain(abci.RequestInitChain{Validators: vals}) // sm.SaveState(stateDB,state) //height 1's validatorsInfo already saved in LoadStateFromDBOrGenesisDoc above - css[i] = newStateWithConfig(thisConfig, state, privVal, app) + css[i] = newStateWithConfig(thisConfig, state, privVal, app, mdutils.Mock()) css[i].SetTimeoutTicker(tickerFunc()) css[i].SetLogger(logger.With("validator", i, "module", "consensus")) } diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 1624e2e704..cbbafa0292 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -29,7 +30,7 @@ func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) { config.Consensus.CreateEmptyBlocks = false state, privVals := randGenesisState(1, false, 10) - cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication()) + cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock()) assertMempool(cs.txNotifier).EnableTxsAvailable() height, round := cs.Height, cs.Round newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock) @@ -49,7 +50,7 @@ func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) { config.Consensus.CreateEmptyBlocksInterval = ensureTimeout state, privVals := randGenesisState(1, false, 10) - cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication()) + cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock()) assertMempool(cs.txNotifier).EnableTxsAvailable() @@ -67,7 +68,7 @@ func TestMempoolProgressInHigherRound(t *testing.T) { config.Consensus.CreateEmptyBlocks = false state, privVals := randGenesisState(1, false, 10) - cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication()) + cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock()) assertMempool(cs.txNotifier).EnableTxsAvailable() height, round := cs.Height, cs.Round @@ -117,7 +118,7 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) { blockDB := memdb.NewDB() stateStore := sm.NewStore(blockDB) - cs := newStateWithConfigAndBlockStore(config, state, privVals[0], NewCounterApplication(), blockDB) + cs := newStateWithConfigAndBlockStore(config, state, privVals[0], NewCounterApplication(), blockDB, mdutils.Mock()) err := stateStore.Save(state) require.NoError(t, err) newBlockHeaderCh := subscribe(cs.eventBus, types.EventQueryNewBlockHeader) @@ -143,7 +144,7 @@ func TestMempoolRmBadTx(t *testing.T) { blockDB := memdb.NewDB() stateStore := sm.NewStore(blockDB) - cs := newStateWithConfigAndBlockStore(config, state, privVals[0], app, blockDB) + cs := newStateWithConfigAndBlockStore(config, state, privVals[0], app, blockDB, mdutils.Mock()) err := stateStore.Save(state) require.NoError(t, err) diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index a0c668bdb5..a322919436 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -155,8 +155,10 @@ func TestReactorWithEvidence(t *testing.T) { // duplicate code from: // css[i] = newStateWithConfig(thisConfig, state, privVals[i], app) + blockDB := memdb.NewDB() dag := mdutils.Mock() - blockStore := store.MockBlockStore(nil) + blockStore := store.NewBlockStore(blockDB, dag) + // one for mempool, one for consensus mtx := new(tmsync.Mutex) proxyAppConnMem := abcicli.NewLocalClient(mtx, app) diff --git a/consensus/replay.go b/consensus/replay.go index e01845abf6..1bc77f9e51 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -463,10 +463,7 @@ func (h *Handshaker) replayBlocks( } for i := firstBlock; i <= finalBlock; i++ { h.logger.Info("Applying block", "height", i) - block, err := h.store.LoadBlock(context.TODO(), i) - if err != nil { - return nil, err - } + block := h.store.LoadBlock(i) // Extra check to ensure the app was not changed in a way it shouldn't have. if len(appHash) > 0 { assertAppHashEqualsOneFromBlock(appHash, block) @@ -495,10 +492,7 @@ func (h *Handshaker) replayBlocks( // ApplyBlock on the proxyApp with the last block. func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) { - block, err := h.store.LoadBlock(context.TODO(), height) - if err != nil { - return sm.State{}, err - } + block := h.store.LoadBlock(height) meta := h.store.LoadBlockMeta(height) // Use stubs for both mempool and evidence pool since no transactions nor @@ -506,6 +500,7 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}) blockExec.SetEventBus(h.eventBus) + var err error state, _, err = blockExec.ApplyBlock(state, meta.BlockID, block) if err != nil { return sm.State{}, err diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 5d5682f41a..92439dea2f 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -291,7 +291,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo tmos.Exit(err.Error()) } dag := mdutils.Mock() - blockStore := store.MockBlockStore(blockStoreDB) + blockStore := store.NewBlockStore(blockStoreDB, dag) // Get State stateDB, err := badgerdb.NewDB("state", config.DBDir()) diff --git a/consensus/replay_test.go b/consensus/replay_test.go index a6888d3c45..8e157eb622 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -79,6 +79,7 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Confi privValidator, kvstore.NewApplication(), blockDB, + mdutils.Mock(), ) cs.SetLogger(logger) @@ -173,6 +174,7 @@ LOOP: privValidator, kvstore.NewApplication(), blockDB, + mdutils.Mock(), ) cs.SetLogger(logger) @@ -546,9 +548,7 @@ func TestSimulateValidatorsChange(t *testing.T) { sim.Chain = make([]*types.Block, 0) sim.Commits = make([]*types.Commit, 0) for i := 1; i <= numBlocks; i++ { - blck, err := css[0].blockStore.LoadBlock(context.TODO(), int64(i)) - require.NoError(t, err) - sim.Chain = append(sim.Chain, blck) + sim.Chain = append(sim.Chain, css[0].blockStore.LoadBlock(int64(i))) sim.Commits = append(sim.Commits, css[0].blockStore.LoadBlockCommit(int64(i))) } } @@ -1195,15 +1195,13 @@ func newMockBlockStore(config *cfg.Config, params tmproto.ConsensusParams) *mock return &mockBlockStore{config, params, nil, nil, 0, mdutils.Mock()} } -func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) } -func (bs *mockBlockStore) Base() int64 { return bs.base } -func (bs *mockBlockStore) Size() int64 { return bs.Height() - bs.Base() + 1 } -func (bs *mockBlockStore) LoadBaseMeta() *types.BlockMeta { return bs.LoadBlockMeta(bs.base) } -func (bs *mockBlockStore) LoadBlock(ctx context.Context, height int64) (*types.Block, error) { - return bs.chain[height-1], nil -} -func (bs *mockBlockStore) LoadBlockByHash(ctx context.Context, hash []byte) (*types.Block, error) { - return bs.chain[int64(len(bs.chain))-1], nil +func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) } +func (bs *mockBlockStore) Base() int64 { return bs.base } +func (bs *mockBlockStore) Size() int64 { return bs.Height() - bs.Base() + 1 } +func (bs *mockBlockStore) LoadBaseMeta() *types.BlockMeta { return bs.LoadBlockMeta(bs.base) } +func (bs *mockBlockStore) LoadBlock(height int64) *types.Block { return bs.chain[height-1] } +func (bs *mockBlockStore) LoadBlockByHash(hash []byte) *types.Block { + return bs.chain[int64(len(bs.chain))-1] } func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta { block := bs.chain[height-1] @@ -1213,13 +1211,7 @@ func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta { } } func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil } -func (bs *mockBlockStore) SaveBlock( - ctx context.Context, - block *types.Block, - blockParts *types.PartSet, - seenCommit *types.Commit, -) error { - return nil +func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { } func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit { return bs.commits[height-1] diff --git a/consensus/state.go b/consensus/state.go index 659ae9d7a0..a1476ba7c6 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -2,7 +2,6 @@ package consensus import ( "bytes" - "context" "errors" "fmt" "io/ioutil" @@ -1545,10 +1544,7 @@ func (cs *State) finalizeCommit(height int64) { // but may differ from the LastCommit included in the next block precommits := cs.Votes.Precommits(cs.CommitRound) seenCommit := precommits.MakeCommit() - err := cs.blockStore.SaveBlock(context.TODO(), block, blockParts, seenCommit) - if err != nil { - panic(err) - } + cs.blockStore.SaveBlock(block, blockParts, seenCommit) } else { // Happens during replay if we already saved the block but didn't commit cs.Logger.Info("Calling finalizeCommit on already stored block", "height", block.Height) diff --git a/consensus/wal_test.go b/consensus/wal_test.go index 75778d3902..08d1d969a5 100644 --- a/consensus/wal_test.go +++ b/consensus/wal_test.go @@ -311,7 +311,7 @@ func walGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { t.Error(err) } dag := mdutils.Mock() - blockStore := store.MockBlockStore(blockStoreDB) + blockStore := store.NewBlockStore(blockStoreDB, dag) proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app)) proxyApp.SetLogger(logger.With("module", "proxy")) diff --git a/evidence/pool_test.go b/evidence/pool_test.go index 55623c6538..bb1a06baaa 100644 --- a/evidence/pool_test.go +++ b/evidence/pool_test.go @@ -1,11 +1,11 @@ package evidence_test import ( - "context" "os" "testing" "time" + mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -396,7 +396,7 @@ func initializeValidatorState(privVal types.PrivValidator, height int64) sm.Stor // initializeBlockStore creates a block storage and populates it w/ a dummy // block at +height+. func initializeBlockStore(db dbm.DB, state sm.State, valAddr []byte) *store.BlockStore { - blockStore := store.MockBlockStore(db) + blockStore := store.NewBlockStore(db, mdutils.Mock()) for i := int64(1); i <= state.LastBlockHeight; i++ { lastCommit := makeCommit(i-1, valAddr) @@ -408,10 +408,7 @@ func initializeBlockStore(db dbm.DB, state sm.State, valAddr []byte) *store.Bloc partSet := block.MakePartSet(parts) seenCommit := makeCommit(i, valAddr) - err := blockStore.SaveBlock(context.TODO(), block, partSet, seenCommit) - if err != nil { - panic(err) - } + blockStore.SaveBlock(block, partSet, seenCommit) } return blockStore diff --git a/go.mod b/go.mod index 6cf863b7c6..525e0e21e8 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,6 @@ require ( github.com/ipfs/go-ipfs-api v0.2.0 github.com/ipfs/go-ipfs-blockstore v0.1.4 github.com/ipfs/go-ipfs-config v0.11.0 - github.com/ipfs/go-ipfs-exchange-offline v0.0.1 github.com/ipfs/go-ipfs-routing v0.1.0 github.com/ipfs/go-ipld-format v0.2.0 github.com/ipfs/go-merkledag v0.3.2 diff --git a/ipfs/mock.go b/ipfs/mock.go index 40a8ff36e5..45e66e542e 100644 --- a/ipfs/mock.go +++ b/ipfs/mock.go @@ -3,9 +3,6 @@ package ipfs import ( "context" - ds "github.com/ipfs/go-datastore" - ds_sync "github.com/ipfs/go-datastore/sync" - blockstore "github.com/ipfs/go-ipfs-blockstore" nilrouting "github.com/ipfs/go-ipfs-routing/none" "github.com/ipfs/go-ipfs/core" coremock "github.com/ipfs/go-ipfs/core/mock" @@ -46,7 +43,3 @@ func MockRouting() routing.Routing { croute, _ := nilrouting.ConstructNilRouting(context.TODO(), nil, nil, nil) return croute } - -func MockBlockStore() blockstore.Blockstore { - return blockstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) -} diff --git a/node/node.go b/node/node.go index f548ae3e2d..b203bf4793 100644 --- a/node/node.go +++ b/node/node.go @@ -690,7 +690,7 @@ func NewNode(config *cfg.Config, return nil, err } - blockStore := store.NewBlockStore(blockStoreDB, ipfsNode.Blockstore, logger) + blockStore := store.NewBlockStore(blockStoreDB, ipfsNode.DAG) // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, // and replays any blocks as necessary to sync tendermint with the app. diff --git a/node/node_test.go b/node/node_test.go index 59de04dac2..2a4641f465 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -283,7 +284,7 @@ func TestCreateProposalBlock(t *testing.T) { // Make EvidencePool evidenceDB := memdb.NewDB() - blockStore := store.MockBlockStore(nil) + blockStore := store.NewBlockStore(memdb.NewDB(), mdutils.Mock()) evidencePool, err := evidence.NewPool(evidenceDB, stateStore, blockStore) require.NoError(t, err) evidencePool.SetLogger(logger) diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index 24236e73f6..20a97fdb7a 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -89,10 +89,7 @@ func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) return nil, err } - block, err := env.BlockStore.LoadBlock(ctx.Context(), height) - if err != nil { - return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: block}, err - } + block := env.BlockStore.LoadBlock(height) blockMeta := env.BlockStore.LoadBlockMeta(height) if blockMeta == nil { return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: block}, nil @@ -103,10 +100,7 @@ func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) // BlockByHash gets block by hash. // More: https://docs.tendermint.com/master/rpc/#/Info/block_by_hash func BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error) { - block, err := env.BlockStore.LoadBlockByHash(ctx.Context(), hash) - if err != nil { - return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: nil}, err - } + block := env.BlockStore.LoadBlockByHash(hash) if block == nil { return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: nil}, nil } @@ -151,14 +145,9 @@ func DataAvailabilityHeader(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.Re // TODO: store DAHeader to avoid loading the full block each time // depends on either: - // - https://github.com/celestiaorg/celestia-core/pull/312, or - // - https://github.com/celestiaorg/celestia-core/pull/218 - block, err := env.BlockStore.LoadBlock(ctx.Context(), height) - if err != nil { - return &ctypes.ResultDataAvailabilityHeader{ - DataAvailabilityHeader: types.DataAvailabilityHeader{}, - }, err - } + // - https://github.com/lazyledger/lazyledger-core/pull/312, or + // - https://github.com/lazyledger/lazyledger-core/pull/218 + block := env.BlockStore.LoadBlock(height) _ = block.Hash() dah := block.DataAvailabilityHeader return &ctypes.ResultDataAvailabilityHeader{ diff --git a/rpc/core/blocks_test.go b/rpc/core/blocks_test.go index d4e68f0666..9998969a75 100644 --- a/rpc/core/blocks_test.go +++ b/rpc/core/blocks_test.go @@ -1,7 +1,6 @@ package core import ( - "context" "fmt" "testing" @@ -119,26 +118,16 @@ type mockBlockStore struct { height int64 } -func (mockBlockStore) Base() int64 { return 1 } -func (store mockBlockStore) Height() int64 { return store.height } -func (store mockBlockStore) Size() int64 { return store.height } -func (mockBlockStore) LoadBaseMeta() *types.BlockMeta { return nil } -func (mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta { return nil } -func (mockBlockStore) LoadBlock(ctx context.Context, height int64) (*types.Block, error) { - return nil, nil -} -func (mockBlockStore) LoadBlockByHash(ctx context.Context, hash []byte) (*types.Block, error) { - return nil, nil -} +func (mockBlockStore) Base() int64 { return 1 } +func (store mockBlockStore) Height() int64 { return store.height } +func (store mockBlockStore) Size() int64 { return store.height } +func (mockBlockStore) LoadBaseMeta() *types.BlockMeta { return nil } +func (mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta { return nil } +func (mockBlockStore) LoadBlock(height int64) *types.Block { return nil } +func (mockBlockStore) LoadBlockByHash(hash []byte) *types.Block { return nil } func (mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil } func (mockBlockStore) LoadBlockCommit(height int64) *types.Commit { return nil } func (mockBlockStore) LoadSeenCommit(height int64) *types.Commit { return nil } func (mockBlockStore) PruneBlocks(height int64) (uint64, error) { return 0, nil } -func (mockBlockStore) SaveBlock( - ctx context.Context, - block *types.Block, - blockParts *types.PartSet, - seenCommit *types.Commit, -) error { - return nil +func (mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { } diff --git a/rpc/core/tx.go b/rpc/core/tx.go index 817cd56eb1..0d2b6c21bb 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -37,10 +37,7 @@ func Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error var proof types.TxProof if prove { - block, err := env.BlockStore.LoadBlock(ctx.Context(), height) - if err != nil { - return nil, err - } + block := env.BlockStore.LoadBlock(height) proof = block.Data.Txs.Proof(int(index)) // XXX: overflow on 32-bit machines } @@ -110,10 +107,7 @@ func TxSearch(ctx *rpctypes.Context, query string, prove bool, pagePtr, perPageP var proof types.TxProof if prove { - block, err := env.BlockStore.LoadBlock(ctx.Context(), r.Height) - if err != nil { - return nil, err - } + block := env.BlockStore.LoadBlock(r.Height) proof = block.Data.Txs.Proof(int(r.Index)) // XXX: overflow on 32-bit machines } diff --git a/state/services.go b/state/services.go index 0ca6a4e935..6dd03fad17 100644 --- a/state/services.go +++ b/state/services.go @@ -1,8 +1,6 @@ package state import ( - "context" - "github.com/celestiaorg/celestia-core/types" ) @@ -22,13 +20,13 @@ type BlockStore interface { LoadBaseMeta() *types.BlockMeta LoadBlockMeta(height int64) *types.BlockMeta - LoadBlock(ctx context.Context, height int64) (*types.Block, error) + LoadBlock(height int64) *types.Block - SaveBlock(ctx context.Context, block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) error + SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) PruneBlocks(height int64) (uint64, error) - LoadBlockByHash(ctx context.Context, hash []byte) (*types.Block, error) + LoadBlockByHash(hash []byte) *types.Block LoadBlockPart(height int64, index int) *types.Part LoadBlockCommit(height int64) *types.Commit diff --git a/store/mock.go b/store/mock.go deleted file mode 100644 index 2c2e7e90cc..0000000000 --- a/store/mock.go +++ /dev/null @@ -1,21 +0,0 @@ -package store - -import ( - "github.com/celestiaorg/celestia-core/ipfs" - dbm "github.com/celestiaorg/celestia-core/libs/db" - "github.com/celestiaorg/celestia-core/libs/db/memdb" - "github.com/celestiaorg/celestia-core/libs/log" -) - -// MockBlockStore returns a mocked blockstore. a nil db will result in a new in memory db -func MockBlockStore(db dbm.DB) *BlockStore { - if db == nil { - db = memdb.NewDB() - } - - return NewBlockStore( - db, - ipfs.MockBlockStore(), - log.NewNopLogger(), - ) -} diff --git a/store/store.go b/store/store.go index 4e9be36cf9..2c8aeab27d 100644 --- a/store/store.go +++ b/store/store.go @@ -1,28 +1,18 @@ package store import ( - "context" "fmt" - "strings" - "strconv" "github.com/gogo/protobuf/proto" - "github.com/ipfs/go-blockservice" - blockstore "github.com/ipfs/go-ipfs-blockstore" - offline "github.com/ipfs/go-ipfs-exchange-offline" - format "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-merkledag" + ipld "github.com/ipfs/go-ipld-format" - "github.com/celestiaorg/celestia-core/ipfs" dbm "github.com/celestiaorg/celestia-core/libs/db" - "github.com/celestiaorg/celestia-core/libs/log" tmsync "github.com/celestiaorg/celestia-core/libs/sync" "github.com/celestiaorg/celestia-core/p2p/ipld" tmstore "github.com/celestiaorg/celestia-core/proto/tendermint/store" tmproto "github.com/celestiaorg/celestia-core/proto/tendermint/types" "github.com/celestiaorg/celestia-core/types" - "github.com/celestiaorg/rsmt2d" ) /* @@ -54,20 +44,18 @@ type BlockStore struct { base int64 height int64 - dag format.DAGService - logger log.Logger + ipfsDagAPI ipld.DAGService } // NewBlockStore returns a new BlockStore with the given DB, // initialized to the last height that was committed to the DB. -func NewBlockStore(db dbm.DB, bstore blockstore.Blockstore, logger log.Logger) *BlockStore { +func NewBlockStore(db dbm.DB, dagAPI ipld.DAGService) *BlockStore { bs := LoadBlockStoreState(db) return &BlockStore{ - base: bs.Base, - height: bs.Height, - db: db, - dag: merkledag.NewDAGService(blockservice.New(bstore, offline.Exchange(bstore))), - logger: logger, + base: bs.Base, + height: bs.Height, + db: db, + ipfsDagAPI: dagAPI, } } @@ -107,43 +95,48 @@ func (bs *BlockStore) LoadBaseMeta() *types.BlockMeta { // LoadBlock returns the block with the given height. // If no block is found for that height, it returns nil. -func (bs *BlockStore) LoadBlock(ctx context.Context, height int64) (*types.Block, error) { - blockMeta := bs.LoadBlockMeta(height) +func (bs *BlockStore) LoadBlock(height int64) *types.Block { + var blockMeta = bs.LoadBlockMeta(height) if blockMeta == nil { - return nil, nil + return nil } - lastCommit := bs.LoadBlockCommit(height - 1) - - data, err := ipld.RetrieveBlockData(ctx, &blockMeta.DAHeader, bs.dag, rsmt2d.NewRSGF8Codec()) - if err != nil { - if strings.Contains(err.Error(), format.ErrNotFound.Error()) { - return nil, fmt.Errorf("failure to retrieve block data from local ipfs store: %w", err) + pbb := new(tmproto.Block) + buf := []byte{} + for i := 0; i < int(blockMeta.BlockID.PartSetHeader.Total); i++ { + part := bs.LoadBlockPart(height, i) + // If the part is missing (e.g. since it has been deleted after we + // loaded the block meta) we consider the whole block to be missing. + if part == nil { + return nil } - bs.logger.Info("failure to retrieve block data", err) - return nil, err + buf = append(buf, part.Bytes...) + } + err := proto.Unmarshal(buf, pbb) + if err != nil { + // NOTE: The existence of meta should imply the existence of the + // block. So, make sure meta is only saved after blocks are saved. + panic(fmt.Sprintf("Error reading block: %v", err)) } - block := types.Block{ - Header: blockMeta.Header, - Data: data, - DataAvailabilityHeader: blockMeta.DAHeader, - LastCommit: lastCommit, + block, err := types.BlockFromProto(pbb) + if err != nil { + panic(fmt.Errorf("error from proto block: %w", err)) } - return &block, nil + return block } // LoadBlockByHash returns the block with the given hash. // If no block is found for that hash, it returns nil. // Panics if it fails to parse height associated with the given hash. -func (bs *BlockStore) LoadBlockByHash(ctx context.Context, hash []byte) (*types.Block, error) { +func (bs *BlockStore) LoadBlockByHash(hash []byte) *types.Block { bz, err := bs.db.Get(calcBlockHashKey(hash)) if err != nil { panic(err) } if len(bz) == 0 { - return nil, nil + return nil } s := string(bz) @@ -152,7 +145,7 @@ func (bs *BlockStore) LoadBlockByHash(ctx context.Context, hash []byte) (*types. if err != nil { panic(fmt.Sprintf("failed to extract height from %s: %v", s, err)) } - return bs.LoadBlock(ctx, height) + return bs.LoadBlock(height) } // LoadBlockPart returns the Part at the given index @@ -340,12 +333,7 @@ func (bs *BlockStore) PruneBlocks(height int64) (uint64, error) { // If all the nodes restart after committing a block, // we need this to reload the precommits to catch-up nodes to the // most recent height. Otherwise they'd stall at H-1. -func (bs *BlockStore) SaveBlock( - ctx context.Context, - block *types.Block, - blockParts *types.PartSet, - seenCommit *types.Commit, -) error { +func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { if block == nil { panic("BlockStore can only save a non-nil block") } @@ -369,11 +357,6 @@ func (bs *BlockStore) SaveBlock( bs.saveBlockPart(height, i, part) } - err := ipld.PutBlock(ctx, bs.dag, block, ipfs.MockRouting(), bs.logger) - if err != nil { - return err - } - // Save block meta blockMeta := types.NewBlockMeta(block, blockParts) pbm, err := blockMeta.ToProto() @@ -416,8 +399,6 @@ func (bs *BlockStore) SaveBlock( // Save new BlockStoreState descriptor. This also flushes the database. bs.saveState() - - return nil } func (bs *BlockStore) saveBlockPart(height int64, index int, part *types.Part) { diff --git a/store/store_test.go b/store/store_test.go index 39c89e7e02..c14d507af9 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -2,7 +2,6 @@ package store import ( "bytes" - "context" "crypto/sha256" "fmt" "os" @@ -12,6 +11,7 @@ import ( "time" "github.com/gogo/protobuf/proto" + mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -74,7 +74,7 @@ func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore, cleanupFu if err != nil { panic(fmt.Errorf("error constructing state from genesis file: %w", err)) } - return state, MockBlockStore(blockDB), func() { os.RemoveAll(config.RootDir) } + return state, NewBlockStore(blockDB, mdutils.Mock()), func() { os.RemoveAll(config.RootDir) } } func TestLoadBlockStoreState(t *testing.T) { @@ -106,7 +106,7 @@ func TestNewBlockStore(t *testing.T) { bz, _ := proto.Marshal(&bss) err := db.Set(blockStoreKey, bz) require.NoError(t, err) - bs := MockBlockStore(db) + bs := NewBlockStore(db, mdutils.Mock()) require.Equal(t, int64(100), bs.Base(), "failed to properly parse blockstore") require.Equal(t, int64(10000), bs.Height(), "failed to properly parse blockstore") @@ -124,7 +124,7 @@ func TestNewBlockStore(t *testing.T) { _, _, panicErr := doFn(func() (interface{}, error) { err := db.Set(blockStoreKey, tt.data) require.NoError(t, err) - _ = MockBlockStore(db) + _ = NewBlockStore(db, mdutils.Mock()) return nil, nil }) require.NotNil(t, panicErr, "#%d panicCauser: %q expected a panic", i, tt.data) @@ -133,13 +133,13 @@ func TestNewBlockStore(t *testing.T) { err = db.Set(blockStoreKey, []byte{}) require.NoError(t, err) - bs = MockBlockStore(db) + bs = NewBlockStore(db, mdutils.Mock()) assert.Equal(t, bs.Height(), int64(0), "expecting empty bytes to be unmarshaled alright") } func freshBlockStore() (*BlockStore, dbm.DB) { db := memdb.NewDB() - return MockBlockStore(db), db + return NewBlockStore(db, mdutils.Mock()), db } var ( @@ -172,12 +172,10 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) { require.Equal(t, bs.Base(), int64(0), "initially the base should be zero") require.Equal(t, bs.Height(), int64(0), "initially the height should be zero") - ctx := context.TODO() - // check there are no blocks at various heights noBlockHeights := []int64{0, -1, 100, 1000, 2} for i, height := range noBlockHeights { - if g, _ := bs.LoadBlock(ctx, height); g != nil { + if g := bs.LoadBlock(height); g != nil { t.Errorf("#%d: height(%d) got a block; want nil", i, height) } } @@ -186,14 +184,13 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) { block := makeBlock(bs.Height()+1, state, new(types.Commit)) validPartSet := block.MakePartSet(2) seenCommit := makeTestCommit(10, tmtime.Now()) - err := bs.SaveBlock(ctx, block, partSet, seenCommit) - require.NoError(t, err) + bs.SaveBlock(block, partSet, seenCommit) require.EqualValues(t, 1, bs.Base(), "expecting the new height to be changed") require.EqualValues(t, block.Header.Height, bs.Height(), "expecting the new height to be changed") incompletePartSet := types.NewPartSetFromHeader(types.PartSetHeader{Total: 2}) uncontiguousPartSet := types.NewPartSetFromHeader(types.PartSetHeader{Total: 0}) - _, err = uncontiguousPartSet.AddPart(part2) + _, err := uncontiguousPartSet.AddPart(part2) require.Error(t, err) header1 := types.Header{ @@ -308,20 +305,16 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) { bs, db := freshBlockStore() // SaveBlock res, err, panicErr := doFn(func() (interface{}, error) { - err := bs.SaveBlock(ctx, tuple.block, tuple.parts, tuple.seenCommit) + bs.SaveBlock(tuple.block, tuple.parts, tuple.seenCommit) if tuple.block == nil { return nil, nil } - if err != nil { - return nil, err - } if tuple.corruptBlockInDB { err := db.Set(calcBlockMetaKey(tuple.block.Height), []byte("block-bogus")) require.NoError(t, err) } - bBlock, err := bs.LoadBlock(ctx, tuple.block.Height) - require.NoError(t, err) + bBlock := bs.LoadBlock(tuple.block.Height) bBlockMeta := bs.LoadBlockMeta(tuple.block.Height) if tuple.eraseSeenCommitInDB { @@ -388,14 +381,13 @@ func TestLoadBaseMeta(t *testing.T) { stateStore := sm.NewStore(memdb.NewDB()) state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) require.NoError(t, err) - bs := MockBlockStore(nil) + bs := NewBlockStore(memdb.NewDB(), mdutils.Mock()) for h := int64(1); h <= 10; h++ { block := makeBlock(h, state, new(types.Commit)) partSet := block.MakePartSet(2) seenCommit := makeTestCommit(h, tmtime.Now()) - err := bs.SaveBlock(context.TODO(), block, partSet, seenCommit) - require.NoError(t, err) + bs.SaveBlock(block, partSet, seenCommit) } _, err = bs.PruneBlocks(4) @@ -446,13 +438,11 @@ func TestPruneBlocks(t *testing.T) { state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) require.NoError(t, err) db := memdb.NewDB() - bs := MockBlockStore(db) + bs := NewBlockStore(db, mdutils.Mock()) assert.EqualValues(t, 0, bs.Base()) assert.EqualValues(t, 0, bs.Height()) assert.EqualValues(t, 0, bs.Size()) - ctx := context.TODO() - // pruning an empty store should error, even when pruning to 0 _, err = bs.PruneBlocks(1) require.Error(t, err) @@ -465,16 +455,14 @@ func TestPruneBlocks(t *testing.T) { block := makeBlock(h, state, new(types.Commit)) partSet := block.MakePartSet(2) seenCommit := makeTestCommit(h, tmtime.Now()) - err := bs.SaveBlock(ctx, block, partSet, seenCommit) - require.NoError(t, err) + bs.SaveBlock(block, partSet, seenCommit) } assert.EqualValues(t, 1, bs.Base()) assert.EqualValues(t, 1500, bs.Height()) assert.EqualValues(t, 1500, bs.Size()) - prunedBlock, err := bs.LoadBlock(ctx, 1199) - require.NoError(t, err) + prunedBlock := bs.LoadBlock(1199) // Check that basic pruning works pruned, err := bs.PruneBlocks(1200) @@ -488,29 +476,18 @@ func TestPruneBlocks(t *testing.T) { Height: 1500, }, LoadBlockStoreState(db)) - b, err := bs.LoadBlock(ctx, 1200) - require.NotNil(t, b) - require.NoError(t, err) - b, err = bs.LoadBlock(ctx, 1199) - require.NoError(t, err) - require.Nil(t, b) - b, err = bs.LoadBlockByHash(ctx, prunedBlock.Hash()) - require.Nil(t, b) - require.NoError(t, err) - + require.NotNil(t, bs.LoadBlock(1200)) + require.Nil(t, bs.LoadBlock(1199)) + require.Nil(t, bs.LoadBlockByHash(prunedBlock.Hash())) require.Nil(t, bs.LoadBlockCommit(1199)) require.Nil(t, bs.LoadBlockMeta(1199)) require.Nil(t, bs.LoadBlockPart(1199, 1)) for i := int64(1); i < 1200; i++ { - b, err := bs.LoadBlock(ctx, i) - require.Nil(t, b) - require.NoError(t, err) + require.Nil(t, bs.LoadBlock(i)) } for i := int64(1200); i <= 1500; i++ { - b, err := bs.LoadBlock(ctx, i) - require.NotNil(t, b) - require.NoError(t, err) + require.NotNil(t, bs.LoadBlock(i)) } // Pruning below the current base should error @@ -536,15 +513,9 @@ func TestPruneBlocks(t *testing.T) { pruned, err = bs.PruneBlocks(1500) require.NoError(t, err) assert.EqualValues(t, 200, pruned) - b, err = bs.LoadBlock(ctx, 1499) - assert.Nil(t, b) - require.NoError(t, err) - b, err = bs.LoadBlock(ctx, 1500) - assert.NotNil(t, b) - require.NoError(t, err) - b, err = bs.LoadBlock(ctx, 1501) - assert.Nil(t, b) - require.NoError(t, err) + assert.Nil(t, bs.LoadBlock(1499)) + assert.NotNil(t, bs.LoadBlock(1500)) + assert.Nil(t, bs.LoadBlock(1501)) } func TestLoadBlockMeta(t *testing.T) { @@ -590,7 +561,6 @@ func TestLoadBlockMeta(t *testing.T) { } func TestBlockFetchAtHeight(t *testing.T) { - ctx := context.TODO() state, bs, cleanup := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) defer cleanup() require.Equal(t, bs.Height(), int64(0), "initially the height should be zero") @@ -598,12 +568,10 @@ func TestBlockFetchAtHeight(t *testing.T) { partSet := block.MakePartSet(2) seenCommit := makeTestCommit(10, tmtime.Now()) - err := bs.SaveBlock(ctx, block, partSet, seenCommit) - require.NoError(t, err) + bs.SaveBlock(block, partSet, seenCommit) require.Equal(t, bs.Height(), block.Header.Height, "expecting the new height to be changed") - blockAtHeight, err := bs.LoadBlock(ctx, bs.Height()) - require.NoError(t, err) + blockAtHeight := bs.LoadBlock(bs.Height()) b1, err := block.ToProto() require.NoError(t, err) b2, err := blockAtHeight.ToProto() @@ -614,11 +582,9 @@ func TestBlockFetchAtHeight(t *testing.T) { require.Equal(t, block.Hash(), blockAtHeight.Hash(), "expecting a successful load of the last saved block") - blockAtHeightPlus1, err := bs.LoadBlock(ctx, bs.Height()+1) - require.NoError(t, err) + blockAtHeightPlus1 := bs.LoadBlock(bs.Height() + 1) require.Nil(t, blockAtHeightPlus1, "expecting an unsuccessful load of Height()+1") - blockAtHeightPlus2, err := bs.LoadBlock(ctx, bs.Height()+2) - require.NoError(t, err) + blockAtHeightPlus2 := bs.LoadBlock(bs.Height() + 2) require.Nil(t, blockAtHeightPlus2, "expecting an unsuccessful load of Height()+2") }