Skip to content

Commit

Permalink
Replace Cancel with Terminate (#37)
Browse files Browse the repository at this point in the history
* replace Cancel with Terminate

* Code optimized based on code review result

* Update doc about command: terminate

* Code optimized
  • Loading branch information
Lifei Chen authored and Marc Rooding committed Apr 12, 2019
1 parent d5a9c82 commit 706b264
Show file tree
Hide file tree
Showing 20 changed files with 245 additions and 75 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ Currently, it supports several features:
1. Listing jobs
2. Deploying a new job
3. Updating an existing job
4. Querying Flink queryable state
4. Terminating an existing job
5. Querying Flink queryable state

For a full overview of the commands and flags, run `flink-job-deployer help`

Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/flink/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package flink
// FlinkRestAPI is an interface representing the ability to execute
// multiple HTTP requests against the Apache Flink API.
type FlinkRestAPI interface {
Cancel(jobID string) error
Terminate(jobID string, mode string) error
CreateSavepoint(jobID string, savepointPath string) (CreateSavepointResponse, error)
MonitorSavepointCreation(jobID string, requestID string) (MonitorSavepointCreationResponse, error)
RetrieveJobs() ([]Job, error)
Expand Down
24 changes: 0 additions & 24 deletions cmd/cli/flink/cancel.go

This file was deleted.

35 changes: 0 additions & 35 deletions cmd/cli/flink/cancel_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion cmd/cli/flink/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package flink
import (
"fmt"

retryablehttp "github.com/hashicorp/go-retryablehttp"
"github.com/hashicorp/go-retryablehttp"
)

// A FlinkRestClient is a client to interface with
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/flink/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"strings"
"testing"

retryablehttp "github.com/hashicorp/go-retryablehttp"
"github.com/hashicorp/go-retryablehttp"
"github.com/stretchr/testify/assert"
)

Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/flink/retrieve_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"net/http"
"testing"

retryablehttp "github.com/hashicorp/go-retryablehttp"
"github.com/hashicorp/go-retryablehttp"
"github.com/stretchr/testify/assert"
)

Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/flink/run_jar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"net/http"
"testing"

retryablehttp "github.com/hashicorp/go-retryablehttp"
"github.com/hashicorp/go-retryablehttp"
"github.com/stretchr/testify/assert"
)

Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/flink/savepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"net/http"
"testing"

retryablehttp "github.com/hashicorp/go-retryablehttp"
"github.com/hashicorp/go-retryablehttp"
"github.com/stretchr/testify/assert"
)

Expand Down
60 changes: 60 additions & 0 deletions cmd/cli/flink/terminate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package flink

import (
"context"
"fmt"
"io/ioutil"
"net/http"
)

type TerminateJobErrorResponse struct {
ErrInfo string `json:"error"`
}

// Terminate terminates a running job specified by job ID
func (c FlinkRestClient) Terminate(jobID string, mode string) error {
var path string
if len(mode) > 0 {
path = fmt.Sprintf("jobs/%v?mode=%v", jobID, mode)
} else {
path = fmt.Sprintf("jobs/%v", jobID)
}

c.Client.CheckRetry = RetryPolicy
req, err := c.newRequest("PATCH", c.constructURL(path), nil)
res, err := c.Client.Do(req)

defer res.Body.Close()

if err != nil {
return err
}

if res.StatusCode != 202 {
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return err
}

return fmt.Errorf("Unexpected response status %v with body %v", res.StatusCode, string(body[:]))
}

return nil
}

// Do not retry when status code is 500. (indicating the job is not stoppable)
func RetryPolicy(ctx context.Context, resp *http.Response, err error) (bool, error) {
if ctx.Err() != nil {
return false, ctx.Err()
}

if err != nil {
return true, err
}

if resp.StatusCode == 0 || resp.StatusCode > 500 {
return true, nil
}

return false, nil
}
65 changes: 65 additions & 0 deletions cmd/cli/flink/terminate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package flink

import (
"net/http"
"testing"

"github.com/hashicorp/go-retryablehttp"
"github.com/stretchr/testify/assert"
)

func TestTerminateWithModeCancelAndStatusSuccess(t *testing.T) {
server := createTestServerWithBodyCheck(t, "/jobs/id?mode=cancel", "", http.StatusAccepted, "")
defer server.Close()

api := FlinkRestClient{
BaseURL: server.URL,
Client: retryablehttp.NewClient(),
}

err := api.Terminate("id", "cancel")

assert.Nil(t, err)
}

func TestTerminateWithModeCancelAndStatus404(t *testing.T) {
server := createTestServerWithBodyCheck(t, "/jobs/id?mode=cancel", "", http.StatusNotFound, "not found")
defer server.Close()

api := FlinkRestClient{
BaseURL: server.URL,
Client: retryablehttp.NewClient(),
}

err := api.Terminate("id", "cancel")

assert.EqualError(t, err, "Unexpected response status 404 with body not found")
}

func TestTerminateWithModeStopAndStatusSuccess(t *testing.T) {
server := createTestServerWithBodyCheck(t, "/jobs/id?mode=stop", "", http.StatusAccepted, "OK")
defer server.Close()

api := FlinkRestClient{
BaseURL: server.URL,
Client: retryablehttp.NewClient(),
}

err := api.Terminate("id", "stop")

assert.Nil(t, err)
}

func TestTerminateWithModeStopAndStatusFailure(t *testing.T) {
server := createTestServerWithBodyCheck(t, "/jobs/id?mode=stop", "", http.StatusInternalServerError, "error")
defer server.Close()

api := FlinkRestClient{
BaseURL: server.URL,
Client: retryablehttp.NewClient(),
}

err := api.Terminate("id", "stop")

assert.EqualError(t, err, "Unexpected response status 500 with body error")
}
2 changes: 1 addition & 1 deletion cmd/cli/flink/upload_jar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"net/http"
"testing"

retryablehttp "github.com/hashicorp/go-retryablehttp"
"github.com/hashicorp/go-retryablehttp"
"github.com/stretchr/testify/assert"
)

Expand Down
42 changes: 42 additions & 0 deletions cmd/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,32 @@ func UpdateAction(c *cli.Context) error {
return nil
}

// TerminateAction executes the CLI terminate command
func TerminateAction(c *cli.Context) error {
terminate := operations.TerminateJob{}

jobNameBase := c.String("job-name-base")
if len(jobNameBase) == 0 {
return cli.NewExitError("unspecified flag 'job-name-base'", -1)
}
terminate.JobNameBase = jobNameBase

mode := c.String("mode")
if len(mode) > 0 && mode != "cancel" && mode != "stop" {
return cli.NewExitError("unknown value for 'mode', only 'cancel' and 'stop' are supported", -1)
}
terminate.Mode = mode

err := operator.Terminate(terminate)
if err != nil {
return cli.NewExitError(fmt.Sprintf("an error occurred: %v", err), -1)
}

log.Println("Job successfully terminated")

return nil
}

func getAPITimeoutSeconds() (int64, error) {
if len(os.Getenv("FLINK_API_TIMEOUT_SECONDS")) > 0 {
return strconv.ParseInt(os.Getenv("FLINK_API_TIMEOUT_SECONDS"), 10, 64)
Expand Down Expand Up @@ -306,6 +332,22 @@ func main() {
},
Action: UpdateAction,
},
{
Name: "terminate",
Aliases: []string{"t"},
Usage: "Terminate a running job",
Flags: []cli.Flag{
cli.StringFlag{
Name: "job-name-base, jnb",
Usage: "The base name of the job to update",
},
cli.StringFlag{
Name: "mode, m",
Usage: "The mode to terminate a running job, cancel and stop supported",
},
},
Action: TerminateAction,
},
}

app.Run(os.Args)
Expand Down
28 changes: 28 additions & 0 deletions cmd/cli/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,34 @@ func TestListActionShouldReturnNilWhenTheAPISucceeds(t *testing.T) {
assert.Nil(t, err)
}

/*
* TerminateAction
*/
func TestTerminateActionShouldThrowAnErrorWhenJobNameBaseMissing(t *testing.T) {
mockedUpdateError = nil
operator = TestOperator{}

app := cli.App{}
set := flag.FlagSet{}
context := cli.NewContext(&app, &set, nil)
err := TerminateAction(context)

assert.EqualError(t, err, "unspecified flag 'job-name-base'")
}

func TestTerminateActionShouldThrowAnErrorWhenModeUnknown(t *testing.T) {
operator = TestOperator{}

app := cli.App{}
set := flag.FlagSet{}
set.String("job-name-base", "a job", "")
set.String("mode", "not_exist_mode", "")
context := cli.NewContext(&app, &set, nil)
err := TerminateAction(context)

assert.EqualError(t, err, "unknown value for 'mode', only 'cancel' and 'stop' are supported")
}

/*
* DeployAction
*/
Expand Down
6 changes: 3 additions & 3 deletions cmd/cli/operations/flink_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
* Flink REST API mocking
*/

var mockedCancelError error
var mockedTerminateError error
var mockedCreateSavepointResponse flink.CreateSavepointResponse
var mockedCreateSavepointError error
var mockedMonitorSavepointCreationResponse flink.MonitorSavepointCreationResponse
Expand All @@ -26,8 +26,8 @@ type TestFlinkRestClient struct {
Client *http.Client
}

func (c TestFlinkRestClient) Cancel(jobID string) error {
return mockedCancelError
func (c TestFlinkRestClient) Terminate(jobID string, mode string) error {
return mockedTerminateError
}
func (c TestFlinkRestClient) CreateSavepoint(jobID string, savepointPath string) (flink.CreateSavepointResponse, error) {
return mockedCreateSavepointResponse, mockedCreateSavepointError
Expand Down
1 change: 1 addition & 0 deletions cmd/cli/operations/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Operator interface {
Deploy(d Deploy) error
Update(u UpdateJob) error
RetrieveJobs() ([]flink.Job, error)
Terminate(t TerminateJob) error
}

// RealOperator is the Operator used in the production code
Expand Down
Loading

0 comments on commit 706b264

Please sign in to comment.