From 32880af6082ab2ff6a51fa8b00ec80f9d9c1b178 Mon Sep 17 00:00:00 2001 From: Olha Yevtushenko Date: Thu, 31 Aug 2023 14:52:59 +0300 Subject: [PATCH 1/7] cloud: add initial sending of events --- controllers/k6_create.go | 7 +++ controllers/k6_finish.go | 17 ++++++- controllers/k6_initialize.go | 6 +++ pkg/cloud/test_runs.go | 21 +++++++++ pkg/cloud/types.go | 91 ++++++++++++++++++++++++++++++++++++ 5 files changed, 140 insertions(+), 2 deletions(-) diff --git a/controllers/k6_create.go b/controllers/k6_create.go index 9a08825c..5fb15be5 100644 --- a/controllers/k6_create.go +++ b/controllers/k6_create.go @@ -7,6 +7,7 @@ import ( "github.com/go-logr/logr" "github.com/grafana/k6-operator/api/v1alpha1" + "github.com/grafana/k6-operator/pkg/cloud" "github.com/grafana/k6-operator/pkg/resources/jobs" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -41,6 +42,12 @@ func CreateJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *T // An error here means a very likely mis-configuration of the token. // Consider updating status to error to let a user know quicker? log.Error(err, "A problem while getting token.") + + if k6.IsTrue(v1alpha1.CloudTestRun) { + events := cloud.ErrorEvent(cloud.K6OperatorStartError).WithDetail(fmt.Sprintf("Failed to retrieve token: %v", err)) + cloud.SendTestRunEvents(r.k6CloudClient, k6.Spec.TestRunID, log, events) + } + return ctrl.Result{}, nil } if !tokenReady { diff --git a/controllers/k6_finish.go b/controllers/k6_finish.go index 4344a072..5082bf35 100644 --- a/controllers/k6_finish.go +++ b/controllers/k6_finish.go @@ -6,6 +6,7 @@ import ( "github.com/go-logr/logr" "github.com/grafana/k6-operator/api/v1alpha1" + "github.com/grafana/k6-operator/pkg/cloud" batchv1 "k8s.io/api/batch/v1" "k8s.io/apimachinery/pkg/labels" "sigs.k8s.io/controller-runtime/pkg/client" @@ -35,15 +36,27 @@ func FinishJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *T } // TODO: We should distinguish between Suceeded/Failed/Unknown - var finished int32 + var ( + finished, failed int32 + ) for _, job := range jl.Items { if job.Status.Active != 0 { continue } finished++ + + if job.Status.Failed > 0 { + failed++ + } } - log.Info(fmt.Sprintf("%d/%d jobs complete", finished, k6.GetSpec().Parallelism)) + msg := fmt.Sprintf("%d/%d jobs complete, %d failed", finished, k6.GetSpec().Parallelism, failed) + log.Info(msg) + + if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) && failed > 0 { + events := cloud.ErrorEvent(cloud.K6OperatorRunnerError).WithDetail(msg) + cloud.SendTestRunEvents(r.k6CloudClient, k6.GetSpec().TestRunID, log, events) + } if finished < k6.GetSpec().Parallelism { return diff --git a/controllers/k6_initialize.go b/controllers/k6_initialize.go index 664aeb05..bef2e1e6 100644 --- a/controllers/k6_initialize.go +++ b/controllers/k6_initialize.go @@ -52,6 +52,12 @@ func RunValidations(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, inspectOutput, inspectReady, err := inspectTestRun(ctx, log, k6, r.Client) if err != nil { + if v1alpha1.IsTrue(k6, v1alpha1.CloudPLZTestRun) { + // This error won't allow to start a test so let k6 Cloud know of it + events := cloud.ErrorEvent(cloud.K6OperatorStartError).WithDetail(fmt.Sprintf("Failed to inspect the test script: %v", err)) + cloud.SendTestRunEvents(r.k6CloudClient, k6.GetSpec().TestRunID, log, events) + } + // inspectTestRun made a log message already so just return without requeue return ctrl.Result{}, nil } diff --git a/pkg/cloud/test_runs.go b/pkg/cloud/test_runs.go index 0650e656..8e7c5b78 100644 --- a/pkg/cloud/test_runs.go +++ b/pkg/cloud/test_runs.go @@ -120,3 +120,24 @@ func GetTestRunState(client *cloudapi.Client, refID string, log logr.Logger) (Te return TestRunStatus(trData.RunStatus), nil } + +// called by K6 controller +// If there's an error, it'll be logged. +func SendTestRunEvents(client *cloudapi.Client, refID string, log logr.Logger, events *Events) { + if len(*events) == 0 { + return + } + + url := fmt.Sprintf("%s/orchestrator/v1/testruns/%s/events", client.BaseURL(), refID) + + req, err := client.NewRequest("POST", url, events) + if err != nil { + log.Error(err, fmt.Sprintf("Failed to create events HTTP request %+v", events)) + return + } + + // status code is checked in Do + if err = client.Do(req, nil); err != nil { + log.Error(err, fmt.Sprintf("Failed to send events %+v", events)) + } +} diff --git a/pkg/cloud/types.go b/pkg/cloud/types.go index cc6b012d..a2d048c9 100644 --- a/pkg/cloud/types.go +++ b/pkg/cloud/types.go @@ -67,3 +67,94 @@ type PLZResources struct { CPU string `json:"cpu"` Memory string `json:"memory"` } + +type Events []*EventPayload + +type EventPayload struct { + EventType `json:"event_type"` + Event `json:"event"` +} + +type Event struct { + ErrorCode `json:"error_code,omitempty"` + Detail string `json:"error_detail,omitempty"` + Origin `json:"origin,omitempty"` + Reason string `json:"reason,omitempty"` +} + +type EventType string + +var ( + abortEvent = EventType("TestRunAbortEvent") + errorEvent = EventType("TestRunErrorEvent") +) + +type ErrorCode uint + +var ( + SetupError = ErrorCode(8030) + TeardownError = ErrorCode(8031) + OOMError = ErrorCode(8032) + PanicError = ErrorCode(8033) + UnknownError = ErrorCode(8034) + ScriptException = ErrorCode(8035) + + K6OperatorStartError = ErrorCode(8050) + K6OperatorAbortError = ErrorCode(8051) + K6OperatorRunnerError = ErrorCode(8052) +) + +type Origin string + +var ( + OriginUser = Origin("user") + OriginK6 = Origin("k6") +) + +// WithDetail sets detail only for the 1st event +func (e *Events) WithDetail(s string) *Events { + if len(*e) == 0 { + return e + } + + if (*e)[0].EventType == abortEvent { + (*e)[0].Reason = s + } else { + (*e)[0].Detail = s + } + return e +} + +// WithAbort adds abortEvent if errorEvent already exists +// func (e *Events) WithAbort(s string) *Events { +// if len(*e) == 0 { +// return +// } + +// if (*e)[0].EventType == errorEvent { +// ae := EventPayload{ +// EventType: abort, +// } +// } +// return e +// } + +func AbortEvent(o Origin) *Events { + e := Events([]*EventPayload{{ + EventType: abortEvent, + Event: Event{ + Origin: o, + }, + }}) + return &e +} + +func ErrorEvent(ec ErrorCode) *Events { + e := Events([]*EventPayload{{ + EventType: errorEvent, + Event: Event{ + ErrorCode: ec, + }, + }}) + return &e +} From 0dc73b1ac658e9722c419171091ec392617df32c Mon Sep 17 00:00:00 2001 From: Olha Yevtushenko Date: Tue, 19 Sep 2023 11:22:59 +0300 Subject: [PATCH 2/7] cloud, plz: add events on failed creation with simple timeout --- api/v1alpha1/zz_generated.deepcopy.go | 1 - controllers/k6_create.go | 10 ++++++++-- controllers/k6_start.go | 15 +++++++++++++++ pkg/cloud/test_runs.go | 4 ++-- 4 files changed, 25 insertions(+), 5 deletions(-) diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 8046580b..873c1f1c 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -1,5 +1,4 @@ //go:build !ignore_autogenerated -// +build !ignore_autogenerated /* diff --git a/controllers/k6_create.go b/controllers/k6_create.go index 5fb15be5..f76b49c3 100644 --- a/controllers/k6_create.go +++ b/controllers/k6_create.go @@ -43,9 +43,9 @@ func CreateJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *T // Consider updating status to error to let a user know quicker? log.Error(err, "A problem while getting token.") - if k6.IsTrue(v1alpha1.CloudTestRun) { + if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) { events := cloud.ErrorEvent(cloud.K6OperatorStartError).WithDetail(fmt.Sprintf("Failed to retrieve token: %v", err)) - cloud.SendTestRunEvents(r.k6CloudClient, k6.Spec.TestRunID, log, events) + cloud.SendTestRunEvents(r.k6CloudClient, k6.GetSpec().TestRunID, log, events) } return ctrl.Result{}, nil @@ -58,6 +58,12 @@ func CreateJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *T log.Info("Creating test jobs") if res, err = createJobSpecs(ctx, log, k6, r, token); err != nil { + + if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) { + events := cloud.ErrorEvent(cloud.K6OperatorStartError).WithDetail(fmt.Sprintf("Failed to create runner jobs: %v", err)) + cloud.SendTestRunEvents(r.k6CloudClient, k6.GetSpec().TestRunID, log, events) + } + return res, err } diff --git a/controllers/k6_start.go b/controllers/k6_start.go index 9308cffb..6722bcef 100644 --- a/controllers/k6_start.go +++ b/controllers/k6_start.go @@ -2,12 +2,14 @@ package controllers import ( "context" + "errors" "fmt" "net/http" "time" "github.com/go-logr/logr" "github.com/grafana/k6-operator/api/v1alpha1" + "github.com/grafana/k6-operator/pkg/cloud" "github.com/grafana/k6-operator/pkg/resources/jobs" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -62,6 +64,19 @@ func StartJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *Te log.Info(fmt.Sprintf("%d/%d runner pods ready", count, k6.GetSpec().Parallelism)) if count != int(k6.GetSpec().Parallelism) { + if t, ok := v1alpha1.LastUpdate(k6, v1alpha1.TestRunRunning); !ok { + // this should never happen + return res, errors.New("Cannot find condition TestRunRunning") + } else { + // let's try this approach + if time.Since(t).Minutes() > 5 { + if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) { + events := cloud.ErrorEvent(cloud.K6OperatorStartError).WithDetail("Creation of runner pods takes too long: perhaps, something is off with your configuration. Check if runner jobs and pods were created successfully.") + cloud.SendTestRunEvents(r.k6CloudClient, k6.GetSpec().TestRunID, log, events) + } + } + } + return res, nil } diff --git a/pkg/cloud/test_runs.go b/pkg/cloud/test_runs.go index 8e7c5b78..870364b5 100644 --- a/pkg/cloud/test_runs.go +++ b/pkg/cloud/test_runs.go @@ -128,9 +128,9 @@ func SendTestRunEvents(client *cloudapi.Client, refID string, log logr.Logger, e return } - url := fmt.Sprintf("%s/orchestrator/v1/testruns/%s/events", client.BaseURL(), refID) - + url := fmt.Sprintf("%s/orchestrator/v1/testruns/%s/events", strings.TrimSuffix(client.BaseURL(), "/v1"), refID) req, err := client.NewRequest("POST", url, events) + if err != nil { log.Error(err, fmt.Sprintf("Failed to create events HTTP request %+v", events)) return From 8b8436c7928753148f7e927ca4f46bd994fb97e2 Mon Sep 17 00:00:00 2001 From: Olha Yevtushenko Date: Tue, 31 Oct 2023 17:55:21 +0200 Subject: [PATCH 3/7] cloud, plz: add public detail and abort to events --- api/v1alpha1/testruni.go | 11 ++++++++ controllers/common.go | 2 +- controllers/k6_create.go | 21 +++++++------- controllers/k6_finish.go | 6 ++-- controllers/k6_initialize.go | 8 ++++-- controllers/k6_start.go | 6 ++-- controllers/testrun_controller.go | 2 +- pkg/cloud/test_runs.go | 4 +-- pkg/cloud/types.go | 46 +++++++++++++++++-------------- 9 files changed, 64 insertions(+), 42 deletions(-) diff --git a/api/v1alpha1/testruni.go b/api/v1alpha1/testruni.go index 5320e296..0f88463f 100644 --- a/api/v1alpha1/testruni.go +++ b/api/v1alpha1/testruni.go @@ -20,3 +20,14 @@ type TestRunI interface { GetSpec() *TestRunSpec NamespacedName() types.NamespacedName } + +// TestRunID is a tiny helper to get k6 Cloud test run ID. +// PLZ test run will have test run ID as part of spec +// while cloud output test run as part of status. +func TestRunID(k6 TestRunI) string { + specId := k6.GetSpec().TestRunID + if len(specId) > 0 { + return specId + } + return k6.GetStatus().TestRunID +} diff --git a/controllers/common.go b/controllers/common.go index 2a7ecbee..b409b4c6 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -46,7 +46,7 @@ func inspectTestRun(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, } // there should be only 1 initializer pod - if podList.Items[0].Status.Phase != "Succeeded" { + if podList.Items[0].Status.Phase != corev1.PodSucceeded && podList.Items[0].Status.Phase != corev1.PodFailed { log.Info("Waiting for initializing pod to finish") return } diff --git a/controllers/k6_create.go b/controllers/k6_create.go index f76b49c3..bca23c5c 100644 --- a/controllers/k6_create.go +++ b/controllers/k6_create.go @@ -40,14 +40,8 @@ func CreateJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *T token, tokenReady, err = loadToken(ctx, log, r.Client, k6.GetSpec().Token, sOpts) if err != nil { // An error here means a very likely mis-configuration of the token. - // Consider updating status to error to let a user know quicker? + // TODO: update status to error to let a user know quicker log.Error(err, "A problem while getting token.") - - if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) { - events := cloud.ErrorEvent(cloud.K6OperatorStartError).WithDetail(fmt.Sprintf("Failed to retrieve token: %v", err)) - cloud.SendTestRunEvents(r.k6CloudClient, k6.GetSpec().TestRunID, log, events) - } - return ctrl.Result{}, nil } if !tokenReady { @@ -58,10 +52,11 @@ func CreateJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *T log.Info("Creating test jobs") if res, err = createJobSpecs(ctx, log, k6, r, token); err != nil { - if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) { - events := cloud.ErrorEvent(cloud.K6OperatorStartError).WithDetail(fmt.Sprintf("Failed to create runner jobs: %v", err)) - cloud.SendTestRunEvents(r.k6CloudClient, k6.GetSpec().TestRunID, log, events) + events := cloud.ErrorEvent(cloud.K6OperatorStartError). + WithDetail(fmt.Sprintf("Failed to create runner jobs: %v", err)). + WithAbort() + cloud.SendTestRunEvents(r.k6CloudClient, v1alpha1.TestRunID(k6), log, events) } return res, err @@ -86,7 +81,11 @@ func createJobSpecs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, } if err := r.Get(ctx, namespacedName, found); err == nil || !errors.IsNotFound(err) { - log.Info("Could not start a new test, Make sure you've deleted your previous run.") + if err == nil { + err = fmt.Errorf("job with the name %s exists; make sure you've deleted your previous run", namespacedName.Name) + } + + log.Info(err.Error()) return ctrl.Result{}, err } diff --git a/controllers/k6_finish.go b/controllers/k6_finish.go index 5082bf35..6ae2acb1 100644 --- a/controllers/k6_finish.go +++ b/controllers/k6_finish.go @@ -54,8 +54,10 @@ func FinishJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *T log.Info(msg) if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) && failed > 0 { - events := cloud.ErrorEvent(cloud.K6OperatorRunnerError).WithDetail(msg) - cloud.SendTestRunEvents(r.k6CloudClient, k6.GetSpec().TestRunID, log, events) + events := cloud.ErrorEvent(cloud.K6OperatorRunnerError). + WithDetail(msg). + WithAbort() + cloud.SendTestRunEvents(r.k6CloudClient, v1alpha1.TestRunID(k6), log, events) } if finished < k6.GetSpec().Parallelism { diff --git a/controllers/k6_initialize.go b/controllers/k6_initialize.go index bef2e1e6..aeb1f41f 100644 --- a/controllers/k6_initialize.go +++ b/controllers/k6_initialize.go @@ -52,10 +52,14 @@ func RunValidations(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, inspectOutput, inspectReady, err := inspectTestRun(ctx, log, k6, r.Client) if err != nil { + // Cloud output test run is not created yet at this point, so sending + // events is possible only for PLZ test run. if v1alpha1.IsTrue(k6, v1alpha1.CloudPLZTestRun) { // This error won't allow to start a test so let k6 Cloud know of it - events := cloud.ErrorEvent(cloud.K6OperatorStartError).WithDetail(fmt.Sprintf("Failed to inspect the test script: %v", err)) - cloud.SendTestRunEvents(r.k6CloudClient, k6.GetSpec().TestRunID, log, events) + events := cloud.ErrorEvent(cloud.K6OperatorStartError). + WithDetail(fmt.Sprintf("Failed to inspect the test script: %v", err)). + WithAbort() + cloud.SendTestRunEvents(r.k6CloudClient, v1alpha1.TestRunID(k6), log, events) } // inspectTestRun made a log message already so just return without requeue diff --git a/controllers/k6_start.go b/controllers/k6_start.go index 6722bcef..ced7fccc 100644 --- a/controllers/k6_start.go +++ b/controllers/k6_start.go @@ -71,8 +71,10 @@ func StartJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *Te // let's try this approach if time.Since(t).Minutes() > 5 { if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) { - events := cloud.ErrorEvent(cloud.K6OperatorStartError).WithDetail("Creation of runner pods takes too long: perhaps, something is off with your configuration. Check if runner jobs and pods were created successfully.") - cloud.SendTestRunEvents(r.k6CloudClient, k6.GetSpec().TestRunID, log, events) + events := cloud.ErrorEvent(cloud.K6OperatorStartError). + WithDetail("Creation of runner pods takes too long: your configuration might be off. Check if runner jobs and pods were created successfully."). + WithAbort() + cloud.SendTestRunEvents(r.k6CloudClient, v1alpha1.TestRunID(k6), log, events) } } } diff --git a/controllers/testrun_controller.go b/controllers/testrun_controller.go index 5e742ae6..029563a4 100644 --- a/controllers/testrun_controller.go +++ b/controllers/testrun_controller.go @@ -82,7 +82,7 @@ func (r *TestRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct func (r *TestRunReconciler) reconcile(ctx context.Context, req ctrl.Request, log logr.Logger, k6 v1alpha1.TestRunI) (ctrl.Result, error) { var err error - if v1alpha1.IsTrue(k6, v1alpha1.CloudPLZTestRun) { + if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) { // bootstrap the client found, err := r.createClient(ctx, k6, log) if err != nil { diff --git a/pkg/cloud/test_runs.go b/pkg/cloud/test_runs.go index 870364b5..699ac18e 100644 --- a/pkg/cloud/test_runs.go +++ b/pkg/cloud/test_runs.go @@ -110,7 +110,7 @@ func GetTestRunData(client *cloudapi.Client, refID string) (*TestRunData, error) return getTestRun(client, url) } -// called by K6 controller +// called by TestRun controller func GetTestRunState(client *cloudapi.Client, refID string, log logr.Logger) (TestRunStatus, error) { url := fmt.Sprintf("%s/loadtests/v4/test_runs(%s)?$select=id,run_status", ApiURL(client.BaseURL()), refID) trData, err := getTestRun(client, url) @@ -121,7 +121,7 @@ func GetTestRunState(client *cloudapi.Client, refID string, log logr.Logger) (Te return TestRunStatus(trData.RunStatus), nil } -// called by K6 controller +// called by TestRun controller // If there's an error, it'll be logged. func SendTestRunEvents(client *cloudapi.Client, refID string, log logr.Logger, events *Events) { if len(*events) == 0 { diff --git a/pkg/cloud/types.go b/pkg/cloud/types.go index a2d048c9..74836123 100644 --- a/pkg/cloud/types.go +++ b/pkg/cloud/types.go @@ -76,10 +76,14 @@ type EventPayload struct { } type Event struct { - ErrorCode `json:"error_code,omitempty"` - Detail string `json:"error_detail,omitempty"` Origin `json:"origin,omitempty"` - Reason string `json:"reason,omitempty"` + ErrorCode `json:"error_code,omitempty"` + + // reason is used for abort events, + // while details are for any non-abort event + Reason string `json:"reason,omitempty"` + Detail string `json:"error_detail,omitempty"` + PublicDetail string `json:"error_detail_public,omitempty"` } type EventType string @@ -111,7 +115,8 @@ var ( OriginK6 = Origin("k6") ) -// WithDetail sets detail only for the 1st event +// WithDetail sets detail only for the 1st event. +// If it's abort, WithDetail sets reason field. func (e *Events) WithDetail(s string) *Events { if len(*e) == 0 { return e @@ -121,32 +126,31 @@ func (e *Events) WithDetail(s string) *Events { (*e)[0].Reason = s } else { (*e)[0].Detail = s + (*e)[0].PublicDetail = s } return e } -// WithAbort adds abortEvent if errorEvent already exists -// func (e *Events) WithAbort(s string) *Events { -// if len(*e) == 0 { -// return -// } - -// if (*e)[0].EventType == errorEvent { -// ae := EventPayload{ -// EventType: abort, -// } -// } -// return e -// } +// WithAbort adds abortEvent to errorEvent if it already exists. +func (e *Events) WithAbort() *Events { + if len(*e) == 0 { + return e + } -func AbortEvent(o Origin) *Events { - e := Events([]*EventPayload{{ + if (*e)[0].EventType == errorEvent { + *e = append(*e, AbortEvent(OriginUser)) + } + return e +} + +func AbortEvent(o Origin) *EventPayload { + e := &EventPayload{ EventType: abortEvent, Event: Event{ Origin: o, }, - }}) - return &e + } + return e } func ErrorEvent(ec ErrorCode) *Events { From 3bdb25183201afe66759bb52b0fe10ee48f89cb5 Mon Sep 17 00:00:00 2001 From: Olha Yevtushenko Date: Fri, 10 Nov 2023 13:58:39 +0200 Subject: [PATCH 4/7] cloud, plz: fix for job duplicate check --- controllers/k6_create.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/controllers/k6_create.go b/controllers/k6_create.go index bca23c5c..23e2fdab 100644 --- a/controllers/k6_create.go +++ b/controllers/k6_create.go @@ -21,7 +21,6 @@ import ( func CreateJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *TestRunReconciler) (ctrl.Result, error) { var ( err error - res ctrl.Result token string // only for cloud tests ) @@ -51,7 +50,7 @@ func CreateJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *T log.Info("Creating test jobs") - if res, err = createJobSpecs(ctx, log, k6, r, token); err != nil { + if res, recheck, err := createJobSpecs(ctx, log, k6, r, token); err != nil { if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) { events := cloud.ErrorEvent(cloud.K6OperatorStartError). WithDetail(fmt.Sprintf("Failed to create runner jobs: %v", err)). @@ -60,9 +59,11 @@ func CreateJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *T } return res, err + } else if recheck { + return res, nil } - log.Info("Changing stage of K6 status to created") + log.Info("Changing stage of TestRun status to created") k6.GetStatus().Stage = "created" if updateHappened, err := r.UpdateStatus(ctx, k6, log); err != nil { @@ -73,7 +74,7 @@ func CreateJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *T return ctrl.Result{}, nil } -func createJobSpecs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *TestRunReconciler, token string) (ctrl.Result, error) { +func createJobSpecs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *TestRunReconciler, token string) (ctrl.Result, bool, error) { found := &batchv1.Job{} namespacedName := types.NamespacedName{ Name: fmt.Sprintf("%s-1", k6.NamespacedName().Name), @@ -84,17 +85,24 @@ func createJobSpecs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, if err == nil { err = fmt.Errorf("job with the name %s exists; make sure you've deleted your previous run", namespacedName.Name) } - log.Info(err.Error()) - return ctrl.Result{}, err + + // is it possible to implement this delay with resourceVersion of the job? + t, _ := v1alpha1.LastUpdate(k6, v1alpha1.CloudTestRun) + if time.Since(t).Seconds() <= 30 { + // try again before returning an error + return ctrl.Result{RequeueAfter: time.Second * 10}, true, nil + } + + return ctrl.Result{}, false, err } for i := 1; i <= int(k6.GetSpec().Parallelism); i++ { if err := launchTest(ctx, k6, i, log, r, token); err != nil { - return ctrl.Result{}, err + return ctrl.Result{}, false, err } } - return ctrl.Result{}, nil + return ctrl.Result{}, false, nil } func launchTest(ctx context.Context, k6 v1alpha1.TestRunI, index int, log logr.Logger, r *TestRunReconciler, token string) error { From c7da10fedbd374c985e695c9d18a00ee7f2191ef Mon Sep 17 00:00:00 2001 From: Olha Yevtushenko Date: Tue, 21 Nov 2023 21:28:29 +0200 Subject: [PATCH 5/7] cloud, plz: check for abort at the start of each reconcile --- api/v1alpha1/k6conditions.go | 15 +++++++++++---- controllers/k6_start.go | 7 +++++-- controllers/k6_stop.go | 5 +++-- controllers/testrun_controller.go | 28 ++++++++++++++++++---------- pkg/cloud/test_runs.go | 2 ++ 5 files changed, 39 insertions(+), 18 deletions(-) diff --git a/api/v1alpha1/k6conditions.go b/api/v1alpha1/k6conditions.go index 71d7df03..6a8862c2 100644 --- a/api/v1alpha1/k6conditions.go +++ b/api/v1alpha1/k6conditions.go @@ -73,13 +73,14 @@ func Initialize(k6 TestRunI) { }, } + UpdateCondition(k6, CloudTestRunAborted, metav1.ConditionFalse) + // PLZ test run case if len(k6.GetSpec().TestRunID) > 0 { UpdateCondition(k6, CloudTestRun, metav1.ConditionTrue) UpdateCondition(k6, CloudPLZTestRun, metav1.ConditionTrue) UpdateCondition(k6, CloudTestRunCreated, metav1.ConditionTrue) UpdateCondition(k6, CloudTestRunFinalized, metav1.ConditionFalse) - UpdateCondition(k6, CloudTestRunAborted, metav1.ConditionFalse) k6.GetStatus().TestRunID = k6.GetSpec().TestRunID } else { @@ -151,17 +152,23 @@ func (k6status *TestRunStatus) SetIfNewer(proposedStatus TestRunStatus) (isNewer isNewer = true } case "created": - if proposedStatus.Stage == "started" || proposedStatus.Stage == "finished" || proposedStatus.Stage == "error" { + if proposedStatus.Stage == "started" || + proposedStatus.Stage == "finished" || + proposedStatus.Stage == "error" || + proposedStatus.Stage == "stopped" { k6status.Stage = proposedStatus.Stage isNewer = true } case "started": - if proposedStatus.Stage == "stopped" || proposedStatus.Stage == "finished" || proposedStatus.Stage == "error" { + if proposedStatus.Stage == "stopped" || + proposedStatus.Stage == "finished" || + proposedStatus.Stage == "error" { k6status.Stage = proposedStatus.Stage isNewer = true } case "stopped": - if proposedStatus.Stage == "finished" || proposedStatus.Stage == "error" { + if proposedStatus.Stage == "finished" || + proposedStatus.Stage == "error" { k6status.Stage = proposedStatus.Stage isNewer = true } diff --git a/controllers/k6_start.go b/controllers/k6_start.go index ced7fccc..b8a46224 100644 --- a/controllers/k6_start.go +++ b/controllers/k6_start.go @@ -70,9 +70,12 @@ func StartJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *Te } else { // let's try this approach if time.Since(t).Minutes() > 5 { + msg := "Creation of runner pods takes too long: your configuration might be off. Check if runner jobs and pods were created successfully." + log.Info(msg) + if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) { events := cloud.ErrorEvent(cloud.K6OperatorStartError). - WithDetail("Creation of runner pods takes too long: your configuration might be off. Check if runner jobs and pods were created successfully."). + WithDetail(msg). WithAbort() cloud.SendTestRunEvents(r.k6CloudClient, v1alpha1.TestRunID(k6), log, events) } @@ -116,7 +119,7 @@ func StartJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *Te log.Info("Created starter job") - log.Info("Changing stage of K6 status to started") + log.Info("Changing stage of TestRun status to started") k6.GetStatus().Stage = "started" v1alpha1.UpdateCondition(k6, v1alpha1.TestRunRunning, metav1.ConditionTrue) diff --git a/controllers/k6_stop.go b/controllers/k6_stop.go index 2c13d9c9..3df8acd8 100644 --- a/controllers/k6_stop.go +++ b/controllers/k6_stop.go @@ -2,6 +2,7 @@ package controllers import ( "context" + "time" "github.com/go-logr/logr" "github.com/grafana/k6-operator/api/v1alpha1" @@ -56,7 +57,7 @@ func StopJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *Tes log.Info("Created stop job") - log.Info("Changing stage of K6 status to stopped") + log.Info("Changing stage of TestRun status to stopped") k6.GetStatus().Stage = "stopped" v1alpha1.UpdateCondition(k6, v1alpha1.TestRunRunning, metav1.ConditionFalse) v1alpha1.UpdateCondition(k6, v1alpha1.CloudTestRunAborted, metav1.ConditionTrue) @@ -64,7 +65,7 @@ func StopJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *Tes if updateHappened, err := r.UpdateStatus(ctx, k6, log); err != nil { return ctrl.Result{}, err } else if updateHappened { - return ctrl.Result{Requeue: true}, nil + return ctrl.Result{RequeueAfter: time.Second}, nil } return ctrl.Result{}, nil } diff --git a/controllers/testrun_controller.go b/controllers/testrun_controller.go index 029563a4..1d689a38 100644 --- a/controllers/testrun_controller.go +++ b/controllers/testrun_controller.go @@ -16,7 +16,6 @@ package controllers import ( "context" - "errors" "fmt" "time" @@ -100,6 +99,14 @@ func (r *TestRunReconciler) reconcile(ctx context.Context, req ctrl.Request, log // Decision making here is now a mix between stages and conditions. // TODO: refactor further. + if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) && v1alpha1.IsFalse(k6, v1alpha1.CloudTestRunAborted) { + // check in with the BE for status + if r.ShouldAbort(ctx, k6, log) { + log.Info("Received an abort signal from the k6 Cloud: stopping the test.") + return StopJobs(ctx, log, k6, r) + } + } + switch k6.GetStatus().Stage { case "": log.Info("Initialize test") @@ -110,7 +117,7 @@ func (r *TestRunReconciler) reconcile(ctx context.Context, req ctrl.Request, log return ctrl.Result{}, err } - log.Info("Changing stage of K6 status to initialization") + log.Info("Changing stage of TestRun status to initialization") k6.GetStatus().Stage = "initialization" if updateHappened, err := r.UpdateStatus(ctx, k6, log); err != nil { @@ -129,7 +136,7 @@ func (r *TestRunReconciler) reconcile(ctx context.Context, req ctrl.Request, log if v1alpha1.IsFalse(k6, v1alpha1.CloudTestRun) { // RunValidations has already happened and this is not a // cloud test: we can move on - log.Info("Changing stage of K6 status to initialized") + log.Info("Changing stage of TestRun status to initialized") k6.GetStatus().Stage = "initialized" @@ -147,7 +154,7 @@ func (r *TestRunReconciler) reconcile(ctx context.Context, req ctrl.Request, log } else { // if test run was created, then only changing status is left - log.Info("Changing stage of K6 status to initialized") + log.Info("Changing stage of TestRun status to initialized") k6.GetStatus().Stage = "initialized" @@ -179,7 +186,8 @@ func (r *TestRunReconciler) reconcile(ctx context.Context, req ctrl.Request, log // wait for the test to finish if !FinishJobs(ctx, log, k6, r) { - if v1alpha1.IsTrue(k6, v1alpha1.CloudPLZTestRun) && v1alpha1.IsFalse(k6, v1alpha1.CloudTestRunAborted) { + // TODO: confirm if this check is needed given the check in the beginning of reconcile + if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) && v1alpha1.IsFalse(k6, v1alpha1.CloudTestRunAborted) { // check in with the BE for status if r.ShouldAbort(ctx, k6, log) { log.Info("Received an abort signal from the k6 Cloud: stopping the test.") @@ -201,7 +209,7 @@ func (r *TestRunReconciler) reconcile(ctx context.Context, req ctrl.Request, log if v1alpha1.IsTrue(k6, v1alpha1.TestRunRunning) { v1alpha1.UpdateCondition(k6, v1alpha1.TestRunRunning, metav1.ConditionFalse) - log.Info("Changing stage of K6 status to stopped") + log.Info("Changing stage of TestRun status to stopped") k6.GetStatus().Stage = "stopped" _, err := r.UpdateStatus(ctx, k6, log) @@ -254,7 +262,7 @@ func (r *TestRunReconciler) reconcile(ctx context.Context, req ctrl.Request, log } } - log.Info("Changing stage of K6 status to finished") + log.Info("Changing stage of TestRun status to finished") k6.GetStatus().Stage = "finished" _, err := r.UpdateStatus(ctx, k6, log) @@ -355,12 +363,12 @@ func (r *TestRunReconciler) UpdateStatus(ctx context.Context, k6 v1alpha1.TestRu // cause a forced stop. It is meant to be used only by PLZ test runs. func (r *TestRunReconciler) ShouldAbort(ctx context.Context, k6 v1alpha1.TestRunI, log logr.Logger) bool { // sanity check - if len(k6.GetStatus().TestRunID) == 0 { - log.Error(errors.New("empty test run ID"), "Trying to get state of test run with empty test run ID") + if len(v1alpha1.TestRunID(k6)) == 0 { + // log.Error(errors.New("empty test run ID"), "Trying to get state of test run with empty test run ID") return false } - status, err := cloud.GetTestRunState(r.k6CloudClient, k6.GetStatus().TestRunID, log) + status, err := cloud.GetTestRunState(r.k6CloudClient, v1alpha1.TestRunID(k6), log) if err != nil { log.Error(err, "Failed to get test run state.") return false diff --git a/pkg/cloud/test_runs.go b/pkg/cloud/test_runs.go index 699ac18e..26c74732 100644 --- a/pkg/cloud/test_runs.go +++ b/pkg/cloud/test_runs.go @@ -136,6 +136,8 @@ func SendTestRunEvents(client *cloudapi.Client, refID string, log logr.Logger, e return } + log.Info(fmt.Sprintf("Sending events to k6 Cloud %+v", *events)) + // status code is checked in Do if err = client.Do(req, nil); err != nil { log.Error(err, fmt.Sprintf("Failed to send events %+v", events)) From e4c6403f75012b0dfb0970e198a4c66dbbd3e11c Mon Sep 17 00:00:00 2001 From: Olha Yevtushenko Date: Tue, 28 Nov 2023 18:17:06 +0200 Subject: [PATCH 6/7] add consistent logic on error with initializer spec --- controllers/common.go | 4 ++++ controllers/k6_initialize.go | 17 ++++++++++------- controllers/k6_start.go | 2 +- controllers/testrun_controller.go | 24 ++++++++++++++++++++++-- 4 files changed, 37 insertions(+), 10 deletions(-) diff --git a/controllers/common.go b/controllers/common.go index b409b4c6..19e90876 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -19,6 +19,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +const ( + errMessageTooLong = "Creation of %s takes too long: your configuration might be off. Check if %v were created successfully." +) + // It may take some time to retrieve inspect output so indicate with boolean if it's ready // and use returnErr only for errors that require a change of behaviour. All other errors // should just be logged. diff --git a/controllers/k6_initialize.go b/controllers/k6_initialize.go index aeb1f41f..a8b2469c 100644 --- a/controllers/k6_initialize.go +++ b/controllers/k6_initialize.go @@ -44,7 +44,8 @@ func InitializeJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, return res, nil } -func RunValidations(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *TestRunReconciler) (res ctrl.Result, err error) { +func RunValidations(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *TestRunReconciler) ( + res ctrl.Result, ready bool, err error) { // initializer is a quick job so check in frequently res = ctrl.Result{RequeueAfter: time.Second * 5} @@ -63,10 +64,10 @@ func RunValidations(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, } // inspectTestRun made a log message already so just return without requeue - return ctrl.Result{}, nil + return ctrl.Result{}, ready, err } if !inspectReady { - return res, nil + return res, ready, nil } log.Info(fmt.Sprintf("k6 inspect: %+v", inspectOutput)) @@ -83,11 +84,11 @@ func RunValidations(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, k6.GetStatus().Stage = "error" if _, err := r.UpdateStatus(ctx, k6, log); err != nil { - return ctrl.Result{}, err + return ctrl.Result{}, ready, err } // Don't requeue in case of this error; unless it's made into a warning as described above. - return ctrl.Result{}, nil + return ctrl.Result{}, ready, nil } if cli.HasCloudOut { @@ -104,10 +105,12 @@ func RunValidations(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, } if _, err := r.UpdateStatus(ctx, k6, log); err != nil { - return ctrl.Result{}, err + return ctrl.Result{}, ready, err } - return res, nil + ready = true + + return res, ready, nil } // SetupCloudTest inspects the output of initializer and creates a new diff --git a/controllers/k6_start.go b/controllers/k6_start.go index b8a46224..3e1199d6 100644 --- a/controllers/k6_start.go +++ b/controllers/k6_start.go @@ -70,7 +70,7 @@ func StartJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *Te } else { // let's try this approach if time.Since(t).Minutes() > 5 { - msg := "Creation of runner pods takes too long: your configuration might be off. Check if runner jobs and pods were created successfully." + msg := fmt.Sprintf(errMessageTooLong, "runner pods", "runner jobs and pods") log.Info(msg) if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) { diff --git a/controllers/testrun_controller.go b/controllers/testrun_controller.go index 1d689a38..d6b1f51e 100644 --- a/controllers/testrun_controller.go +++ b/controllers/testrun_controller.go @@ -16,6 +16,7 @@ package controllers import ( "context" + "errors" "fmt" "time" @@ -129,8 +130,27 @@ func (r *TestRunReconciler) reconcile(ctx context.Context, req ctrl.Request, log return ctrl.Result{}, nil case "initialization": - if v1alpha1.IsUnknown(k6, v1alpha1.CloudTestRun) { - return RunValidations(ctx, log, k6, r) + res, ready, err := RunValidations(ctx, log, k6, r) + if err != nil || !ready { + if t, ok := v1alpha1.LastUpdate(k6, v1alpha1.TestRunRunning); !ok { + // this should never happen + return res, errors.New("Cannot find condition TestRunRunning") + } else { + // let's try this approach + if time.Since(t).Minutes() > 5 { + msg := fmt.Sprintf(errMessageTooLong, "initializer pod", "initializer job and pod") + log.Info(msg) + + if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) { + events := cloud.ErrorEvent(cloud.K6OperatorStartError). + WithDetail(msg). + WithAbort() + cloud.SendTestRunEvents(r.k6CloudClient, v1alpha1.TestRunID(k6), log, events) + } + } + } + + return res, err } if v1alpha1.IsFalse(k6, v1alpha1.CloudTestRun) { From d260d6c2528805df3196935300fc52ac2699a455 Mon Sep 17 00:00:00 2001 From: Olha Yevtushenko Date: Wed, 6 Dec 2023 11:39:17 +0200 Subject: [PATCH 7/7] extend error propagation from initializer --- controllers/common.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/controllers/common.go b/controllers/common.go index 19e90876..c90f2b8e 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -41,9 +41,11 @@ func inspectTestRun(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, err error ) if err = c.List(ctx, podList, listOpts); err != nil { + returnErr = err log.Error(err, "Could not list pods") return } + if len(podList.Items) < 1 { log.Info("No initializing pod found yet") return @@ -65,12 +67,14 @@ func inspectTestRun(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, config, err := rest.InClusterConfig() if err != nil { log.Error(err, "unable to fetch in-cluster REST config") + returnErr = err return } clientset, err := kubernetes.NewForConfig(config) if err != nil { log.Error(err, "unable to get access to clientset") + returnErr = err return } req := clientset.CoreV1().Pods(k6.NamespacedName().Namespace).GetLogs(podList.Items[0].Name, &corev1.PodLogOptions{ @@ -82,12 +86,13 @@ func inspectTestRun(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, podLogs, err := req.Stream(ctx) if err != nil { log.Error(err, "unable to stream logs from the pod") + returnErr = err return } defer podLogs.Close() buf := new(bytes.Buffer) - _, err = io.Copy(buf, podLogs) + _, returnErr = io.Copy(buf, podLogs) if err != nil { log.Error(err, "unable to copy logs from the pod") return