Skip to content

Commit

Permalink
remove malformed queued items
Browse files Browse the repository at this point in the history
  • Loading branch information
isaaguilar committed Sep 8, 2023
1 parent 49818ba commit f7bba12
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
10 changes: 9 additions & 1 deletion internal/tfhandler/qworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,21 @@ MainLoop:
continue
}

// TODO Keep track of how many attempts queued items have done. Remove after a certain threshold.

tf := i.queue.PopFront()
if !shouldPoll(tf) {
continue
}
name := tf.Name
namespace := tf.Namespace

if i.clusterName == "" || name == "" || namespace == "" {
// The resulting request will be malformed unless all vars are defined
// TODO Determine what causes undefined fields
continue
}

log.Printf(".... Waiting for workflow completion. \t(%s/%s)", namespace, name)
result, err := i.clientset.Cluster(i.clusterName).Poll(namespace, name).Read(ctx, &tf)
if err != nil {
Expand All @@ -60,6 +68,7 @@ MainLoop:
continue
}

// The result returned from the API was successful. Validate the data before removing from queue
list, ok := result.Data.Data.([]interface{})
if !ok {
log.Printf("ERROR api response in unexpected format %T \t(%s/%s)", result.Data.Data, namespace, name)
Expand All @@ -85,7 +94,6 @@ MainLoop:
for _, item := range list {
b, _ := base64.StdEncoding.DecodeString(item.(string))
applyRawManifest(ctx, kedge.KubernetesConfig(os.Getenv("KUBECONFIG")), b, namespace)

}
log.Printf("Done handling \t(%s/%s)", namespace, name)
}
Expand Down
6 changes: 6 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package main
import (
"flag"
"fmt"
"io"
"log"
"os"

"github.com/galleybytes/monitor/projects/terraform-operator-remote-controller/internal/tfhandler"
"github.com/galleybytes/monitor/projects/terraform-operator-remote-controller/pkg/tfoapiclient"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
)

func kubernetesConfig(kubeconfigPath string) *rest.Config {
Expand All @@ -33,8 +35,12 @@ func readFile(filename string) []byte {

func main() {
var insecureSkipVerify bool
klog.InitFlags(flag.CommandLine)
flag.BoolVar(&insecureSkipVerify, "insecure-skip-verify", false, "Allow conneting to API server without unverified HTTPS")
flag.Parse()
flag.Set("logtostderr", "false")
flag.Set("alsologtostderr", "false")
klog.SetOutput(io.Discard)
kubeconfig := os.Getenv("KUBECONFIG")
clientName := os.Getenv("CLIENT_NAME")
proto := os.Getenv("TFO_API_PROTOCOL")
Expand Down

0 comments on commit f7bba12

Please sign in to comment.