Skip to content

Commit

Permalink
fix: status UI hangs with a blank screen; shows no pool updates
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Mitchell <[email protected]>
  • Loading branch information
starpit committed Jul 23, 2024
1 parent a81e5d6 commit e50d428
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 33 deletions.
35 changes: 6 additions & 29 deletions pkg/be/kubernetes/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package kubernetes
import (
"bufio"
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"lunchpail.io/pkg/observe/events"
"strings"
Expand All @@ -18,18 +16,18 @@ type LogLine struct {
Message string
}

func streamLogUpdatesForComponent(run, namespace string, component events.Component, onlyInfo bool, c chan events.Message) error {
func streamLogUpdatesForComponent(podName, namespace string, component events.Component, onlyInfo bool, c chan events.Message) error {
clientset, _, err := Client()
if err != nil {
return err
}

podName, err := findPodName(run, namespace, component, clientset)
if err != nil {
return err
}
// TODO leak?
go func() error {
return streamLogUpdatesForPod(podName, namespace, component, onlyInfo, clientset, c)
}()

return streamLogUpdatesForPod(podName, namespace, component, onlyInfo, clientset, c)
return nil
}

func streamLogUpdatesForWorker(podName, namespace string, c chan events.Message) error {
Expand Down Expand Up @@ -97,24 +95,3 @@ func streamLogUpdatesForPod(podName, namespace string, component events.Componen

return nil
}

func findPodName(run, namespace string, component events.Component, clientset *kubernetes.Clientset) (string, error) {
for {
listOptions := metav1.ListOptions{
LabelSelector: "app.kubernetes.io/component=" + string(component) + ",app.kubernetes.io/instance=" + run,
}

if pods, err := clientset.
CoreV1().
Pods(namespace).
List(context.Background(), listOptions); err != nil {
return "", err
} else if len(pods.Items) == 0 {
time.Sleep(1 * time.Second)
} else if len(pods.Items) != 1 {
return "", fmt.Errorf("Multiple %v instances found for run=%s namespace=%s\n", component, run, namespace)
} else {
return pods.Items[0].Name, nil
}
}
}
9 changes: 5 additions & 4 deletions pkg/be/kubernetes/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ func startWatching(app, run, namespace string) (watch.Interface, error) {

timeout := timeoutSeconds
podWatcher, err := clientset.CoreV1().Pods(namespace).Watch(context.Background(), metav1.ListOptions{
TimeoutSeconds: &timeout,
LabelSelector: "app.kubernetes.io/component,app.kubernetes.io/instance=" + run,
TimeoutSeconds: &timeout,
ResourceVersion: "",
LabelSelector: "app.kubernetes.io/component,app.kubernetes.io/instance=" + run,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -86,15 +87,15 @@ func updateFromPod(pod *v1.Pod, what watch.EventType, cc chan events.ComponentUp
switch component {
case string(events.WorkStealerComponent):
if what == watch.Added {
// new worker pod. start streaming its logs
// new workerstealer pod. start streaming its logs
if err := streamLogUpdatesForComponent(pod.Name, pod.Namespace, events.WorkStealerComponent, true, cm); err != nil {
return err
}
}
cc <- events.WorkStealerUpdate(pod.Namespace, "Kubernetes", workerStatus, what)
case string(events.DispatcherComponent):
if what == watch.Added {
// new worker pod. start streaming its logs
// new dispatcher pod. start streaming its logs
if err := streamLogUpdatesForComponent(pod.Name, pod.Namespace, events.DispatcherComponent, false, cm); err != nil {
return err
}
Expand Down

0 comments on commit e50d428

Please sign in to comment.