Skip to content

Commit

Permalink
Revert "Save and load block data using IPFS (#374)"
Browse files Browse the repository at this point in the history
This reverts commit 8da1644.
  • Loading branch information
evan-forbes committed Aug 24, 2021
1 parent dbf5418 commit cb6f83f
Show file tree
Hide file tree
Showing 23 changed files with 130 additions and 279 deletions.
13 changes: 3 additions & 10 deletions blockchain/v0/reactor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package v0

import (
"context"
"fmt"
"reflect"
"time"
Expand Down Expand Up @@ -179,10 +178,7 @@ func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
func (bcR *BlockchainReactor) respondToPeer(msg *bcproto.BlockRequest,
src p2p.Peer) (queued bool) {

block, err := bcR.store.LoadBlock(context.TODO(), msg.Height)
if err != nil {
panic(err)
}
block := bcR.store.LoadBlock(msg.Height)
if block != nil {
bl, err := block.ToProto()
if err != nil {
Expand Down Expand Up @@ -422,14 +418,11 @@ FOR_LOOP:
bcR.pool.PopRequest()

// TODO: batch saves so we dont persist to disk every block
err := bcR.store.SaveBlock(context.TODO(), first, firstParts, second.LastCommit)
if err != nil {
// an error is only returned if something with the local IPFS blockstore is seriously wrong
panic(err)
}
bcR.store.SaveBlock(first, firstParts, second.LastCommit)

// TODO: same thing for app - but we would need a way to get the hash
// without persisting the state.
var err error
state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first)
if err != nil {
// TODO This is bad, are we zombie?
Expand Down
20 changes: 6 additions & 14 deletions blockchain/v0/reactor_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package v0

import (
"context"
"crypto/sha256"
"fmt"
"os"
"sort"
"testing"
"time"

mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -69,9 +69,10 @@ func newBlockchainReactor(
panic(fmt.Errorf("error start app: %w", err))
}

blockDB := memdb.NewDB()
stateDB := memdb.NewDB()
stateStore := sm.NewStore(stateDB)
blockStore := store.MockBlockStore(nil)
blockStore := store.NewBlockStore(blockDB, mdutils.Mock())

state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc)
if err != nil {
Expand Down Expand Up @@ -99,10 +100,7 @@ func newBlockchainReactor(
lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil)
if blockHeight > 1 {
lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1)
lastBlock, err := blockStore.LoadBlock(context.TODO(), blockHeight-1)
if err != nil {
panic(err)
}
lastBlock := blockStore.LoadBlock(blockHeight - 1)

vote, err := types.MakeVote(
lastBlock.Header.Height,
Expand All @@ -129,10 +127,7 @@ func newBlockchainReactor(
panic(fmt.Errorf("error apply block: %w", err))
}

err := blockStore.SaveBlock(context.TODO(), thisBlock, thisParts, lastCommit)
if err != nil {
panic(err)
}
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
}

bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
Expand Down Expand Up @@ -189,10 +184,7 @@ func TestNoBlockResponse(t *testing.T) {
assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height())

for _, tt := range tests {
block, err := reactorPairs[1].reactor.store.LoadBlock(context.TODO(), tt.height)
if err != nil {
panic(err)
}
block := reactorPairs[1].reactor.store.LoadBlock(tt.height)
if tt.existent {
assert.True(t, block != nil)
} else {
Expand Down
9 changes: 3 additions & 6 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ import (
"testing"
"time"

"github.com/ipfs/go-blockservice"
offline "github.com/ipfs/go-ipfs-exchange-offline"
"github.com/ipfs/go-merkledag"
mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -59,9 +57,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
app.InitChain(abci.RequestInitChain{Validators: vals})

blockDB := memdb.NewDB()
bs := ipfs.MockBlockStore()
dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
blockStore := store.NewBlockStore(blockDB, bs, log.TestingLogger())
dag := mdutils.Mock()
blockStore := store.NewBlockStore(blockDB, dag)

// one for mempool, one for consensus
mtx := new(tmsync.Mutex)
Expand Down
17 changes: 7 additions & 10 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ import (
"time"

"github.com/go-kit/kit/log/term"
"github.com/ipfs/go-blockservice"
offline "github.com/ipfs/go-ipfs-exchange-offline"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -359,17 +356,18 @@ func subscribeToVoter(cs *State, addr []byte) <-chan tmpubsub.Message {

func newState(state sm.State, pv types.PrivValidator, app abci.Application, ipfsDagAPI format.DAGService) *State {
config := cfg.ResetTestRoot("consensus_state_test")
return newStateWithConfig(config, state, pv, app)
return newStateWithConfig(config, state, pv, app, ipfsDagAPI)
}

func newStateWithConfig(
thisConfig *cfg.Config,
state sm.State,
pv types.PrivValidator,
app abci.Application,
ipfsDagAPI format.DAGService,
) *State {
blockDB := memdb.NewDB()
return newStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB)
return newStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB, ipfsDagAPI)
}

func newStateWithConfigAndBlockStore(
Expand All @@ -378,11 +376,10 @@ func newStateWithConfigAndBlockStore(
pv types.PrivValidator,
app abci.Application,
blockDB dbm.DB,
dag format.DAGService,
) *State {
// Get BlockStore
bs := ipfs.MockBlockStore()
dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
blockStore := store.NewBlockStore(blockDB, bs, log.TestingLogger())
blockStore := store.NewBlockStore(blockDB, dag)

// one for mempool, one for consensus
mtx := new(tmsync.Mutex)
Expand Down Expand Up @@ -711,7 +708,7 @@ func randConsensusNet(
vals := types.TM2PB.ValidatorUpdates(state.Validators)
app.InitChain(abci.RequestInitChain{Validators: vals})

css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB)
css[i] = newStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB, mdutils.Mock())
css[i].SetTimeoutTicker(tickerFunc())
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
}
Expand Down Expand Up @@ -774,7 +771,7 @@ func randConsensusNetWithPeers(
app.InitChain(abci.RequestInitChain{Validators: vals})
// sm.SaveState(stateDB,state) //height 1's validatorsInfo already saved in LoadStateFromDBOrGenesisDoc above

css[i] = newStateWithConfig(thisConfig, state, privVal, app)
css[i] = newStateWithConfig(thisConfig, state, privVal, app, mdutils.Mock())
css[i].SetTimeoutTicker(tickerFunc())
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
}
Expand Down
11 changes: 6 additions & 5 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -29,7 +30,7 @@ func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) {

config.Consensus.CreateEmptyBlocks = false
state, privVals := randGenesisState(1, false, 10)
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication())
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock())
assertMempool(cs.txNotifier).EnableTxsAvailable()
height, round := cs.Height, cs.Round
newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock)
Expand All @@ -49,7 +50,7 @@ func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) {

config.Consensus.CreateEmptyBlocksInterval = ensureTimeout
state, privVals := randGenesisState(1, false, 10)
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication())
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock())

assertMempool(cs.txNotifier).EnableTxsAvailable()

Expand All @@ -67,7 +68,7 @@ func TestMempoolProgressInHigherRound(t *testing.T) {

config.Consensus.CreateEmptyBlocks = false
state, privVals := randGenesisState(1, false, 10)
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication())
cs := newStateWithConfig(config, state, privVals[0], NewCounterApplication(), mdutils.Mock())

assertMempool(cs.txNotifier).EnableTxsAvailable()
height, round := cs.Height, cs.Round
Expand Down Expand Up @@ -117,7 +118,7 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) {
blockDB := memdb.NewDB()
stateStore := sm.NewStore(blockDB)

cs := newStateWithConfigAndBlockStore(config, state, privVals[0], NewCounterApplication(), blockDB)
cs := newStateWithConfigAndBlockStore(config, state, privVals[0], NewCounterApplication(), blockDB, mdutils.Mock())
err := stateStore.Save(state)
require.NoError(t, err)
newBlockHeaderCh := subscribe(cs.eventBus, types.EventQueryNewBlockHeader)
Expand All @@ -143,7 +144,7 @@ func TestMempoolRmBadTx(t *testing.T) {
blockDB := memdb.NewDB()

stateStore := sm.NewStore(blockDB)
cs := newStateWithConfigAndBlockStore(config, state, privVals[0], app, blockDB)
cs := newStateWithConfigAndBlockStore(config, state, privVals[0], app, blockDB, mdutils.Mock())
err := stateStore.Save(state)
require.NoError(t, err)

Expand Down
4 changes: 3 additions & 1 deletion consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,10 @@ func TestReactorWithEvidence(t *testing.T) {
// duplicate code from:
// css[i] = newStateWithConfig(thisConfig, state, privVals[i], app)

blockDB := memdb.NewDB()
dag := mdutils.Mock()
blockStore := store.MockBlockStore(nil)
blockStore := store.NewBlockStore(blockDB, dag)

// one for mempool, one for consensus
mtx := new(tmsync.Mutex)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
Expand Down
11 changes: 3 additions & 8 deletions consensus/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,7 @@ func (h *Handshaker) replayBlocks(
}
for i := firstBlock; i <= finalBlock; i++ {
h.logger.Info("Applying block", "height", i)
block, err := h.store.LoadBlock(context.TODO(), i)
if err != nil {
return nil, err
}
block := h.store.LoadBlock(i)
// Extra check to ensure the app was not changed in a way it shouldn't have.
if len(appHash) > 0 {
assertAppHashEqualsOneFromBlock(appHash, block)
Expand Down Expand Up @@ -495,17 +492,15 @@ func (h *Handshaker) replayBlocks(

// ApplyBlock on the proxyApp with the last block.
func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) {
block, err := h.store.LoadBlock(context.TODO(), height)
if err != nil {
return sm.State{}, err
}
block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height)

// Use stubs for both mempool and evidence pool since no transactions nor
// evidence are needed here - block already exists.
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{})
blockExec.SetEventBus(h.eventBus)

var err error
state, _, err = blockExec.ApplyBlock(state, meta.BlockID, block)
if err != nil {
return sm.State{}, err
Expand Down
2 changes: 1 addition & 1 deletion consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
tmos.Exit(err.Error())
}
dag := mdutils.Mock()
blockStore := store.MockBlockStore(blockStoreDB)
blockStore := store.NewBlockStore(blockStoreDB, dag)

// Get State
stateDB, err := badgerdb.NewDB("state", config.DBDir())
Expand Down
30 changes: 11 additions & 19 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Confi
privValidator,
kvstore.NewApplication(),
blockDB,
mdutils.Mock(),
)
cs.SetLogger(logger)

Expand Down Expand Up @@ -173,6 +174,7 @@ LOOP:
privValidator,
kvstore.NewApplication(),
blockDB,
mdutils.Mock(),
)
cs.SetLogger(logger)

Expand Down Expand Up @@ -546,9 +548,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
sim.Chain = make([]*types.Block, 0)
sim.Commits = make([]*types.Commit, 0)
for i := 1; i <= numBlocks; i++ {
blck, err := css[0].blockStore.LoadBlock(context.TODO(), int64(i))
require.NoError(t, err)
sim.Chain = append(sim.Chain, blck)
sim.Chain = append(sim.Chain, css[0].blockStore.LoadBlock(int64(i)))
sim.Commits = append(sim.Commits, css[0].blockStore.LoadBlockCommit(int64(i)))
}
}
Expand Down Expand Up @@ -1195,15 +1195,13 @@ func newMockBlockStore(config *cfg.Config, params tmproto.ConsensusParams) *mock
return &mockBlockStore{config, params, nil, nil, 0, mdutils.Mock()}
}

func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) }
func (bs *mockBlockStore) Base() int64 { return bs.base }
func (bs *mockBlockStore) Size() int64 { return bs.Height() - bs.Base() + 1 }
func (bs *mockBlockStore) LoadBaseMeta() *types.BlockMeta { return bs.LoadBlockMeta(bs.base) }
func (bs *mockBlockStore) LoadBlock(ctx context.Context, height int64) (*types.Block, error) {
return bs.chain[height-1], nil
}
func (bs *mockBlockStore) LoadBlockByHash(ctx context.Context, hash []byte) (*types.Block, error) {
return bs.chain[int64(len(bs.chain))-1], nil
func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) }
func (bs *mockBlockStore) Base() int64 { return bs.base }
func (bs *mockBlockStore) Size() int64 { return bs.Height() - bs.Base() + 1 }
func (bs *mockBlockStore) LoadBaseMeta() *types.BlockMeta { return bs.LoadBlockMeta(bs.base) }
func (bs *mockBlockStore) LoadBlock(height int64) *types.Block { return bs.chain[height-1] }
func (bs *mockBlockStore) LoadBlockByHash(hash []byte) *types.Block {
return bs.chain[int64(len(bs.chain))-1]
}
func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
block := bs.chain[height-1]
Expand All @@ -1213,13 +1211,7 @@ func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
}
}
func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil }
func (bs *mockBlockStore) SaveBlock(
ctx context.Context,
block *types.Block,
blockParts *types.PartSet,
seenCommit *types.Commit,
) error {
return nil
func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
}
func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit {
return bs.commits[height-1]
Expand Down
6 changes: 1 addition & 5 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package consensus

import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -1545,10 +1544,7 @@ func (cs *State) finalizeCommit(height int64) {
// but may differ from the LastCommit included in the next block
precommits := cs.Votes.Precommits(cs.CommitRound)
seenCommit := precommits.MakeCommit()
err := cs.blockStore.SaveBlock(context.TODO(), block, blockParts, seenCommit)
if err != nil {
panic(err)
}
cs.blockStore.SaveBlock(block, blockParts, seenCommit)
} else {
// Happens during replay if we already saved the block but didn't commit
cs.Logger.Info("Calling finalizeCommit on already stored block", "height", block.Height)
Expand Down
2 changes: 1 addition & 1 deletion consensus/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func walGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
t.Error(err)
}
dag := mdutils.Mock()
blockStore := store.MockBlockStore(blockStoreDB)
blockStore := store.NewBlockStore(blockStoreDB, dag)

proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app))
proxyApp.SetLogger(logger.With("module", "proxy"))
Expand Down
Loading

0 comments on commit cb6f83f

Please sign in to comment.