Skip to content

Commit

Permalink
Refactor target pod selection (#365)
Browse files Browse the repository at this point in the history
* Refactor target pod selection
* Update JS API comments

Signed-off-by: Pablo Chacin <[email protected]>
  • Loading branch information
pablochacin authored Oct 26, 2023
1 parent 65f90aa commit bed2032
Show file tree
Hide file tree
Showing 9 changed files with 574 additions and 343 deletions.
6 changes: 3 additions & 3 deletions e2e/disruptors/pod_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func Test_PodDisruptor(t *testing.T) {
}

// create pod disruptor that will select the service's pods
selector := disruptors.PodSelector{
selector := disruptors.PodSelectorSpec{
Namespace: namespace,
Select: disruptors.PodAttributes{
Labels: tc.service.Spec.Selector,
Expand Down Expand Up @@ -242,7 +242,7 @@ func Test_PodDisruptor(t *testing.T) {
}

// create pod disruptor that will select the service's pods
selector := disruptors.PodSelector{
selector := disruptors.PodSelectorSpec{
Namespace: namespace,
Select: disruptors.PodAttributes{
Labels: service.Spec.Selector,
Expand Down Expand Up @@ -299,7 +299,7 @@ func Test_PodDisruptor(t *testing.T) {
}

// create pod disruptor that will select all pods
selector := disruptors.PodSelector{
selector := disruptors.PodSelectorSpec{
Namespace: namespace,
}
options := disruptors.PodDisruptorOptions{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func NewPodDisruptor(
return nil, fmt.Errorf("PodDisruptor constructor expects a non null PodSelector argument")
}

selector := disruptors.PodSelector{}
selector := disruptors.PodSelectorSpec{}
err := convertValue(rt, c.Argument(0), &selector)
if err != nil {
return nil, fmt.Errorf("invalid PodSelector: %w", err)
Expand Down
8 changes: 5 additions & 3 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ func testSetup() (*testEnv, error) {
return nil, err
}

// Constructors for ServiceDisruptor and PodDisruptor will error if they cannot find any target for the supplied
// parameters. For this reason, we need to add to the fake k8s client a service and a pod backing it.
// Create a Service and a backing pod that match the targets in fault injection methods
// We are not testing the fault injection logic, only the arguments passed to the API but
// those methods would fail if they don't find a target pod.
// Note: the ServiceDisruptor constructor also will fail if the target service doesn't exist
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: "namespace"},
}
Expand All @@ -71,7 +73,7 @@ func testSetup() (*testEnv, error) {
WithIP("192.0.2.6").
Build()

// Constructors for ServiceDisruptor and PodDisruptor will also attempt to inject the disruptor agent into a target
// ServiceDisruptor and PodDisruptor will also attempt to inject the disruptor agent into a target
// pod once it's discovered, and then wait for that container to be Running. Flagging this pod as ready is hard to
// do with the k8s fake client, so we take advantage of the fact that both injection and check are skipped if the
// agent container already exists by creating the fake pod with the sidecar already added.
Expand Down
128 changes: 38 additions & 90 deletions pkg/disruptors/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,14 @@ package disruptors

import (
"context"
"errors"
"fmt"
"reflect"
"strings"
"time"

"github.com/grafana/xk6-disruptor/pkg/kubernetes"
"github.com/grafana/xk6-disruptor/pkg/kubernetes/helpers"
"github.com/grafana/xk6-disruptor/pkg/types/intstr"
"github.com/grafana/xk6-disruptor/pkg/utils"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// ErrSelectorNoPods is returned by NewPodDisruptor when the selector passed to it does not match any pod in the
// cluster.
var ErrSelectorNoPods = errors.New("no pods found matching selector")

// DefaultTargetPort defines the default value for a target HTTP
var DefaultTargetPort = intstr.FromInt32(80) //nolint:gochecknoglobals

Expand All @@ -41,13 +30,13 @@ type PodDisruptorOptions struct {

// podDisruptor is an instance of a PodDisruptor that uses a PodController to interact with target pods
type podDisruptor struct {
helper helpers.PodHelper
options PodDisruptorOptions
targets []corev1.Pod
helper helpers.PodHelper
selector *PodSelector
options PodDisruptorOptions
}

// PodSelector defines the criteria for selecting a pod for disruption
type PodSelector struct {
// PodSelectorSpec defines the criteria for selecting a pod for disruption
type PodSelectorSpec struct {
Namespace string
// Select Pods that match these PodAttributes
Select PodAttributes
Expand All @@ -60,94 +49,38 @@ type PodAttributes struct {
Labels map[string]string
}

// NamespaceOrDefault returns the configured namespace for this selector, and the name of the default namespace if it
// is not configured.
func (p PodSelector) NamespaceOrDefault() string {
if p.Namespace != "" {
return p.Namespace
}

return metav1.NamespaceDefault
}

// String returns a human-readable explanation of the pods matched by a PodSelector.
func (p PodSelector) String() string {
var str string

if len(p.Select.Labels) == 0 && len(p.Exclude.Labels) == 0 {
str = "all pods"
} else {
str = "pods "
str += p.groupLabels("including", p.Select.Labels)
str += p.groupLabels("excluding", p.Exclude.Labels)
str = strings.TrimSuffix(str, ", ")
}

str += fmt.Sprintf(" in ns %q", p.NamespaceOrDefault())

return str
}

// groupLabels returns a group of labels as a string, giving that group a name. The returned string has the form of:
// `groupName(foo=bar, boo=baz), `, including the trailing space and comma.
// An empty group of labels produces an empty string.
func (PodSelector) groupLabels(groupName string, labels map[string]string) string {
if len(labels) == 0 {
return ""
}

group := groupName + "("
for k, v := range labels {
group += fmt.Sprintf("%s=%s, ", k, v)
}
group = strings.TrimSuffix(group, ", ")
group += "), "

return group
}

// NewPodDisruptor creates a new instance of a PodDisruptor that acts on the pods
// that match the given PodSelector
func NewPodDisruptor(
ctx context.Context,
_ context.Context,
k8s kubernetes.Kubernetes,
selector PodSelector,
spec PodSelectorSpec,
options PodDisruptorOptions,
) (PodDisruptor, error) {
// validate selector
emptySelect := reflect.DeepEqual(selector.Select, PodAttributes{})
emptyExclude := reflect.DeepEqual(selector.Exclude, PodAttributes{})
if selector.Namespace == "" && emptySelect && emptyExclude {
return nil, fmt.Errorf("namespace, select and exclude attributes in pod selector cannot all be empty")
}

// ensure selector and controller use default namespace if none specified
namespace := selector.NamespaceOrDefault()
helper := k8s.PodHelper(namespace)
namespace := spec.NamespaceOrDefault()

filter := helpers.PodFilter{
Select: selector.Select.Labels,
Exclude: selector.Exclude.Labels,
}
helper := k8s.PodHelper(namespace)

targets, err := helper.List(ctx, filter)
selector, err := NewPodSelector(spec, helper)
if err != nil {
return nil, err
}

if len(targets) == 0 {
return nil, fmt.Errorf("finding pods matching '%s': %w", selector, ErrSelectorNoPods)
}

return &podDisruptor{
helper: helper,
options: options,
targets: targets,
helper: helper,
options: options,
selector: selector,
}, nil
}

func (d *podDisruptor) Targets(_ context.Context) ([]string, error) {
return utils.PodNames(d.targets), nil
func (d *podDisruptor) Targets(ctx context.Context) ([]string, error) {
targets, err := d.selector.Targets(ctx)
if err != nil {
return nil, err
}

return utils.PodNames(targets), nil
}

// InjectHTTPFault injects faults in the http requests sent to the disruptor's targets
Expand Down Expand Up @@ -175,7 +108,12 @@ func (d *podDisruptor) InjectHTTPFaults(
command,
)

controller := NewPodController(d.targets)
targets, err := d.selector.Targets(ctx)
if err != nil {
return err
}

controller := NewPodController(targets)

return controller.Visit(ctx, visitor)
}
Expand All @@ -199,7 +137,12 @@ func (d *podDisruptor) InjectGrpcFaults(
command,
)

controller := NewPodController(d.targets)
targets, err := d.selector.Targets(ctx)
if err != nil {
return err
}

controller := NewPodController(targets)

return controller.Visit(ctx, visitor)
}
Expand All @@ -209,7 +152,12 @@ func (d *podDisruptor) TerminatePods(
ctx context.Context,
fault PodTerminationFault,
) ([]string, error) {
targets, err := utils.Sample(d.targets, fault.Count)
targets, err := d.selector.Targets(ctx)
if err != nil {
return nil, err
}

targets, err = utils.Sample(targets, fault.Count)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit bed2032

Please sign in to comment.