diff --git a/api/server_test.go b/api/server_test.go index e6c412d7d91..dd7fc06c814 100644 --- a/api/server_test.go +++ b/api/server_test.go @@ -13,7 +13,7 @@ import ( "go.k6.io/k6/api/common" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" + "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils" "go.k6.io/k6/lib/testutils/minirunner" @@ -71,7 +71,7 @@ func TestWithEngine(t *testing.T) { Runner: &minirunner.MiniRunner{}, } - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) diff --git a/api/v1/group_routes_test.go b/api/v1/group_routes_test.go index 6dc06f74854..460fb0e4635 100644 --- a/api/v1/group_routes_test.go +++ b/api/v1/group_routes_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/require" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" + "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils" "go.k6.io/k6/lib/testutils/minirunner" @@ -50,7 +50,7 @@ func TestGetGroups(t *testing.T) { assert.NoError(t, err) testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{Group: g0}) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) diff --git a/api/v1/metric_routes_test.go b/api/v1/metric_routes_test.go index 131f16278f7..5612d5f3523 100644 --- a/api/v1/metric_routes_test.go +++ b/api/v1/metric_routes_test.go @@ -11,7 +11,7 @@ import ( "gopkg.in/guregu/null.v3" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" + "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils/minirunner" "go.k6.io/k6/metrics" @@ -23,7 +23,7 @@ func TestGetMetrics(t *testing.T) { testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{}) testMetric, err := testState.Registry.NewMetric("my_metric", metrics.Trend, metrics.Time) require.NoError(t, err) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) @@ -82,7 +82,7 @@ func TestGetMetric(t *testing.T) { testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{}) testMetric, err := testState.Registry.NewMetric("my_metric", metrics.Trend, metrics.Time) require.NoError(t, err) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) diff --git a/api/v1/setup_teardown_routes_test.go b/api/v1/setup_teardown_routes_test.go index 8829755c901..4b9e06b150e 100644 --- a/api/v1/setup_teardown_routes_test.go +++ b/api/v1/setup_teardown_routes_test.go @@ -16,7 +16,7 @@ import ( "gopkg.in/guregu/null.v3" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" + "go.k6.io/k6/execution" "go.k6.io/k6/js" "go.k6.io/k6/lib" "go.k6.io/k6/lib/types" @@ -136,7 +136,7 @@ func TestSetupData(t *testing.T) { TeardownTimeout: types.NullDurationFrom(5 * time.Second), }, runner) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) diff --git a/api/v1/status_routes.go b/api/v1/status_routes.go index f9d444dae16..dc18a822660 100644 --- a/api/v1/status_routes.go +++ b/api/v1/status_routes.go @@ -7,7 +7,7 @@ import ( "net/http" "go.k6.io/k6/api/common" - "go.k6.io/k6/lib" + "go.k6.io/k6/execution" "go.k6.io/k6/lib/executor" ) @@ -24,9 +24,7 @@ func handleGetStatus(rw http.ResponseWriter, r *http.Request) { _, _ = rw.Write(data) } -func getFirstExternallyControlledExecutor( - execScheduler lib.ExecutionScheduler, -) (*executor.ExternallyControlled, error) { +func getFirstExternallyControlledExecutor(execScheduler *execution.Scheduler) (*executor.ExternallyControlled, error) { executors := execScheduler.GetExecutors() for _, s := range executors { if mex, ok := s.(*executor.ExternallyControlled); ok { diff --git a/api/v1/status_routes_test.go b/api/v1/status_routes_test.go index 03ce1f2d4f5..bb31003ed2b 100644 --- a/api/v1/status_routes_test.go +++ b/api/v1/status_routes_test.go @@ -14,7 +14,7 @@ import ( "gopkg.in/guregu/null.v3" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" + "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils/minirunner" ) @@ -23,7 +23,7 @@ func TestGetStatus(t *testing.T) { t.Parallel() testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{}) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) @@ -110,7 +110,7 @@ func TestPatchStatus(t *testing.T) { require.NoError(t, err) testState := getTestRunState(t, lib.Options{Scenarios: scenarios}, &minirunner.MiniRunner{}) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) diff --git a/cmd/run.go b/cmd/run.go index c2060c86527..98f24af36a0 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -20,9 +20,9 @@ import ( "go.k6.io/k6/api" "go.k6.io/k6/cmd/state" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" "go.k6.io/k6/errext" "go.k6.io/k6/errext/exitcodes" + "go.k6.io/k6/execution" "go.k6.io/k6/js/common" "go.k6.io/k6/lib" "go.k6.io/k6/lib/consts" @@ -72,7 +72,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { logger := testRunState.Logger // Create a local execution scheduler wrapping the runner. logger.Debug("Initializing the execution scheduler...") - execScheduler, err := local.NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) if err != nil { return err } @@ -332,7 +332,7 @@ a commandline interface for interacting with it.`, return runCmd } -func reportUsage(execScheduler *local.ExecutionScheduler) error { +func reportUsage(execScheduler *execution.Scheduler) error { execState := execScheduler.GetState() executorConfigs := execScheduler.GetExecutorConfigs() diff --git a/core/engine.go b/core/engine.go index 5e31a567a4f..aa37d4bf4d1 100644 --- a/core/engine.go +++ b/core/engine.go @@ -10,6 +10,7 @@ import ( "go.k6.io/k6/errext" "go.k6.io/k6/errext/exitcodes" + "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/metrics" "go.k6.io/k6/metrics/engine" @@ -33,7 +34,7 @@ type Engine struct { // TODO: completely remove the engine and use all of these separately, in a // much more composable and testable manner - ExecutionScheduler lib.ExecutionScheduler + ExecutionScheduler *execution.Scheduler MetricsEngine *engine.MetricsEngine OutputManager *output.Manager @@ -53,7 +54,7 @@ type Engine struct { } // NewEngine instantiates a new Engine, without doing any heavy initialization. -func NewEngine(testState *lib.TestRunState, ex lib.ExecutionScheduler, outputs []output.Output) (*Engine, error) { +func NewEngine(testState *lib.TestRunState, ex *execution.Scheduler, outputs []output.Output) (*Engine, error) { if ex == nil { return nil, errors.New("missing ExecutionScheduler instance") } diff --git a/core/engine_test.go b/core/engine_test.go index d06962dd242..30e3a8984e1 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -14,8 +14,8 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/guregu/null.v3" - "go.k6.io/k6/core/local" "go.k6.io/k6/errext" + "go.k6.io/k6/execution" "go.k6.io/k6/js" "go.k6.io/k6/lib" "go.k6.io/k6/lib/executor" @@ -82,7 +82,7 @@ func newTestEngineWithTestPreInitState( //nolint:golint testRunState := getTestRunState(t, piState, newOpts, runner) - execScheduler, err := local.NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) engine, err := NewEngine(testRunState, execScheduler, outputs) @@ -927,7 +927,7 @@ func TestVuInitException(t *testing.T) { testState := getTestRunState(t, piState, opts, runner) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := NewEngine(testState, execScheduler, nil) require.NoError(t, err) @@ -1315,7 +1315,7 @@ func TestActiveVUsCount(t *testing.T) { require.NoError(t, err) testState := getTestRunState(t, piState, opts, runner) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := NewEngine(testState, execScheduler, []output.Output{mockOutput}) require.NoError(t, err) diff --git a/core/local/util_test.go b/core/local/util_test.go deleted file mode 100644 index 55426902c19..00000000000 --- a/core/local/util_test.go +++ /dev/null @@ -1,266 +0,0 @@ -package local - -//TODO: translate this test to the new paradigm -/* -func TestProcessStages(t *testing.T) { - type checkpoint struct { - D time.Duration - Keep bool - VUs null.Int - } - testdata := map[string]struct { - Start int64 - Stages []lib.Stage - Checkpoints []checkpoint - }{ - "none": { - 0, - []lib.Stage{}, - []checkpoint{ - {0 * time.Second, false, null.NewInt(0, false)}, - {10 * time.Second, false, null.NewInt(0, false)}, - {24 * time.Hour, false, null.NewInt(0, false)}, - }, - }, - "one": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(10 * time.Second)}, - }, - []checkpoint{ - {0 * time.Second, true, null.NewInt(0, false)}, - {1 * time.Second, true, null.NewInt(0, false)}, - {10 * time.Second, true, null.NewInt(0, false)}, - {11 * time.Second, false, null.NewInt(0, false)}, - }, - }, - "one/start": { - 5, - []lib.Stage{ - {Duration: types.NullDurationFrom(10 * time.Second)}, - }, - []checkpoint{ - {0 * time.Second, true, null.NewInt(5, false)}, - {1 * time.Second, true, null.NewInt(5, false)}, - {10 * time.Second, true, null.NewInt(5, false)}, - {11 * time.Second, false, null.NewInt(5, false)}, - }, - }, - "one/targeted": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(10 * time.Second), Target: null.IntFrom(100)}, - }, - []checkpoint{ - {0 * time.Second, true, null.IntFrom(0)}, - {1 * time.Second, true, null.IntFrom(10)}, - {2 * time.Second, true, null.IntFrom(20)}, - {3 * time.Second, true, null.IntFrom(30)}, - {4 * time.Second, true, null.IntFrom(40)}, - {5 * time.Second, true, null.IntFrom(50)}, - {6 * time.Second, true, null.IntFrom(60)}, - {7 * time.Second, true, null.IntFrom(70)}, - {8 * time.Second, true, null.IntFrom(80)}, - {9 * time.Second, true, null.IntFrom(90)}, - {10 * time.Second, true, null.IntFrom(100)}, - {11 * time.Second, false, null.IntFrom(100)}, - }, - }, - "one/targeted/start": { - 50, - []lib.Stage{ - {Duration: types.NullDurationFrom(10 * time.Second), Target: null.IntFrom(100)}, - }, - []checkpoint{ - {0 * time.Second, true, null.IntFrom(50)}, - {1 * time.Second, true, null.IntFrom(55)}, - {2 * time.Second, true, null.IntFrom(60)}, - {3 * time.Second, true, null.IntFrom(65)}, - {4 * time.Second, true, null.IntFrom(70)}, - {5 * time.Second, true, null.IntFrom(75)}, - {6 * time.Second, true, null.IntFrom(80)}, - {7 * time.Second, true, null.IntFrom(85)}, - {8 * time.Second, true, null.IntFrom(90)}, - {9 * time.Second, true, null.IntFrom(95)}, - {10 * time.Second, true, null.IntFrom(100)}, - {11 * time.Second, false, null.IntFrom(100)}, - }, - }, - "two": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second)}, - {Duration: types.NullDurationFrom(5 * time.Second)}, - }, - []checkpoint{ - {0 * time.Second, true, null.NewInt(0, false)}, - {1 * time.Second, true, null.NewInt(0, false)}, - {11 * time.Second, false, null.NewInt(0, false)}, - }, - }, - "two/start": { - 5, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second)}, - {Duration: types.NullDurationFrom(5 * time.Second)}, - }, - []checkpoint{ - {0 * time.Second, true, null.NewInt(5, false)}, - {1 * time.Second, true, null.NewInt(5, false)}, - {11 * time.Second, false, null.NewInt(5, false)}, - }, - }, - "two/targeted": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(100)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(0)}, - }, - []checkpoint{ - {0 * time.Second, true, null.IntFrom(0)}, - {1 * time.Second, true, null.IntFrom(20)}, - {2 * time.Second, true, null.IntFrom(40)}, - {3 * time.Second, true, null.IntFrom(60)}, - {4 * time.Second, true, null.IntFrom(80)}, - {5 * time.Second, true, null.IntFrom(100)}, - {6 * time.Second, true, null.IntFrom(80)}, - {7 * time.Second, true, null.IntFrom(60)}, - {8 * time.Second, true, null.IntFrom(40)}, - {9 * time.Second, true, null.IntFrom(20)}, - {10 * time.Second, true, null.IntFrom(0)}, - {11 * time.Second, false, null.IntFrom(0)}, - }, - }, - "three": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second)}, - {Duration: types.NullDurationFrom(10 * time.Second)}, - {Duration: types.NullDurationFrom(15 * time.Second)}, - }, - []checkpoint{ - {0 * time.Second, true, null.NewInt(0, false)}, - {1 * time.Second, true, null.NewInt(0, false)}, - {15 * time.Second, true, null.NewInt(0, false)}, - {30 * time.Second, true, null.NewInt(0, false)}, - {31 * time.Second, false, null.NewInt(0, false)}, - }, - }, - "three/targeted": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(50)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(100)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(0)}, - }, - []checkpoint{ - {0 * time.Second, true, null.IntFrom(0)}, - {1 * time.Second, true, null.IntFrom(10)}, - {2 * time.Second, true, null.IntFrom(20)}, - {3 * time.Second, true, null.IntFrom(30)}, - {4 * time.Second, true, null.IntFrom(40)}, - {5 * time.Second, true, null.IntFrom(50)}, - {6 * time.Second, true, null.IntFrom(60)}, - {7 * time.Second, true, null.IntFrom(70)}, - {8 * time.Second, true, null.IntFrom(80)}, - {9 * time.Second, true, null.IntFrom(90)}, - {10 * time.Second, true, null.IntFrom(100)}, - {11 * time.Second, true, null.IntFrom(80)}, - {12 * time.Second, true, null.IntFrom(60)}, - {13 * time.Second, true, null.IntFrom(40)}, - {14 * time.Second, true, null.IntFrom(20)}, - {15 * time.Second, true, null.IntFrom(0)}, - {16 * time.Second, false, null.IntFrom(0)}, - }, - }, - "mix": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(20)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(10)}, - {Duration: types.NullDurationFrom(2 * time.Second)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(20)}, - {Duration: types.NullDurationFrom(2 * time.Second)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(10)}, - }, - []checkpoint{ - {0 * time.Second, true, null.IntFrom(0)}, - - {1 * time.Second, true, null.IntFrom(4)}, - {2 * time.Second, true, null.IntFrom(8)}, - {3 * time.Second, true, null.IntFrom(12)}, - {4 * time.Second, true, null.IntFrom(16)}, - {5 * time.Second, true, null.IntFrom(20)}, - - {6 * time.Second, true, null.IntFrom(18)}, - {7 * time.Second, true, null.IntFrom(16)}, - {8 * time.Second, true, null.IntFrom(14)}, - {9 * time.Second, true, null.IntFrom(12)}, - {10 * time.Second, true, null.IntFrom(10)}, - - {11 * time.Second, true, null.IntFrom(10)}, - {12 * time.Second, true, null.IntFrom(10)}, - - {13 * time.Second, true, null.IntFrom(12)}, - {14 * time.Second, true, null.IntFrom(14)}, - {15 * time.Second, true, null.IntFrom(16)}, - {16 * time.Second, true, null.IntFrom(18)}, - {17 * time.Second, true, null.IntFrom(20)}, - - {18 * time.Second, true, null.IntFrom(20)}, - {19 * time.Second, true, null.IntFrom(20)}, - - {20 * time.Second, true, null.IntFrom(18)}, - {21 * time.Second, true, null.IntFrom(16)}, - {22 * time.Second, true, null.IntFrom(14)}, - {23 * time.Second, true, null.IntFrom(12)}, - {24 * time.Second, true, null.IntFrom(10)}, - }, - }, - "mix/start": { - 5, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(10)}, - }, - []checkpoint{ - {0 * time.Second, true, null.NewInt(5, false)}, - - {1 * time.Second, true, null.NewInt(5, false)}, - {2 * time.Second, true, null.NewInt(5, false)}, - {3 * time.Second, true, null.NewInt(5, false)}, - {4 * time.Second, true, null.NewInt(5, false)}, - {5 * time.Second, true, null.NewInt(5, false)}, - - {6 * time.Second, true, null.NewInt(6, true)}, - {7 * time.Second, true, null.NewInt(7, true)}, - {8 * time.Second, true, null.NewInt(8, true)}, - {9 * time.Second, true, null.NewInt(9, true)}, - {10 * time.Second, true, null.NewInt(10, true)}, - }, - }, - "infinite": { - 0, - []lib.Stage{{}}, - []checkpoint{ - {0 * time.Second, true, null.NewInt(0, false)}, - {1 * time.Minute, true, null.NewInt(0, false)}, - {1 * time.Hour, true, null.NewInt(0, false)}, - {24 * time.Hour, true, null.NewInt(0, false)}, - {365 * 24 * time.Hour, true, null.NewInt(0, false)}, - }, - }, - } - for name, data := range testdata { - t.Run(name, func(t *testing.T) { - for _, ckp := range data.Checkpoints { - t.Run(ckp.D.String(), func(t *testing.T) { - vus, keepRunning := ProcessStages(data.Start, data.Stages, ckp.D) - assert.Equal(t, ckp.VUs, vus) - assert.Equal(t, ckp.Keep, keepRunning) - }) - } - }) - } -} -*/ diff --git a/execution/abort.go b/execution/abort.go new file mode 100644 index 00000000000..08e9bfa8ea2 --- /dev/null +++ b/execution/abort.go @@ -0,0 +1,84 @@ +package execution + +import ( + "context" + "sync" + + "github.com/sirupsen/logrus" +) + +// testAbortKey is the key used to store the abort function for the context of +// an executor. This allows any users of that context or its sub-contexts to +// cancel the whole execution tree, while at the same time providing all of the +// details for why they cancelled it via the attached error. +type testAbortKey struct{} + +type testAbortController struct { + cancel context.CancelFunc + + logger logrus.FieldLogger + lock sync.Mutex // only the first reason will be kept, other will be logged + reason error // see errext package, you can wrap errors to attach exit status, run status, etc. +} + +func (tac *testAbortController) abort(err error) { + tac.lock.Lock() + defer tac.lock.Unlock() + if tac.reason != nil { + tac.logger.Debugf( + "test abort with reason '%s' was attempted when the test was already aborted due to '%s'", + err.Error(), tac.reason.Error(), + ) + return + } + tac.reason = err + tac.cancel() +} + +func (tac *testAbortController) getReason() error { + tac.lock.Lock() + defer tac.lock.Unlock() + return tac.reason +} + +// NewTestRunContext returns context.Context that can be aborted by calling the +// returned TestAbortFunc or by calling CancelTestRunContext() on the returned +// context or a sub-context of it. Use this to initialize the context that will +// be passed to the ExecutionScheduler, so `execution.test.abort()` and the REST +// API test stopping both work. +func NewTestRunContext( + ctx context.Context, logger logrus.FieldLogger, +) (newCtx context.Context, abortTest func(reason error)) { + ctx, cancel := context.WithCancel(ctx) + + controller := &testAbortController{ + cancel: cancel, + logger: logger, + } + + return context.WithValue(ctx, testAbortKey{}, controller), controller.abort +} + +// AbortTestRun will cancel the test run context with the given reason if the +// provided context is actually a TestRuncontext or a child of one. +func AbortTestRun(ctx context.Context, err error) bool { + if x := ctx.Value(testAbortKey{}); x != nil { + if v, ok := x.(*testAbortController); ok { + v.abort(err) + return true + } + } + return false +} + +// GetCancelReasonIfTestAborted returns a reason the Context was cancelled, if it was +// aborted with these functions. It will return nil if ctx is not an +// TestRunContext (or its children) or if it was never aborted. +func GetCancelReasonIfTestAborted(ctx context.Context) error { + if x := ctx.Value(testAbortKey{}); x != nil { + if v, ok := x.(*testAbortController); ok { + return v.getReason() + } + } + return nil +} diff --git a/execution/pkg.go b/execution/pkg.go new file mode 100644 index 00000000000..5cf88bf880e --- /dev/null +++ b/execution/pkg.go @@ -0,0 +1,8 @@ +// Package execution contains most of the components that schedule, execute and +// control individual k6 tests. +package execution + +// TODO: move ExecutionSegment and ESS here + +// TODO: move execotors interfaces here and implementations in a sub-folder +// TODO: move the execution state here diff --git a/core/local/local.go b/execution/scheduler.go similarity index 81% rename from core/local/local.go rename to execution/scheduler.go index 61c9b1e59a1..0f5d42a729f 100644 --- a/core/local/local.go +++ b/execution/scheduler.go @@ -1,4 +1,4 @@ -package local +package execution import ( "context" @@ -11,13 +11,14 @@ import ( "go.k6.io/k6/errext" "go.k6.io/k6/lib" - "go.k6.io/k6/lib/executor" "go.k6.io/k6/metrics" "go.k6.io/k6/ui/pb" ) -// ExecutionScheduler is the local implementation of lib.ExecutionScheduler -type ExecutionScheduler struct { +// A Scheduler is in charge of most of the test execution - initializing VUs and +// executors, running setup() and teardown(), and actually starting the +// executors for the different scenarios at the appropriate times. +type Scheduler struct { initProgress *pb.ProgressBar executorConfigs []lib.ExecutorConfig // sorted by (startTime, ID) executors []lib.Executor // sorted by (startTime, ID), excludes executors with no work @@ -31,14 +32,11 @@ type ExecutionScheduler struct { stopVUsEmission, vusEmissionStopped chan struct{} } -// Check to see if we implement the lib.ExecutionScheduler interface -var _ lib.ExecutionScheduler = &ExecutionScheduler{} - -// NewExecutionScheduler creates and returns a new local lib.ExecutionScheduler -// instance, without initializing it beyond the bare minimum. Specifically, it -// creates the needed executor instances and a lot of state placeholders, but it -// doesn't initialize the executors and it doesn't initialize or run VUs. -func NewExecutionScheduler(trs *lib.TestRunState) (*ExecutionScheduler, error) { +// NewScheduler creates and returns a new Scheduler instance, without +// initializing it beyond the bare minimum. Specifically, it creates the needed +// executor instances and a lot of state placeholders, but it doesn't initialize +// the executors and it doesn't initialize or run VUs. +func NewScheduler(trs *lib.TestRunState) (*Scheduler, error) { options := trs.Options et, err := lib.NewExecutionTuple(options.ExecutionSegment, options.ExecutionSegmentSequence) if err != nil { @@ -78,7 +76,7 @@ func NewExecutionScheduler(trs *lib.TestRunState) (*ExecutionScheduler, error) { } } - return &ExecutionScheduler{ + return &Scheduler{ initProgress: pb.New(pb.WithConstLeft("Init")), executors: executors, executorConfigs: executorConfigs, @@ -93,48 +91,48 @@ func NewExecutionScheduler(trs *lib.TestRunState) (*ExecutionScheduler, error) { } // GetRunner returns the wrapped lib.Runner instance. -func (e *ExecutionScheduler) GetRunner() lib.Runner { // TODO: remove +func (e *Scheduler) GetRunner() lib.Runner { // TODO: remove return e.state.Test.Runner } -// GetState returns a pointer to the execution state struct for the local -// execution scheduler. It's guaranteed to be initialized and present, though -// see the documentation in lib/execution.go for caveats about its usage. The -// most important one is that none of the methods beyond the pause-related ones +// GetState returns a pointer to the execution state struct for the execution +// scheduler. It's guaranteed to be initialized and present, though see the +// documentation in lib/execution.go for caveats about its usage. The most +// important one is that none of the methods beyond the pause-related ones // should be used for synchronization. -func (e *ExecutionScheduler) GetState() *lib.ExecutionState { +func (e *Scheduler) GetState() *lib.ExecutionState { return e.state } // GetExecutors returns the slice of configured executor instances which // have work, sorted by their (startTime, name) in an ascending order. -func (e *ExecutionScheduler) GetExecutors() []lib.Executor { +func (e *Scheduler) GetExecutors() []lib.Executor { return e.executors } // GetExecutorConfigs returns the slice of all executor configs, sorted by // their (startTime, name) in an ascending order. -func (e *ExecutionScheduler) GetExecutorConfigs() []lib.ExecutorConfig { +func (e *Scheduler) GetExecutorConfigs() []lib.ExecutorConfig { return e.executorConfigs } // GetInitProgressBar returns the progress bar associated with the Init // function. After the Init is done, it is "hijacked" to display real-time // execution statistics as a text bar. -func (e *ExecutionScheduler) GetInitProgressBar() *pb.ProgressBar { +func (e *Scheduler) GetInitProgressBar() *pb.ProgressBar { return e.initProgress } // GetExecutionPlan is a helper method so users of the local execution scheduler // don't have to calculate the execution plan again. -func (e *ExecutionScheduler) GetExecutionPlan() []lib.ExecutionStep { +func (e *Scheduler) GetExecutionPlan() []lib.ExecutionStep { return e.executionPlan } // initVU is a helper method that's used to both initialize the planned VUs // in the Init() method, and also passed to executors so they can initialize // any unplanned VUs themselves. -func (e *ExecutionScheduler) initVU( +func (e *Scheduler) initVU( ctx context.Context, samplesOut chan<- metrics.SampleContainer, logger logrus.FieldLogger, ) (lib.InitializedVU, error) { // Get the VU IDs here, so that the VUs are (mostly) ordered by their @@ -151,7 +149,7 @@ func (e *ExecutionScheduler) initVU( // getRunStats is a helper function that can be used as the execution // scheduler's progressbar substitute (i.e. hijack). -func (e *ExecutionScheduler) getRunStats() string { +func (e *Scheduler) getRunStats() string { status := "running" if e.state.IsPaused() { status = "paused" @@ -169,7 +167,7 @@ func (e *ExecutionScheduler) getRunStats() string { ) } -func (e *ExecutionScheduler) initVUsConcurrently( +func (e *Scheduler) initVUsConcurrently( ctx context.Context, samplesOut chan<- metrics.SampleContainer, count uint64, concurrency int, logger logrus.FieldLogger, ) chan error { @@ -206,7 +204,7 @@ func (e *ExecutionScheduler) initVUsConcurrently( return doneInits } -func (e *ExecutionScheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.SampleContainer) { +func (e *Scheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.SampleContainer) { e.state.Test.Logger.Debug("Starting emission of VUs and VUsMax metrics...") tags := e.state.Test.RunTags @@ -259,7 +257,7 @@ func (e *ExecutionScheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- me // Init concurrently initializes all of the planned VUs and then sequentially // initializes all of the configured executors. -func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) { +func (e *Scheduler) Init(ctx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) { e.emitVUsAndVUsMax(ctx, samplesOut) defer func() { if err != nil { @@ -268,7 +266,7 @@ func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- metrics } }() - logger := e.state.Test.Logger.WithField("phase", "local-execution-scheduler-init") + logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-init") vusToInitialize := lib.GetMaxPlannedVUs(e.executionPlan) logger.WithFields(logrus.Fields{ "neededVUs": vusToInitialize, @@ -342,7 +340,7 @@ func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- metrics // executor, each time in a new goroutine. It is responsible for waiting out the // configured startTime for the specific executor and then running its Run() // method. -func (e *ExecutionScheduler) runExecutor( +func (e *Scheduler) runExecutor( runCtx context.Context, runResults chan<- error, engineOut chan<- metrics.SampleContainer, executor lib.Executor, ) { executorConfig := executor.GetConfig() @@ -389,18 +387,18 @@ func (e *ExecutionScheduler) runExecutor( runResults <- err } -// Run the ExecutionScheduler, funneling all generated metric samples through the supplied +// Run the Scheduler, funneling all generated metric samples through the supplied // out channel. // //nolint:funlen -func (e *ExecutionScheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metrics.SampleContainer) error { +func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metrics.SampleContainer) error { defer func() { close(e.stopVUsEmission) <-e.vusEmissionStopped }() executorsCount := len(e.executors) - logger := e.state.Test.Logger.WithField("phase", "local-execution-scheduler-run") + logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-run") e.initProgress.Modify(pb.WithConstLeft("Run")) var interrupted bool defer func() { @@ -452,7 +450,7 @@ func (e *ExecutionScheduler) Run(globalCtx, runCtx context.Context, engineOut ch // this context effectively stopping all executions. // // This is for addressing test.abort(). - execCtx := executor.Context(runSubCtx) + execCtx, _ := NewTestRunContext(runSubCtx, logger) for _, exec := range e.executors { go e.runExecutor(execCtx, runResults, engineOut, exec) } @@ -480,17 +478,33 @@ func (e *ExecutionScheduler) Run(globalCtx, runCtx context.Context, engineOut ch return err } } - if err := executor.CancelReason(execCtx); err != nil && errext.IsInterruptError(err) { + if err := GetCancelReasonIfTestAborted(execCtx); err != nil && errext.IsInterruptError(err) { interrupted = true return err } return firstErr } -// SetPaused pauses a test, if called with true. And if called with false, tries -// to start/resume it. See the lib.ExecutionScheduler interface documentation of -// the methods for the various caveats about its usage. -func (e *ExecutionScheduler) SetPaused(pause bool) error { +// SetPaused pauses the test, or start/resumes it. To check if a test is paused, +// use GetState().IsPaused(). +// +// Currently, any executor, so any test, can be started in a paused state. This +// will cause k6 to initialize all needed VUs, but it won't actually start the +// test. Later, the test can be started for real by resuming/unpausing it from +// the REST API. +// +// After a test is actually started, it may become impossible to pause it again. +// That is signaled by having SetPaused(true) return an error. The likely cause +// is that some of the executors for the test don't support pausing after the +// test has been started. +// +// IMPORTANT: Currently only the externally controlled executor can be paused +// and resumed multiple times in the middle of the test execution! Even then, +// "pausing" is a bit misleading, since k6 won't pause in the middle of the +// currently executing iterations. It will allow the currently in-progress +// iterations to finish, and it just won't start any new ones nor will it +// increment the value returned by GetCurrentTestRunDuration(). +func (e *Scheduler) SetPaused(pause bool) error { if !e.state.HasStarted() && e.state.IsPaused() { if pause { return fmt.Errorf("execution is already paused") diff --git a/core/local/k6execution_test.go b/execution/scheduler_ext_exec_test.go similarity index 95% rename from core/local/k6execution_test.go rename to execution/scheduler_ext_exec_test.go index c97fd15ab32..2ae4c10b703 100644 --- a/core/local/k6execution_test.go +++ b/execution/scheduler_ext_exec_test.go @@ -1,4 +1,4 @@ -package local +package execution_test import ( "encoding/json" @@ -18,6 +18,8 @@ import ( "go.k6.io/k6/metrics" ) +// TODO: rewrite and/or move these as integration tests to reduce boilerplate +// and improve reliability? func TestExecutionInfoVUSharing(t *testing.T) { t.Parallel() script := []byte(` @@ -81,7 +83,7 @@ func TestExecutionInfoVUSharing(t *testing.T) { ) require.NoError(t, err) - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, lib.Options{}) defer cancel() type vuStat struct { @@ -194,7 +196,7 @@ func TestExecutionInfoScenarioIter(t *testing.T) { ) require.NoError(t, err) - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, lib.Options{}) defer cancel() errCh := make(chan error, 1) @@ -276,7 +278,7 @@ func TestSharedIterationsStable(t *testing.T) { ) require.NoError(t, err) - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, lib.Options{}) defer cancel() errCh := make(chan error, 1) @@ -409,7 +411,7 @@ func TestExecutionInfoAll(t *testing.T) { }, nil) require.NoError(t, err) - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, lib.Options{}) defer cancel() errCh := make(chan error, 1) diff --git a/core/local/local_test.go b/execution/scheduler_ext_test.go similarity index 86% rename from core/local/local_test.go rename to execution/scheduler_ext_test.go index dcb0ddf4a72..43dfc9b69ac 100644 --- a/core/local/local_test.go +++ b/execution/scheduler_ext_test.go @@ -1,4 +1,4 @@ -package local +package execution_test import ( "context" @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/guregu/null.v3" + "go.k6.io/k6/execution" "go.k6.io/k6/js" "go.k6.io/k6/lib" "go.k6.io/k6/lib/executor" @@ -56,9 +57,9 @@ func getTestRunState( } } -func newTestExecutionScheduler( +func newTestScheduler( t *testing.T, runner lib.Runner, logger *logrus.Logger, opts lib.Options, -) (ctx context.Context, cancel func(), execScheduler *ExecutionScheduler, samples chan metrics.SampleContainer) { +) (ctx context.Context, cancel func(), execScheduler *execution.Scheduler, samples chan metrics.SampleContainer) { if runner == nil { runner = &minirunner.MiniRunner{} } @@ -73,7 +74,7 @@ func newTestExecutionScheduler( testRunState.Logger = logger } - execScheduler, err = NewExecutionScheduler(testRunState) + execScheduler, err = execution.NewScheduler(testRunState) require.NoError(t, err) samples = make(chan metrics.SampleContainer, newOpts.MetricSamplesBufferSize.Int64) @@ -92,9 +93,9 @@ func newTestExecutionScheduler( return ctx, cancel, execScheduler, samples } -func TestExecutionSchedulerRun(t *testing.T) { +func TestSchedulerRun(t *testing.T) { t.Parallel() - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, nil, nil, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, nil, nil, lib.Options{}) defer cancel() err := make(chan error, 1) @@ -102,7 +103,7 @@ func TestExecutionSchedulerRun(t *testing.T) { assert.NoError(t, <-err) } -func TestExecutionSchedulerRunNonDefault(t *testing.T) { +func TestSchedulerRunNonDefault(t *testing.T) { t.Parallel() testCases := []struct { @@ -136,7 +137,7 @@ func TestExecutionSchedulerRunNonDefault(t *testing.T) { testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -165,7 +166,7 @@ func TestExecutionSchedulerRunNonDefault(t *testing.T) { } } -func TestExecutionSchedulerRunEnv(t *testing.T) { +func TestSchedulerRunEnv(t *testing.T) { t.Parallel() scriptTemplate := ` @@ -253,7 +254,7 @@ func TestExecutionSchedulerRunEnv(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -280,7 +281,7 @@ func TestExecutionSchedulerRunEnv(t *testing.T) { } } -func TestExecutionSchedulerSystemTags(t *testing.T) { +func TestSchedulerSystemTags(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) sr := tb.Replacer.Replace @@ -322,7 +323,7 @@ func TestExecutionSchedulerSystemTags(t *testing.T) { }))) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) @@ -372,7 +373,7 @@ func TestExecutionSchedulerSystemTags(t *testing.T) { } } -func TestExecutionSchedulerRunCustomTags(t *testing.T) { +func TestSchedulerRunCustomTags(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) sr := tb.Replacer.Replace @@ -453,7 +454,7 @@ func TestExecutionSchedulerRunCustomTags(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) @@ -496,7 +497,7 @@ func TestExecutionSchedulerRunCustomTags(t *testing.T) { // Ensure that custom executor settings are unique per executor and // that there's no "crossover"/"pollution" between executors. // Also test that custom tags are properly set on checks and groups metrics. -func TestExecutionSchedulerRunCustomConfigNoCrossover(t *testing.T) { +func TestSchedulerRunCustomConfigNoCrossover(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) @@ -617,7 +618,7 @@ func TestExecutionSchedulerRunCustomConfigNoCrossover(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) @@ -686,7 +687,7 @@ func TestExecutionSchedulerRunCustomConfigNoCrossover(t *testing.T) { require.Equal(t, 8, gotSampleTags, "received wrong amount of samples with expected tags") } -func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { +func TestSchedulerSetupTeardownRun(t *testing.T) { t.Parallel() t.Run("Normal", func(t *testing.T) { t.Parallel() @@ -702,7 +703,7 @@ func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { return nil }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{}) err := make(chan error, 1) go func() { err <- execScheduler.Run(ctx, ctx, samples) }() @@ -718,7 +719,7 @@ func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { return nil, errors.New("setup error") }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{}) defer cancel() assert.EqualError(t, execScheduler.Run(ctx, ctx, samples), "setup error") }) @@ -732,7 +733,7 @@ func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { return errors.New("teardown error") }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{ + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{ NoSetup: null.BoolFrom(true), VUs: null.IntFrom(1), Iterations: null.IntFrom(1), @@ -751,7 +752,7 @@ func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { return errors.New("teardown error") }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{ + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{ VUs: null.IntFrom(1), Iterations: null.IntFrom(1), }) @@ -769,7 +770,7 @@ func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { return errors.New("teardown error") }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{ + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{ NoTeardown: null.BoolFrom(true), VUs: null.IntFrom(1), Iterations: null.IntFrom(1), @@ -779,7 +780,7 @@ func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { }) } -func TestExecutionSchedulerStages(t *testing.T) { +func TestSchedulerStages(t *testing.T) { t.Parallel() testdata := map[string]struct { Duration time.Duration @@ -815,7 +816,7 @@ func TestExecutionSchedulerStages(t *testing.T) { return nil }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{ + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{ VUs: null.IntFrom(1), Stages: data.Stages, }) @@ -826,7 +827,7 @@ func TestExecutionSchedulerStages(t *testing.T) { } } -func TestExecutionSchedulerEndTime(t *testing.T) { +func TestSchedulerEndTime(t *testing.T) { t.Parallel() runner := &minirunner.MiniRunner{ Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { @@ -834,7 +835,7 @@ func TestExecutionSchedulerEndTime(t *testing.T) { return nil }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{ + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{ VUs: null.IntFrom(10), Duration: types.NullDurationFrom(1 * time.Second), }) @@ -851,7 +852,7 @@ func TestExecutionSchedulerEndTime(t *testing.T) { assert.True(t, runTime < 10*time.Second, "took more than 10 seconds") } -func TestExecutionSchedulerRuntimeErrors(t *testing.T) { +func TestSchedulerRuntimeErrors(t *testing.T) { t.Parallel() runner := &minirunner.MiniRunner{ Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { @@ -864,7 +865,7 @@ func TestExecutionSchedulerRuntimeErrors(t *testing.T) { }, } logger, hook := logtest.NewNullLogger() - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, lib.Options{}) defer cancel() endTime, isFinal := lib.GetEndOffset(execScheduler.GetExecutionPlan()) @@ -883,7 +884,7 @@ func TestExecutionSchedulerRuntimeErrors(t *testing.T) { } } -func TestExecutionSchedulerEndErrors(t *testing.T) { +func TestSchedulerEndErrors(t *testing.T) { t.Parallel() exec := executor.NewConstantVUsConfig("we_need_hard_stop") @@ -901,7 +902,7 @@ func TestExecutionSchedulerEndErrors(t *testing.T) { }, } logger, hook := logtest.NewNullLogger() - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, lib.Options{}) defer cancel() endTime, isFinal := lib.GetEndOffset(execScheduler.GetExecutionPlan()) @@ -917,7 +918,7 @@ func TestExecutionSchedulerEndErrors(t *testing.T) { assert.Empty(t, hook.Entries) } -func TestExecutionSchedulerEndIterations(t *testing.T) { +func TestSchedulerEndIterations(t *testing.T) { t.Parallel() registry := metrics.NewRegistry() metric := registry.MustNewMetric("test_metric", metrics.Counter) @@ -954,7 +955,7 @@ func TestExecutionSchedulerEndIterations(t *testing.T) { defer cancel() testRunState := getTestRunState(t, getTestPreInitState(t), runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) samples := make(chan metrics.SampleContainer, 300) @@ -972,7 +973,7 @@ func TestExecutionSchedulerEndIterations(t *testing.T) { } } -func TestExecutionSchedulerIsRunning(t *testing.T) { +func TestSchedulerIsRunning(t *testing.T) { t.Parallel() runner := &minirunner.MiniRunner{ Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { @@ -980,7 +981,7 @@ func TestExecutionSchedulerIsRunning(t *testing.T) { return nil }, } - ctx, cancel, execScheduler, _ := newTestExecutionScheduler(t, runner, nil, lib.Options{}) + ctx, cancel, execScheduler, _ := newTestScheduler(t, runner, nil, lib.Options{}) state := execScheduler.GetState() err := make(chan error) @@ -995,7 +996,7 @@ func TestExecutionSchedulerIsRunning(t *testing.T) { assert.NoError(t, <-err) } -// TestDNSResolver checks the DNS resolution behavior at the ExecutionScheduler level. +// TestDNSResolver checks the DNS resolution behavior at the Scheduler level. func TestDNSResolver(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) @@ -1072,7 +1073,7 @@ func TestDNSResolver(t *testing.T) { mr := mockresolver.New(nil, net.LookupIP) runner.ActualResolver = mr.LookupIPAll - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, tc.opts) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, tc.opts) defer cancel() mr.Set("myhost", sr("HTTPBIN_IP")) @@ -1160,7 +1161,7 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, options, runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -1280,86 +1281,7 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) { } } -// Just a lib.PausableExecutor implementation that can return an error -type pausableExecutor struct { - lib.Executor - err error -} - -func (p pausableExecutor) SetPaused(bool) error { - return p.err -} - -func TestSetPaused(t *testing.T) { - t.Parallel() - t.Run("second pause is an error", func(t *testing.T) { - t.Parallel() - testRunState := getTestRunState(t, getTestPreInitState(t), lib.Options{}, &minirunner.MiniRunner{}) - sched, err := NewExecutionScheduler(testRunState) - require.NoError(t, err) - sched.executors = []lib.Executor{pausableExecutor{err: nil}} - - require.NoError(t, sched.SetPaused(true)) - err = sched.SetPaused(true) - require.Error(t, err) - require.Contains(t, err.Error(), "execution is already paused") - }) - - t.Run("unpause at the start is an error", func(t *testing.T) { - t.Parallel() - testRunState := getTestRunState(t, getTestPreInitState(t), lib.Options{}, &minirunner.MiniRunner{}) - sched, err := NewExecutionScheduler(testRunState) - require.NoError(t, err) - sched.executors = []lib.Executor{pausableExecutor{err: nil}} - err = sched.SetPaused(false) - require.Error(t, err) - require.Contains(t, err.Error(), "execution wasn't paused") - }) - - t.Run("second unpause is an error", func(t *testing.T) { - t.Parallel() - testRunState := getTestRunState(t, getTestPreInitState(t), lib.Options{}, &minirunner.MiniRunner{}) - sched, err := NewExecutionScheduler(testRunState) - require.NoError(t, err) - sched.executors = []lib.Executor{pausableExecutor{err: nil}} - require.NoError(t, sched.SetPaused(true)) - require.NoError(t, sched.SetPaused(false)) - err = sched.SetPaused(false) - require.Error(t, err) - require.Contains(t, err.Error(), "execution wasn't paused") - }) - - t.Run("an error on pausing is propagated", func(t *testing.T) { - t.Parallel() - testRunState := getTestRunState(t, getTestPreInitState(t), lib.Options{}, &minirunner.MiniRunner{}) - sched, err := NewExecutionScheduler(testRunState) - require.NoError(t, err) - expectedErr := errors.New("testing pausable executor error") - sched.executors = []lib.Executor{pausableExecutor{err: expectedErr}} - err = sched.SetPaused(true) - require.Error(t, err) - require.Equal(t, err, expectedErr) - }) - - t.Run("can't pause unpausable executor", func(t *testing.T) { - t.Parallel() - runner := &minirunner.MiniRunner{} - options, err := executor.DeriveScenariosFromShortcuts(lib.Options{ - Iterations: null.IntFrom(2), - VUs: null.IntFrom(1), - }.Apply(runner.GetOptions()), nil) - require.NoError(t, err) - - testRunState := getTestRunState(t, getTestPreInitState(t), options, runner) - sched, err := NewExecutionScheduler(testRunState) - require.NoError(t, err) - err = sched.SetPaused(true) - require.Error(t, err) - require.Contains(t, err.Error(), "doesn't support pause and resume operations after its start") - }) -} - -func TestNewExecutionSchedulerHasWork(t *testing.T) { +func TestNewSchedulerHasWork(t *testing.T) { t.Parallel() script := []byte(` import http from 'k6/http'; @@ -1406,9 +1328,13 @@ func TestNewExecutionSchedulerHasWork(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) - assert.Len(t, execScheduler.executors, 2) - assert.Len(t, execScheduler.executorConfigs, 3) + assert.Len(t, execScheduler.GetExecutors(), 2) + assert.Len(t, execScheduler.GetExecutorConfigs(), 3) + + err = execScheduler.SetPaused(true) + require.Error(t, err) + require.Contains(t, err.Error(), "doesn't support pause and resume operations after its start") } diff --git a/execution/scheduler_int_test.go b/execution/scheduler_int_test.go new file mode 100644 index 00000000000..c0451ad7e63 --- /dev/null +++ b/execution/scheduler_int_test.go @@ -0,0 +1,91 @@ +package execution + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/require" + "go.k6.io/k6/lib" + "go.k6.io/k6/lib/testutils" + "go.k6.io/k6/lib/testutils/minirunner" + "go.k6.io/k6/metrics" +) + +func getBogusTestRunState(tb testing.TB) *lib.TestRunState { + reg := metrics.NewRegistry() + piState := &lib.TestPreInitState{ + Logger: testutils.NewLogger(tb), + RuntimeOptions: lib.RuntimeOptions{}, + Registry: reg, + BuiltinMetrics: metrics.RegisterBuiltinMetrics(reg), + } + + return &lib.TestRunState{ + TestPreInitState: piState, + Options: lib.Options{}, + Runner: &minirunner.MiniRunner{}, + RunTags: piState.Registry.RootTagSet(), + } +} + +// Just a lib.PausableExecutor implementation that can return an error +type pausableExecutor struct { + lib.Executor + err error +} + +func (p pausableExecutor) SetPaused(bool) error { + return p.err +} + +func TestSetPaused(t *testing.T) { + t.Parallel() + t.Run("second pause is an error", func(t *testing.T) { + t.Parallel() + testRunState := getBogusTestRunState(t) + sched, err := NewScheduler(testRunState) + require.NoError(t, err) + sched.executors = []lib.Executor{pausableExecutor{err: nil}} + + require.NoError(t, sched.SetPaused(true)) + err = sched.SetPaused(true) + require.Error(t, err) + require.Contains(t, err.Error(), "execution is already paused") + }) + + t.Run("unpause at the start is an error", func(t *testing.T) { + t.Parallel() + testRunState := getBogusTestRunState(t) + sched, err := NewScheduler(testRunState) + require.NoError(t, err) + sched.executors = []lib.Executor{pausableExecutor{err: nil}} + err = sched.SetPaused(false) + require.Error(t, err) + require.Contains(t, err.Error(), "execution wasn't paused") + }) + + t.Run("second unpause is an error", func(t *testing.T) { + t.Parallel() + testRunState := getBogusTestRunState(t) + sched, err := NewScheduler(testRunState) + require.NoError(t, err) + sched.executors = []lib.Executor{pausableExecutor{err: nil}} + require.NoError(t, sched.SetPaused(true)) + require.NoError(t, sched.SetPaused(false)) + err = sched.SetPaused(false) + require.Error(t, err) + require.Contains(t, err.Error(), "execution wasn't paused") + }) + + t.Run("an error on pausing is propagated", func(t *testing.T) { + t.Parallel() + testRunState := getBogusTestRunState(t) + sched, err := NewScheduler(testRunState) + require.NoError(t, err) + expectedErr := errors.New("testing pausable executor error") + sched.executors = []lib.Executor{pausableExecutor{err: expectedErr}} + err = sched.SetPaused(true) + require.Error(t, err) + require.Equal(t, err, expectedErr) + }) +} diff --git a/js/runner_test.go b/js/runner_test.go index c794ac3a1c0..fa800b87b40 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -34,8 +34,8 @@ import ( "gopkg.in/guregu/null.v3" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" "go.k6.io/k6/errext" + "go.k6.io/k6/execution" "go.k6.io/k6/js/modules/k6" k6http "go.k6.io/k6/js/modules/k6/http" k6metrics "go.k6.io/k6/js/modules/k6/metrics" @@ -384,7 +384,7 @@ func TestDataIsolation(t *testing.T) { RunTags: runner.preInitState.Registry.RootTagSet().WithTagsFromMap(options.RunTags), } - execScheduler, err := local.NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) mockOutput := mockoutput.New() @@ -2670,7 +2670,7 @@ func TestExecutionInfo(t *testing.T) { Runner: r, } - execScheduler, err := local.NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) ctx = lib.WithExecutionState(ctx, execScheduler.GetState()) diff --git a/lib/execution.go b/lib/execution.go index 2d21eae6864..239041e660d 100644 --- a/lib/execution.go +++ b/lib/execution.go @@ -9,62 +9,8 @@ import ( "time" "github.com/sirupsen/logrus" - - "go.k6.io/k6/metrics" ) -// An ExecutionScheduler is in charge of initializing executors and using them -// to initialize and schedule VUs created by a wrapped Runner. It decouples how -// a swarm of VUs is controlled from the details of how or even where they're -// scheduled. -// -// The core/local execution scheduler schedules VUs on the local machine, but -// the same interface may be implemented to control a test running on a cluster -// or in the cloud. -// -// TODO: flesh out the interface after actually having more than one -// implementation... -type ExecutionScheduler interface { - // Returns the wrapped runner. May return nil if not applicable, eg. - // if we're remote controlling a test running on another machine. - GetRunner() Runner - - // Return the ExecutionState instance from which different statistics for the - // current state of the runner could be retrieved. - GetState() *ExecutionState - - // Return the instances of the configured executors - GetExecutors() []Executor - - // Init initializes all executors, including all of their needed VUs. - Init(ctx context.Context, samplesOut chan<- metrics.SampleContainer) error - - // Run the ExecutionScheduler, funneling the generated metric samples - // through the supplied out channel. - Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) error - - // Pause a test, or start/resume it. To check if a test is paused, use - // GetState().IsPaused(). - // - // Currently, any executor, so any test, can be started in a paused state. - // This will cause k6 to initialize all needed VUs, but it won't actually - // start the test. Later, the test can be started for real by - // resuming/unpausing it from the REST API. - // - // After a test is actually started, it may become impossible to pause it - // again. That is denoted by having SetPaused(true) return an error. The - // likely cause is that some of the executors for the test don't support - // pausing after the test has been started. - // - // IMPORTANT: Currently only the externally controlled executor can be - // paused and resumed multiple times in the middle of the test execution! - // Even then, "pausing" is a bit misleading, since k6 won't pause in the - // middle of the currently executing iterations. It will allow the currently - // in progress iterations to finish, and it just won't start any new ones - // nor will it increment the value returned by GetCurrentTestRunDuration(). - SetPaused(paused bool) error -} - // MaxTimeToWaitForPlannedVU specifies the maximum allowable time for an executor // to wait for a planned VU to be retrieved from the ExecutionState.PlannedVUs // buffer. If it's exceeded, k6 will emit a warning log message, since it either @@ -82,8 +28,9 @@ const MaxTimeToWaitForPlannedVU = 400 * time.Millisecond // MaxTimeToWaitForPlannedVU before we actually return an error. const MaxRetriesGetPlannedVU = 5 -// ExecutionStatus is similar to RunStatus, but more fine grained and concerns -// only local execution. +// ExecutionStatus is used to mark the possible states of a test run at any +// given time in its execution, from its start to its finish. +// //go:generate enumer -type=ExecutionStatus -trimprefix ExecutionStatus -output execution_status_gen.go type ExecutionStatus uint32 @@ -103,13 +50,13 @@ const ( ) // ExecutionState contains a few different things: -// - Some convenience items, that are needed by all executors, like the +// - Some convenience items, that are needed by all executors, like the // execution segment and the unique VU ID generator. By keeping those here, // we can just pass the ExecutionState to the different executors, instead of // individually passing them each item. -// - Mutable counters that different executors modify and other parts of +// - Mutable counters that different executors modify and other parts of // k6 can read, e.g. for the vus and vus_max metrics k6 emits every second. -// - Pausing controls and statistics. +// - Pausing controls and statistics. // // The counters and timestamps here are primarily meant to be used for // information extraction and avoidance of ID collisions. Using many of the @@ -498,9 +445,10 @@ func (es *ExecutionState) Resume() error { // // And, since tests won't be paused most of the time, it's // probably better to check for that like this: -// if executionState.IsPaused() { -// <-executionState.ResumeNotify() -// } +// +// if executionState.IsPaused() { +// <-executionState.ResumeNotify() +// } func (es *ExecutionState) ResumeNotify() <-chan struct{} { es.pauseStateLock.RLock() defer es.pauseStateLock.RUnlock() diff --git a/lib/executor/helpers.go b/lib/executor/helpers.go index 13b5df97cdb..5ab401c1d5d 100644 --- a/lib/executor/helpers.go +++ b/lib/executor/helpers.go @@ -10,6 +10,7 @@ import ( "github.com/sirupsen/logrus" "go.k6.io/k6/errext" + "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/lib/types" "go.k6.io/k6/ui/pb" @@ -56,56 +57,12 @@ func validateStages(stages []Stage) []error { return errors } -// cancelKey is the key used to store the cancel function for the context of an -// executor. This is a work around to avoid excessive changes for the ability of -// nested functions to cancel the passed context. -type cancelKey struct{} - -type cancelExec struct { - cancel context.CancelFunc - reason error -} - -// Context returns context.Context that can be cancelled by calling -// CancelExecutorContext. Use this to initialize context that will be passed to -// executors. -// -// This allows executors to globally halt any executions that uses this context. -// Example use case is when a script calls test.abort(). -func Context(ctx context.Context) context.Context { - ctx, cancel := context.WithCancel(ctx) - return context.WithValue(ctx, cancelKey{}, &cancelExec{cancel: cancel}) -} - -// cancelExecutorContext cancels executor context found in ctx, ctx can be a -// child of a context that was created with Context function. -func cancelExecutorContext(ctx context.Context, err error) { - if x := ctx.Value(cancelKey{}); x != nil { - if v, ok := x.(*cancelExec); ok { - v.reason = err - v.cancel() - } - } -} - -// CancelReason returns a reason the executor context was cancelled. This will -// return nil if ctx is not an executor context(ctx or any of its parents was -// never created by Context function). -func CancelReason(ctx context.Context) error { - if x := ctx.Value(cancelKey{}); x != nil { - if v, ok := x.(*cancelExec); ok { - return v.reason - } - } - return nil -} - // handleInterrupt returns true if err is InterruptError and if so it // cancels the executor context passed with ctx. func handleInterrupt(ctx context.Context, err error) bool { if err != nil { if errext.IsInterruptError(err) { - cancelExecutorContext(ctx, err) + execution.AbortTestRun(ctx, err) return true } }