Skip to content

Commit

Permalink
Merge pull request #9 from ing-bank/queryable-state
Browse files Browse the repository at this point in the history
Queryable state
  • Loading branch information
arnestaphorsius authored Mar 13, 2018
2 parents 211236b + 85674c4 commit b517dd1
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 118 deletions.
38 changes: 4 additions & 34 deletions cmd/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,28 +145,14 @@ func QueryAction(c *cli.Context) error {
} else {
query.mainClass = mainClass
}
highAvailability := c.String("high-availability")
if len(highAvailability) == 0 {
return cli.NewExitError("unspecified flag 'high-availability'", -1)
} else if highAvailability != "zookeeper" && highAvailability != "none" {
return cli.NewExitError("unknown value for flag 'high-availability'. Allowed values: zookeeper/none", -1)
} else {
query.highAvailability = filename
}
zookeeperQuorum := c.String("zookeeper-quorum")
if highAvailability == "zookeeper" && len(zookeeperQuorum) == 0 {
return cli.NewExitError("unspecified flag 'zookeeper-quorum'", -1)
} else {
query.zookeeperQuorum = zookeeperQuorum
}
jobmanagerAddress := c.String("jobmanager-address")
if highAvailability == "none" && len(jobmanagerAddress) == 0 {
if len(jobmanagerAddress) == 0 {
return cli.NewExitError("unspecified flag 'jobmanager-address'", -1)
} else {
query.jobManagerRPCAddress = jobmanagerAddress
}
jobmanagerPort := c.Int("jobmanager-port")
if highAvailability == "none" && jobmanagerPort <= 0 {
if jobmanagerPort <= 0 {
return cli.NewExitError("unspecified flag 'jobmanager-port'", -1)
} else {
query.jobManagerRPCPort = jobmanagerPort
Expand Down Expand Up @@ -288,33 +274,17 @@ func main() {
Name: "file-name, fn",
Usage: "The complete name of the job JAR file",
},
cli.StringFlag{
Name: "remote-file-name, rfn",
Usage: "The location of a remote job JAR file to be downloaded",
},
cli.StringFlag{
Name: "api-token, at",
Usage: "The API token for the remote address of the a remote file",
},
cli.StringFlag{
Name: "main-class, mc",
Usage: "The package and class name of the main class",
},
cli.StringFlag{
Name: "high-availability, ha",
Usage: "Which high availability mode to use (zookeeper/none)",
},
cli.StringFlag{
Name: "zookeeper-quorum, zq",
Usage: "Comma separated list of zookeeper nodes (host:port,host:port)",
},
cli.StringFlag{
Name: "jobmanager-address, ja",
Usage: "The Job Manager RPC address to use when high availability is none",
Usage: "The Job Manager RPC address to use",
},
cli.IntFlag{
Name: "jobmanager-port, jp",
Usage: "The Job Manager RPC port to use when high availability is none",
Usage: "The Job Manager RPC port to use",
},
},
Action: QueryAction,
Expand Down
38 changes: 2 additions & 36 deletions cmd/cli/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestQueryActionShouldThrowAnErrorWhenTheMainClassArgumentIsMissing(t *testi
assert.EqualError(t, err, "unspecified flag 'main-class'")
}

func TestQueryActionShouldThrowAnErrorWhenTheHighAvailabilityArgumentIsMissing(t *testing.T) {
func TestQueryActionShouldThrowAnErrorWhenTheJobmanagerAddressIsMissing(t *testing.T) {
mockedExitStatus = 0
commander = TestCommander{}

Expand All @@ -177,42 +177,10 @@ func TestQueryActionShouldThrowAnErrorWhenTheHighAvailabilityArgumentIsMissing(t
context := cli.NewContext(&app, &set, nil)
err := QueryAction(context)

assert.EqualError(t, err, "unspecified flag 'high-availability'")
}

func TestQueryActionShouldThrowAnErrorWhenHAIsZookeeperAndTheQourumArgumentIsMissing(t *testing.T) {
mockedExitStatus = 0
commander = TestCommander{}

app := cli.App{}
set := flag.FlagSet{}
set.String("file-name", "file.jar", "")
set.String("job-name", "Job A", "")
set.String("main-class", "com.ing.QueryState", "")
set.String("high-availability", "zookeeper", "")
context := cli.NewContext(&app, &set, nil)
err := QueryAction(context)

assert.EqualError(t, err, "unspecified flag 'zookeeper-quorum'")
}

func TestQueryActionShouldThrowAnErrorWhenHAIsNoneAndTheJobmanagerAddressIsMissing(t *testing.T) {
mockedExitStatus = 0
commander = TestCommander{}

app := cli.App{}
set := flag.FlagSet{}
set.String("file-name", "file.jar", "")
set.String("job-name", "Job A", "")
set.String("main-class", "com.ing.QueryState", "")
set.String("high-availability", "none", "")
context := cli.NewContext(&app, &set, nil)
err := QueryAction(context)

assert.EqualError(t, err, "unspecified flag 'jobmanager-address'")
}

func TestQueryActionShouldThrowAnErrorWhenHAIsNoneAndTheJobmanagerPortIsMissing(t *testing.T) {
func TestQueryActionShouldThrowAnErrorWhenTheJobmanagerPortIsMissing(t *testing.T) {
mockedExitStatus = 0
commander = TestCommander{}

Expand All @@ -221,7 +189,6 @@ func TestQueryActionShouldThrowAnErrorWhenHAIsNoneAndTheJobmanagerPortIsMissing(
set.String("file-name", "file.jar", "")
set.String("job-name", "Job A", "")
set.String("main-class", "com.ing.QueryState", "")
set.String("high-availability", "none", "")
set.String("jobmanager-address", "flink", "")
context := cli.NewContext(&app, &set, nil)
err := QueryAction(context)
Expand All @@ -238,7 +205,6 @@ func TestQueryActionShouldThrowAnErrorWhenTheCommandFails(t *testing.T) {
set.String("file-name", "file.jar", "")
set.String("job-name", "Job A", "")
set.String("main-class", "com.ing.QueryState", "")
set.String("high-availability", "none", "")
set.String("jobmanager-address", "flink", "")
set.Int("jobmanager-port", 6123, "")
context := cli.NewContext(&app, &set, nil)
Expand Down
11 changes: 3 additions & 8 deletions cmd/cli/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ type Query struct {
jobName string
filename string
mainClass string
highAvailability string
zookeeperQuorum string
jobManagerRPCAddress string
jobManagerRPCPort int
}
Expand All @@ -31,12 +29,9 @@ func (q Query) execute() ([]byte, error) {
q.filename,
q.mainClass,
jobIds[0],
q.highAvailability)
if q.highAvailability == "zookeeper" {
args = append(args, q.zookeeperQuorum)
} else {
args = append(args, q.jobManagerRPCAddress, strconv.Itoa(q.jobManagerRPCPort))
}
q.jobManagerRPCAddress,
strconv.Itoa(q.jobManagerRPCPort))

out, err := commander.CombinedOutput("java", args...)
if err != nil {
return nil, err
Expand Down
45 changes: 5 additions & 40 deletions cmd/cli/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"testing"

"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -51,12 +52,11 @@ func TestQueryShouldReturnTheQueryCommandOutput(t *testing.T) {
commander = TestCommander{}

query := Query{
jobName: "Job A",
filename: "file.jar",
mainClass: "com.ing.QueryState",
highAvailability: "none",
jobName: "Job A",
filename: "file.jar",
mainClass: "com.ing.QueryState",
jobManagerRPCAddress: "flink",
jobManagerRPCPort: 6123,
jobManagerRPCPort: 6123,
}

out, err := query.execute()
Expand All @@ -69,47 +69,12 @@ func TestQueryShouldReturnTheQueryCommandOutput(t *testing.T) {
"file.jar",
"com.ing.QueryState",
"jobid1",
"none",
"flink",
"6123",
}
assert.Equal(t, expected, receivedArguments)
}

func TestQueryShouldReturnTheQueryCommandOutputForZookeeperHAMode(t *testing.T) {
mockedStdout = `
------------------ Running/Restarting Jobs -------------------
15.11.2017 12:23:37 : jobid1 : Job A (RUNNING)
15.11.2017 12:23:37 : jobid2 : Job B (RUNNING)
--------------------------------------------------------------
`
mockedExitStatus = 0
commander = TestCommander{}

query := Query{
jobName: "Job A",
filename: "file.jar",
mainClass: "com.ing.QueryState",
highAvailability: "zookeeper",
zookeeperQuorum: "zookeeper-1:2181,zookeeper-2:2181",
}

out, err := query.execute()

assert.Equal(t, mockedStdout, string(out))
assert.Nil(t, err)

expected := []string{
"-cp",
"file.jar",
"com.ing.QueryState",
"jobid1",
"zookeeper",
"zookeeper-1:2181,zookeeper-2:2181",
}
assert.Equal(t, expected, receivedArguments)
}

func TestQueryShouldReturnAnErrorWhenThereAreMultipleJobsRunning(t *testing.T) {
mockedStdout = `
------------------ Running/Restarting Jobs -------------------
Expand Down

0 comments on commit b517dd1

Please sign in to comment.