Skip to content

Commit

Permalink
Fix worker pool (#55)
Browse files Browse the repository at this point in the history
* corrects events permissions in clusterrole

the controller needs to "list" events, not "get", when running
diagnostics

* fixes replica autoscaling in buildkit pool

former logic was scaling the cluster up infinitely whenever certain pods
were failing to become "operational"

- improves the "pod operational" check
- reports abnormal pods during reconciliation
- return AutoscalingPool instead of interface

* logs controller config on startup

so we can always verify that the intended configuration has been
correctly applied

* ensures rendered config json does not print credentials
  • Loading branch information
sonnysideup authored Aug 18, 2022
1 parent f7581fe commit ca95078
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ rules:
resources:
- events
verbs:
- get
- list
- create
- patch
- apiGroups:
Expand Down
118 changes: 92 additions & 26 deletions pkg/buildkit/worker/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const (
expiryTimeAnnotation = "hephaestus.dominodatalab.com/expiry-time"
)

type workerPool struct {
type AutoscalingPool struct {
log logr.Logger

// system shutdown
Expand Down Expand Up @@ -89,7 +89,12 @@ type workerPool struct {
}

// NewPool creates a new worker pool that can be used to lease buildkit workers for image builds.
func NewPool(ctx context.Context, clientset kubernetes.Interface, conf config.Buildkit, opts ...PoolOption) Pool {
func NewPool(
ctx context.Context,
clientset kubernetes.Interface,
conf config.Buildkit,
opts ...PoolOption,
) *AutoscalingPool {
o := defaultOpts
for _, fn := range opts {
o = fn(o)
Expand All @@ -102,7 +107,7 @@ func NewPool(ctx context.Context, clientset kubernetes.Interface, conf config.Bu
endpointSliceListOptions := metav1.ListOptions{LabelSelector: esls.String()}

ctx, cancel := context.WithCancel(ctx)
wp := &workerPool{
wp := &AutoscalingPool{
ctx: ctx,
cancel: cancel,
log: o.Log,
Expand Down Expand Up @@ -159,7 +164,7 @@ func NewPool(ctx context.Context, clientset kubernetes.Interface, conf config.Bu
//
// Adds "lease"/"manager-identity" metadata and removes "expiry-time".
// The worker will remain leased until the caller provides the address to Release().
func (p *workerPool) Get(ctx context.Context, owner string) (string, error) {
func (p *AutoscalingPool) Get(ctx context.Context, owner string) (string, error) {
request := &PodRequest{
owner: owner,
result: make(chan PodRequestResult, 1),
Expand Down Expand Up @@ -192,7 +197,7 @@ func (p *workerPool) Get(ctx context.Context, owner string) (string, error) {
//
// Adds "expiry-time" and removes "lease"/"manager-identity" metadata.
// The underlying worker will be terminated after its expiry time has passed.
func (p *workerPool) Release(ctx context.Context, addr string) error {
func (p *AutoscalingPool) Release(ctx context.Context, addr string) error {
p.mu.Lock()
defer p.mu.Unlock()

Expand All @@ -217,12 +222,12 @@ func (p *workerPool) Release(ctx context.Context, addr string) error {
}

// Close shuts down the pool by terminating all background routines used to manage requests and garbage collection.
func (p *workerPool) Close() {
func (p *AutoscalingPool) Close() {
p.cancel()
}

// applies lease metadata to given pod
func (p *workerPool) leasePod(ctx context.Context, pod *corev1.Pod, owner string) error {
func (p *AutoscalingPool) leasePod(ctx context.Context, pod *corev1.Pod, owner string) error {
pac, err := corev1ac.ExtractPod(pod, fieldManagerName)
if err != nil {
return fmt.Errorf("cannot extract pod config: %w", err)
Expand All @@ -244,7 +249,7 @@ func (p *workerPool) leasePod(ctx context.Context, pod *corev1.Pod, owner string
}

// removes lease metadata from given pod and adds expiry
func (p *workerPool) releasePod(ctx context.Context, pod *corev1.Pod) error {
func (p *AutoscalingPool) releasePod(ctx context.Context, pod *corev1.Pod) error {
pac, err := corev1ac.ExtractPod(pod, fieldManagerName)
if err != nil {
return fmt.Errorf("cannot extract pod config: %w", err)
Expand All @@ -268,7 +273,7 @@ func (p *workerPool) releasePod(ctx context.Context, pod *corev1.Pod) error {
}

// builds routable url for buildkit pod with protocol and port
func (p *workerPool) buildEndpointURL(ctx context.Context, podName string) (string, error) {
func (p *AutoscalingPool) buildEndpointURL(ctx context.Context, podName string) (string, error) {
p.log.Info("Watching endpoints for new address", "podName", podName)

watchOpts := metav1.ListOptions{
Expand Down Expand Up @@ -312,7 +317,7 @@ func (p *workerPool) buildEndpointURL(ctx context.Context, podName string) (stri
}

// generates internal hostname for pod using an endpoint slice
func (p *workerPool) extractHostname(epSlice *discoveryv1beta1.EndpointSlice, podName string) (hostname string) {
func (p *AutoscalingPool) extractHostname(epSlice *discoveryv1beta1.EndpointSlice, podName string) (hostname string) {
var portPresent bool
for _, port := range epSlice.Ports {
if pointer.Int32Deref(port.Port, 0) == p.servicePort {
Expand Down Expand Up @@ -347,7 +352,7 @@ func (p *workerPool) extractHostname(epSlice *discoveryv1beta1.EndpointSlice, po
}

// reconcile pods in worker pool
func (p *workerPool) updateWorkers(ctx context.Context) error {
func (p *AutoscalingPool) updateWorkers(ctx context.Context) error {
p.mu.Lock()
defer p.mu.Unlock()

Expand All @@ -366,6 +371,7 @@ func (p *workerPool) updateWorkers(ctx context.Context) error {
var leased []string
var pending []string
var removals []string
var abnormal []string

// mark pods for removal based on state and lease pods when requests exist
for _, pod := range podList.Items {
Expand All @@ -382,7 +388,7 @@ func (p *workerPool) updateWorkers(ctx context.Context) error {
leased = append(leased, pod.Name)
} else if pod.Status.Phase == corev1.PodPending { // mark pending pods
pending = append(pending, pod.Name)
} else if pod.Status.Phase == corev1.PodRunning { // dispatch builds/check expiry/check age on running pods
} else if p.isOperationalPod(ctx, pod.Name) { // dispatch builds/check expiry/check age on operation pods
if req := p.requests.Dequeue(); req != nil {
log.Info("Found pending pod request, processing")

Expand All @@ -406,6 +412,14 @@ func (p *workerPool) updateWorkers(ctx context.Context) error {
log.Info("Eligible for termination, missing expiry time and pod age older than max idle time")
removals = append(removals, pod.Name)
}
} else { // mark abnormal pods
log.Info(
"Unknown phase detected",
"phase", pod.Status.Phase,
"conditions", pod.Status.Conditions,
"containerStatuses", pod.Status.ContainerStatuses,
)
abnormal = append(abnormal, pod.Name)
}
}

Expand All @@ -417,6 +431,9 @@ func (p *workerPool) updateWorkers(ctx context.Context) error {
for _, name := range pending {
subtractionMap[name] = true
}
for _, name := range abnormal {
subtractionMap[name] = true
}

// calculate which pods can be removed based reverse-ordinal position
var subtractions int
Expand All @@ -439,6 +456,7 @@ func (p *workerPool) updateWorkers(ctx context.Context) error {
"leasedPods", leased,
"pendingPods", pending,
"removalPods", removals,
"abnormalPods", abnormal,
"podRequests", requestCount,
)

Expand All @@ -459,7 +477,7 @@ func (p *workerPool) updateWorkers(ctx context.Context) error {
}

// attempts to lease a pod, build and endpoint url, and provide a request result
func (p *workerPool) processPodRequest(ctx context.Context, req *PodRequest, pod corev1.Pod) (success bool) {
func (p *AutoscalingPool) processPodRequest(ctx context.Context, req *PodRequest, pod corev1.Pod) (success bool) {
log := p.log.WithValues("podName", pod.Name)

log.Info("Attempting to lease pod")
Expand Down Expand Up @@ -491,7 +509,7 @@ func (p *workerPool) processPodRequest(ctx context.Context, req *PodRequest, pod
}

// trigger a pool reconciliation
func (p *workerPool) triggerReconcile() {
func (p *AutoscalingPool) triggerReconcile() {
p.log.Info("Attempting to notify reconciliation")

select {
Expand All @@ -502,8 +520,44 @@ func (p *workerPool) triggerReconcile() {
}
}

// ensure pod is operational by checking its phase and conditions
func (p *AutoscalingPool) isOperationalPod(ctx context.Context, podName string) (verdict bool) {
// fetch the latest version of the pod
pod, err := p.podClient.Get(ctx, podName, metav1.GetOptions{})
if err != nil {
p.log.Error(err, "Failed to check if pod is operational")
return
}

// this does not mean the pod is usable but is a good first check
if pod.Status.Phase != corev1.PodRunning {
return
}

// assert the following:
// - pod has been scheduled on a node
// - all init containers have completed successfully
// - all containers in the pod are ready
// - the pod is able to serve requests and should be added to the load balancing pools of all matching Services
var scheduled, initialized, containersReady, podReady bool
for _, condition := range pod.Status.Conditions {
switch condition.Type {
case corev1.PodScheduled:
scheduled = condition.Status == corev1.ConditionTrue
case corev1.PodInitialized:
initialized = condition.Status == corev1.ConditionTrue
case corev1.ContainersReady:
containersReady = condition.Status == corev1.ConditionTrue
case corev1.PodReady:
podReady = condition.Status == corev1.ConditionTrue
}
}

return scheduled && initialized && containersReady && podReady
}

// diagnose elements that could lead to a failure
func (p *workerPool) diagnoseFailure(ctx context.Context, podName string) {
func (p *AutoscalingPool) diagnoseFailure(ctx context.Context, podName string) {
log := p.log.WithName("diagnosis").WithValues("podName", podName)

log.Info("Beginning failure diagnosis")
Expand All @@ -514,7 +568,7 @@ func (p *workerPool) diagnoseFailure(ctx context.Context, podName string) {
}

// diagnose issues with endpoint slices
func (p *workerPool) diagnoseEndpointSlices(ctx context.Context, podName string) {
func (p *AutoscalingPool) diagnoseEndpointSlices(ctx context.Context, podName string) {
log := p.log.WithName("diagnosis").WithName("endpointslice").WithValues("podName", podName)

listOpts := metav1.ListOptions{LabelSelector: p.endpointSliceListOptions.LabelSelector}
Expand Down Expand Up @@ -548,7 +602,7 @@ func (p *workerPool) diagnoseEndpointSlices(ctx context.Context, podName string)
}

// diagnose issues with pods
func (p *workerPool) diagnosePod(ctx context.Context, podName string) {
func (p *AutoscalingPool) diagnosePod(ctx context.Context, podName string) {
log := p.log.WithName("diagnosis").WithName("pod").WithValues("podName", podName)

pod, err := p.podClient.Get(ctx, podName, metav1.GetOptions{})
Expand All @@ -562,23 +616,35 @@ func (p *workerPool) diagnosePod(ctx context.Context, podName string) {
}
log.Info("Pod details", "spec", pod.Spec, "status", pod.Status)

if pod.Status.Phase != corev1.PodRunning {
log.Info("Pod is NOT running", "phase", pod.Status.Phase)
}

for _, condition := range pod.Status.Conditions {
if condition.Status == corev1.ConditionTrue {
continue
}

var message string
switch condition.Type {
case corev1.PodScheduled:
log.Info("Pod was scheduled", "condition", condition)
message = "Pod is NOT scheduled"
case corev1.PodInitialized:
log.Info("Pod finished initializing", "condition", condition)
message = "Pod is NOT initialized"
case corev1.ContainersReady:
log.Info("All pod containers became ready", "condition", condition)
message = "All pod containers are NOT ready"
case corev1.PodReady:
log.Info("Pod became ready to serve requests", "condition", condition)
message = "Pod is NOT ready to serve requests"
default:
log.Info("Unexpected pod state", "condition", condition)
message = "Unexpected pod condition type"
}
}

if pod.Status.Phase != corev1.PodRunning {
log.Info("Pod is NOT running", "phase", pod.Status.Phase)
log.Info(
message,
"reason", condition.Reason,
"message", condition.Message,
"lastTransitionTime", condition.LastTransitionTime,
)
}

node, err := p.nodeClient.Get(ctx, pod.Spec.NodeName, metav1.GetOptions{})
Expand All @@ -594,7 +660,7 @@ func (p *workerPool) diagnosePod(ctx context.Context, podName string) {
}

// inspect events related to pod
func (p *workerPool) diagnoseEvents(ctx context.Context, podName string) {
func (p *AutoscalingPool) diagnoseEvents(ctx context.Context, podName string) {
log := p.log.WithName("diagnosis").WithName("event").WithValues("podName", podName)

eventList, err := p.eventClient.List(ctx, metav1.ListOptions{})
Expand Down
Loading

0 comments on commit ca95078

Please sign in to comment.