From e9f45a2b1c53956ba7c6ea05cd619220eef7193e Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Wed, 7 Dec 2022 19:17:31 +0200 Subject: [PATCH 1/3] Move core/local.ExecutionScheduler to execution.Scheduler We also remove the lib.ExecutionScheduler interface because it only ever had a single implementation and it's unlikely we'll ever need more than that. Distributed execution will be implemented another way and we should not mock the execution scheduler, we should mock the parts it moves (e.g. the Runner and Executors, if needs be). --- api/server_test.go | 4 +- api/v1/group_routes_test.go | 4 +- api/v1/metric_routes_test.go | 6 +- api/v1/setup_teardown_routes_test.go | 4 +- api/v1/status_routes.go | 6 +- api/v1/status_routes_test.go | 6 +- cmd/run.go | 6 +- core/engine.go | 5 +- core/engine_test.go | 8 +- core/local/util_test.go | 266 ------------------ execution/pkg.go | 8 + core/local/local.go => execution/scheduler.go | 87 +++--- .../scheduler_exec_test.go | 12 +- .../scheduler_test.go | 88 +++--- js/runner_test.go | 6 +- lib/execution.go | 72 +---- 16 files changed, 147 insertions(+), 441 deletions(-) delete mode 100644 core/local/util_test.go create mode 100644 execution/pkg.go rename core/local/local.go => execution/scheduler.go (82%) rename core/local/k6execution_test.go => execution/scheduler_exec_test.go (95%) rename core/local/local_test.go => execution/scheduler_test.go (92%) diff --git a/api/server_test.go b/api/server_test.go index e6c412d7d91..dd7fc06c814 100644 --- a/api/server_test.go +++ b/api/server_test.go @@ -13,7 +13,7 @@ import ( "go.k6.io/k6/api/common" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" + "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils" "go.k6.io/k6/lib/testutils/minirunner" @@ -71,7 +71,7 @@ func TestWithEngine(t *testing.T) { Runner: &minirunner.MiniRunner{}, } - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) diff --git a/api/v1/group_routes_test.go b/api/v1/group_routes_test.go index 6dc06f74854..460fb0e4635 100644 --- a/api/v1/group_routes_test.go +++ b/api/v1/group_routes_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/require" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" + "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils" "go.k6.io/k6/lib/testutils/minirunner" @@ -50,7 +50,7 @@ func TestGetGroups(t *testing.T) { assert.NoError(t, err) testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{Group: g0}) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) diff --git a/api/v1/metric_routes_test.go b/api/v1/metric_routes_test.go index 131f16278f7..5612d5f3523 100644 --- a/api/v1/metric_routes_test.go +++ b/api/v1/metric_routes_test.go @@ -11,7 +11,7 @@ import ( "gopkg.in/guregu/null.v3" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" + "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils/minirunner" "go.k6.io/k6/metrics" @@ -23,7 +23,7 @@ func TestGetMetrics(t *testing.T) { testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{}) testMetric, err := testState.Registry.NewMetric("my_metric", metrics.Trend, metrics.Time) require.NoError(t, err) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) @@ -82,7 +82,7 @@ func TestGetMetric(t *testing.T) { testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{}) testMetric, err := testState.Registry.NewMetric("my_metric", metrics.Trend, metrics.Time) require.NoError(t, err) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) diff --git a/api/v1/setup_teardown_routes_test.go b/api/v1/setup_teardown_routes_test.go index 8829755c901..4b9e06b150e 100644 --- a/api/v1/setup_teardown_routes_test.go +++ b/api/v1/setup_teardown_routes_test.go @@ -16,7 +16,7 @@ import ( "gopkg.in/guregu/null.v3" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" + "go.k6.io/k6/execution" "go.k6.io/k6/js" "go.k6.io/k6/lib" "go.k6.io/k6/lib/types" @@ -136,7 +136,7 @@ func TestSetupData(t *testing.T) { TeardownTimeout: types.NullDurationFrom(5 * time.Second), }, runner) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) diff --git a/api/v1/status_routes.go b/api/v1/status_routes.go index f9d444dae16..dc18a822660 100644 --- a/api/v1/status_routes.go +++ b/api/v1/status_routes.go @@ -7,7 +7,7 @@ import ( "net/http" "go.k6.io/k6/api/common" - "go.k6.io/k6/lib" + "go.k6.io/k6/execution" "go.k6.io/k6/lib/executor" ) @@ -24,9 +24,7 @@ func handleGetStatus(rw http.ResponseWriter, r *http.Request) { _, _ = rw.Write(data) } -func getFirstExternallyControlledExecutor( - execScheduler lib.ExecutionScheduler, -) (*executor.ExternallyControlled, error) { +func getFirstExternallyControlledExecutor(execScheduler *execution.Scheduler) (*executor.ExternallyControlled, error) { executors := execScheduler.GetExecutors() for _, s := range executors { if mex, ok := s.(*executor.ExternallyControlled); ok { diff --git a/api/v1/status_routes_test.go b/api/v1/status_routes_test.go index 03ce1f2d4f5..bb31003ed2b 100644 --- a/api/v1/status_routes_test.go +++ b/api/v1/status_routes_test.go @@ -14,7 +14,7 @@ import ( "gopkg.in/guregu/null.v3" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" + "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils/minirunner" ) @@ -23,7 +23,7 @@ func TestGetStatus(t *testing.T) { t.Parallel() testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{}) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) @@ -110,7 +110,7 @@ func TestPatchStatus(t *testing.T) { require.NoError(t, err) testState := getTestRunState(t, lib.Options{Scenarios: scenarios}, &minirunner.MiniRunner{}) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) diff --git a/cmd/run.go b/cmd/run.go index c2060c86527..98f24af36a0 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -20,9 +20,9 @@ import ( "go.k6.io/k6/api" "go.k6.io/k6/cmd/state" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" "go.k6.io/k6/errext" "go.k6.io/k6/errext/exitcodes" + "go.k6.io/k6/execution" "go.k6.io/k6/js/common" "go.k6.io/k6/lib" "go.k6.io/k6/lib/consts" @@ -72,7 +72,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { logger := testRunState.Logger // Create a local execution scheduler wrapping the runner. logger.Debug("Initializing the execution scheduler...") - execScheduler, err := local.NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) if err != nil { return err } @@ -332,7 +332,7 @@ a commandline interface for interacting with it.`, return runCmd } -func reportUsage(execScheduler *local.ExecutionScheduler) error { +func reportUsage(execScheduler *execution.Scheduler) error { execState := execScheduler.GetState() executorConfigs := execScheduler.GetExecutorConfigs() diff --git a/core/engine.go b/core/engine.go index 5e31a567a4f..aa37d4bf4d1 100644 --- a/core/engine.go +++ b/core/engine.go @@ -10,6 +10,7 @@ import ( "go.k6.io/k6/errext" "go.k6.io/k6/errext/exitcodes" + "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/metrics" "go.k6.io/k6/metrics/engine" @@ -33,7 +34,7 @@ type Engine struct { // TODO: completely remove the engine and use all of these separately, in a // much more composable and testable manner - ExecutionScheduler lib.ExecutionScheduler + ExecutionScheduler *execution.Scheduler MetricsEngine *engine.MetricsEngine OutputManager *output.Manager @@ -53,7 +54,7 @@ type Engine struct { } // NewEngine instantiates a new Engine, without doing any heavy initialization. -func NewEngine(testState *lib.TestRunState, ex lib.ExecutionScheduler, outputs []output.Output) (*Engine, error) { +func NewEngine(testState *lib.TestRunState, ex *execution.Scheduler, outputs []output.Output) (*Engine, error) { if ex == nil { return nil, errors.New("missing ExecutionScheduler instance") } diff --git a/core/engine_test.go b/core/engine_test.go index d06962dd242..30e3a8984e1 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -14,8 +14,8 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/guregu/null.v3" - "go.k6.io/k6/core/local" "go.k6.io/k6/errext" + "go.k6.io/k6/execution" "go.k6.io/k6/js" "go.k6.io/k6/lib" "go.k6.io/k6/lib/executor" @@ -82,7 +82,7 @@ func newTestEngineWithTestPreInitState( //nolint:golint testRunState := getTestRunState(t, piState, newOpts, runner) - execScheduler, err := local.NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) engine, err := NewEngine(testRunState, execScheduler, outputs) @@ -927,7 +927,7 @@ func TestVuInitException(t *testing.T) { testState := getTestRunState(t, piState, opts, runner) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := NewEngine(testState, execScheduler, nil) require.NoError(t, err) @@ -1315,7 +1315,7 @@ func TestActiveVUsCount(t *testing.T) { require.NoError(t, err) testState := getTestRunState(t, piState, opts, runner) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := NewEngine(testState, execScheduler, []output.Output{mockOutput}) require.NoError(t, err) diff --git a/core/local/util_test.go b/core/local/util_test.go deleted file mode 100644 index 55426902c19..00000000000 --- a/core/local/util_test.go +++ /dev/null @@ -1,266 +0,0 @@ -package local - -//TODO: translate this test to the new paradigm -/* -func TestProcessStages(t *testing.T) { - type checkpoint struct { - D time.Duration - Keep bool - VUs null.Int - } - testdata := map[string]struct { - Start int64 - Stages []lib.Stage - Checkpoints []checkpoint - }{ - "none": { - 0, - []lib.Stage{}, - []checkpoint{ - {0 * time.Second, false, null.NewInt(0, false)}, - {10 * time.Second, false, null.NewInt(0, false)}, - {24 * time.Hour, false, null.NewInt(0, false)}, - }, - }, - "one": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(10 * time.Second)}, - }, - []checkpoint{ - {0 * time.Second, true, null.NewInt(0, false)}, - {1 * time.Second, true, null.NewInt(0, false)}, - {10 * time.Second, true, null.NewInt(0, false)}, - {11 * time.Second, false, null.NewInt(0, false)}, - }, - }, - "one/start": { - 5, - []lib.Stage{ - {Duration: types.NullDurationFrom(10 * time.Second)}, - }, - []checkpoint{ - {0 * time.Second, true, null.NewInt(5, false)}, - {1 * time.Second, true, null.NewInt(5, false)}, - {10 * time.Second, true, null.NewInt(5, false)}, - {11 * time.Second, false, null.NewInt(5, false)}, - }, - }, - "one/targeted": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(10 * time.Second), Target: null.IntFrom(100)}, - }, - []checkpoint{ - {0 * time.Second, true, null.IntFrom(0)}, - {1 * time.Second, true, null.IntFrom(10)}, - {2 * time.Second, true, null.IntFrom(20)}, - {3 * time.Second, true, null.IntFrom(30)}, - {4 * time.Second, true, null.IntFrom(40)}, - {5 * time.Second, true, null.IntFrom(50)}, - {6 * time.Second, true, null.IntFrom(60)}, - {7 * time.Second, true, null.IntFrom(70)}, - {8 * time.Second, true, null.IntFrom(80)}, - {9 * time.Second, true, null.IntFrom(90)}, - {10 * time.Second, true, null.IntFrom(100)}, - {11 * time.Second, false, null.IntFrom(100)}, - }, - }, - "one/targeted/start": { - 50, - []lib.Stage{ - {Duration: types.NullDurationFrom(10 * time.Second), Target: null.IntFrom(100)}, - }, - []checkpoint{ - {0 * time.Second, true, null.IntFrom(50)}, - {1 * time.Second, true, null.IntFrom(55)}, - {2 * time.Second, true, null.IntFrom(60)}, - {3 * time.Second, true, null.IntFrom(65)}, - {4 * time.Second, true, null.IntFrom(70)}, - {5 * time.Second, true, null.IntFrom(75)}, - {6 * time.Second, true, null.IntFrom(80)}, - {7 * time.Second, true, null.IntFrom(85)}, - {8 * time.Second, true, null.IntFrom(90)}, - {9 * time.Second, true, null.IntFrom(95)}, - {10 * time.Second, true, null.IntFrom(100)}, - {11 * time.Second, false, null.IntFrom(100)}, - }, - }, - "two": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second)}, - {Duration: types.NullDurationFrom(5 * time.Second)}, - }, - []checkpoint{ - {0 * time.Second, true, null.NewInt(0, false)}, - {1 * time.Second, true, null.NewInt(0, false)}, - {11 * time.Second, false, null.NewInt(0, false)}, - }, - }, - "two/start": { - 5, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second)}, - {Duration: types.NullDurationFrom(5 * time.Second)}, - }, - []checkpoint{ - {0 * time.Second, true, null.NewInt(5, false)}, - {1 * time.Second, true, null.NewInt(5, false)}, - {11 * time.Second, false, null.NewInt(5, false)}, - }, - }, - "two/targeted": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(100)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(0)}, - }, - []checkpoint{ - {0 * time.Second, true, null.IntFrom(0)}, - {1 * time.Second, true, null.IntFrom(20)}, - {2 * time.Second, true, null.IntFrom(40)}, - {3 * time.Second, true, null.IntFrom(60)}, - {4 * time.Second, true, null.IntFrom(80)}, - {5 * time.Second, true, null.IntFrom(100)}, - {6 * time.Second, true, null.IntFrom(80)}, - {7 * time.Second, true, null.IntFrom(60)}, - {8 * time.Second, true, null.IntFrom(40)}, - {9 * time.Second, true, null.IntFrom(20)}, - {10 * time.Second, true, null.IntFrom(0)}, - {11 * time.Second, false, null.IntFrom(0)}, - }, - }, - "three": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second)}, - {Duration: types.NullDurationFrom(10 * time.Second)}, - {Duration: types.NullDurationFrom(15 * time.Second)}, - }, - []checkpoint{ - {0 * time.Second, true, null.NewInt(0, false)}, - {1 * time.Second, true, null.NewInt(0, false)}, - {15 * time.Second, true, null.NewInt(0, false)}, - {30 * time.Second, true, null.NewInt(0, false)}, - {31 * time.Second, false, null.NewInt(0, false)}, - }, - }, - "three/targeted": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(50)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(100)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(0)}, - }, - []checkpoint{ - {0 * time.Second, true, null.IntFrom(0)}, - {1 * time.Second, true, null.IntFrom(10)}, - {2 * time.Second, true, null.IntFrom(20)}, - {3 * time.Second, true, null.IntFrom(30)}, - {4 * time.Second, true, null.IntFrom(40)}, - {5 * time.Second, true, null.IntFrom(50)}, - {6 * time.Second, true, null.IntFrom(60)}, - {7 * time.Second, true, null.IntFrom(70)}, - {8 * time.Second, true, null.IntFrom(80)}, - {9 * time.Second, true, null.IntFrom(90)}, - {10 * time.Second, true, null.IntFrom(100)}, - {11 * time.Second, true, null.IntFrom(80)}, - {12 * time.Second, true, null.IntFrom(60)}, - {13 * time.Second, true, null.IntFrom(40)}, - {14 * time.Second, true, null.IntFrom(20)}, - {15 * time.Second, true, null.IntFrom(0)}, - {16 * time.Second, false, null.IntFrom(0)}, - }, - }, - "mix": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(20)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(10)}, - {Duration: types.NullDurationFrom(2 * time.Second)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(20)}, - {Duration: types.NullDurationFrom(2 * time.Second)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(10)}, - }, - []checkpoint{ - {0 * time.Second, true, null.IntFrom(0)}, - - {1 * time.Second, true, null.IntFrom(4)}, - {2 * time.Second, true, null.IntFrom(8)}, - {3 * time.Second, true, null.IntFrom(12)}, - {4 * time.Second, true, null.IntFrom(16)}, - {5 * time.Second, true, null.IntFrom(20)}, - - {6 * time.Second, true, null.IntFrom(18)}, - {7 * time.Second, true, null.IntFrom(16)}, - {8 * time.Second, true, null.IntFrom(14)}, - {9 * time.Second, true, null.IntFrom(12)}, - {10 * time.Second, true, null.IntFrom(10)}, - - {11 * time.Second, true, null.IntFrom(10)}, - {12 * time.Second, true, null.IntFrom(10)}, - - {13 * time.Second, true, null.IntFrom(12)}, - {14 * time.Second, true, null.IntFrom(14)}, - {15 * time.Second, true, null.IntFrom(16)}, - {16 * time.Second, true, null.IntFrom(18)}, - {17 * time.Second, true, null.IntFrom(20)}, - - {18 * time.Second, true, null.IntFrom(20)}, - {19 * time.Second, true, null.IntFrom(20)}, - - {20 * time.Second, true, null.IntFrom(18)}, - {21 * time.Second, true, null.IntFrom(16)}, - {22 * time.Second, true, null.IntFrom(14)}, - {23 * time.Second, true, null.IntFrom(12)}, - {24 * time.Second, true, null.IntFrom(10)}, - }, - }, - "mix/start": { - 5, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(10)}, - }, - []checkpoint{ - {0 * time.Second, true, null.NewInt(5, false)}, - - {1 * time.Second, true, null.NewInt(5, false)}, - {2 * time.Second, true, null.NewInt(5, false)}, - {3 * time.Second, true, null.NewInt(5, false)}, - {4 * time.Second, true, null.NewInt(5, false)}, - {5 * time.Second, true, null.NewInt(5, false)}, - - {6 * time.Second, true, null.NewInt(6, true)}, - {7 * time.Second, true, null.NewInt(7, true)}, - {8 * time.Second, true, null.NewInt(8, true)}, - {9 * time.Second, true, null.NewInt(9, true)}, - {10 * time.Second, true, null.NewInt(10, true)}, - }, - }, - "infinite": { - 0, - []lib.Stage{{}}, - []checkpoint{ - {0 * time.Second, true, null.NewInt(0, false)}, - {1 * time.Minute, true, null.NewInt(0, false)}, - {1 * time.Hour, true, null.NewInt(0, false)}, - {24 * time.Hour, true, null.NewInt(0, false)}, - {365 * 24 * time.Hour, true, null.NewInt(0, false)}, - }, - }, - } - for name, data := range testdata { - t.Run(name, func(t *testing.T) { - for _, ckp := range data.Checkpoints { - t.Run(ckp.D.String(), func(t *testing.T) { - vus, keepRunning := ProcessStages(data.Start, data.Stages, ckp.D) - assert.Equal(t, ckp.VUs, vus) - assert.Equal(t, ckp.Keep, keepRunning) - }) - } - }) - } -} -*/ diff --git a/execution/pkg.go b/execution/pkg.go new file mode 100644 index 00000000000..5cf88bf880e --- /dev/null +++ b/execution/pkg.go @@ -0,0 +1,8 @@ +// Package execution contains most of the components that schedule, execute and +// control individual k6 tests. +package execution + +// TODO: move ExecutionSegment and ESS here + +// TODO: move execotors interfaces here and implementations in a sub-folder +// TODO: move the execution state here diff --git a/core/local/local.go b/execution/scheduler.go similarity index 82% rename from core/local/local.go rename to execution/scheduler.go index 61c9b1e59a1..a2e7214330e 100644 --- a/core/local/local.go +++ b/execution/scheduler.go @@ -1,4 +1,4 @@ -package local +package execution import ( "context" @@ -16,8 +16,10 @@ import ( "go.k6.io/k6/ui/pb" ) -// ExecutionScheduler is the local implementation of lib.ExecutionScheduler -type ExecutionScheduler struct { +// A Scheduler is in charge of most of the test execution - initializing VUs and +// executors, running setup() and teardown(), and actually starting the +// executors for the different scenarios at the appropriate times. +type Scheduler struct { initProgress *pb.ProgressBar executorConfigs []lib.ExecutorConfig // sorted by (startTime, ID) executors []lib.Executor // sorted by (startTime, ID), excludes executors with no work @@ -31,14 +33,11 @@ type ExecutionScheduler struct { stopVUsEmission, vusEmissionStopped chan struct{} } -// Check to see if we implement the lib.ExecutionScheduler interface -var _ lib.ExecutionScheduler = &ExecutionScheduler{} - -// NewExecutionScheduler creates and returns a new local lib.ExecutionScheduler -// instance, without initializing it beyond the bare minimum. Specifically, it -// creates the needed executor instances and a lot of state placeholders, but it -// doesn't initialize the executors and it doesn't initialize or run VUs. -func NewExecutionScheduler(trs *lib.TestRunState) (*ExecutionScheduler, error) { +// NewScheduler creates and returns a new Scheduler instance, without +// initializing it beyond the bare minimum. Specifically, it creates the needed +// executor instances and a lot of state placeholders, but it doesn't initialize +// the executors and it doesn't initialize or run VUs. +func NewScheduler(trs *lib.TestRunState) (*Scheduler, error) { options := trs.Options et, err := lib.NewExecutionTuple(options.ExecutionSegment, options.ExecutionSegmentSequence) if err != nil { @@ -78,7 +77,7 @@ func NewExecutionScheduler(trs *lib.TestRunState) (*ExecutionScheduler, error) { } } - return &ExecutionScheduler{ + return &Scheduler{ initProgress: pb.New(pb.WithConstLeft("Init")), executors: executors, executorConfigs: executorConfigs, @@ -93,48 +92,48 @@ func NewExecutionScheduler(trs *lib.TestRunState) (*ExecutionScheduler, error) { } // GetRunner returns the wrapped lib.Runner instance. -func (e *ExecutionScheduler) GetRunner() lib.Runner { // TODO: remove +func (e *Scheduler) GetRunner() lib.Runner { // TODO: remove return e.state.Test.Runner } -// GetState returns a pointer to the execution state struct for the local -// execution scheduler. It's guaranteed to be initialized and present, though -// see the documentation in lib/execution.go for caveats about its usage. The -// most important one is that none of the methods beyond the pause-related ones +// GetState returns a pointer to the execution state struct for the execution +// scheduler. It's guaranteed to be initialized and present, though see the +// documentation in lib/execution.go for caveats about its usage. The most +// important one is that none of the methods beyond the pause-related ones // should be used for synchronization. -func (e *ExecutionScheduler) GetState() *lib.ExecutionState { +func (e *Scheduler) GetState() *lib.ExecutionState { return e.state } // GetExecutors returns the slice of configured executor instances which // have work, sorted by their (startTime, name) in an ascending order. -func (e *ExecutionScheduler) GetExecutors() []lib.Executor { +func (e *Scheduler) GetExecutors() []lib.Executor { return e.executors } // GetExecutorConfigs returns the slice of all executor configs, sorted by // their (startTime, name) in an ascending order. -func (e *ExecutionScheduler) GetExecutorConfigs() []lib.ExecutorConfig { +func (e *Scheduler) GetExecutorConfigs() []lib.ExecutorConfig { return e.executorConfigs } // GetInitProgressBar returns the progress bar associated with the Init // function. After the Init is done, it is "hijacked" to display real-time // execution statistics as a text bar. -func (e *ExecutionScheduler) GetInitProgressBar() *pb.ProgressBar { +func (e *Scheduler) GetInitProgressBar() *pb.ProgressBar { return e.initProgress } // GetExecutionPlan is a helper method so users of the local execution scheduler // don't have to calculate the execution plan again. -func (e *ExecutionScheduler) GetExecutionPlan() []lib.ExecutionStep { +func (e *Scheduler) GetExecutionPlan() []lib.ExecutionStep { return e.executionPlan } // initVU is a helper method that's used to both initialize the planned VUs // in the Init() method, and also passed to executors so they can initialize // any unplanned VUs themselves. -func (e *ExecutionScheduler) initVU( +func (e *Scheduler) initVU( ctx context.Context, samplesOut chan<- metrics.SampleContainer, logger logrus.FieldLogger, ) (lib.InitializedVU, error) { // Get the VU IDs here, so that the VUs are (mostly) ordered by their @@ -151,7 +150,7 @@ func (e *ExecutionScheduler) initVU( // getRunStats is a helper function that can be used as the execution // scheduler's progressbar substitute (i.e. hijack). -func (e *ExecutionScheduler) getRunStats() string { +func (e *Scheduler) getRunStats() string { status := "running" if e.state.IsPaused() { status = "paused" @@ -169,7 +168,7 @@ func (e *ExecutionScheduler) getRunStats() string { ) } -func (e *ExecutionScheduler) initVUsConcurrently( +func (e *Scheduler) initVUsConcurrently( ctx context.Context, samplesOut chan<- metrics.SampleContainer, count uint64, concurrency int, logger logrus.FieldLogger, ) chan error { @@ -206,7 +205,7 @@ func (e *ExecutionScheduler) initVUsConcurrently( return doneInits } -func (e *ExecutionScheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.SampleContainer) { +func (e *Scheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.SampleContainer) { e.state.Test.Logger.Debug("Starting emission of VUs and VUsMax metrics...") tags := e.state.Test.RunTags @@ -259,7 +258,7 @@ func (e *ExecutionScheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- me // Init concurrently initializes all of the planned VUs and then sequentially // initializes all of the configured executors. -func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) { +func (e *Scheduler) Init(ctx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) { e.emitVUsAndVUsMax(ctx, samplesOut) defer func() { if err != nil { @@ -268,7 +267,7 @@ func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- metrics } }() - logger := e.state.Test.Logger.WithField("phase", "local-execution-scheduler-init") + logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-init") vusToInitialize := lib.GetMaxPlannedVUs(e.executionPlan) logger.WithFields(logrus.Fields{ "neededVUs": vusToInitialize, @@ -342,7 +341,7 @@ func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- metrics // executor, each time in a new goroutine. It is responsible for waiting out the // configured startTime for the specific executor and then running its Run() // method. -func (e *ExecutionScheduler) runExecutor( +func (e *Scheduler) runExecutor( runCtx context.Context, runResults chan<- error, engineOut chan<- metrics.SampleContainer, executor lib.Executor, ) { executorConfig := executor.GetConfig() @@ -389,18 +388,18 @@ func (e *ExecutionScheduler) runExecutor( runResults <- err } -// Run the ExecutionScheduler, funneling all generated metric samples through the supplied +// Run the Scheduler, funneling all generated metric samples through the supplied // out channel. // //nolint:funlen -func (e *ExecutionScheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metrics.SampleContainer) error { +func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metrics.SampleContainer) error { defer func() { close(e.stopVUsEmission) <-e.vusEmissionStopped }() executorsCount := len(e.executors) - logger := e.state.Test.Logger.WithField("phase", "local-execution-scheduler-run") + logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-run") e.initProgress.Modify(pb.WithConstLeft("Run")) var interrupted bool defer func() { @@ -487,10 +486,26 @@ func (e *ExecutionScheduler) Run(globalCtx, runCtx context.Context, engineOut ch return firstErr } -// SetPaused pauses a test, if called with true. And if called with false, tries -// to start/resume it. See the lib.ExecutionScheduler interface documentation of -// the methods for the various caveats about its usage. -func (e *ExecutionScheduler) SetPaused(pause bool) error { +// SetPaused pauses the test, or start/resumes it. To check if a test is paused, +// use GetState().IsPaused(). +// +// Currently, any executor, so any test, can be started in a paused state. This +// will cause k6 to initialize all needed VUs, but it won't actually start the +// test. Later, the test can be started for real by resuming/unpausing it from +// the REST API. +// +// After a test is actually started, it may become impossible to pause it again. +// That is signaled by having SetPaused(true) return an error. The likely cause +// is that some of the executors for the test don't support pausing after the +// test has been started. +// +// IMPORTANT: Currently only the externally controlled executor can be paused +// and resumed multiple times in the middle of the test execution! Even then, +// "pausing" is a bit misleading, since k6 won't pause in the middle of the +// currently executing iterations. It will allow the currently in-progress +// iterations to finish, and it just won't start any new ones nor will it +// increment the value returned by GetCurrentTestRunDuration(). +func (e *Scheduler) SetPaused(pause bool) error { if !e.state.HasStarted() && e.state.IsPaused() { if pause { return fmt.Errorf("execution is already paused") diff --git a/core/local/k6execution_test.go b/execution/scheduler_exec_test.go similarity index 95% rename from core/local/k6execution_test.go rename to execution/scheduler_exec_test.go index c97fd15ab32..570d20e00d3 100644 --- a/core/local/k6execution_test.go +++ b/execution/scheduler_exec_test.go @@ -1,4 +1,4 @@ -package local +package execution import ( "encoding/json" @@ -18,6 +18,8 @@ import ( "go.k6.io/k6/metrics" ) +// TODO: rewrite and/or move these as integration tests to reduce boilerplate +// and improve reliability? func TestExecutionInfoVUSharing(t *testing.T) { t.Parallel() script := []byte(` @@ -81,7 +83,7 @@ func TestExecutionInfoVUSharing(t *testing.T) { ) require.NoError(t, err) - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, lib.Options{}) defer cancel() type vuStat struct { @@ -194,7 +196,7 @@ func TestExecutionInfoScenarioIter(t *testing.T) { ) require.NoError(t, err) - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, lib.Options{}) defer cancel() errCh := make(chan error, 1) @@ -276,7 +278,7 @@ func TestSharedIterationsStable(t *testing.T) { ) require.NoError(t, err) - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, lib.Options{}) defer cancel() errCh := make(chan error, 1) @@ -409,7 +411,7 @@ func TestExecutionInfoAll(t *testing.T) { }, nil) require.NoError(t, err) - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, lib.Options{}) defer cancel() errCh := make(chan error, 1) diff --git a/core/local/local_test.go b/execution/scheduler_test.go similarity index 92% rename from core/local/local_test.go rename to execution/scheduler_test.go index dcb0ddf4a72..761725fbdd4 100644 --- a/core/local/local_test.go +++ b/execution/scheduler_test.go @@ -1,4 +1,4 @@ -package local +package execution import ( "context" @@ -56,9 +56,9 @@ func getTestRunState( } } -func newTestExecutionScheduler( +func newTestScheduler( t *testing.T, runner lib.Runner, logger *logrus.Logger, opts lib.Options, -) (ctx context.Context, cancel func(), execScheduler *ExecutionScheduler, samples chan metrics.SampleContainer) { +) (ctx context.Context, cancel func(), execScheduler *Scheduler, samples chan metrics.SampleContainer) { if runner == nil { runner = &minirunner.MiniRunner{} } @@ -73,7 +73,7 @@ func newTestExecutionScheduler( testRunState.Logger = logger } - execScheduler, err = NewExecutionScheduler(testRunState) + execScheduler, err = NewScheduler(testRunState) require.NoError(t, err) samples = make(chan metrics.SampleContainer, newOpts.MetricSamplesBufferSize.Int64) @@ -92,9 +92,9 @@ func newTestExecutionScheduler( return ctx, cancel, execScheduler, samples } -func TestExecutionSchedulerRun(t *testing.T) { +func TestSchedulerRun(t *testing.T) { t.Parallel() - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, nil, nil, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, nil, nil, lib.Options{}) defer cancel() err := make(chan error, 1) @@ -102,7 +102,7 @@ func TestExecutionSchedulerRun(t *testing.T) { assert.NoError(t, <-err) } -func TestExecutionSchedulerRunNonDefault(t *testing.T) { +func TestSchedulerRunNonDefault(t *testing.T) { t.Parallel() testCases := []struct { @@ -136,7 +136,7 @@ func TestExecutionSchedulerRunNonDefault(t *testing.T) { testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -165,7 +165,7 @@ func TestExecutionSchedulerRunNonDefault(t *testing.T) { } } -func TestExecutionSchedulerRunEnv(t *testing.T) { +func TestSchedulerRunEnv(t *testing.T) { t.Parallel() scriptTemplate := ` @@ -253,7 +253,7 @@ func TestExecutionSchedulerRunEnv(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -280,7 +280,7 @@ func TestExecutionSchedulerRunEnv(t *testing.T) { } } -func TestExecutionSchedulerSystemTags(t *testing.T) { +func TestSchedulerSystemTags(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) sr := tb.Replacer.Replace @@ -322,7 +322,7 @@ func TestExecutionSchedulerSystemTags(t *testing.T) { }))) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) @@ -372,7 +372,7 @@ func TestExecutionSchedulerSystemTags(t *testing.T) { } } -func TestExecutionSchedulerRunCustomTags(t *testing.T) { +func TestSchedulerRunCustomTags(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) sr := tb.Replacer.Replace @@ -453,7 +453,7 @@ func TestExecutionSchedulerRunCustomTags(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) @@ -496,7 +496,7 @@ func TestExecutionSchedulerRunCustomTags(t *testing.T) { // Ensure that custom executor settings are unique per executor and // that there's no "crossover"/"pollution" between executors. // Also test that custom tags are properly set on checks and groups metrics. -func TestExecutionSchedulerRunCustomConfigNoCrossover(t *testing.T) { +func TestSchedulerRunCustomConfigNoCrossover(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) @@ -617,7 +617,7 @@ func TestExecutionSchedulerRunCustomConfigNoCrossover(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) @@ -686,7 +686,7 @@ func TestExecutionSchedulerRunCustomConfigNoCrossover(t *testing.T) { require.Equal(t, 8, gotSampleTags, "received wrong amount of samples with expected tags") } -func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { +func TestSchedulerSetupTeardownRun(t *testing.T) { t.Parallel() t.Run("Normal", func(t *testing.T) { t.Parallel() @@ -702,7 +702,7 @@ func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { return nil }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{}) err := make(chan error, 1) go func() { err <- execScheduler.Run(ctx, ctx, samples) }() @@ -718,7 +718,7 @@ func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { return nil, errors.New("setup error") }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{}) defer cancel() assert.EqualError(t, execScheduler.Run(ctx, ctx, samples), "setup error") }) @@ -732,7 +732,7 @@ func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { return errors.New("teardown error") }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{ + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{ NoSetup: null.BoolFrom(true), VUs: null.IntFrom(1), Iterations: null.IntFrom(1), @@ -751,7 +751,7 @@ func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { return errors.New("teardown error") }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{ + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{ VUs: null.IntFrom(1), Iterations: null.IntFrom(1), }) @@ -769,7 +769,7 @@ func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { return errors.New("teardown error") }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{ + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{ NoTeardown: null.BoolFrom(true), VUs: null.IntFrom(1), Iterations: null.IntFrom(1), @@ -779,7 +779,7 @@ func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { }) } -func TestExecutionSchedulerStages(t *testing.T) { +func TestSchedulerStages(t *testing.T) { t.Parallel() testdata := map[string]struct { Duration time.Duration @@ -815,7 +815,7 @@ func TestExecutionSchedulerStages(t *testing.T) { return nil }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{ + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{ VUs: null.IntFrom(1), Stages: data.Stages, }) @@ -826,7 +826,7 @@ func TestExecutionSchedulerStages(t *testing.T) { } } -func TestExecutionSchedulerEndTime(t *testing.T) { +func TestSchedulerEndTime(t *testing.T) { t.Parallel() runner := &minirunner.MiniRunner{ Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { @@ -834,7 +834,7 @@ func TestExecutionSchedulerEndTime(t *testing.T) { return nil }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{ + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{ VUs: null.IntFrom(10), Duration: types.NullDurationFrom(1 * time.Second), }) @@ -851,7 +851,7 @@ func TestExecutionSchedulerEndTime(t *testing.T) { assert.True(t, runTime < 10*time.Second, "took more than 10 seconds") } -func TestExecutionSchedulerRuntimeErrors(t *testing.T) { +func TestSchedulerRuntimeErrors(t *testing.T) { t.Parallel() runner := &minirunner.MiniRunner{ Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { @@ -864,7 +864,7 @@ func TestExecutionSchedulerRuntimeErrors(t *testing.T) { }, } logger, hook := logtest.NewNullLogger() - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, lib.Options{}) defer cancel() endTime, isFinal := lib.GetEndOffset(execScheduler.GetExecutionPlan()) @@ -883,7 +883,7 @@ func TestExecutionSchedulerRuntimeErrors(t *testing.T) { } } -func TestExecutionSchedulerEndErrors(t *testing.T) { +func TestSchedulerEndErrors(t *testing.T) { t.Parallel() exec := executor.NewConstantVUsConfig("we_need_hard_stop") @@ -901,7 +901,7 @@ func TestExecutionSchedulerEndErrors(t *testing.T) { }, } logger, hook := logtest.NewNullLogger() - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, lib.Options{}) defer cancel() endTime, isFinal := lib.GetEndOffset(execScheduler.GetExecutionPlan()) @@ -917,7 +917,7 @@ func TestExecutionSchedulerEndErrors(t *testing.T) { assert.Empty(t, hook.Entries) } -func TestExecutionSchedulerEndIterations(t *testing.T) { +func TestSchedulerEndIterations(t *testing.T) { t.Parallel() registry := metrics.NewRegistry() metric := registry.MustNewMetric("test_metric", metrics.Counter) @@ -954,7 +954,7 @@ func TestExecutionSchedulerEndIterations(t *testing.T) { defer cancel() testRunState := getTestRunState(t, getTestPreInitState(t), runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := NewScheduler(testRunState) require.NoError(t, err) samples := make(chan metrics.SampleContainer, 300) @@ -972,7 +972,7 @@ func TestExecutionSchedulerEndIterations(t *testing.T) { } } -func TestExecutionSchedulerIsRunning(t *testing.T) { +func TestSchedulerIsRunning(t *testing.T) { t.Parallel() runner := &minirunner.MiniRunner{ Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { @@ -980,7 +980,7 @@ func TestExecutionSchedulerIsRunning(t *testing.T) { return nil }, } - ctx, cancel, execScheduler, _ := newTestExecutionScheduler(t, runner, nil, lib.Options{}) + ctx, cancel, execScheduler, _ := newTestScheduler(t, runner, nil, lib.Options{}) state := execScheduler.GetState() err := make(chan error) @@ -995,7 +995,7 @@ func TestExecutionSchedulerIsRunning(t *testing.T) { assert.NoError(t, <-err) } -// TestDNSResolver checks the DNS resolution behavior at the ExecutionScheduler level. +// TestDNSResolver checks the DNS resolution behavior at the Scheduler level. func TestDNSResolver(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) @@ -1072,7 +1072,7 @@ func TestDNSResolver(t *testing.T) { mr := mockresolver.New(nil, net.LookupIP) runner.ActualResolver = mr.LookupIPAll - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, tc.opts) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, tc.opts) defer cancel() mr.Set("myhost", sr("HTTPBIN_IP")) @@ -1160,7 +1160,7 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, options, runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -1295,7 +1295,7 @@ func TestSetPaused(t *testing.T) { t.Run("second pause is an error", func(t *testing.T) { t.Parallel() testRunState := getTestRunState(t, getTestPreInitState(t), lib.Options{}, &minirunner.MiniRunner{}) - sched, err := NewExecutionScheduler(testRunState) + sched, err := NewScheduler(testRunState) require.NoError(t, err) sched.executors = []lib.Executor{pausableExecutor{err: nil}} @@ -1308,7 +1308,7 @@ func TestSetPaused(t *testing.T) { t.Run("unpause at the start is an error", func(t *testing.T) { t.Parallel() testRunState := getTestRunState(t, getTestPreInitState(t), lib.Options{}, &minirunner.MiniRunner{}) - sched, err := NewExecutionScheduler(testRunState) + sched, err := NewScheduler(testRunState) require.NoError(t, err) sched.executors = []lib.Executor{pausableExecutor{err: nil}} err = sched.SetPaused(false) @@ -1319,7 +1319,7 @@ func TestSetPaused(t *testing.T) { t.Run("second unpause is an error", func(t *testing.T) { t.Parallel() testRunState := getTestRunState(t, getTestPreInitState(t), lib.Options{}, &minirunner.MiniRunner{}) - sched, err := NewExecutionScheduler(testRunState) + sched, err := NewScheduler(testRunState) require.NoError(t, err) sched.executors = []lib.Executor{pausableExecutor{err: nil}} require.NoError(t, sched.SetPaused(true)) @@ -1332,7 +1332,7 @@ func TestSetPaused(t *testing.T) { t.Run("an error on pausing is propagated", func(t *testing.T) { t.Parallel() testRunState := getTestRunState(t, getTestPreInitState(t), lib.Options{}, &minirunner.MiniRunner{}) - sched, err := NewExecutionScheduler(testRunState) + sched, err := NewScheduler(testRunState) require.NoError(t, err) expectedErr := errors.New("testing pausable executor error") sched.executors = []lib.Executor{pausableExecutor{err: expectedErr}} @@ -1351,7 +1351,7 @@ func TestSetPaused(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, getTestPreInitState(t), options, runner) - sched, err := NewExecutionScheduler(testRunState) + sched, err := NewScheduler(testRunState) require.NoError(t, err) err = sched.SetPaused(true) require.Error(t, err) @@ -1359,7 +1359,7 @@ func TestSetPaused(t *testing.T) { }) } -func TestNewExecutionSchedulerHasWork(t *testing.T) { +func TestNewSchedulerHasWork(t *testing.T) { t.Parallel() script := []byte(` import http from 'k6/http'; @@ -1406,7 +1406,7 @@ func TestNewExecutionSchedulerHasWork(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := NewScheduler(testRunState) require.NoError(t, err) assert.Len(t, execScheduler.executors, 2) diff --git a/js/runner_test.go b/js/runner_test.go index c794ac3a1c0..fa800b87b40 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -34,8 +34,8 @@ import ( "gopkg.in/guregu/null.v3" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" "go.k6.io/k6/errext" + "go.k6.io/k6/execution" "go.k6.io/k6/js/modules/k6" k6http "go.k6.io/k6/js/modules/k6/http" k6metrics "go.k6.io/k6/js/modules/k6/metrics" @@ -384,7 +384,7 @@ func TestDataIsolation(t *testing.T) { RunTags: runner.preInitState.Registry.RootTagSet().WithTagsFromMap(options.RunTags), } - execScheduler, err := local.NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) mockOutput := mockoutput.New() @@ -2670,7 +2670,7 @@ func TestExecutionInfo(t *testing.T) { Runner: r, } - execScheduler, err := local.NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) ctx = lib.WithExecutionState(ctx, execScheduler.GetState()) diff --git a/lib/execution.go b/lib/execution.go index 2d21eae6864..239041e660d 100644 --- a/lib/execution.go +++ b/lib/execution.go @@ -9,62 +9,8 @@ import ( "time" "github.com/sirupsen/logrus" - - "go.k6.io/k6/metrics" ) -// An ExecutionScheduler is in charge of initializing executors and using them -// to initialize and schedule VUs created by a wrapped Runner. It decouples how -// a swarm of VUs is controlled from the details of how or even where they're -// scheduled. -// -// The core/local execution scheduler schedules VUs on the local machine, but -// the same interface may be implemented to control a test running on a cluster -// or in the cloud. -// -// TODO: flesh out the interface after actually having more than one -// implementation... -type ExecutionScheduler interface { - // Returns the wrapped runner. May return nil if not applicable, eg. - // if we're remote controlling a test running on another machine. - GetRunner() Runner - - // Return the ExecutionState instance from which different statistics for the - // current state of the runner could be retrieved. - GetState() *ExecutionState - - // Return the instances of the configured executors - GetExecutors() []Executor - - // Init initializes all executors, including all of their needed VUs. - Init(ctx context.Context, samplesOut chan<- metrics.SampleContainer) error - - // Run the ExecutionScheduler, funneling the generated metric samples - // through the supplied out channel. - Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) error - - // Pause a test, or start/resume it. To check if a test is paused, use - // GetState().IsPaused(). - // - // Currently, any executor, so any test, can be started in a paused state. - // This will cause k6 to initialize all needed VUs, but it won't actually - // start the test. Later, the test can be started for real by - // resuming/unpausing it from the REST API. - // - // After a test is actually started, it may become impossible to pause it - // again. That is denoted by having SetPaused(true) return an error. The - // likely cause is that some of the executors for the test don't support - // pausing after the test has been started. - // - // IMPORTANT: Currently only the externally controlled executor can be - // paused and resumed multiple times in the middle of the test execution! - // Even then, "pausing" is a bit misleading, since k6 won't pause in the - // middle of the currently executing iterations. It will allow the currently - // in progress iterations to finish, and it just won't start any new ones - // nor will it increment the value returned by GetCurrentTestRunDuration(). - SetPaused(paused bool) error -} - // MaxTimeToWaitForPlannedVU specifies the maximum allowable time for an executor // to wait for a planned VU to be retrieved from the ExecutionState.PlannedVUs // buffer. If it's exceeded, k6 will emit a warning log message, since it either @@ -82,8 +28,9 @@ const MaxTimeToWaitForPlannedVU = 400 * time.Millisecond // MaxTimeToWaitForPlannedVU before we actually return an error. const MaxRetriesGetPlannedVU = 5 -// ExecutionStatus is similar to RunStatus, but more fine grained and concerns -// only local execution. +// ExecutionStatus is used to mark the possible states of a test run at any +// given time in its execution, from its start to its finish. +// //go:generate enumer -type=ExecutionStatus -trimprefix ExecutionStatus -output execution_status_gen.go type ExecutionStatus uint32 @@ -103,13 +50,13 @@ const ( ) // ExecutionState contains a few different things: -// - Some convenience items, that are needed by all executors, like the +// - Some convenience items, that are needed by all executors, like the // execution segment and the unique VU ID generator. By keeping those here, // we can just pass the ExecutionState to the different executors, instead of // individually passing them each item. -// - Mutable counters that different executors modify and other parts of +// - Mutable counters that different executors modify and other parts of // k6 can read, e.g. for the vus and vus_max metrics k6 emits every second. -// - Pausing controls and statistics. +// - Pausing controls and statistics. // // The counters and timestamps here are primarily meant to be used for // information extraction and avoidance of ID collisions. Using many of the @@ -498,9 +445,10 @@ func (es *ExecutionState) Resume() error { // // And, since tests won't be paused most of the time, it's // probably better to check for that like this: -// if executionState.IsPaused() { -// <-executionState.ResumeNotify() -// } +// +// if executionState.IsPaused() { +// <-executionState.ResumeNotify() +// } func (es *ExecutionState) ResumeNotify() <-chan struct{} { es.pauseStateLock.RLock() defer es.pauseStateLock.RUnlock() From f2e92f7d4caa327d42907d1b80a328b0c9f5de15 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Thu, 8 Dec 2022 00:16:41 +0200 Subject: [PATCH 2/3] Refactor execution.Scheduler tests to prevent import loops --- ...xec_test.go => scheduler_ext_exec_test.go} | 2 +- ...cheduler_test.go => scheduler_ext_test.go} | 110 +++--------------- execution/scheduler_int_test.go | 91 +++++++++++++++ 3 files changed, 110 insertions(+), 93 deletions(-) rename execution/{scheduler_exec_test.go => scheduler_ext_exec_test.go} (99%) rename execution/{scheduler_test.go => scheduler_ext_test.go} (91%) create mode 100644 execution/scheduler_int_test.go diff --git a/execution/scheduler_exec_test.go b/execution/scheduler_ext_exec_test.go similarity index 99% rename from execution/scheduler_exec_test.go rename to execution/scheduler_ext_exec_test.go index 570d20e00d3..2ae4c10b703 100644 --- a/execution/scheduler_exec_test.go +++ b/execution/scheduler_ext_exec_test.go @@ -1,4 +1,4 @@ -package execution +package execution_test import ( "encoding/json" diff --git a/execution/scheduler_test.go b/execution/scheduler_ext_test.go similarity index 91% rename from execution/scheduler_test.go rename to execution/scheduler_ext_test.go index 761725fbdd4..43dfc9b69ac 100644 --- a/execution/scheduler_test.go +++ b/execution/scheduler_ext_test.go @@ -1,4 +1,4 @@ -package execution +package execution_test import ( "context" @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/guregu/null.v3" + "go.k6.io/k6/execution" "go.k6.io/k6/js" "go.k6.io/k6/lib" "go.k6.io/k6/lib/executor" @@ -58,7 +59,7 @@ func getTestRunState( func newTestScheduler( t *testing.T, runner lib.Runner, logger *logrus.Logger, opts lib.Options, -) (ctx context.Context, cancel func(), execScheduler *Scheduler, samples chan metrics.SampleContainer) { +) (ctx context.Context, cancel func(), execScheduler *execution.Scheduler, samples chan metrics.SampleContainer) { if runner == nil { runner = &minirunner.MiniRunner{} } @@ -73,7 +74,7 @@ func newTestScheduler( testRunState.Logger = logger } - execScheduler, err = NewScheduler(testRunState) + execScheduler, err = execution.NewScheduler(testRunState) require.NoError(t, err) samples = make(chan metrics.SampleContainer, newOpts.MetricSamplesBufferSize.Int64) @@ -136,7 +137,7 @@ func TestSchedulerRunNonDefault(t *testing.T) { testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -253,7 +254,7 @@ func TestSchedulerRunEnv(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -322,7 +323,7 @@ func TestSchedulerSystemTags(t *testing.T) { }))) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) @@ -453,7 +454,7 @@ func TestSchedulerRunCustomTags(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) @@ -617,7 +618,7 @@ func TestSchedulerRunCustomConfigNoCrossover(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) @@ -954,7 +955,7 @@ func TestSchedulerEndIterations(t *testing.T) { defer cancel() testRunState := getTestRunState(t, getTestPreInitState(t), runner.GetOptions(), runner) - execScheduler, err := NewScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) samples := make(chan metrics.SampleContainer, 300) @@ -1160,7 +1161,7 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, options, runner) - execScheduler, err := NewScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -1280,85 +1281,6 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) { } } -// Just a lib.PausableExecutor implementation that can return an error -type pausableExecutor struct { - lib.Executor - err error -} - -func (p pausableExecutor) SetPaused(bool) error { - return p.err -} - -func TestSetPaused(t *testing.T) { - t.Parallel() - t.Run("second pause is an error", func(t *testing.T) { - t.Parallel() - testRunState := getTestRunState(t, getTestPreInitState(t), lib.Options{}, &minirunner.MiniRunner{}) - sched, err := NewScheduler(testRunState) - require.NoError(t, err) - sched.executors = []lib.Executor{pausableExecutor{err: nil}} - - require.NoError(t, sched.SetPaused(true)) - err = sched.SetPaused(true) - require.Error(t, err) - require.Contains(t, err.Error(), "execution is already paused") - }) - - t.Run("unpause at the start is an error", func(t *testing.T) { - t.Parallel() - testRunState := getTestRunState(t, getTestPreInitState(t), lib.Options{}, &minirunner.MiniRunner{}) - sched, err := NewScheduler(testRunState) - require.NoError(t, err) - sched.executors = []lib.Executor{pausableExecutor{err: nil}} - err = sched.SetPaused(false) - require.Error(t, err) - require.Contains(t, err.Error(), "execution wasn't paused") - }) - - t.Run("second unpause is an error", func(t *testing.T) { - t.Parallel() - testRunState := getTestRunState(t, getTestPreInitState(t), lib.Options{}, &minirunner.MiniRunner{}) - sched, err := NewScheduler(testRunState) - require.NoError(t, err) - sched.executors = []lib.Executor{pausableExecutor{err: nil}} - require.NoError(t, sched.SetPaused(true)) - require.NoError(t, sched.SetPaused(false)) - err = sched.SetPaused(false) - require.Error(t, err) - require.Contains(t, err.Error(), "execution wasn't paused") - }) - - t.Run("an error on pausing is propagated", func(t *testing.T) { - t.Parallel() - testRunState := getTestRunState(t, getTestPreInitState(t), lib.Options{}, &minirunner.MiniRunner{}) - sched, err := NewScheduler(testRunState) - require.NoError(t, err) - expectedErr := errors.New("testing pausable executor error") - sched.executors = []lib.Executor{pausableExecutor{err: expectedErr}} - err = sched.SetPaused(true) - require.Error(t, err) - require.Equal(t, err, expectedErr) - }) - - t.Run("can't pause unpausable executor", func(t *testing.T) { - t.Parallel() - runner := &minirunner.MiniRunner{} - options, err := executor.DeriveScenariosFromShortcuts(lib.Options{ - Iterations: null.IntFrom(2), - VUs: null.IntFrom(1), - }.Apply(runner.GetOptions()), nil) - require.NoError(t, err) - - testRunState := getTestRunState(t, getTestPreInitState(t), options, runner) - sched, err := NewScheduler(testRunState) - require.NoError(t, err) - err = sched.SetPaused(true) - require.Error(t, err) - require.Contains(t, err.Error(), "doesn't support pause and resume operations after its start") - }) -} - func TestNewSchedulerHasWork(t *testing.T) { t.Parallel() script := []byte(` @@ -1406,9 +1328,13 @@ func TestNewSchedulerHasWork(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) - assert.Len(t, execScheduler.executors, 2) - assert.Len(t, execScheduler.executorConfigs, 3) + assert.Len(t, execScheduler.GetExecutors(), 2) + assert.Len(t, execScheduler.GetExecutorConfigs(), 3) + + err = execScheduler.SetPaused(true) + require.Error(t, err) + require.Contains(t, err.Error(), "doesn't support pause and resume operations after its start") } diff --git a/execution/scheduler_int_test.go b/execution/scheduler_int_test.go new file mode 100644 index 00000000000..c0451ad7e63 --- /dev/null +++ b/execution/scheduler_int_test.go @@ -0,0 +1,91 @@ +package execution + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/require" + "go.k6.io/k6/lib" + "go.k6.io/k6/lib/testutils" + "go.k6.io/k6/lib/testutils/minirunner" + "go.k6.io/k6/metrics" +) + +func getBogusTestRunState(tb testing.TB) *lib.TestRunState { + reg := metrics.NewRegistry() + piState := &lib.TestPreInitState{ + Logger: testutils.NewLogger(tb), + RuntimeOptions: lib.RuntimeOptions{}, + Registry: reg, + BuiltinMetrics: metrics.RegisterBuiltinMetrics(reg), + } + + return &lib.TestRunState{ + TestPreInitState: piState, + Options: lib.Options{}, + Runner: &minirunner.MiniRunner{}, + RunTags: piState.Registry.RootTagSet(), + } +} + +// Just a lib.PausableExecutor implementation that can return an error +type pausableExecutor struct { + lib.Executor + err error +} + +func (p pausableExecutor) SetPaused(bool) error { + return p.err +} + +func TestSetPaused(t *testing.T) { + t.Parallel() + t.Run("second pause is an error", func(t *testing.T) { + t.Parallel() + testRunState := getBogusTestRunState(t) + sched, err := NewScheduler(testRunState) + require.NoError(t, err) + sched.executors = []lib.Executor{pausableExecutor{err: nil}} + + require.NoError(t, sched.SetPaused(true)) + err = sched.SetPaused(true) + require.Error(t, err) + require.Contains(t, err.Error(), "execution is already paused") + }) + + t.Run("unpause at the start is an error", func(t *testing.T) { + t.Parallel() + testRunState := getBogusTestRunState(t) + sched, err := NewScheduler(testRunState) + require.NoError(t, err) + sched.executors = []lib.Executor{pausableExecutor{err: nil}} + err = sched.SetPaused(false) + require.Error(t, err) + require.Contains(t, err.Error(), "execution wasn't paused") + }) + + t.Run("second unpause is an error", func(t *testing.T) { + t.Parallel() + testRunState := getBogusTestRunState(t) + sched, err := NewScheduler(testRunState) + require.NoError(t, err) + sched.executors = []lib.Executor{pausableExecutor{err: nil}} + require.NoError(t, sched.SetPaused(true)) + require.NoError(t, sched.SetPaused(false)) + err = sched.SetPaused(false) + require.Error(t, err) + require.Contains(t, err.Error(), "execution wasn't paused") + }) + + t.Run("an error on pausing is propagated", func(t *testing.T) { + t.Parallel() + testRunState := getBogusTestRunState(t) + sched, err := NewScheduler(testRunState) + require.NoError(t, err) + expectedErr := errors.New("testing pausable executor error") + sched.executors = []lib.Executor{pausableExecutor{err: expectedErr}} + err = sched.SetPaused(true) + require.Error(t, err) + require.Equal(t, err, expectedErr) + }) +} From 9f050f7298f802e394a2b08d24e7bd40f8a4df94 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Thu, 8 Dec 2022 00:17:45 +0200 Subject: [PATCH 3/3] Move and expand the test-abort context to the execution package The expanded capabilities will be necessary for changes in upcoming PRs --- execution/abort.go | 84 +++++++++++++++++++++++++++++++++++++++++ execution/scheduler.go | 5 +-- lib/executor/helpers.go | 47 +---------------------- 3 files changed, 88 insertions(+), 48 deletions(-) create mode 100644 execution/abort.go diff --git a/execution/abort.go b/execution/abort.go new file mode 100644 index 00000000000..08e9bfa8ea2 --- /dev/null +++ b/execution/abort.go @@ -0,0 +1,84 @@ +package execution + +import ( + "context" + "sync" + + "github.com/sirupsen/logrus" +) + +// testAbortKey is the key used to store the abort function for the context of +// an executor. This allows any users of that context or its sub-contexts to +// cancel the whole execution tree, while at the same time providing all of the +// details for why they cancelled it via the attached error. +type testAbortKey struct{} + +type testAbortController struct { + cancel context.CancelFunc + + logger logrus.FieldLogger + lock sync.Mutex // only the first reason will be kept, other will be logged + reason error // see errext package, you can wrap errors to attach exit status, run status, etc. +} + +func (tac *testAbortController) abort(err error) { + tac.lock.Lock() + defer tac.lock.Unlock() + if tac.reason != nil { + tac.logger.Debugf( + "test abort with reason '%s' was attempted when the test was already aborted due to '%s'", + err.Error(), tac.reason.Error(), + ) + return + } + tac.reason = err + tac.cancel() +} + +func (tac *testAbortController) getReason() error { + tac.lock.Lock() + defer tac.lock.Unlock() + return tac.reason +} + +// NewTestRunContext returns context.Context that can be aborted by calling the +// returned TestAbortFunc or by calling CancelTestRunContext() on the returned +// context or a sub-context of it. Use this to initialize the context that will +// be passed to the ExecutionScheduler, so `execution.test.abort()` and the REST +// API test stopping both work. +func NewTestRunContext( + ctx context.Context, logger logrus.FieldLogger, +) (newCtx context.Context, abortTest func(reason error)) { + ctx, cancel := context.WithCancel(ctx) + + controller := &testAbortController{ + cancel: cancel, + logger: logger, + } + + return context.WithValue(ctx, testAbortKey{}, controller), controller.abort +} + +// AbortTestRun will cancel the test run context with the given reason if the +// provided context is actually a TestRuncontext or a child of one. +func AbortTestRun(ctx context.Context, err error) bool { + if x := ctx.Value(testAbortKey{}); x != nil { + if v, ok := x.(*testAbortController); ok { + v.abort(err) + return true + } + } + return false +} + +// GetCancelReasonIfTestAborted returns a reason the Context was cancelled, if it was +// aborted with these functions. It will return nil if ctx is not an +// TestRunContext (or its children) or if it was never aborted. +func GetCancelReasonIfTestAborted(ctx context.Context) error { + if x := ctx.Value(testAbortKey{}); x != nil { + if v, ok := x.(*testAbortController); ok { + return v.getReason() + } + } + return nil +} diff --git a/execution/scheduler.go b/execution/scheduler.go index a2e7214330e..0f5d42a729f 100644 --- a/execution/scheduler.go +++ b/execution/scheduler.go @@ -11,7 +11,6 @@ import ( "go.k6.io/k6/errext" "go.k6.io/k6/lib" - "go.k6.io/k6/lib/executor" "go.k6.io/k6/metrics" "go.k6.io/k6/ui/pb" ) @@ -451,7 +450,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr // this context effectively stopping all executions. // // This is for addressing test.abort(). - execCtx := executor.Context(runSubCtx) + execCtx, _ := NewTestRunContext(runSubCtx, logger) for _, exec := range e.executors { go e.runExecutor(execCtx, runResults, engineOut, exec) } @@ -479,7 +478,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr return err } } - if err := executor.CancelReason(execCtx); err != nil && errext.IsInterruptError(err) { + if err := GetCancelReasonIfTestAborted(execCtx); err != nil && errext.IsInterruptError(err) { interrupted = true return err } diff --git a/lib/executor/helpers.go b/lib/executor/helpers.go index 13b5df97cdb..5ab401c1d5d 100644 --- a/lib/executor/helpers.go +++ b/lib/executor/helpers.go @@ -10,6 +10,7 @@ import ( "github.com/sirupsen/logrus" "go.k6.io/k6/errext" + "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/lib/types" "go.k6.io/k6/ui/pb" @@ -56,56 +57,12 @@ func validateStages(stages []Stage) []error { return errors } -// cancelKey is the key used to store the cancel function for the context of an -// executor. This is a work around to avoid excessive changes for the ability of -// nested functions to cancel the passed context. -type cancelKey struct{} - -type cancelExec struct { - cancel context.CancelFunc - reason error -} - -// Context returns context.Context that can be cancelled by calling -// CancelExecutorContext. Use this to initialize context that will be passed to -// executors. -// -// This allows executors to globally halt any executions that uses this context. -// Example use case is when a script calls test.abort(). -func Context(ctx context.Context) context.Context { - ctx, cancel := context.WithCancel(ctx) - return context.WithValue(ctx, cancelKey{}, &cancelExec{cancel: cancel}) -} - -// cancelExecutorContext cancels executor context found in ctx, ctx can be a -// child of a context that was created with Context function. -func cancelExecutorContext(ctx context.Context, err error) { - if x := ctx.Value(cancelKey{}); x != nil { - if v, ok := x.(*cancelExec); ok { - v.reason = err - v.cancel() - } - } -} - -// CancelReason returns a reason the executor context was cancelled. This will -// return nil if ctx is not an executor context(ctx or any of its parents was -// never created by Context function). -func CancelReason(ctx context.Context) error { - if x := ctx.Value(cancelKey{}); x != nil { - if v, ok := x.(*cancelExec); ok { - return v.reason - } - } - return nil -} - // handleInterrupt returns true if err is InterruptError and if so it // cancels the executor context passed with ctx. func handleInterrupt(ctx context.Context, err error) bool { if err != nil { if errext.IsInterruptError(err) { - cancelExecutorContext(ctx, err) + execution.AbortTestRun(ctx, err) return true } }