Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PLZ: add sending events in common cases #283

Merged
merged 7 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions api/v1alpha1/k6conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,14 @@ func Initialize(k6 TestRunI) {
},
}

UpdateCondition(k6, CloudTestRunAborted, metav1.ConditionFalse)

// PLZ test run case
if len(k6.GetSpec().TestRunID) > 0 {
UpdateCondition(k6, CloudTestRun, metav1.ConditionTrue)
UpdateCondition(k6, CloudPLZTestRun, metav1.ConditionTrue)
UpdateCondition(k6, CloudTestRunCreated, metav1.ConditionTrue)
UpdateCondition(k6, CloudTestRunFinalized, metav1.ConditionFalse)
UpdateCondition(k6, CloudTestRunAborted, metav1.ConditionFalse)

k6.GetStatus().TestRunID = k6.GetSpec().TestRunID
} else {
Expand Down Expand Up @@ -151,17 +152,23 @@ func (k6status *TestRunStatus) SetIfNewer(proposedStatus TestRunStatus) (isNewer
isNewer = true
}
case "created":
if proposedStatus.Stage == "started" || proposedStatus.Stage == "finished" || proposedStatus.Stage == "error" {
if proposedStatus.Stage == "started" ||
proposedStatus.Stage == "finished" ||
proposedStatus.Stage == "error" ||
proposedStatus.Stage == "stopped" {
k6status.Stage = proposedStatus.Stage
isNewer = true
}
case "started":
if proposedStatus.Stage == "stopped" || proposedStatus.Stage == "finished" || proposedStatus.Stage == "error" {
if proposedStatus.Stage == "stopped" ||
proposedStatus.Stage == "finished" ||
proposedStatus.Stage == "error" {
k6status.Stage = proposedStatus.Stage
isNewer = true
}
case "stopped":
if proposedStatus.Stage == "finished" || proposedStatus.Stage == "error" {
if proposedStatus.Stage == "finished" ||
proposedStatus.Stage == "error" {
k6status.Stage = proposedStatus.Stage
isNewer = true
}
Expand Down
11 changes: 11 additions & 0 deletions api/v1alpha1/testruni.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,14 @@ type TestRunI interface {
GetSpec() *TestRunSpec
NamespacedName() types.NamespacedName
}

// TestRunID is a tiny helper to get k6 Cloud test run ID.
// PLZ test run will have test run ID as part of spec
// while cloud output test run as part of status.
func TestRunID(k6 TestRunI) string {
specId := k6.GetSpec().TestRunID
if len(specId) > 0 {
return specId
}
return k6.GetStatus().TestRunID
}
1 change: 0 additions & 1 deletion api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
errMessageTooLong = "Creation of %s takes too long: your configuration might be off. Check if %v were created successfully."
)

// It may take some time to retrieve inspect output so indicate with boolean if it's ready
// and use returnErr only for errors that require a change of behaviour. All other errors
// should just be logged.
Expand All @@ -37,16 +41,18 @@ func inspectTestRun(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI,
err error
)
if err = c.List(ctx, podList, listOpts); err != nil {
returnErr = err
log.Error(err, "Could not list pods")
return
}

if len(podList.Items) < 1 {
log.Info("No initializing pod found yet")
return
}

// there should be only 1 initializer pod
if podList.Items[0].Status.Phase != "Succeeded" {
if podList.Items[0].Status.Phase != corev1.PodSucceeded && podList.Items[0].Status.Phase != corev1.PodFailed {
log.Info("Waiting for initializing pod to finish")
return
}
Expand All @@ -61,12 +67,14 @@ func inspectTestRun(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI,
config, err := rest.InClusterConfig()
if err != nil {
log.Error(err, "unable to fetch in-cluster REST config")
returnErr = err
return
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Error(err, "unable to get access to clientset")
returnErr = err
return
}
req := clientset.CoreV1().Pods(k6.NamespacedName().Namespace).GetLogs(podList.Items[0].Name, &corev1.PodLogOptions{
Expand All @@ -78,12 +86,13 @@ func inspectTestRun(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI,
podLogs, err := req.Stream(ctx)
if err != nil {
log.Error(err, "unable to stream logs from the pod")
returnErr = err
return
}
defer podLogs.Close()

buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
_, returnErr = io.Copy(buf, podLogs)
if err != nil {
log.Error(err, "unable to copy logs from the pod")
return
Expand Down
38 changes: 29 additions & 9 deletions controllers/k6_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/go-logr/logr"
"github.com/grafana/k6-operator/api/v1alpha1"
"github.com/grafana/k6-operator/pkg/cloud"
"github.com/grafana/k6-operator/pkg/resources/jobs"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -20,7 +21,6 @@ import (
func CreateJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *TestRunReconciler) (ctrl.Result, error) {
var (
err error
res ctrl.Result
token string // only for cloud tests
)

Expand All @@ -39,7 +39,7 @@ func CreateJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *T
token, tokenReady, err = loadToken(ctx, log, r.Client, k6.GetSpec().Token, sOpts)
if err != nil {
// An error here means a very likely mis-configuration of the token.
// Consider updating status to error to let a user know quicker?
// TODO: update status to error to let a user know quicker
log.Error(err, "A problem while getting token.")
return ctrl.Result{}, nil
}
Expand All @@ -50,11 +50,20 @@ func CreateJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *T

log.Info("Creating test jobs")

if res, err = createJobSpecs(ctx, log, k6, r, token); err != nil {
if res, recheck, err := createJobSpecs(ctx, log, k6, r, token); err != nil {
if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) {
events := cloud.ErrorEvent(cloud.K6OperatorStartError).
WithDetail(fmt.Sprintf("Failed to create runner jobs: %v", err)).
WithAbort()
cloud.SendTestRunEvents(r.k6CloudClient, v1alpha1.TestRunID(k6), log, events)
}

return res, err
} else if recheck {
return res, nil
}

log.Info("Changing stage of K6 status to created")
log.Info("Changing stage of TestRun status to created")
k6.GetStatus().Stage = "created"

if updateHappened, err := r.UpdateStatus(ctx, k6, log); err != nil {
Expand All @@ -65,24 +74,35 @@ func CreateJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *T
return ctrl.Result{}, nil
}

func createJobSpecs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *TestRunReconciler, token string) (ctrl.Result, error) {
func createJobSpecs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *TestRunReconciler, token string) (ctrl.Result, bool, error) {
found := &batchv1.Job{}
namespacedName := types.NamespacedName{
Name: fmt.Sprintf("%s-1", k6.NamespacedName().Name),
Namespace: k6.NamespacedName().Namespace,
}

if err := r.Get(ctx, namespacedName, found); err == nil || !errors.IsNotFound(err) {
log.Info("Could not start a new test, Make sure you've deleted your previous run.")
return ctrl.Result{}, err
if err == nil {
err = fmt.Errorf("job with the name %s exists; make sure you've deleted your previous run", namespacedName.Name)
}
log.Info(err.Error())

// is it possible to implement this delay with resourceVersion of the job?
t, _ := v1alpha1.LastUpdate(k6, v1alpha1.CloudTestRun)
if time.Since(t).Seconds() <= 30 {
// try again before returning an error
return ctrl.Result{RequeueAfter: time.Second * 10}, true, nil
}

return ctrl.Result{}, false, err
}

for i := 1; i <= int(k6.GetSpec().Parallelism); i++ {
if err := launchTest(ctx, k6, i, log, r, token); err != nil {
return ctrl.Result{}, err
return ctrl.Result{}, false, err
}
}
return ctrl.Result{}, nil
return ctrl.Result{}, false, nil
}

func launchTest(ctx context.Context, k6 v1alpha1.TestRunI, index int, log logr.Logger, r *TestRunReconciler, token string) error {
Expand Down
19 changes: 17 additions & 2 deletions controllers/k6_finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/go-logr/logr"
"github.com/grafana/k6-operator/api/v1alpha1"
"github.com/grafana/k6-operator/pkg/cloud"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -35,15 +36,29 @@ func FinishJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *T
}

// TODO: We should distinguish between Suceeded/Failed/Unknown
var finished int32
var (
finished, failed int32
)
for _, job := range jl.Items {
if job.Status.Active != 0 {
continue
}
finished++

if job.Status.Failed > 0 {
failed++
}
}

log.Info(fmt.Sprintf("%d/%d jobs complete", finished, k6.GetSpec().Parallelism))
msg := fmt.Sprintf("%d/%d jobs complete, %d failed", finished, k6.GetSpec().Parallelism, failed)
log.Info(msg)

if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) && failed > 0 {
events := cloud.ErrorEvent(cloud.K6OperatorRunnerError).
WithDetail(msg).
WithAbort()
cloud.SendTestRunEvents(r.k6CloudClient, v1alpha1.TestRunID(k6), log, events)
}

if finished < k6.GetSpec().Parallelism {
return
Expand Down
27 changes: 20 additions & 7 deletions controllers/k6_initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,30 @@ func InitializeJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI,
return res, nil
}

func RunValidations(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *TestRunReconciler) (res ctrl.Result, err error) {
func RunValidations(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *TestRunReconciler) (
res ctrl.Result, ready bool, err error) {
// initializer is a quick job so check in frequently
res = ctrl.Result{RequeueAfter: time.Second * 5}

cli := types.ParseCLI(k6.GetSpec().Arguments)

inspectOutput, inspectReady, err := inspectTestRun(ctx, log, k6, r.Client)
if err != nil {
// Cloud output test run is not created yet at this point, so sending
// events is possible only for PLZ test run.
if v1alpha1.IsTrue(k6, v1alpha1.CloudPLZTestRun) {
// This error won't allow to start a test so let k6 Cloud know of it
events := cloud.ErrorEvent(cloud.K6OperatorStartError).
WithDetail(fmt.Sprintf("Failed to inspect the test script: %v", err)).
WithAbort()
cloud.SendTestRunEvents(r.k6CloudClient, v1alpha1.TestRunID(k6), log, events)
}

// inspectTestRun made a log message already so just return without requeue
return ctrl.Result{}, nil
return ctrl.Result{}, ready, err
}
if !inspectReady {
return res, nil
return res, ready, nil
}

log.Info(fmt.Sprintf("k6 inspect: %+v", inspectOutput))
Expand All @@ -73,11 +84,11 @@ func RunValidations(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI,
k6.GetStatus().Stage = "error"

if _, err := r.UpdateStatus(ctx, k6, log); err != nil {
return ctrl.Result{}, err
return ctrl.Result{}, ready, err
}

// Don't requeue in case of this error; unless it's made into a warning as described above.
return ctrl.Result{}, nil
return ctrl.Result{}, ready, nil
}

if cli.HasCloudOut {
Expand All @@ -94,10 +105,12 @@ func RunValidations(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI,
}

if _, err := r.UpdateStatus(ctx, k6, log); err != nil {
return ctrl.Result{}, err
return ctrl.Result{}, ready, err
}

return res, nil
ready = true

return res, ready, nil
}

// SetupCloudTest inspects the output of initializer and creates a new
Expand Down
22 changes: 21 additions & 1 deletion controllers/k6_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package controllers

import (
"context"
"errors"
"fmt"
"net/http"
"time"

"github.com/go-logr/logr"
"github.com/grafana/k6-operator/api/v1alpha1"
"github.com/grafana/k6-operator/pkg/cloud"
"github.com/grafana/k6-operator/pkg/resources/jobs"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -62,6 +64,24 @@ func StartJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *Te
log.Info(fmt.Sprintf("%d/%d runner pods ready", count, k6.GetSpec().Parallelism))

if count != int(k6.GetSpec().Parallelism) {
if t, ok := v1alpha1.LastUpdate(k6, v1alpha1.TestRunRunning); !ok {
// this should never happen
return res, errors.New("Cannot find condition TestRunRunning")
} else {
// let's try this approach
if time.Since(t).Minutes() > 5 {
msg := fmt.Sprintf(errMessageTooLong, "runner pods", "runner jobs and pods")
log.Info(msg)

if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) {
events := cloud.ErrorEvent(cloud.K6OperatorStartError).
WithDetail(msg).
WithAbort()
cloud.SendTestRunEvents(r.k6CloudClient, v1alpha1.TestRunID(k6), log, events)
}
}
}

return res, nil
}

Expand Down Expand Up @@ -99,7 +119,7 @@ func StartJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *Te

log.Info("Created starter job")

log.Info("Changing stage of K6 status to started")
log.Info("Changing stage of TestRun status to started")
k6.GetStatus().Stage = "started"
v1alpha1.UpdateCondition(k6, v1alpha1.TestRunRunning, metav1.ConditionTrue)

Expand Down
5 changes: 3 additions & 2 deletions controllers/k6_stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"context"
"time"

"github.com/go-logr/logr"
"github.com/grafana/k6-operator/api/v1alpha1"
Expand Down Expand Up @@ -56,15 +57,15 @@ func StopJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *Tes

log.Info("Created stop job")

log.Info("Changing stage of K6 status to stopped")
log.Info("Changing stage of TestRun status to stopped")
k6.GetStatus().Stage = "stopped"
v1alpha1.UpdateCondition(k6, v1alpha1.TestRunRunning, metav1.ConditionFalse)
v1alpha1.UpdateCondition(k6, v1alpha1.CloudTestRunAborted, metav1.ConditionTrue)

if updateHappened, err := r.UpdateStatus(ctx, k6, log); err != nil {
return ctrl.Result{}, err
} else if updateHappened {
return ctrl.Result{Requeue: true}, nil
return ctrl.Result{RequeueAfter: time.Second}, nil
}
return ctrl.Result{}, nil
}
Loading
Loading