Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce execution.Controller with a local no-op implementation #3204

Merged
merged 2 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/v1/group_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"

"go.k6.io/k6/execution"
"go.k6.io/k6/execution/local"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
Expand Down Expand Up @@ -41,7 +42,7 @@ func getTestRunState(tb testing.TB, options lib.Options, runner lib.Runner) *lib
}

func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurface {
execScheduler, err := execution.NewScheduler(testState)
execScheduler, err := execution.NewScheduler(testState, local.NewController())
require.NoError(tb, err)

me, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)
Expand Down
3 changes: 2 additions & 1 deletion api/v1/setup_teardown_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/execution"
"go.k6.io/k6/execution/local"
"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/types"
Expand Down Expand Up @@ -138,7 +139,7 @@ func TestSetupData(t *testing.T) {
TeardownTimeout: types.NullDurationFrom(5 * time.Second),
}, runner)

execScheduler, err := execution.NewScheduler(testState)
execScheduler, err := execution.NewScheduler(testState, local.NewController())
require.NoError(t, err)
metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/execution"
"go.k6.io/k6/execution/local"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
Expand Down Expand Up @@ -115,7 +116,7 @@ func TestPatchStatus(t *testing.T) {
require.NoError(t, err)

testState := getTestRunState(t, lib.Options{Scenarios: scenarios}, &minirunner.MiniRunner{})
execScheduler, err := execution.NewScheduler(testState)
execScheduler, err := execution.NewScheduler(testState, local.NewController())
require.NoError(t, err)

metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)
Expand Down
2 changes: 1 addition & 1 deletion cmd/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type cmdArchive struct {
}

func (c *cmdArchive) run(cmd *cobra.Command, args []string) error {
test, err := loadAndConfigureTest(c.gs, cmd, args, getPartialConfig)
test, err := loadAndConfigureLocalTest(c.gs, cmd, args, getPartialConfig)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c *cmdCloud) run(cmd *cobra.Command, args []string) error {
)
printBar(c.gs, progressBar)

test, err := loadAndConfigureTest(c.gs, cmd, args, getPartialConfig)
test, err := loadAndConfigureLocalTest(c.gs, cmd, args, getPartialConfig)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func getCmdInspect(gs *state.GlobalState) *cobra.Command {
Long: `Inspect a script or archive.`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
test, err := loadTest(gs, cmd, args)
test, err := loadLocalTest(gs, cmd, args)
if err != nil {
return err
}
Expand Down
12 changes: 10 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/event"
"go.k6.io/k6/execution"
"go.k6.io/k6/execution/local"
"go.k6.io/k6/js/common"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/consts"
Expand All @@ -39,6 +40,9 @@ import (
// cmdRun handles the `k6 run` sub-command
type cmdRun struct {
gs *state.GlobalState

// TODO: figure out something more elegant?
loadConfiguredTest func(cmd *cobra.Command, args []string) (*loadedAndConfiguredTest, execution.Controller, error)
}

const (
Expand Down Expand Up @@ -100,7 +104,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
c.gs.Events.UnsubscribeAll()
}()

test, err := loadAndConfigureTest(c.gs, cmd, args, getConfig)
test, controller, err := c.loadConfiguredTest(cmd, args)
if err != nil {
return err
}
Expand Down Expand Up @@ -132,7 +136,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {

// Create a local execution scheduler wrapping the runner.
logger.Debug("Initializing the execution scheduler...")
execScheduler, err := execution.NewScheduler(testRunState)
execScheduler, err := execution.NewScheduler(testRunState, controller)
if err != nil {
return err
}
Expand Down Expand Up @@ -459,6 +463,10 @@ func (c *cmdRun) setupTracerProvider(ctx context.Context, test *loadedAndConfigu
func getCmdRun(gs *state.GlobalState) *cobra.Command {
c := &cmdRun{
gs: gs,
loadConfiguredTest: func(cmd *cobra.Command, args []string) (*loadedAndConfiguredTest, execution.Controller, error) {
test, err := loadAndConfigureLocalTest(gs, cmd, args, getConfig)
return test, local.NewController(), err
},
}

exampleText := getExampleText(gs, `
Expand Down
6 changes: 3 additions & 3 deletions cmd/test_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type loadedTest struct {
keyLogger io.Closer
}

func loadTest(gs *state.GlobalState, cmd *cobra.Command, args []string) (*loadedTest, error) {
func loadLocalTest(gs *state.GlobalState, cmd *cobra.Command, args []string) (*loadedTest, error) {
if len(args) < 1 {
return nil, fmt.Errorf("k6 needs at least one argument to load the test")
}
Expand Down Expand Up @@ -236,11 +236,11 @@ type loadedAndConfiguredTest struct {
derivedConfig Config
}

func loadAndConfigureTest(
func loadAndConfigureLocalTest(
gs *state.GlobalState, cmd *cobra.Command, args []string,
cliConfigGetter func(flags *pflag.FlagSet) (Config, error),
) (*loadedAndConfiguredTest, error) {
test, err := loadTest(gs, cmd, args)
test, err := loadLocalTest(gs, cmd, args)
if err != nil {
return nil, err
}
Expand Down
53 changes: 53 additions & 0 deletions execution/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package execution

// Controller implementations are used to control the k6 execution of a test or
// test suite, either locally or in a distributed environment.
type Controller interface {
// GetOrCreateData requests the data chunk with the given ID, if it already
// exists. If it doesn't (i.e. this was the first time this function was
// called with that ID), the given callback is called and its result and
// error are saved for the ID and returned for all other calls with it.
//
// This is an atomic and single-flight function, so any calls to it while the callback is
// being executed the the same ID will wait for the first call to to finish
// and receive its result.
//
// TODO: split apart into `Once()`, `SetData(), `GetData()` and implement
// the GetOrCreateData() behavior in a helper like the ones below?
GetOrCreateData(ID string, callback func() ([]byte, error)) ([]byte, error)

// Signal is used to notify that the current instance has reached the given
// event ID, or that it has had an error.
Signal(eventID string, err error) error

// Subscribe creates a listener for the specified event ID and returns a
// callback that can wait until all other instances have reached it.
Subscribe(eventID string) (wait func() error)
}

// SignalAndWait implements a rendezvous point / barrier, a way for all
// instances to reach the same execution point and wait for each other, before
// they all ~simultaneously continue with the execution.
//
// It subscribes for the given event ID, signals that the current instance has
// reached it without an error, and then waits until all other instances have
// reached it or until there is an error in one of them.
func SignalAndWait(c Controller, eventID string) error {
wait := c.Subscribe(eventID)

if err := c.Signal(eventID, nil); err != nil {
return err
}
return wait()
}

// SignalErrorOrWait is a helper method that either immediately signals the
// given error and returns it, or it signals nominal completion and waits for
// all other instances to do the same (or signal an error themselves).
func SignalErrorOrWait(c Controller, eventID string, err error) error {
if err != nil {
_ = c.Signal(eventID, err)
return err // return the same error we got
}
return SignalAndWait(c, eventID)
}
45 changes: 45 additions & 0 deletions execution/local/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Package local implements the execution.Controller interface for local
// (single-machine) k6 execution.
package local

// Controller "controls" local tests. It doesn't actually do anything, it just
// implements the execution.Controller interface with no-op operations. The
// methods don't do anything because local tests have only a single instance.
//
// However, for test suites (https://github.com/grafana/k6/issues/1342) in the
// future, we will probably need to actually implement some of these methods and
// introduce simple synchronization primitives even for a single machine...
type Controller struct{}

// NewController creates a new local execution Controller.
func NewController() *Controller {
return &Controller{}
}

// GetOrCreateData immediately calls the given callback and returns its results.
func (c *Controller) GetOrCreateData(_ string, callback func() ([]byte, error)) ([]byte, error) {
return callback()
}

// Subscribe is a no-op, it doesn't actually wait for anything, because there is
// nothing to wait on - we only have one instance in local tests.
//
// TODO: actually use waitgroups, since this may actually matter for test
// suites, even for local test runs. That's because multiple tests might be
// executed even by a single instance, and if we have complicated flows (e.g.
// "test C is executed only after test A and test B finish"), the easiest way
// would be for different tests in the suite to reuse this Controller API *both*
// local and distributed runs.
func (c *Controller) Subscribe(_ string) func() error {
return func() error {
return nil
}
}

// Signal is a no-op, it doesn't actually do anything for local test runs.
//
// TODO: similar to Wait() above, this might actually be required for
// complex/branching test suites, even during local non-distributed execution.
func (c *Controller) Signal(_ string, _ error) error {
return nil
}
61 changes: 56 additions & 5 deletions execution/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
// executors, running setup() and teardown(), and actually starting the
// executors for the different scenarios at the appropriate times.
type Scheduler struct {
controller Controller

initProgress *pb.ProgressBar
executorConfigs []lib.ExecutorConfig // sorted by (startTime, ID)
executors []lib.Executor // sorted by (startTime, ID), excludes executors with no work
Expand All @@ -33,7 +35,7 @@ type Scheduler struct {
// initializing it beyond the bare minimum. Specifically, it creates the needed
// executor instances and a lot of state placeholders, but it doesn't initialize
// the executors and it doesn't initialize or run VUs.
func NewScheduler(trs *lib.TestRunState) (*Scheduler, error) {
func NewScheduler(trs *lib.TestRunState, controller Controller) (*Scheduler, error) {
options := trs.Options
et, err := lib.NewExecutionTuple(options.ExecutionSegment, options.ExecutionSegmentSequence)
if err != nil {
Expand Down Expand Up @@ -81,6 +83,7 @@ func NewScheduler(trs *lib.TestRunState) (*Scheduler, error) {
maxDuration: maxDuration,
maxPossibleVUs: maxPossibleVUs,
state: executionState,
controller: controller,
}, nil
}

Expand Down Expand Up @@ -380,6 +383,13 @@ func (e *Scheduler) Init(
) (stopVUEmission func(), initErr error) {
logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-init")

if err := SignalAndWait(e.controller, "scheduler-init-start"); err != nil {
return nil, err
}
defer func() {
initErr = SignalErrorOrWait(e.controller, "scheduler-init-done", initErr)
}()

execSchedRunCtx, execSchedRunCancel := context.WithCancel(runCtx)
waitForVUsMetricPush := e.emitVUsAndVUsMax(execSchedRunCtx, samplesOut)
stopVUEmission = func() {
Expand Down Expand Up @@ -409,12 +419,16 @@ func (e *Scheduler) Init(
func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) (runErr error) {
logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-run")

if err := SignalAndWait(e.controller, "scheduler-run-start"); err != nil {
return err
}
defer func() {
if interruptErr := GetCancelReasonIfTestAborted(runCtx); interruptErr != nil {
logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, runErr)
e.state.SetExecutionStatus(lib.ExecutionStatusInterrupted)
runErr = interruptErr
}
runErr = SignalErrorOrWait(e.controller, "scheduler-run-done", runErr)
}()

e.initProgress.Modify(pb.WithConstLeft("Run"))
Expand All @@ -430,6 +444,10 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
}
}

if err := SignalAndWait(e.controller, "test-ready-to-run-setup"); err != nil {
return err
}

e.initProgress.Modify(pb.WithConstProgress(1, "Starting test..."))
e.state.MarkStarted()
defer e.state.MarkEnded()
Expand All @@ -449,11 +467,27 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
if !e.state.Test.Options.NoSetup.Bool {
e.state.SetExecutionStatus(lib.ExecutionStatusSetup)
e.initProgress.Modify(pb.WithConstProgress(1, "setup()"))
if err := e.state.Test.Runner.Setup(withExecStateCtx, samplesOut); err != nil {
logger.WithField("error", err).Debug("setup() aborted by error")
actuallyRanSetup := false
data, err := e.controller.GetOrCreateData("setup", func() ([]byte, error) {
actuallyRanSetup = true
if err := e.state.Test.Runner.Setup(withExecStateCtx, samplesOut); err != nil {
logger.WithField("error", err).Debug("setup() aborted by error")
return nil, err
}
return e.state.Test.Runner.GetSetupData(), nil
})
if err != nil {
return err
}
if !actuallyRanSetup {
e.state.Test.Runner.SetSetupData(data)
}
}

if err := SignalAndWait(e.controller, "setup-done"); err != nil {
return err
}

e.initProgress.Modify(pb.WithHijack(e.getRunStats))

// Start all executors at their particular startTime in a separate goroutine...
Expand All @@ -469,6 +503,8 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
// Wait for all executors to finish
var firstErr error
for range e.executors {
// TODO: add logic to abort the test early if there was an error from
// the controller (e.g. some other instance for this test died)
err := <-runResults
if err != nil && firstErr == nil {
logger.WithError(err).Debug("Executor returned with an error, cancelling test run...")
Expand All @@ -477,19 +513,34 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
}
}

if err := SignalAndWait(e.controller, "execution-done"); err != nil {
return err
}

// Run teardown() after all executors are done, if it's not disabled
if !e.state.Test.Options.NoTeardown.Bool {
e.state.SetExecutionStatus(lib.ExecutionStatusTeardown)
e.initProgress.Modify(pb.WithConstProgress(1, "teardown()"))

// We run teardown() with the global context, so it isn't interrupted by
// thresholds or test.abort() or even Ctrl+C (unless used twice).
if err := e.state.Test.Runner.Teardown(globalCtx, samplesOut); err != nil {
logger.WithField("error", err).Debug("teardown() aborted by error")
// TODO: add a `sync.Once` equivalent?
_, err := e.controller.GetOrCreateData("teardown", func() ([]byte, error) {
if err := e.state.Test.Runner.Teardown(globalCtx, samplesOut); err != nil {
logger.WithField("error", err).Debug("teardown() aborted by error")
return nil, err
}
return nil, nil
})
if err != nil {
return err
}
}

if err := SignalAndWait(e.controller, "teardown-done"); err != nil {
return err
}

return firstErr
}

Expand Down
Loading