diff --git a/libs/iavl/mutable_tree.go b/libs/iavl/mutable_tree.go index 7f9a9c47d9..6a4961cb2e 100644 --- a/libs/iavl/mutable_tree.go +++ b/libs/iavl/mutable_tree.go @@ -906,7 +906,7 @@ func (tree *MutableTree) deleteVersion(batch dbm.Batch, version int64, versions return err } - tree.ndb.deleteVersion(batch, version, true) + tree.ndb.deleteVersion(batch, version, true, false) return nil } diff --git a/libs/iavl/mutable_tree_map.go b/libs/iavl/mutable_tree_map.go index 9953125794..44926203ee 100644 --- a/libs/iavl/mutable_tree_map.go +++ b/libs/iavl/mutable_tree_map.go @@ -26,6 +26,7 @@ func (tm *TreeMap) addNewTree(tree *MutableTree) { defer tm.mtx.Unlock() if _, ok := tm.mutableTreeSavedMap[tree.GetModuleName()]; !ok { tm.mutableTreeSavedMap[tree.GetModuleName()] = tree + tree.ndb.cleanPruningInDB() go tree.commitSchedule() if EnablePruningHistoryState { go tree.pruningSchedule() diff --git a/libs/iavl/mutable_tree_oec.go b/libs/iavl/mutable_tree_oec.go index e47bb42167..a4d669f8eb 100644 --- a/libs/iavl/mutable_tree_oec.go +++ b/libs/iavl/mutable_tree_oec.go @@ -275,9 +275,10 @@ func (tree *MutableTree) pruningSchedule() { for event := range tree.pruneCh { if event.version >= 0 { trc := trace.NewTracer("pruningSchedule") + noBatch := IavlCommitAsyncNoBatch batch := tree.ndb.NewBatch() trc.Pin("deleteVersion") - tree.ndb.deleteVersion(batch, event.version, true) + tree.ndb.deleteVersion(batch, event.version, true, noBatch) trc.Pin("Commit") if err := tree.ndb.Commit(batch); err != nil { panic(err) diff --git a/libs/iavl/nodedb.go b/libs/iavl/nodedb.go index 0b6148e540..04f8df6121 100644 --- a/libs/iavl/nodedb.go +++ b/libs/iavl/nodedb.go @@ -31,6 +31,8 @@ const ( // Using semantic versioning: https://semver.org/ defaultStorageVersionValue = "1.0.0" fastStorageVersionValue = "1.1.0" + + pruningVersionKey = "pruning_version" ) var ( @@ -629,7 +631,7 @@ func (ndb *nodeDB) deleteOrphans(batch dbm.Batch, version int64) { if predecessor < fromVersion || fromVersion == toVersion { ndb.log(IavlDebug, "DELETE", "predecessor", predecessor, "fromVersion", fromVersion, "toVersion", toVersion, "hash", hash) batch.Delete(ndb.nodeKey(hash)) - ndb.syncUnCacheNode(hash) + ndb.uncacheNode(hash) ndb.state.increaseDeletedCount() } else { ndb.log(IavlDebug, "MOVE", "predecessor", predecessor, "fromVersion", fromVersion, "toVersion", toVersion, "hash", hash) @@ -638,6 +640,50 @@ func (ndb *nodeDB) deleteOrphans(batch dbm.Batch, version int64) { }) } +func (ndb *nodeDB) deleteOrphansFromDB(version int64) { + // Will be zero if there is no previous version. + predecessor := ndb.getPreviousVersion(version) + + // Traverse orphans with a lifetime ending at the version specified. + // TODO optimize. + ndb.traverseOrphansVersion(version, func(key, hash []byte) { + var fromVersion, toVersion int64 + var err error + + // See comment on `orphanKeyFmt`. Note that here, `version` and + // `toVersion` are always equal. + orphanKeyFormat.Scan(key, &toVersion, &fromVersion) + + // If there is no predecessor, or the predecessor is earlier than the + // beginning of the lifetime (ie: negative lifetime), or the lifetime + // spans a single version and that version is the one being deleted, we + // can delete the orphan. Otherwise, we shorten its lifetime, by + // moving its endpoint to the previous version. + if predecessor < fromVersion || fromVersion == toVersion { + ndb.log(IavlDebug, "DELETE", "predecessor", predecessor, "fromVersion", fromVersion, "toVersion", toVersion, "hash", hash) + err = ndb.db.Delete(ndb.nodeKey(hash)) + if err != nil { + panic(err) + } + ndb.uncacheNode(hash) + ndb.state.increaseDeletedCount() + } else { + ndb.log(IavlDebug, "MOVE", "predecessor", predecessor, "fromVersion", fromVersion, "toVersion", toVersion, "hash", hash) + err = ndb.saveOrphanToDB(hash, fromVersion, predecessor) + if err != nil { + panic(err) + } + } + + // Delete orphan key and reverse-lookup key. + // we can't delete origin orphanKey before delete node or update orphanKey. + err = ndb.db.Delete(key) + if err != nil { + panic(err) + } + }) +} + func (ndb *nodeDB) nodeKey(hash []byte) []byte { return nodeKeyFormat.KeyBytes(hash) } @@ -710,11 +756,18 @@ func (ndb *nodeDB) getPreviousVersion(version int64) int64 { } // deleteRoot deletes the root entry from disk, but not the node it points to. -func (ndb *nodeDB) deleteRoot(batch dbm.Batch, version int64, checkLatestVersion bool) { +func (ndb *nodeDB) deleteRoot(batch dbm.Batch, version int64, checkLatestVersion bool, writeToDB bool) { if checkLatestVersion && version == ndb.getLatestVersion() { panic("Tried to delete latest version") } - batch.Delete(ndb.rootKey(version)) + if !writeToDB { + batch.Delete(ndb.rootKey(version)) + } else { + err := ndb.db.Delete(ndb.rootKey(version)) + if err != nil { + panic(err) + } + } } func (ndb *nodeDB) traverseOrphans(fn func(k, v []byte)) { diff --git a/libs/iavl/nodedb_oec.go b/libs/iavl/nodedb_oec.go index 4da8e52724..b322c20f2d 100644 --- a/libs/iavl/nodedb_oec.go +++ b/libs/iavl/nodedb_oec.go @@ -5,6 +5,7 @@ import ( "container/list" "encoding/binary" "fmt" + "strconv" cmap "github.com/orcaman/concurrent-map" @@ -269,13 +270,69 @@ func (ndb *nodeDB) DeleteVersion(batch dbm.Batch, version int64, checkLatestVers return err } - ndb.deleteVersion(batch, version, checkLatestVersion) + ndb.deleteVersion(batch, version, checkLatestVersion, false) return nil } -func (ndb *nodeDB) deleteVersion(batch dbm.Batch, version int64, checkLatestVersion bool) { +func (ndb *nodeDB) deleteVersion(batch dbm.Batch, version int64, checkLatestVersion bool, writeToDB bool) { + if !writeToDB { + ndb.deleteOrphans(batch, version) + ndb.deleteRoot(batch, version, checkLatestVersion, writeToDB) + return + } + ndb.setPruningRoot(version, checkLatestVersion) + ndb.deleteRoot(batch, version, checkLatestVersion, writeToDB) + ndb.deleteOrphansFromDB(version) + ndb.deletePruningRoot() +} + +func (ndb *nodeDB) cleanPruningInDB() { + version, exist := ndb.getPruningRoot() + if !exist { + return + } + ndb.log(IavlErr, "start cleanPruningInDB", "name", ndb.name, "version", version) + ndb.deleteRoot(nil, version, false, true) + batch := ndb.db.NewBatch() + defer batch.Close() ndb.deleteOrphans(batch, version) - ndb.deleteRoot(batch, version, checkLatestVersion) + if err := batch.Write(); err != nil { + panic(err) + } + ndb.deletePruningRoot() + ndb.log(IavlErr, "cleanPruningInDB is done", "name", ndb.name, "version", version) +} + +func (ndb *nodeDB) setPruningRoot(version int64, checkLatestVersion bool) { + if checkLatestVersion && version == ndb.getLatestVersion() { + panic("Tried to delete latest version") + } + err := ndb.db.Set(metadataKeyFormat.Key([]byte(pruningVersionKey)), []byte(strconv.FormatInt(version, 10))) + if err != nil { + panic(err) + } +} + +func (ndb *nodeDB) deletePruningRoot() { + err := ndb.db.Delete(metadataKeyFormat.Key([]byte(pruningVersionKey))) + if err != nil { + panic(err) + } +} + +func (ndb *nodeDB) getPruningRoot() (int64, bool) { + bz, err := ndb.db.Get(metadataKeyFormat.Key([]byte(pruningVersionKey))) + if err != nil { + panic(err) + } + if len(bz) == 0 { + return 0, false + } + v, err := strconv.ParseInt(string(bz), 10, 64) + if err != nil { + panic(err) + } + return v, true } func (ndb *nodeDB) checkoutVersionReaders(version int64) error {