forked from litmuschaos/litmus
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Litmus 3.0: Added enhancements in subscriber for Litmus 3.0 (li…
…tmuschaos#4027) * feat: Litmus 3.0: Added subscriber base setup Signed-off-by: Amit Kumar Das <[email protected]> * feat: Litmus 3.0: Added events for chaosengine and workflows Signed-off-by: Amit Kumar Das <[email protected]> * feat: Litmus 3.0: Added logs and minor changes Signed-off-by: Amit Kumar Das <[email protected]> * updated logs Signed-off-by: Amit Kumar Das <[email protected]> --------- Signed-off-by: Amit Kumar Das <[email protected]> Co-authored-by: Saranya Jena <[email protected]>
- Loading branch information
1 parent
d8d4f14
commit b416043
Showing
18 changed files
with
2,632 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
module subscriber | ||
|
||
go 1.18 | ||
|
||
require ( | ||
github.com/argoproj/argo-workflows/v3 v3.3.1 | ||
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 | ||
github.com/gorilla/websocket v1.5.0 | ||
github.com/kelseyhightower/envconfig v1.4.0 | ||
github.com/litmuschaos/chaos-operator v0.0.0-20230109130222-de7c74a937a9 | ||
github.com/sirupsen/logrus v1.8.1 | ||
gopkg.in/yaml.v2 v2.4.0 | ||
k8s.io/api v0.23.3 | ||
k8s.io/apimachinery v0.23.3 | ||
k8s.io/client-go v12.0.0+incompatible | ||
) | ||
|
||
require ( | ||
github.com/PuerkitoBio/purell v1.1.1 // indirect | ||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect | ||
github.com/davecgh/go-spew v1.1.1 // indirect | ||
github.com/emicklei/go-restful v2.15.0+incompatible // indirect | ||
github.com/go-logr/logr v1.2.2 // indirect | ||
github.com/go-openapi/jsonpointer v0.19.5 // indirect | ||
github.com/go-openapi/jsonreference v0.19.6 // indirect | ||
github.com/go-openapi/swag v0.19.15 // indirect | ||
github.com/gogo/protobuf v1.3.2 // indirect | ||
github.com/golang/protobuf v1.5.2 // indirect | ||
github.com/google/go-cmp v0.5.7 // indirect | ||
github.com/google/gofuzz v1.2.0 // indirect | ||
github.com/googleapis/gnostic v0.5.5 // indirect | ||
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect | ||
github.com/hashicorp/golang-lru v0.5.3 // indirect | ||
github.com/imdario/mergo v0.3.12 // indirect | ||
github.com/josharian/intern v1.0.0 // indirect | ||
github.com/json-iterator/go v1.1.12 // indirect | ||
github.com/mailru/easyjson v0.7.7 // indirect | ||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect | ||
github.com/modern-go/reflect2 v1.0.2 // indirect | ||
github.com/pkg/errors v0.9.1 // indirect | ||
github.com/spf13/pflag v1.0.5 // indirect | ||
github.com/stretchr/testify v1.8.2 // indirect | ||
golang.org/x/net v0.6.0 // indirect | ||
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect | ||
golang.org/x/sys v0.5.0 // indirect | ||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect | ||
golang.org/x/text v0.7.0 // indirect | ||
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect | ||
google.golang.org/appengine v1.6.7 // indirect | ||
google.golang.org/genproto v0.0.0-20220218161850-94dd64e39d7c // indirect | ||
google.golang.org/grpc v1.44.0 // indirect | ||
google.golang.org/protobuf v1.28.1 // indirect | ||
gopkg.in/inf.v0 v0.9.1 // indirect | ||
gopkg.in/yaml.v3 v3.0.1 // indirect | ||
k8s.io/klog/v2 v2.40.1 // indirect | ||
k8s.io/kube-openapi v0.0.0-20220124234850-424119656bbf // indirect | ||
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9 // indirect | ||
sigs.k8s.io/controller-runtime v0.11.1 // indirect | ||
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect | ||
sigs.k8s.io/yaml v1.3.0 // indirect | ||
) | ||
|
||
// Pinned to kubernetes-1.21.2 | ||
replace ( | ||
github.com/docker/docker => github.com/moby/moby v0.7.3-0.20190826074503-38ab9da00309 | ||
|
||
github.com/emicklei/go-restful => github.com/emicklei/go-restful v2.16.0+incompatible | ||
|
||
golang.org/x/net => golang.org/x/net v0.0.0-20220906165146-f3363e06e74c | ||
golang.org/x/text => golang.org/x/text v0.3.8 | ||
k8s.io/api => k8s.io/api v0.21.2 | ||
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.21.2 | ||
k8s.io/apimachinery => k8s.io/apimachinery v0.21.2 | ||
k8s.io/apiserver => k8s.io/apiserver v0.21.2 | ||
k8s.io/cli-runtime => k8s.io/cli-runtime v0.21.2 | ||
k8s.io/client-go => k8s.io/client-go v0.21.2 | ||
k8s.io/cloud-provider => k8s.io/cloud-provider v0.21.2 | ||
k8s.io/code-generator => k8s.io/code-generator v0.21.2 | ||
k8s.io/component-base => k8s.io/component-base v0.21.2 | ||
k8s.io/cri-api => k8s.io/cri-api v0.21.2 | ||
k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.21.2 | ||
k8s.io/infra-bootstrap => k8s.io/infra-bootstrap v0.21.2 | ||
k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.21.2 | ||
k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.21.2 | ||
k8s.io/kube-proxy => k8s.io/kube-proxy v0.21.2 | ||
k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.21.2 | ||
k8s.io/kubectl => k8s.io/kubectl v0.21.2 | ||
k8s.io/kubelet => k8s.io/kubelet v0.21.2 | ||
k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.21.2 | ||
k8s.io/metrics => k8s.io/metrics v0.21.2 | ||
k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.21.2 | ||
) |
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,211 @@ | ||
package events | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"strconv" | ||
"subscriber/pkg/k8s" | ||
"subscriber/pkg/types" | ||
|
||
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" | ||
chaosTypes "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1" | ||
"github.com/litmuschaos/chaos-operator/pkg/client/clientset/versioned" | ||
litmusV1alpha1 "github.com/litmuschaos/chaos-operator/pkg/client/clientset/versioned/typed/litmuschaos/v1alpha1" | ||
"github.com/litmuschaos/chaos-operator/pkg/client/informers/externalversions" | ||
"github.com/sirupsen/logrus" | ||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
mergeType "k8s.io/apimachinery/pkg/types" | ||
"k8s.io/client-go/tools/cache" | ||
) | ||
|
||
// ChaosEventWatcher initializes the Litmus ChaosEngine event watcher | ||
func ChaosEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string) { | ||
startTime, err := strconv.Atoi(infraData["START_TIME"]) | ||
if err != nil { | ||
logrus.WithError(err).Fatal("failed to parse startTime") | ||
} | ||
|
||
cfg, err := k8s.GetKubeConfig() | ||
if err != nil { | ||
logrus.WithError(err).Fatal("could not get kube config") | ||
} | ||
|
||
// ClientSet to create Informer | ||
clientSet, err := versioned.NewForConfig(cfg) | ||
if err != nil { | ||
logrus.WithError(err).Fatal("could not generate dynamic client for config") | ||
} | ||
|
||
// Create a factory object to watch workflows depending on default scope | ||
f := externalversions.NewSharedInformerFactoryWithOptions(clientSet, resyncPeriod, | ||
externalversions.WithTweakListOptions(func(list *v1.ListOptions) { | ||
list.LabelSelector = fmt.Sprintf("infra_id=%s,type=standalone_workflow", InfraID) | ||
})) | ||
|
||
informer := f.Litmuschaos().V1alpha1().ChaosEngines().Informer() | ||
if InfraScope == "namespace" { | ||
f = externalversions.NewSharedInformerFactoryWithOptions(clientSet, resyncPeriod, externalversions.WithNamespace(InfraNamespace), | ||
externalversions.WithTweakListOptions(func(list *v1.ListOptions) { | ||
list.LabelSelector = fmt.Sprintf("infra_id=%s,type=standalone_workflow", InfraID) | ||
})) | ||
informer = f.Litmuschaos().V1alpha1().ChaosEngines().Informer() | ||
} | ||
|
||
go startWatchEngine(stopCh, informer, stream, int64(startTime)) | ||
} | ||
|
||
// handles the different events events - add, update and delete | ||
func startWatchEngine(stopCh <-chan struct{}, s cache.SharedIndexInformer, stream chan types.WorkflowEvent, startTime int64) { | ||
handlers := cache.ResourceEventHandlerFuncs{ | ||
AddFunc: func(obj interface{}) { | ||
chaosEventHandler(obj, "ADD", stream, startTime) | ||
}, | ||
UpdateFunc: func(oldObj, obj interface{}) { | ||
chaosEventHandler(obj, "UPDATE", stream, startTime) | ||
}, | ||
} | ||
|
||
s.AddEventHandler(handlers) | ||
s.Run(stopCh) | ||
} | ||
|
||
// responsible for extracting the required data from the event and streaming | ||
func chaosEventHandler(obj interface{}, eventType string, stream chan types.WorkflowEvent, startTime int64) { | ||
workflowObj := obj.(*chaosTypes.ChaosEngine) | ||
if workflowObj.Labels["workflow_id"] == "" { | ||
logrus.WithFields(map[string]interface{}{ | ||
"uid": string(workflowObj.ObjectMeta.UID), | ||
"wf_id": workflowObj.Labels["workflow_id"], | ||
"infra_id": workflowObj.Labels["infra_id"], | ||
}).Printf("CHAOSENGINE RUN IGNORED [INVALID METADATA]") | ||
return | ||
} | ||
|
||
if workflowObj.ObjectMeta.CreationTimestamp.Unix() < startTime { | ||
return | ||
} | ||
|
||
cfg, err := k8s.GetKubeConfig() | ||
if err != nil { | ||
logrus.WithError(err).Fatal("could not get kube config") | ||
} | ||
|
||
chaosClient, err := litmusV1alpha1.NewForConfig(cfg) | ||
if err != nil { | ||
logrus.WithError(err).Fatal("could not get Chaos ClientSet") | ||
} | ||
|
||
nodes := make(map[string]types.Node) | ||
logrus.Print("STANDALONE CHAOSENGINE EVENT ", workflowObj.UID, " ", eventType) | ||
var cd *types.ChaosData = nil | ||
|
||
//extracts chaos data | ||
cd, err = getChaosData(v1alpha1.NodeStatus{StartedAt: workflowObj.ObjectMeta.CreationTimestamp}, workflowObj.Name, workflowObj.Namespace, chaosClient) | ||
if err != nil { | ||
logrus.WithError(err).Print("FAILED PARSING CHAOS ENGINE CRD") | ||
} | ||
|
||
// considering chaos events has only 1 artifact with manifest as raw data | ||
finTime := int64(-1) | ||
if workflowObj.Status.EngineStatus == chaosTypes.EngineStatusCompleted || workflowObj.Status.EngineStatus == chaosTypes.EngineStatusStopped { | ||
if len(workflowObj.Status.Experiments) > 0 { | ||
finTime = workflowObj.Status.Experiments[0].LastUpdateTime.Unix() | ||
} | ||
} | ||
|
||
nodes[workflowObj.Name] = types.Node{ | ||
Name: workflowObj.Name, | ||
Phase: "Succeeded", | ||
StartedAt: StrConvTime(workflowObj.CreationTimestamp.Unix()), | ||
FinishedAt: StrConvTime(finTime), | ||
Children: []string{workflowObj.Name + "-engine"}, | ||
Type: "Steps", | ||
} | ||
details := types.Node{ | ||
Name: workflowObj.Name, | ||
Phase: mapStatus(workflowObj.Status.EngineStatus), | ||
Type: "ChaosEngine", | ||
StartedAt: StrConvTime(workflowObj.CreationTimestamp.Unix()), | ||
FinishedAt: StrConvTime(finTime), | ||
Children: []string{}, | ||
ChaosExp: cd, | ||
Message: string(workflowObj.Status.EngineStatus), | ||
} | ||
|
||
nodes[workflowObj.Name+"-engine"] = details | ||
workflow := types.WorkflowEvent{ | ||
WorkflowType: "chaosengine", | ||
WorkflowID: workflowObj.Labels["workflow_id"], | ||
EventType: eventType, | ||
UID: string(workflowObj.ObjectMeta.UID), | ||
Namespace: workflowObj.ObjectMeta.Namespace, | ||
Name: workflowObj.ObjectMeta.Name, | ||
CreationTimestamp: StrConvTime(workflowObj.ObjectMeta.CreationTimestamp.Unix()), | ||
Phase: details.Phase, | ||
Message: details.Message, | ||
StartedAt: details.StartedAt, | ||
FinishedAt: details.FinishedAt, | ||
Nodes: nodes, | ||
} | ||
|
||
//stream | ||
stream <- workflow | ||
} | ||
|
||
//StopChaosEngineState is used to patch all the chaosEngines with engineState=stop | ||
func StopChaosEngineState(namespace string, workflowRunID *string) error { | ||
ctx := context.TODO() | ||
|
||
//Define the GVR | ||
resourceType := schema.GroupVersionResource{ | ||
Group: "litmuschaos.io", | ||
Version: "v1alpha1", | ||
Resource: "chaosengines", | ||
} | ||
|
||
//Generate the dynamic client | ||
_, dynamicClient, err := k8s.GetDynamicAndDiscoveryClient() | ||
if err != nil { | ||
return errors.New("failed to get dynamic client, error: " + err.Error()) | ||
} | ||
|
||
listOption := v1.ListOptions{} | ||
|
||
if workflowRunID != nil { | ||
listOption.LabelSelector = fmt.Sprintf("workflow_run_id=%s", *workflowRunID) | ||
} | ||
|
||
//List all chaosEngines present in the particular namespace | ||
chaosEngines, err := dynamicClient.Resource(resourceType).Namespace(namespace).List(context.TODO(), listOption) | ||
if err != nil { | ||
return errors.New("failed to list chaosengines: " + err.Error()) | ||
} | ||
|
||
//Foe every chaosEngine patch the engineState to Stop | ||
for _, val := range chaosEngines.Items { | ||
patch := []byte(`{"spec":{"engineState":"stop"}}`) | ||
patched, err := dynamicClient.Resource(resourceType).Namespace(namespace).Patch(ctx, val.GetName(), mergeType.MergePatchType, patch, v1.PatchOptions{}) | ||
if err != nil { | ||
return errors.New("failed to patch chaosengines: " + err.Error()) | ||
} | ||
if patched != nil { | ||
logrus.Info("Successfully patched ChaosEngine: ", patched.GetName()) | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func mapStatus(status chaosTypes.EngineStatus) string { | ||
switch status { | ||
case chaosTypes.EngineStatusInitialized: | ||
return "Running" | ||
case chaosTypes.EngineStatusCompleted: | ||
return "Succeeded" | ||
case chaosTypes.EngineStatusStopped: | ||
return "Skipped" | ||
default: | ||
return "Running" | ||
} | ||
} |
Oops, something went wrong.