Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split Scheduler.Run() into Init() and Run() again #2893

Merged
merged 1 commit into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func TestPatchStatus(t *testing.T) {
}

for name, testCase := range testData {
name, testCase := name, testCase
t.Run(name, func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -136,6 +137,9 @@ func TestPatchStatus(t *testing.T) {
RunState: testState,
}

stopEmission, err := execScheduler.Init(runCtx, samples)
require.NoError(t, err)

wg := &sync.WaitGroup{}
wg.Add(1)
defer func() {
Expand All @@ -146,6 +150,7 @@ func TestPatchStatus(t *testing.T) {

go func() {
assert.ErrorContains(t, execScheduler.Run(globalCtx, runCtx, samples), "custom cancel signal")
stopEmission()
close(samples)
wg.Done()
}()
Expand Down
11 changes: 9 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
stopSignalHandling := handleTestAbortSignals(c.gs, gracefulStop, onHardStop)
defer stopSignalHandling()

// Initialize the VUs and executors
stopVUEmission, err := execScheduler.Init(runCtx, samples)
if err != nil {
return err
}
defer stopVUEmission()

if conf.Linger.Bool {
defer func() {
msg := "The test is done, but --linger was enabled, so k6 is waiting for Ctrl+C to continue..."
Expand All @@ -291,8 +298,8 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
}()
}

// Initialize VUs and start the test! However, we won't immediately return
// if there was an error, we still have things to do.
// Start the test! However, we won't immediately return if there was an
// error, we still have things to do.
err = execScheduler.Run(globalCtx, runCtx, samples)

// Init has passed successfully, so unless disabled, make sure we send a
Expand Down
10 changes: 1 addition & 9 deletions cmd/tests/cmd_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,15 +1020,7 @@ func TestAbortedByTestAbortInNonFirstInitCode(t *testing.T) {
export function handleSummary() { return {stdout: '\n\n\nbogus summary\n\n\n'};}
`

t.Run("noLinger", func(t *testing.T) {
t.Parallel()
testAbortedByScriptTestAbort(t, script, runTestWithNoLinger)
})

t.Run("withLinger", func(t *testing.T) {
t.Parallel()
testAbortedByScriptTestAbort(t, script, runTestWithLinger)
})
testAbortedByScriptTestAbort(t, script, runTestWithNoLinger)
}

func TestAbortedByScriptAbortInVUCode(t *testing.T) {
Expand Down
43 changes: 32 additions & 11 deletions execution/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,18 +372,43 @@ func (e *Scheduler) runExecutor(
runResults <- err
}

// Init concurrently initializes all of the planned VUs and then sequentially
// initializes all of the configured executors. It also starts the measurement
// and emission of the `vus` and `vus_max` metrics.
func (e *Scheduler) Init(
runCtx context.Context, samplesOut chan<- metrics.SampleContainer,
) (stopVUEmission func(), err error) {
logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-init")

execSchedRunCtx, execSchedRunCancel := context.WithCancel(runCtx)
waitForVUsMetricPush := e.emitVUsAndVUsMax(execSchedRunCtx, samplesOut)
stopVUEmission = func() {
logger.Debugf("Stopping vus and vux_max metrics emission...")
execSchedRunCancel()
waitForVUsMetricPush()
}

defer func() {
if interruptErr := GetCancelReasonIfTestAborted(runCtx); interruptErr != nil {
logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, err)
e.state.SetExecutionStatus(lib.ExecutionStatusInterrupted)
err = interruptErr
}
if err != nil {
stopVUEmission()
}
}()

return stopVUEmission, e.initVUsAndExecutors(execSchedRunCtx, samplesOut)
}

// Run the Scheduler, funneling all generated metric samples through the supplied
// out channel.
//
//nolint:funlen
func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) {
logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-run")

execSchedRunCtx, execSchedRunCancel := context.WithCancel(runCtx)
waitForVUsMetricPush := e.emitVUsAndVUsMax(execSchedRunCtx, samplesOut)
defer waitForVUsMetricPush()
defer execSchedRunCancel()

defer func() {
if interruptErr := GetCancelReasonIfTestAborted(runCtx); interruptErr != nil {
logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, err)
Expand All @@ -392,10 +417,6 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
}
}()

if err := e.initVUsAndExecutors(execSchedRunCtx, samplesOut); err != nil {
return err
}

e.initProgress.Modify(pb.WithConstLeft("Run"))
if e.state.IsPaused() {
logger.Debug("Execution is paused, waiting for resume or interrupt...")
Expand All @@ -404,7 +425,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
select {
case <-e.state.ResumeNotify():
// continue
case <-execSchedRunCtx.Done():
case <-runCtx.Done():
return nil
}
}
Expand All @@ -422,7 +443,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
// TODO: get rid of this context, pass the e.state directly to VUs when they
// are initialized by e.initVUsAndExecutors(). This will also give access to
// its properties in their init context executions.
withExecStateCtx := lib.WithExecutionState(execSchedRunCtx, e.state)
withExecStateCtx := lib.WithExecutionState(runCtx, e.state)

// Run setup() before any executors, if it's not disabled
if !e.state.Test.Options.NoSetup.Bool {
Expand Down
56 changes: 52 additions & 4 deletions execution/scheduler_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ func newTestScheduler(
}
}()

stopEmission, err := execScheduler.Init(ctx, samples)
require.NoError(t, err)
t.Cleanup(func() {
stopEmission()
close(samples)
})

return ctx, cancel, execScheduler, samples
}

Expand Down Expand Up @@ -143,6 +150,12 @@ func TestSchedulerRunNonDefault(t *testing.T) {

done := make(chan struct{})
samples := make(chan metrics.SampleContainer)
defer close(samples)

stopEmission, err := execScheduler.Init(ctx, samples)
require.NoError(t, err)
defer stopEmission()

go func() {
assert.NoError(t, execScheduler.Run(ctx, ctx, samples))
close(done)
Expand Down Expand Up @@ -254,6 +267,12 @@ func TestSchedulerRunEnv(t *testing.T) {

done := make(chan struct{})
samples := make(chan metrics.SampleContainer)
defer close(samples)

stopEmission, err := execScheduler.Init(ctx, samples)
require.NoError(t, err)
defer stopEmission()

go func() {
assert.NoError(t, execScheduler.Run(ctx, ctx, samples))
close(done)
Expand Down Expand Up @@ -321,6 +340,12 @@ func TestSchedulerSystemTags(t *testing.T) {
defer cancel()

samples := make(chan metrics.SampleContainer)
defer close(samples)

stopEmission, err := execScheduler.Init(ctx, samples)
require.NoError(t, err)
defer stopEmission()

done := make(chan struct{})
go func() {
defer close(done)
Expand Down Expand Up @@ -452,6 +477,12 @@ func TestSchedulerRunCustomTags(t *testing.T) {

done := make(chan struct{})
samples := make(chan metrics.SampleContainer)
defer close(samples)

stopEmission, err := execScheduler.Init(ctx, samples)
require.NoError(t, err)
defer stopEmission()

go func() {
defer close(done)
require.NoError(t, execScheduler.Run(ctx, ctx, samples))
Expand Down Expand Up @@ -614,8 +645,13 @@ func TestSchedulerRunCustomConfigNoCrossover(t *testing.T) {
defer cancel()

samples := make(chan metrics.SampleContainer)

stopEmission, err := execScheduler.Init(ctx, samples)
require.NoError(t, err)

go func() {
assert.NoError(t, execScheduler.Run(ctx, ctx, samples))
stopEmission()
close(samples)
}()

Expand Down Expand Up @@ -947,6 +983,12 @@ func TestSchedulerEndIterations(t *testing.T) {
require.NoError(t, err)

samples := make(chan metrics.SampleContainer, 300)
defer close(samples)

stopEmission, err := execScheduler.Init(ctx, samples)
require.NoError(t, err)
defer stopEmission()

require.NoError(t, execScheduler.Run(ctx, ctx, samples))

assert.Equal(t, uint64(100), execScheduler.GetState().GetFullIterationCount())
Expand Down Expand Up @@ -1155,9 +1197,15 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) {
defer cancel()

done := make(chan struct{})
sampleContainers := make(chan metrics.SampleContainer)
samples := make(chan metrics.SampleContainer)
defer close(samples)

stopEmission, err := execScheduler.Init(ctx, samples)
require.NoError(t, err)
defer stopEmission()

go func() {
assert.NoError(t, execScheduler.Run(ctx, ctx, sampleContainers))
assert.NoError(t, execScheduler.Run(ctx, ctx, samples))
close(done)
}()

Expand All @@ -1167,7 +1215,7 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) {
to *= time.Millisecond
for {
select {
case sampleContainer := <-sampleContainers:
case sampleContainer := <-samples:
gotVus := false
for _, s := range sampleContainer.GetSamples() {
if s.Metric == piState.BuiltinMetrics.VUs || s.Metric == piState.BuiltinMetrics.VUsMax {
Expand Down Expand Up @@ -1257,7 +1305,7 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) {

for {
select {
case s := <-sampleContainers:
case s := <-samples:
t.Fatalf("Did not expect anything in the sample channel bug got %#v", s)
case <-time.After(3 * time.Second):
t.Fatalf("Local execScheduler took way to long to finish")
Expand Down
4 changes: 4 additions & 0 deletions js/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,9 @@ func TestDataIsolation(t *testing.T) {

require.Empty(t, runner.defaultGroup.Groups)

stopEmission, err := execScheduler.Init(runCtx, samples)
require.NoError(t, err)

errC := make(chan error)
go func() { errC <- execScheduler.Run(globalCtx, runCtx, samples) }()

Expand All @@ -414,6 +417,7 @@ func TestDataIsolation(t *testing.T) {
runAbort(fmt.Errorf("unexpected abort"))
t.Fatal("Test timed out")
case err := <-errC:
stopEmission()
close(samples)
require.NoError(t, err)
waitForMetricsFlushed()
Expand Down