Skip to content

Commit

Permalink
metamorphic: sync with all affected objects when closing DB
Browse files Browse the repository at this point in the history
The `closeOp` on a db now syncs with all objects associated with that
DB. This prevents the test from executing e.g. an iterator operation
in parallel with the db close operation.

Note that we only populate the field in `computeDerivedFields`, which
is now called by the generator (in addition to the parser).
  • Loading branch information
RaduBerinde committed Jan 11, 2024
1 parent cbc5fd0 commit 0b0f62a
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 33 deletions.
28 changes: 11 additions & 17 deletions metamorphic/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ func generate(rng *rand.Rand, count uint64, cfg OpConfig, km *keyManager) []op {
}

g.dbClose()

computeDerivedFields(g.ops)
return g.ops
}

Expand Down Expand Up @@ -507,7 +509,7 @@ func (g *generator) removeBatchFromGenerator(batchID objID) {
for _, id := range iters.sorted() {
g.liveIters.remove(id)
delete(g.iters, id)
g.add(&closeOp{objID: id, derivedDBID: g.objDB[batchID]})
g.add(&closeOp{objID: id})
}
}

Expand All @@ -519,7 +521,7 @@ func (g *generator) batchAbort() {
batchID := g.liveBatches.rand(g.rng)
g.removeBatchFromGenerator(batchID)

g.add(&closeOp{objID: batchID, derivedDBID: g.objDB[batchID]})
g.add(&closeOp{objID: batchID})
}

func (g *generator) batchCommit() {
Expand Down Expand Up @@ -551,7 +553,7 @@ func (g *generator) batchCommit() {
dbID: dbID,
batchID: batchID,
})
g.add(&closeOp{objID: batchID, derivedDBID: dbID})
g.add(&closeOp{objID: batchID})

}

Expand All @@ -566,9 +568,8 @@ func (g *generator) dbClose() {
}
for len(g.liveBatches) > 0 {
batchID := g.liveBatches[0]
dbID := g.objDB[batchID]
g.removeBatchFromGenerator(batchID)
g.add(&closeOp{objID: batchID, derivedDBID: dbID})
g.add(&closeOp{objID: batchID})
}
for len(g.dbs) > 0 {
db := g.dbs[0]
Expand Down Expand Up @@ -650,9 +651,8 @@ func (g *generator) dbRestart() {
// Close the batches.
for len(g.liveBatches) > 0 {
batchID := g.liveBatches[0]
dbID := g.objDB[batchID]
g.removeBatchFromGenerator(batchID)
g.add(&closeOp{objID: batchID, derivedDBID: dbID})
g.add(&closeOp{objID: batchID})
}
if len(g.liveReaders) != len(g.dbs) || len(g.liveWriters) != len(g.dbs) {
panic(fmt.Sprintf("unexpected counts: liveReaders %d, liveWriters: %d",
Expand Down Expand Up @@ -846,14 +846,9 @@ func (g *generator) iterClose(iterID objID) {
if readerIters, ok := g.iters[iterID]; ok {
delete(g.iters, iterID)
delete(readerIters, iterID)
//lint:ignore SA9003 - readability
} else {
// NB: the DB object does not track its open iterators because it never
// closes.
}

readerID := g.iterReaderID[iterID]
g.add(&closeOp{objID: iterID, derivedDBID: g.objDB[readerID]})
g.add(&closeOp{objID: iterID})
}

func (g *generator) iterSetBounds(iterID objID) {
Expand Down Expand Up @@ -1273,10 +1268,10 @@ func (g *generator) snapshotClose() {
for _, id := range iters.sorted() {
g.liveIters.remove(id)
delete(g.iters, id)
g.add(&closeOp{objID: id, derivedDBID: g.objDB[snapID]})
g.add(&closeOp{objID: id})
}

g.add(&closeOp{objID: snapID, derivedDBID: g.objDB[snapID]})
g.add(&closeOp{objID: snapID})
}

func (g *generator) writerApply() {
Expand Down Expand Up @@ -1328,8 +1323,7 @@ func (g *generator) writerApply() {
batchID: batchID,
})
g.add(&closeOp{
objID: batchID,
derivedDBID: dbID,
objID: batchID,
})
}

Expand Down
27 changes: 13 additions & 14 deletions metamorphic/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,12 @@ func (o *checkpointOp) diagramKeyRanges() []pebble.KeyRange {

// closeOp models a {Batch,Iterator,Snapshot}.Close operation.
type closeOp struct {
objID objID
derivedDBID objID
objID objID

// affectedObjects is the list of additional objects that are affected by this
// operation, and which syncObjs() must return so that we don't perform the
// close in parallel with other operations to affected objects.
affectedObjects []objID
}

func (o *closeOp) run(t *Test, h historyRecorder) {
Expand All @@ -214,17 +218,7 @@ func (o *closeOp) run(t *Test, h historyRecorder) {
func (o *closeOp) String() string { return fmt.Sprintf("%s.Close()", o.objID) }
func (o *closeOp) receiver() objID { return o.objID }
func (o *closeOp) syncObjs() objIDSlice {
// Synchronize on the database so that we don't close the database before
// all its iterators, snapshots and batches are closed.
// TODO(jackson): It would be nice to relax this so that Close calls can
// execute in parallel.
if o.objID.tag() == dbTag {
return nil
}
if o.derivedDBID != 0 {
return []objID{o.derivedDBID}
}
return nil
return o.affectedObjects
}

func (o *closeOp) keys() []*[]byte { return nil }
Expand Down Expand Up @@ -1734,6 +1728,11 @@ func (o *dbRatchetFormatMajorVersionOp) diagramKeyRanges() []pebble.KeyRange { r

type dbRestartOp struct {
dbID objID

// affectedObjects is the list of additional objects that are affected by this
// operation, and which syncObjs() must return so that we don't perform the
// restart in parallel with other operations to affected objects.
affectedObjects []objID
}

func (o *dbRestartOp) run(t *Test, h historyRecorder) {
Expand All @@ -1747,7 +1746,7 @@ func (o *dbRestartOp) run(t *Test, h historyRecorder) {

func (o *dbRestartOp) String() string { return fmt.Sprintf("%s.Restart()", o.dbID) }
func (o *dbRestartOp) receiver() objID { return o.dbID }
func (o *dbRestartOp) syncObjs() objIDSlice { return nil }
func (o *dbRestartOp) syncObjs() objIDSlice { return o.affectedObjects }

func (o *dbRestartOp) keys() []*[]byte { return nil }
func (o *dbRestartOp) diagramKeyRanges() []pebble.KeyRange { return nil }
Expand Down
25 changes: 23 additions & 2 deletions metamorphic/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"golang.org/x/exp/slices"
)

type methodInfo struct {
Expand Down Expand Up @@ -583,9 +584,29 @@ func computeDerivedFields(ops []op) {
case *batchCommitOp:
v.dbID = objToDB[v.batchID]
case *closeOp:
if derivedDBID, ok := objToDB[v.objID]; ok && v.objID.tag() != dbTag {
v.derivedDBID = derivedDBID
if v.objID.tag() == dbTag {
// Find all objects that use this db.
v.affectedObjects = nil
for obj, db := range objToDB {
if db == v.objID {
v.affectedObjects = append(v.affectedObjects, obj)
}
}
// Sort so the output is deterministic.
slices.Sort(v.affectedObjects)
} else if dbID, ok := objToDB[v.objID]; ok {
v.affectedObjects = objIDSlice{dbID}
}
case *dbRestartOp:
// Find all objects that use this db.
v.affectedObjects = nil
for obj, db := range objToDB {
if db == v.dbID {
v.affectedObjects = append(v.affectedObjects, obj)
}
}
// Sort so the output is deterministic.
slices.Sort(v.affectedObjects)
case *ingestOp:
v.derivedDBIDs = make([]objID, len(v.batchIDs))
for i := range v.batchIDs {
Expand Down

0 comments on commit 0b0f62a

Please sign in to comment.