Skip to content

Commit

Permalink
Remove rollback code from private data store
Browse files Browse the repository at this point in the history
Now that the private data store can be ahead of block store
the private data store can be ahead of block store (in order to support
reset and rollback), the rollback function can be removed. Also,
this fixes the bug mentioned in FAB-17293

Signed-off-by: manish <[email protected]>
  • Loading branch information
manish-sethi authored and denyeart committed Feb 15, 2020
1 parent f354c81 commit 1893808
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 127 deletions.
1 change: 0 additions & 1 deletion core/ledger/ledgerstorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error
}

if err := s.AddBlock(blockAndPvtdata.Block); err != nil {
s.pvtdataStore.Rollback()
return err
}

Expand Down
3 changes: 2 additions & 1 deletion core/ledger/ledgerstorage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,10 @@ func TestAddAfterBlkStoreError(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, uint64(9), pvtStoreCommitHt)

// pvt store should have a pending batch
pvtStorePndingBatch, err := store.pvtdataStore.HasPendingBatch()
assert.NoError(t, err)
assert.False(t, pvtStorePndingBatch)
assert.True(t, pvtStorePndingBatch)
}

func TestPvtStoreAheadOfBlockStore(t *testing.T) {
Expand Down
15 changes: 4 additions & 11 deletions core/ledger/pvtdatastorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,10 @@ type Provider interface {
}

// Store manages the permanent storage of private write sets for a ledger
// Beacsue the pvt data is supposed to be in sync with the blocks in the
// ledger, both should logically happen in an atomic operation. In order
// to accomplish this, an implementation of this store should provide
// support for a two-phase like commit/rollback capability.
// The expected use is such that - first the private data will be given to
// this store (via `Prepare` funtion) and then the block is appended to the block storage.
// Finally, one of the functions `Commit` or `Rollback` is invoked on this store based
// on whether the block was written successfully or not. The store implementation
// is expected to survive a server crash between the call to `Prepare` and `Commit`/`Rollback`
// Finally, the functions `Commit` invoked on this store. The store implementation
// is expected to survive a server crash between the call to `Prepare` and `Commit`.
type Store interface {
// Init initializes the store. This function is expected to be invoked before using the store
Init(btlPolicy pvtdatapolicy.BTLPolicy)
Expand All @@ -51,15 +46,13 @@ type Store interface {
// for which this peer is a member; `ineligible` denotes that the missing private data belong to a
// collection for which this peer is not a member.
// This call does not commit the pvt data and store missing private data. Subsequently, the caller
// is expected to call either `Commit` or `Rollback` function. Return from this should ensure
// is expected to call `Commit` function. Return from this should ensure
// that enough preparation is done such that `Commit` function invoked afterwards can commit the
// data and the store is capable of surviving a crash between this function call and the next
// invoke to the `Commit`
Prepare(blockNum uint64, pvtData []*ledger.TxPvtData, missingPvtData ledger.TxMissingPvtDataMap) error
// Commit commits the pvt data passed in the previous invoke to the `Prepare` function
Commit() error
// Rollback rolls back the pvt data passed in the previous invoke to the `Prepare` function
Rollback() error
// ProcessCollsEligibilityEnabled notifies the store when the peer becomes eligible to recieve data for an
// existing collection. Parameter 'committingBlk' refers to the block number that contains the corresponding
// collection upgrade transaction and the parameter 'nsCollMap' contains the collections for which the peer
Expand All @@ -86,7 +79,7 @@ type Store interface {
Shutdown()
}

// ErrIllegalCall is to be thrown by a store impl if the store does not expect a call to Prepare/Commit/Rollback/InitLastCommittedBlock
// ErrIllegalCall is to be thrown by a store impl if the store does not expect a call to Prepare/Commit/InitLastCommittedBlock
type ErrIllegalCall struct {
msg string
}
Expand Down
34 changes: 1 addition & 33 deletions core/ledger/pvtdatastorage/store_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s *store) Init(btlPolicy pvtdatapolicy.BTLPolicy) {
func (s *store) Prepare(blockNum uint64, pvtData []*ledger.TxPvtData, missingPvtData ledger.TxMissingPvtDataMap) error {
if s.batchPending {
return &ErrIllegalCall{`A pending batch exists as as result of last invoke to "Prepare" call.
Invoke "Commit" or "Rollback" on the pending batch before invoking "Prepare" function`}
Invoke "Commit" on the pending batch before invoking "Prepare" function`}
}
expectedBlockNum := s.nextBlockNum()
if expectedBlockNum != blockNum {
Expand Down Expand Up @@ -235,38 +235,6 @@ func (s *store) Commit() error {
return nil
}

// Rollback implements the function in the interface `Store`
// This deletes the existing data entries and eligible missing data entries.
// However, this does not delete ineligible missing data entires as the next try
// would have exact same entries and will overwrite those. This also leaves the
// existing expiry entires as is because, most likely they will also get overwritten
// per new data entries. Even if some of the expiry entries does not get overwritten,
// (because of some data may be missing next time), the additional expiry entries are just
// a Noop
func (s *store) Rollback() error {
if !s.batchPending {
return &ErrIllegalCall{"No pending batch to rollback"}
}
blkNum := s.nextBlockNum()
batch := leveldbhelper.NewUpdateBatch()
itr := s.db.GetIterator(datakeyRange(blkNum))
for itr.Next() {
batch.Delete(itr.Key())
}
itr.Release()
itr = s.db.GetIterator(eligibleMissingdatakeyRange(blkNum))
for itr.Next() {
batch.Delete(itr.Key())
}
itr.Release()
batch.Delete(pendingCommitKey)
if err := s.db.WriteBatch(batch, true); err != nil {
return err
}
s.batchPending = false
return nil
}

// CommitPvtDataOfOldBlocks commits the pvtData (i.e., previously missing data) of old blocks.
// The parameter `blocksPvtData` refers a list of old block's pvtdata which are missing in the pvtstore.
// Given a list of old block's pvtData, `CommitPvtDataOfOldBlocks` performs the following four
Expand Down
82 changes: 1 addition & 81 deletions core/ledger/pvtdatastorage/store_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ func TestStoreBasicCommitAndRetrieval(t *testing.T) {
assert.NoError(store.Prepare(1, testData, blk1MissingData))
assert.NoError(store.Commit())

// pvt data with block 2 - rollback
assert.NoError(store.Prepare(2, testData, nil))
assert.NoError(store.Rollback())

// pvt data retrieval for block 0 should return nil
var nilFilter ledger.PvtNsCollFilter
retrievedData, err := store.GetPvtDataByBlockNum(0, nilFilter)
Expand Down Expand Up @@ -773,83 +769,7 @@ func testCollElgEnabled(t *testing.T) {
assert.Equal(expectedMissingPvtDataInfo, missingPvtDataInfo)
}

func TestRollBack(t *testing.T) {
btlPolicy := btltestutil.SampleBTLPolicy(
map[[2]string]uint64{
{"ns-1", "coll-1"}: 0,
{"ns-1", "coll-2"}: 0,
},
)
env := NewTestStoreEnv(t, "TestRollBack", btlPolicy)
defer env.Cleanup()
assert := assert.New(t)
store := env.TestStore
assert.NoError(store.Prepare(0, nil, nil))
assert.NoError(store.Commit())

pvtdata := []*ledger.TxPvtData{
produceSamplePvtdata(t, 0, []string{"ns-1:coll-1", "ns-1:coll-2"}),
produceSamplePvtdata(t, 5, []string{"ns-1:coll-1", "ns-1:coll-2"}),
}
missingData := make(ledger.TxMissingPvtDataMap)
missingData.Add(1, "ns-1", "coll-1", true)
missingData.Add(5, "ns-1", "coll-1", true)
missingData.Add(5, "ns-2", "coll-2", false)

for i := 1; i <= 9; i++ {
assert.NoError(store.Prepare(uint64(i), pvtdata, missingData))
assert.NoError(store.Commit())
}

datakeyTx0 := &dataKey{
nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1"},
txNum: 0,
}
datakeyTx5 := &dataKey{
nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1"},
txNum: 5,
}
eligibleMissingdatakey := &missingDataKey{
nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1"},
isEligible: true,
}

// test store state before preparing for block 10
testPendingBatch(false, assert, store)
testLastCommittedBlockHeight(10, assert, store)

// prepare for block 10 and test store for presence of datakeys and eligibile missingdatakeys
assert.NoError(store.Prepare(10, pvtdata, missingData))
testPendingBatch(true, assert, store)
testLastCommittedBlockHeight(10, assert, store)

datakeyTx0.blkNum = 10
datakeyTx5.blkNum = 10
eligibleMissingdatakey.blkNum = 10
assert.True(testDataKeyExists(t, store, datakeyTx0))
assert.True(testDataKeyExists(t, store, datakeyTx5))
assert.True(testMissingDataKeyExists(t, store, eligibleMissingdatakey))

// rollback last prepared block and test store for absence of datakeys and eligibile missingdatakeys
store.Rollback()
testPendingBatch(false, assert, store)
testLastCommittedBlockHeight(10, assert, store)
assert.False(testDataKeyExists(t, store, datakeyTx0))
assert.False(testDataKeyExists(t, store, datakeyTx5))
assert.False(testMissingDataKeyExists(t, store, eligibleMissingdatakey))

// For previously committed blocks the datakeys and eligibile missingdatakeys should still be present
for i := 1; i <= 9; i++ {
datakeyTx0.blkNum = uint64(i)
datakeyTx5.blkNum = uint64(i)
eligibleMissingdatakey.blkNum = uint64(i)
assert.True(testDataKeyExists(t, store, datakeyTx0))
assert.True(testDataKeyExists(t, store, datakeyTx5))
assert.True(testMissingDataKeyExists(t, store, eligibleMissingdatakey))
}
}

// TODO Add tests for simulating a crash between calls `Prepare` and `Commit`/`Rollback` - [FAB-13099]
// TODO Add tests for simulating a crash between calls `Prepare` and `Commit` - [FAB-13099]

func testEmpty(expectedEmpty bool, assert *assert.Assertions, store Store) {
isEmpty, err := store.IsEmpty()
Expand Down

0 comments on commit 1893808

Please sign in to comment.