Skip to content

Commit

Permalink
Added error handling and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Fiona-Waters committed Nov 23, 2023
1 parent b420be0 commit 9481eff
Show file tree
Hide file tree
Showing 14 changed files with 268 additions and 112 deletions.
8 changes: 8 additions & 0 deletions cmd/kar-controllers/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"os"
"strconv"
"strings"

"k8s.io/klog/v2"
)

// ServerOption is the main context object for the controller manager.
Expand Down Expand Up @@ -96,6 +98,8 @@ func (s *ServerOption) loadDefaultsFromEnvVars() {
backoffInt, err := strconv.Atoi(backoffString)
if err == nil {
s.BackoffTime = backoffInt
} else {
klog.Errorf("[loadDefaultsFromEnvVars] unable to parse int, - error: %#v", err)
}
}

Expand All @@ -105,6 +109,8 @@ func (s *ServerOption) loadDefaultsFromEnvVars() {
holInt, err := strconv.Atoi(holString)
if err == nil {
s.HeadOfLineHoldingTime = holInt
} else {
klog.Errorf("[loadDefaultsFromEnvVars] unable to parse int, - error: %#v", err)
}
}

Expand All @@ -126,6 +132,8 @@ func (s *ServerOption) loadDefaultsFromEnvVars() {
to, err := strconv.ParseInt(dispatchResourceReservationTimeoutString, 10, 64)
if err == nil {
s.DispatchResourceReservationTimeout = to
} else {
klog.Errorf("[loadDefaultsFromEnvVars] unable to parse int, - error: %#v", err)
}
}
}
8 changes: 5 additions & 3 deletions cmd/kar-controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package app

import (
"fmt"
"net/http"
"strings"

Expand All @@ -42,7 +43,7 @@ func buildConfig(master, kubeconfig string) (*rest.Config, error) {
func Run(opt *options.ServerOption) error {
restConfig, err := buildConfig(opt.Master, opt.Kubeconfig)
if err != nil {
return err
return fmt.Errorf("[Run] unable to build server config, - error: %#v", err)
}

neverStop := make(chan struct{})
Expand Down Expand Up @@ -71,7 +72,8 @@ func Run(opt *options.ServerOption) error {
// This call is blocking (unless an error occurs) which equates to <-neverStop
err = listenHealthProbe(opt)
if err != nil {
return err
return fmt.Errorf("[Run] unable to start health probe listener, - error: %#v", err)

}

return nil
Expand All @@ -83,7 +85,7 @@ func listenHealthProbe(opt *options.ServerOption) error {
handler.Handle("/healthz", &health.Handler{})
err := http.ListenAndServe(opt.HealthProbeListenAddr, handler)
if err != nil {
return err
return fmt.Errorf("[listenHealthProbe] unable to listen and serve, - error: %#v", err)
}

return nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/apis/controller/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
)

func GetController(obj interface{}) types.UID {
accessor, err := meta.Accessor(obj)
if err != nil {
klog.Errorf("[GetController] unable to return object as minimum required fields are missing, - error: %#v", err)
return ""
}

Expand All @@ -37,10 +39,10 @@ func GetController(obj interface{}) types.UID {
return ""
}


func GetJobID(pod *v1.Pod) types.UID {
accessor, err := meta.Accessor(pod)
if err != nil {
klog.Errorf("[GetJobID] unable to return object as minimum required fields are missing, - error: %#v", err)
return ""
}

Expand Down
189 changes: 116 additions & 73 deletions pkg/controller/queuejob/queuejob_controller_ex.go

Large diffs are not rendered by default.

47 changes: 37 additions & 10 deletions pkg/controller/queuejob/scheduling_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ func (p *PriorityQueue) Length() int {
func (p *PriorityQueue) IfExist(qj *qjobv1.AppWrapper) bool {
p.lock.Lock()
defer p.lock.Unlock()
_, exists, _ := p.activeQ.Get(qj)
_, exists, err := p.activeQ.Get(qj)
if err != nil {
klog.Errorf("[IfExist] unable to check if app wrapper exists, - error:%#v", err)
}
if p.unschedulableQ.Get(qj) != nil || exists {
return true
}
Expand All @@ -140,7 +143,10 @@ func (p *PriorityQueue) IfExist(qj *qjobv1.AppWrapper) bool {
func (p *PriorityQueue) IfExistActiveQ(qj *qjobv1.AppWrapper) bool {
p.lock.Lock()
defer p.lock.Unlock()
_, exists, _ := p.activeQ.Get(qj)
_, exists, err := p.activeQ.Get(qj)
if err != nil {
klog.Errorf("[IfExistActiveQ] unable to check if app wrapper exists, - error:%#v", err)
}
return exists
}

Expand Down Expand Up @@ -196,12 +202,15 @@ func (p *PriorityQueue) AddIfNotPresent(qj *qjobv1.AppWrapper) error {
if p.unschedulableQ.Get(qj) != nil {
return nil
}
if _, exists, _ := p.activeQ.Get(qj); exists {
if _, exists, err := p.activeQ.Get(qj); exists {
if err != nil {
klog.Errorf("[AddIfNotPresent] unable to check if pod exists, - error:%#v", err)
}
return nil
}
err := p.activeQ.Add(qj)
if err != nil {
klog.Errorf("Error adding pod %s/%s to the scheduling queue: %v", qj.Namespace, qj.Name, err)
klog.Errorf("[AddIfNotPresent] Error adding pod %s/%s to the scheduling queue, - error:%#v", qj.Namespace, qj.Name, err)
} else {
p.cond.Broadcast()
}
Expand All @@ -218,7 +227,10 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(qj *qjobv1.AppWrapper) erro
if p.unschedulableQ.Get(qj) != nil {
return fmt.Errorf("pod is already present in unschedulableQ")
}
if _, exists, _ := p.activeQ.Get(qj); exists {
if _, exists, err := p.activeQ.Get(qj); exists {
if err != nil {
klog.Errorf("[AddUnschedulableIfNotPresent] unable to check if pod exists, - error:%#v", err)
}
return fmt.Errorf("pod is already present in the activeQ")
}
// if !p.receivedMoveRequest && isPodUnschedulable(qj) {
Expand All @@ -227,7 +239,9 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(qj *qjobv1.AppWrapper) erro
return nil
}
err := p.activeQ.Add(qj)
if err == nil {
if err != nil {
klog.Errorf("[AddUnschedulableIfNotPresent] Error adding QJ %s/%s to the scheduling queue: %v", qj.Namespace, qj.Name, err)
} else {
p.cond.Broadcast()
}
return err
Expand Down Expand Up @@ -271,16 +285,24 @@ func (p *PriorityQueue) Update(oldQJ, newQJ *qjobv1.AppWrapper) error {
p.lock.Lock()
defer p.lock.Unlock()
// If the pod is already in the active queue, just update it there.
if _, exists, _ := p.activeQ.Get(newQJ); exists {
if _, exists, errp := p.activeQ.Get(newQJ); exists {
if errp != nil {
klog.Errorf("[Update] unable to check if pod exists, - error:%#v", errp)
}
err := p.activeQ.Update(newQJ)
if err != nil {
klog.Errorf("[Update] unable to update pod, - error: %#v", err)
}
return err
}
// If the pod is in the unschedulable queue, updating it may make it schedulable.
if usQJ := p.unschedulableQ.Get(newQJ); usQJ != nil {
if p.isQJUpdated(oldQJ, newQJ) {
p.unschedulableQ.Delete(usQJ)
err := p.activeQ.Add(newQJ)
if err == nil {
if err != nil {
klog.Errorf("Error adding QJ %s/%s to the scheduling queue: %v", newQJ.Namespace, newQJ.Name, err)
} else {
p.cond.Broadcast()
}
return err
Expand All @@ -290,7 +312,9 @@ func (p *PriorityQueue) Update(oldQJ, newQJ *qjobv1.AppWrapper) error {
}
// If pod is not in any of the two queue, we put it in the active queue.
err := p.activeQ.Add(newQJ)
if err == nil {
if err != nil {
klog.Errorf("Error adding QJ %s/%s to the scheduling queue: %v", newQJ.Namespace, newQJ.Name, err)
} else {
p.cond.Broadcast()
}
return err
Expand All @@ -303,7 +327,10 @@ func (p *PriorityQueue) Delete(qj *qjobv1.AppWrapper) error {
p.lock.Lock()
defer p.lock.Unlock()
p.unschedulableQ.Delete(qj)
if _, exists, _ := p.activeQ.Get(qj); exists {
if _, exists, err := p.activeQ.Get(qj); exists {
if err != nil {
klog.Errorf("[Delete] unable to check if pod exists - error: %#v", err)
}
return p.activeQ.Delete(qj)
}
// p.unschedulableQ.Delete(qj)
Expand Down
20 changes: 16 additions & 4 deletions pkg/controller/queuejobdispatch/queuejobagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ func NewJobClusterAgent(config string, agentEventQueue *cache.FIFO) *JobClusterA

qa.jobSynced = qa.jobInformer.Informer().HasSynced

qa.UpdateAggrResources(context.Background())
err = qa.UpdateAggrResources(context.Background())
if err != nil {
klog.Errorf("[NewJobClusterAgent] Unable to update aggr resources - error: %#v", err)
}

return qa
}
Expand Down Expand Up @@ -161,7 +164,10 @@ func (qa *JobClusterAgent) Run(stopCh <-chan struct{}) {
func (qa *JobClusterAgent) DeleteJob(ctx context.Context, cqj *arbv1.AppWrapper) {
qj_temp := cqj.DeepCopy()
klog.V(2).Infof("[Dispatcher: Agent] Request deletion of XQJ %s/%s to Agent %s\n", qj_temp.Namespace, qj_temp.Name, qa.AgentId)
qa.queuejobclients.WorkloadV1beta1().AppWrappers(qj_temp.Namespace).Delete(ctx, qj_temp.Name, metav1.DeleteOptions{})
err := qa.queuejobclients.WorkloadV1beta1().AppWrappers(qj_temp.Namespace).Delete(ctx, qj_temp.Name, metav1.DeleteOptions{})
if err != nil {
klog.Errorf("[DeleteJob] Unable to delete app wrapper, - error: %#v", err)
}
}

func (qa *JobClusterAgent) CreateJob(ctx context.Context, cqj *arbv1.AppWrapper) {
Expand All @@ -183,7 +189,10 @@ func (qa *JobClusterAgent) CreateJob(ctx context.Context, cqj *arbv1.AppWrapper)
agent_qj.Labels["IsDispatched"] = "true"

klog.V(2).Infof("[Dispatcher: Agent] Create XQJ: %s/%s (Status: %+v) in Agent %s\n", agent_qj.Namespace, agent_qj.Name, agent_qj.Status, qa.AgentId)
qa.queuejobclients.WorkloadV1beta1().AppWrappers(agent_qj.Namespace).Create(ctx, agent_qj, metav1.CreateOptions{})
_, err := qa.queuejobclients.WorkloadV1beta1().AppWrappers(agent_qj.Namespace).Create(ctx, agent_qj, metav1.CreateOptions{})
if err != nil {
klog.Errorf("[CreateJob] Unable to create app wrapper, - error: %#v", err)
}
}

type ClusterMetricsList struct {
Expand Down Expand Up @@ -228,7 +237,10 @@ func (qa *JobClusterAgent) UpdateAggrResources(ctx context.Context) error {
clusterMetricType := res.Items[i].MetricLabels["cluster"]

if strings.Compare(clusterMetricType, "cpu") == 0 || strings.Compare(clusterMetricType, "memory") == 0 {
val, units, _ := getFloatString(res.Items[i].Value)
val, units, err := getFloatString(res.Items[i].Value)
if err != nil {
klog.Errorf("[Dispatcher: UpdateAggrResources] Possible issue getting float string - error: %#v", err)
}
num, err := strconv.ParseFloat(val, 64)
if err != nil {
klog.Warningf("[Dispatcher: UpdateAggrResources] Possible issue converting %s string value of %s due to error: %v\n",
Expand Down
Loading

0 comments on commit 9481eff

Please sign in to comment.