Skip to content

Commit

Permalink
Fix and mask output data races when test is interrupted during the in…
Browse files Browse the repository at this point in the history
…it phase

These will only be properly fixed when #2790 and #1889 are implemented, so the locking is needed for now...
  • Loading branch information
na-- committed Dec 1, 2022
1 parent b5c68fd commit a6823b2
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 4 deletions.
14 changes: 14 additions & 0 deletions cmd/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,14 @@ func TestAbortedByScriptSetupError(t *testing.T) {

newRootCommand(ts.globalState).execute()

// FIXME: remove this locking after VU initialization accepts a context and
// is properly synchronized: currently when a test is aborted during the
// init phase, some logs might be emitted after the above command returns...
// see: https://github.com/grafana/k6/issues/2790
ts.outMutex.Lock()
stdOut := ts.stdOut.String()
ts.outMutex.Unlock()

t.Log(stdOut)
require.Contains(t, stdOut, `wonky setup`)
require.Contains(t, stdOut, `Error: foo`)
Expand Down Expand Up @@ -793,7 +800,14 @@ func TestAbortedByScriptInitError(t *testing.T) {

newRootCommand(ts.globalState).execute()

// FIXME: remove this locking after VU initialization accepts a context and
// is properly synchronized: currently when a test is aborted during the
// init phase, some logs might be emitted after the above command returns...
// see: https://github.com/grafana/k6/issues/2790
ts.outMutex.Lock()
stdOut := ts.stdOut.String()
ts.outMutex.Unlock()

t.Log(stdOut)
require.Contains(t, stdOut, `Error: foo`)
require.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=7 tainted=false`)
Expand Down
11 changes: 7 additions & 4 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type cmdRun struct {
}

// TODO: split apart some more
//
//nolint:funlen,gocognit,gocyclo,cyclop
func (c *cmdRun) run(cmd *cobra.Command, args []string) error {
printBanner(c.gs)
Expand Down Expand Up @@ -75,22 +76,24 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) error {
return err
}

progressBarWG := &sync.WaitGroup{}
progressBarWG.Add(1)
defer progressBarWG.Wait()

// This is manually triggered after the Engine's Run() has completed,
// and things like a single Ctrl+C don't affect it. We use it to make
// sure that the progressbars finish updating with the latest execution
// state one last time, after the test run has finished.
progressCtx, progressCancel := context.WithCancel(globalCtx)
defer progressCancel()
initBar := execScheduler.GetInitProgressBar()
progressBarWG := &sync.WaitGroup{}
progressBarWG.Add(1)
go func() {
pbs := []*pb.ProgressBar{execScheduler.GetInitProgressBar()}
defer progressBarWG.Done()
pbs := []*pb.ProgressBar{initBar}
for _, s := range execScheduler.GetExecutors() {
pbs = append(pbs, s.GetProgress())
}
showProgress(progressCtx, c.gs, pbs, logger)
progressBarWG.Done()
}()

// Create all outputs.
Expand Down
8 changes: 8 additions & 0 deletions core/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ func (e *ExecutionScheduler) initVUsConcurrently(
for i := 0; i < concurrency; i++ {
go func() {
for range limiter {
// TODO: actually pass the context when we initialize VUs here,
// so we can cancel that initialization if there is an error,
// see https://github.com/grafana/k6/issues/2790
newVU, err := e.initVU(samplesOut, logger)
if err == nil {
e.state.AddInitializedVU(newVU)
Expand Down Expand Up @@ -281,6 +284,10 @@ func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- metrics
}),
)

// TODO: once VU initialization accepts a context, when a VU init fails,
// cancel the context and actually wait for all VUs to finish before this
// function returns - that way we won't have any trailing logs, see
// https://github.com/grafana/k6/issues/2790
for vuNum := uint64(0); vuNum < vusToInitialize; vuNum++ {
select {
case err := <-doneInits:
Expand Down Expand Up @@ -369,6 +376,7 @@ func (e *ExecutionScheduler) runExecutor(

// Run the ExecutionScheduler, funneling all generated metric samples through the supplied
// out channel.
//
//nolint:funlen
func (e *ExecutionScheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metrics.SampleContainer) error {
defer func() {
Expand Down

0 comments on commit a6823b2

Please sign in to comment.