Skip to content

Commit

Permalink
metamorphic: limit aggregate injected I/O latency
Browse files Browse the repository at this point in the history
Limit the amount of I/O latency injected into any individual run of the
metamorphic test to improve worst-case test runtimes.

Informs #3681.
Informs #3733.
  • Loading branch information
jbowens committed Jul 10, 2024
1 parent 1d55ad5 commit 3b1077d
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 9 deletions.
3 changes: 2 additions & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2214,7 +2214,8 @@ func TestDeterminism(t *testing.T) {
if cmdArg.Key == "io-latency" {
prevMkfs := mkfs
mkfs = func() vfs.FS {
return errorfs.Wrap(prevMkfs(), errorfs.RandomLatency(errorfs.Randomly(p, 0), mean, 0))
return errorfs.Wrap(prevMkfs(), errorfs.RandomLatency(
errorfs.Randomly(p, 0), mean, 0 /* seed */, 0 /* no limit */))
}
} else if cmdArg.Key == "step-latency" {
beforeStep = func() {
Expand Down
2 changes: 1 addition & 1 deletion error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func TestDBCompactionCrash(t *testing.T) {
// really slow. See https://github.com/golang/go/issues/61042,
// https://github.com/golang/go/issues/44343.
if runtime.GOOS != "windows" {
fs = errorfs.Wrap(fs, errorfs.RandomLatency(nil, 20*time.Microsecond, seed))
fs = errorfs.Wrap(fs, errorfs.RandomLatency(nil, 20*time.Microsecond, seed, 0 /* no limit */))
}
rng := rand.New(rand.NewSource(seed))
maxConcurrentCompactions := rng.Intn(3) + 2
Expand Down
7 changes: 5 additions & 2 deletions metamorphic/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,13 +492,16 @@ func RunOnce(t TestingT, runDir string, seed uint64, historyPath string, rOpts .
} else {
opts.Cleaner = base.ArchiveCleaner{}
}
// Wrap the filesystem with a VFS that will inject random latency if
// the test options require it.
// Wrap the filesystem with a VFS that will inject random latency if the
// test options require it. We cap the overlal injected latency to ten
// seconds to avoid excessive test run times when paired with small target
// file sizes, block sizes, etc.
if testOpts.ioLatencyProbability > 0 {
opts.FS = errorfs.Wrap(opts.FS, errorfs.RandomLatency(
errorfs.Randomly(testOpts.ioLatencyProbability, testOpts.ioLatencySeed),
testOpts.ioLatencyMean,
testOpts.ioLatencySeed,
10*time.Second,
))
}

Expand Down
3 changes: 2 additions & 1 deletion metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,8 @@ func TestMetricsWAmpDisableWAL(t *testing.T) {
// Metrics.WAL.BytesWritten metric is always nondecreasing.
// It's a regression test for issue #3505.
func TestMetricsWALBytesWrittenMonotonicity(t *testing.T) {
fs := errorfs.Wrap(vfs.NewMem(), errorfs.RandomLatency(nil, 100*time.Microsecond, time.Now().UnixNano()))
fs := errorfs.Wrap(vfs.NewMem(), errorfs.RandomLatency(
nil, 100*time.Microsecond, time.Now().UnixNano(), 0 /* no limit */))
d, err := Open("", &Options{
FS: fs,
// Use a tiny memtable size so that we get frequent flushes. While a
Expand Down
25 changes: 22 additions & 3 deletions vfs/errorfs/latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/cockroachdb/errors"
Expand All @@ -19,7 +20,10 @@ import (
// amount of latency injected follows an exponential distribution with the
// provided mean. Latency injected is derived from the provided seed and is
// deterministic with respect to each file's path.
func RandomLatency(pred Predicate, mean time.Duration, seed int64) Injector {
//
// If limit is nonzero, total latency injected over the lifetime of the Injector
// is capped to limit.
func RandomLatency(pred Predicate, mean time.Duration, seed int64, limit time.Duration) Injector {
rl := &randomLatency{
predicate: pred,
mean: mean,
Expand Down Expand Up @@ -47,13 +51,18 @@ func parseRandomLatency(p *Parser, s *dsl.Scanner) Injector {
if tok.Kind != token.RPAREN {
panic(errors.Errorf("errorfs: unexpected token %s; expected %s", tok.String(), token.RPAREN))
}
return RandomLatency(pred, dur, seed)
return RandomLatency(pred, dur, seed, 0 /* no limit */)
}

type randomLatency struct {
predicate Predicate
// p defines the probability of an error being injected.
// mean is the mean duration injected each operation.
mean time.Duration
// limit configures a limit on total latency injected over the lifetime of
// the Injector.
limit time.Duration
// agg is the aggregate latency injected over the lifetime of the Injector.
agg atomic.Int64
keyedPrng
}

Expand All @@ -75,6 +84,16 @@ func (rl *randomLatency) MaybeError(op Op) error {
// enough that causes a test timeout.
dur = time.Duration(min(prng.ExpFloat64(), 20.0) * float64(rl.mean))
})

if v := time.Duration(rl.agg.Add(int64(dur))); v-dur > rl.limit {
// We'd already exceeded the limit before adding dur. Don't inject
// anything.
return nil
} else if v > rl.limit {
// We're about to exceed the limit. Cap the duration.
dur -= v - rl.limit
}

time.Sleep(dur)
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion wal/failover_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,8 @@ func TestFailoverManager_Quiesce(t *testing.T) {
memFS := vfs.NewMem()
require.NoError(t, memFS.MkdirAll("primary", os.ModePerm))
require.NoError(t, memFS.MkdirAll("secondary", os.ModePerm))
fs := errorfs.Wrap(memFS, errorfs.RandomLatency(errorfs.Randomly(0.50, seed), 10*time.Millisecond, seed))
fs := errorfs.Wrap(memFS, errorfs.RandomLatency(
errorfs.Randomly(0.50, seed), 10*time.Millisecond, seed, 0 /* no limit */))

var m failoverManager
require.NoError(t, m.init(Options{
Expand Down

0 comments on commit 3b1077d

Please sign in to comment.