Skip to content

Commit

Permalink
New CLI handling (#5)
Browse files Browse the repository at this point in the history
- Move from `flag` to `kingpin`
- Add tests
- Add comments
  • Loading branch information
mwuertinger authored Jun 12, 2018
1 parent 4993f86 commit 877af50
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 56 deletions.
6 changes: 4 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ language: go
go:
- "1.10.x"
script:
- set -e
- go get -v ./...
- test -n $(gofmt -l .)
- diff -u <(echo -n) <(gofmt -d ./)
- go vet .
- go test -cover -race ./...
- go build
- export GITHASH=$(git rev-parse HEAD) # Git commit hash is used as version string in main.go
- go build -ldflags "-extldflags '-static' -X main.githash=$GITHASH"
72 changes: 72 additions & 0 deletions internal/cli/cli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package cli

import (
"bytes"
"fmt"
"io"
"path/filepath"
"text/template"
"time"

"gopkg.in/alecthomas/kingpin.v2"
)

const helpTemplate = `Purpose: Create a Kubernetes job and watch the logs until it completes.
Examples:
# Read job spec from file:
{{ .app }} -n test job.yaml
# Read job spec from stdin
cat job.yaml | {{ .app }} -n test
`

type Args struct {
Kubeconfig string
Namespace string
JobFile string
Timeout time.Duration
}

func Parse(args []string, home, version string, out io.Writer) (*Args, error) {
var help bytes.Buffer
err := template.Must(template.New("help").Parse(helpTemplate)).Execute(&help, map[string]string{"app": args[0]})
if err != nil {
return nil, fmt.Errorf("unable to format help: %v", err)
}

app := kingpin.New(args[0], "Run Kubernetes jobs and wait for their completion.")
app.Version(version)
app.Help = help.String()

jobFile := app.Arg("JOBFILE", "Job spec file, - for stdin (default)").Default("-").String()
namespace := app.Flag("namespace", "Kubernetes namespace to use").Short('n').Required().String()
timeout := app.Flag("timeout", "Timeout in time.Duration format (eg. 10s, 1m, 1h, ...)").Short('t').Duration()

var kubeconfig *string
if home != "" {
kubeconfig = app.Flag("kubeconfig", "(optional) absolute path to the Kubeconfig file").Default(filepath.Join(home, ".kube", "config")).String()
} else {
kubeconfig = app.Flag("kubeconfig", "absolute path to the Kubeconfig file").Required().String()
}

// do not call os.Exit() on error
app.Terminate(nil)

// redirect output
app.ErrorWriter(out)
app.UsageWriter(out)

_, err = app.Parse(args[1:])
if err != nil {
return nil, err
}

return &Args{
Kubeconfig: *kubeconfig,
Namespace: *namespace,
JobFile: *jobFile,
Timeout: *timeout,
}, nil
}
78 changes: 78 additions & 0 deletions internal/cli/cli_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package cli

import (
"bytes"
"reflect"
"strings"
"testing"
)

func TestParse(t *testing.T) {
data := []struct {
home string
args []string
expectedRes Args
expectedErr bool
outContains string
}{
{
"/foo/bar",
[]string{"kubejob", "-n", "foo"},
Args{
Kubeconfig: "/foo/bar/.kube/config",
Namespace: "foo",
JobFile: "-",
},
false,
"",
},
{
"/foo/bar",
[]string{"kubejob", "-n", "foo", "/fizz/buzz"},
Args{
Kubeconfig: "/foo/bar/.kube/config",
Namespace: "foo",
JobFile: "/fizz/buzz",
},
false,
"",
},
{
"/foo/bar",
[]string{"kubejob", "--help"},
Args{},
true,
"",
},
{
"",
[]string{"kubejob", "--version"},
Args{},
true,
"42-23-73",
},
}

for _, d := range data {
var buf bytes.Buffer
res, err := Parse(d.args, d.home, "42-23-73", &buf)
if d.expectedErr {
if err == nil {
t.Fatalf("%v: missing expected error", d)
}
} else {
if err != nil {
t.Fatalf("%v: unexpected error: %v", d, err)
}
}
if !d.expectedErr && !reflect.DeepEqual(res, &d.expectedRes) {
t.Fatalf("%v: unexpected result: %v", d, res)
}
if d.outContains != "" {
if !strings.Contains(buf.String(), d.outContains) {
t.Log(buf.String())
t.Fatal("output does not contain expected string: ", d.outContains)
}
}
}
}
107 changes: 53 additions & 54 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@ package main

import (
"bytes"
"flag"
"fmt"
"io"
"log"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"

"github.com/egymgmbh/go-prefix-writer/prefixer"
"github.com/egymgmbh/kubejob/internal/cli"

batch "k8s.io/api/batch/v1"
core "k8s.io/api/core/v1"
Expand All @@ -27,63 +26,25 @@ import (
"k8s.io/client-go/tools/clientcmd"
)

var githash string // set by linker, see '.travis.yml'

func main() {
var kubeconfig *string
if home := os.Getenv("HOME"); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
namespace := flag.String("n", "", "kubernetes namespace to use")
timeoutStr := flag.String("t", "", "(optional) timeout in time.Duration format (eg. 10s, 1m, 1h, ...)")
// help := flag.Bool("h", false, "(optional) print usage information")
flag.Parse()

if len(*namespace) == 0 {
fmt.Fprint(os.Stderr, "Purpose: Create a Kubernetes job and watch the logs until it completes.\n")
fmt.Fprintf(os.Stderr, "\nUsage:\n\t%s -n Namespace [-kubeconfig ConfigFile] [-t Timeout] [JobFile]\n", os.Args[0])
fmt.Fprintln(os.Stderr, "\nExamples:")
fmt.Fprintf(os.Stderr, "\t# Read job spec from file:\n\t%s -n test job.yaml\n\n", os.Args[0])
fmt.Fprintf(os.Stderr, "\t# Read job spec from stdin:\n\tcat job.yaml | %s -n test\n\n", os.Args[0])
fmt.Fprintf(os.Stderr, "Options:\n")
flag.PrintDefaults()
return
args, err := cli.Parse(os.Args, os.Getenv("HOME"), githash, os.Stderr)
if err != nil {
log.Fatal("Error: ", err)
}

var timeoutChan <-chan time.Time
if len(*timeoutStr) > 0 {
timeout, err := time.ParseDuration(*timeoutStr)
if err != nil {
log.Fatal("Invalid timeout (-t): ", err)
}
timeoutChan = time.After(timeout)
if args.Timeout != time.Duration(0) {
timeoutChan = time.After(args.Timeout)
}

var jobFile *os.File
if len(flag.Args()) > 0 {
var err error
jobFile, err = os.Open(flag.Arg(0))
if err != nil {
log.Fatal("Unable to open job file: ", err)
}
} else {
jobFile = os.Stdin
}

var jobIn batch.Job
err := yaml.NewYAMLOrJSONDecoder(jobFile, 1024).Decode(&jobIn)
jobIn, err := parseAndValidateJob(args.JobFile)
if err != nil {
log.Fatal("Unable to parse job spec: ", err)
log.Fatal("Unable to parse job: ", err)
}

if jobIn.Spec.Template.Spec.RestartPolicy != core.RestartPolicyNever {
log.Print(`Warning: ".spec.template.spec.restartPolicy" should be set to "Never" in order to avoid unintended restarts`)
}
if jobIn.Spec.BackoffLimit == nil || *jobIn.Spec.BackoffLimit != 0 {
log.Print(`Warning: ".spec.backoffLimit" should be set to "0" in order to avoid unintended restarts`)
}

cs, err := k8sClientSet(*kubeconfig)
cs, err := k8sClientSet(args.Kubeconfig)
if err != nil {
log.Fatal("Failed to create client: ", err)
}
Expand All @@ -92,7 +53,7 @@ func main() {
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

// create the job
job, err := cs.BatchV1().Jobs(*namespace).Create(&jobIn)
job, err := cs.BatchV1().Jobs(args.Namespace).Create(jobIn)
if err != nil {
log.Fatal("Failed to create job: ", err)
}
Expand All @@ -105,7 +66,7 @@ func main() {
// wait for signal or shutdown
select {
case <-timeoutChan:
log.Print("Timeout after ", *timeoutStr)
log.Print("Timeout after ", args.Timeout)
case <-sigChan:
log.Print("Cancelled by signal")
case result = <-resultChan:
Expand All @@ -117,7 +78,7 @@ func main() {
}

log.Print("Deleting job")
err = cs.BatchV1().Jobs(*namespace).Delete(job.Name, nil)
err = cs.BatchV1().Jobs(args.Namespace).Delete(job.Name, nil)
if err != nil {
log.Print("Deleting job: ", err)
}
Expand All @@ -127,6 +88,37 @@ func main() {
}
}

// parseAndValidateJob reads the job spec from path and returns the result if possible. If path is "-" the job spec is
// read from os.Stdin. Warnings are logged if required values are not set.
func parseAndValidateJob(path string) (*batch.Job, error) {
f := os.Stdin
if path != "-" {
var err error
f, err = os.Open(path)
if err != nil {
return nil, fmt.Errorf("unable to open job file: %v", err)
}
}

var job batch.Job
err := yaml.NewYAMLOrJSONDecoder(f, 1024).Decode(&job)
if err != nil {
return nil, fmt.Errorf("unable to parse job spec: %v", err)
}

if job.Spec.Template.Spec.RestartPolicy != core.RestartPolicyNever {
log.Print(`Warning: ".spec.template.spec.restartPolicy" should be set to "Never" in order to avoid unintended restarts`)
}
if job.Spec.BackoffLimit == nil || *job.Spec.BackoffLimit != 0 {
log.Print(`Warning: ".spec.backoffLimit" should be set to "0" in order to avoid unintended restarts`)
}

return &job, nil
}

// watchJob waits until job is done and reports the result (sucess or failure) through resultChan. The function ensures
// that all log output of the job is reported on os.Stdout and waits up to 10s for the end of the logs after the job
// finished before reporting to resultChan.
func watchJob(cs *kubernetes.Clientset, job *batch.Job, resultChan chan<- bool) {
var lastPhase core.PodPhase

Expand Down Expand Up @@ -192,6 +184,8 @@ end:
resultChan <- result
}

// startLogStreaming starts streaming the logs for all containers in the pod. The function returns immediately and
// signals the end of all logs treams by closing the done channel.
func startLogStreaming(cs *kubernetes.Clientset, pod *core.Pod, done chan<- interface{}) {
var wg sync.WaitGroup
if len(pod.Spec.Containers) == 1 {
Expand All @@ -206,10 +200,13 @@ func startLogStreaming(cs *kubernetes.Clientset, pod *core.Pod, done chan<- inte

go func() {
wg.Wait()
done <- nil
close(done)
}()
}

// streamLogsForContainer reads all the logs from the specified container which must be part of the specified pod
// and writes them to os.Stdout using the container name as a prefix. container may be empty in case pod has only
// one container. wg.Done() is called when the end of the log stream is reached.
func streamLogsForContainer(cs *kubernetes.Clientset, pod *core.Pod, container string, wg *sync.WaitGroup) {
defer wg.Done()

Expand Down Expand Up @@ -246,6 +243,7 @@ func streamLogsForContainer(cs *kubernetes.Clientset, pod *core.Pod, container s
}
}

// labelSelector converts a label map (as used in the job spec) into a label query as used in the API.
func labelSelector(labels map[string]string) string {
var buf bytes.Buffer
for k, v := range labels {
Expand All @@ -257,6 +255,7 @@ func labelSelector(labels map[string]string) string {
return buf.String()
}

// k8sClientSet creates the Kubernetes client set from the config.
func k8sClientSet(kubeconfig string) (*kubernetes.Clientset, error) {
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
Expand Down
30 changes: 30 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package main

import "testing"

func TestLabelSelector(t *testing.T) {
data := []struct {
in map[string]string
out string
}{
{
map[string]string{},
"",
},
{
map[string]string{"foo": "bar"},
"foo=bar",
},
{
map[string]string{"foo": "bar", "fizz": "buzz"},
"foo=bar,fizz=buzz",
},
}

for _, d := range data {
out := labelSelector(d.in)
if out != d.out {
t.Fatalf("%v: unexpected result: %s", d, out)
}
}
}

0 comments on commit 877af50

Please sign in to comment.