From 706b264307422c1942d8f71ee6e09e47837e31d7 Mon Sep 17 00:00:00 2001 From: Lifei Chen Date: Fri, 12 Apr 2019 13:00:54 +0800 Subject: [PATCH] Replace Cancel with Terminate (#37) * replace Cancel with Terminate * Code optimized based on code review result * Update doc about command: terminate * Code optimized --- README.md | 3 +- cmd/cli/flink/api.go | 2 +- cmd/cli/flink/cancel.go | 24 ---------- cmd/cli/flink/cancel_test.go | 35 --------------- cmd/cli/flink/client.go | 2 +- cmd/cli/flink/client_test.go | 2 +- cmd/cli/flink/retrieve_jobs_test.go | 2 +- cmd/cli/flink/run_jar_test.go | 2 +- cmd/cli/flink/savepoint_test.go | 2 +- cmd/cli/flink/terminate.go | 60 +++++++++++++++++++++++++ cmd/cli/flink/terminate_test.go | 65 +++++++++++++++++++++++++++ cmd/cli/flink/upload_jar_test.go | 2 +- cmd/cli/main.go | 42 +++++++++++++++++ cmd/cli/main_test.go | 28 ++++++++++++ cmd/cli/operations/flink_api_test.go | 6 +-- cmd/cli/operations/operator.go | 1 + cmd/cli/operations/terminate_job.go | 27 +++++++++++ cmd/cli/operations/update_job.go | 2 +- cmd/cli/operations/update_job_test.go | 8 ++-- cmd/cli/operator_test.go | 5 +++ 20 files changed, 245 insertions(+), 75 deletions(-) delete mode 100644 cmd/cli/flink/cancel.go delete mode 100644 cmd/cli/flink/cancel_test.go create mode 100644 cmd/cli/flink/terminate.go create mode 100644 cmd/cli/flink/terminate_test.go create mode 100644 cmd/cli/operations/terminate_job.go diff --git a/README.md b/README.md index ebc639c..fcb2d47 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,8 @@ Currently, it supports several features: 1. Listing jobs 2. Deploying a new job 3. Updating an existing job -4. Querying Flink queryable state +4. Terminating an existing job +5. Querying Flink queryable state For a full overview of the commands and flags, run `flink-job-deployer help` diff --git a/cmd/cli/flink/api.go b/cmd/cli/flink/api.go index 07e13a7..9051685 100644 --- a/cmd/cli/flink/api.go +++ b/cmd/cli/flink/api.go @@ -3,7 +3,7 @@ package flink // FlinkRestAPI is an interface representing the ability to execute // multiple HTTP requests against the Apache Flink API. type FlinkRestAPI interface { - Cancel(jobID string) error + Terminate(jobID string, mode string) error CreateSavepoint(jobID string, savepointPath string) (CreateSavepointResponse, error) MonitorSavepointCreation(jobID string, requestID string) (MonitorSavepointCreationResponse, error) RetrieveJobs() ([]Job, error) diff --git a/cmd/cli/flink/cancel.go b/cmd/cli/flink/cancel.go deleted file mode 100644 index 3606e61..0000000 --- a/cmd/cli/flink/cancel.go +++ /dev/null @@ -1,24 +0,0 @@ -package flink - -import ( - "fmt" -) - -// Cancel terminates a running job specified by job ID -func (c FlinkRestClient) Cancel(jobID string) error { - req, err := c.newRequest("PATCH", c.constructURL(fmt.Sprintf("jobs/%v", jobID)), nil) - if err != nil { - return err - } - - res, err := c.Client.Do(req) - if err != nil { - return err - } - - if res.StatusCode != 202 { - return fmt.Errorf("Unexpected response status %v", res.StatusCode) - } - - return nil -} diff --git a/cmd/cli/flink/cancel_test.go b/cmd/cli/flink/cancel_test.go deleted file mode 100644 index 0164c6e..0000000 --- a/cmd/cli/flink/cancel_test.go +++ /dev/null @@ -1,35 +0,0 @@ -package flink - -import ( - "net/http" - "testing" - - retryablehttp "github.com/hashicorp/go-retryablehttp" - "github.com/stretchr/testify/assert" -) - -func TestCancelReturnsAnErrorWhenTheResponseStatusIsNot202(t *testing.T) { - server := createTestServerWithBodyCheck(t, "/jobs/id", "", http.StatusOK, "OK") - defer server.Close() - - api := FlinkRestClient{ - BaseURL: server.URL, - Client: retryablehttp.NewClient(), - } - err := api.Cancel("id") - - assert.EqualError(t, err, "Unexpected response status 200") -} - -func TestCancelShouldNotReturnAnErrorWhenTheResponseStatusIs202(t *testing.T) { - server := createTestServerWithBodyCheck(t, "/jobs/id", "", http.StatusAccepted, "") - defer server.Close() - - api := FlinkRestClient{ - BaseURL: server.URL, - Client: retryablehttp.NewClient(), - } - err := api.Cancel("id") - - assert.Nil(t, err) -} diff --git a/cmd/cli/flink/client.go b/cmd/cli/flink/client.go index 597605f..f3ca49b 100644 --- a/cmd/cli/flink/client.go +++ b/cmd/cli/flink/client.go @@ -3,7 +3,7 @@ package flink import ( "fmt" - retryablehttp "github.com/hashicorp/go-retryablehttp" + "github.com/hashicorp/go-retryablehttp" ) // A FlinkRestClient is a client to interface with diff --git a/cmd/cli/flink/client_test.go b/cmd/cli/flink/client_test.go index 5d61cb1..f465ade 100644 --- a/cmd/cli/flink/client_test.go +++ b/cmd/cli/flink/client_test.go @@ -8,7 +8,7 @@ import ( "strings" "testing" - retryablehttp "github.com/hashicorp/go-retryablehttp" + "github.com/hashicorp/go-retryablehttp" "github.com/stretchr/testify/assert" ) diff --git a/cmd/cli/flink/retrieve_jobs_test.go b/cmd/cli/flink/retrieve_jobs_test.go index 981cb80..eb9a4ff 100644 --- a/cmd/cli/flink/retrieve_jobs_test.go +++ b/cmd/cli/flink/retrieve_jobs_test.go @@ -4,7 +4,7 @@ import ( "net/http" "testing" - retryablehttp "github.com/hashicorp/go-retryablehttp" + "github.com/hashicorp/go-retryablehttp" "github.com/stretchr/testify/assert" ) diff --git a/cmd/cli/flink/run_jar_test.go b/cmd/cli/flink/run_jar_test.go index 225db0a..9fa448e 100644 --- a/cmd/cli/flink/run_jar_test.go +++ b/cmd/cli/flink/run_jar_test.go @@ -4,7 +4,7 @@ import ( "net/http" "testing" - retryablehttp "github.com/hashicorp/go-retryablehttp" + "github.com/hashicorp/go-retryablehttp" "github.com/stretchr/testify/assert" ) diff --git a/cmd/cli/flink/savepoint_test.go b/cmd/cli/flink/savepoint_test.go index caf4b29..a547aaa 100644 --- a/cmd/cli/flink/savepoint_test.go +++ b/cmd/cli/flink/savepoint_test.go @@ -4,7 +4,7 @@ import ( "net/http" "testing" - retryablehttp "github.com/hashicorp/go-retryablehttp" + "github.com/hashicorp/go-retryablehttp" "github.com/stretchr/testify/assert" ) diff --git a/cmd/cli/flink/terminate.go b/cmd/cli/flink/terminate.go new file mode 100644 index 0000000..b32acbf --- /dev/null +++ b/cmd/cli/flink/terminate.go @@ -0,0 +1,60 @@ +package flink + +import ( + "context" + "fmt" + "io/ioutil" + "net/http" +) + +type TerminateJobErrorResponse struct { + ErrInfo string `json:"error"` +} + +// Terminate terminates a running job specified by job ID +func (c FlinkRestClient) Terminate(jobID string, mode string) error { + var path string + if len(mode) > 0 { + path = fmt.Sprintf("jobs/%v?mode=%v", jobID, mode) + } else { + path = fmt.Sprintf("jobs/%v", jobID) + } + + c.Client.CheckRetry = RetryPolicy + req, err := c.newRequest("PATCH", c.constructURL(path), nil) + res, err := c.Client.Do(req) + + defer res.Body.Close() + + if err != nil { + return err + } + + if res.StatusCode != 202 { + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return err + } + + return fmt.Errorf("Unexpected response status %v with body %v", res.StatusCode, string(body[:])) + } + + return nil +} + +// Do not retry when status code is 500. (indicating the job is not stoppable) +func RetryPolicy(ctx context.Context, resp *http.Response, err error) (bool, error) { + if ctx.Err() != nil { + return false, ctx.Err() + } + + if err != nil { + return true, err + } + + if resp.StatusCode == 0 || resp.StatusCode > 500 { + return true, nil + } + + return false, nil +} diff --git a/cmd/cli/flink/terminate_test.go b/cmd/cli/flink/terminate_test.go new file mode 100644 index 0000000..40e4c5a --- /dev/null +++ b/cmd/cli/flink/terminate_test.go @@ -0,0 +1,65 @@ +package flink + +import ( + "net/http" + "testing" + + "github.com/hashicorp/go-retryablehttp" + "github.com/stretchr/testify/assert" +) + +func TestTerminateWithModeCancelAndStatusSuccess(t *testing.T) { + server := createTestServerWithBodyCheck(t, "/jobs/id?mode=cancel", "", http.StatusAccepted, "") + defer server.Close() + + api := FlinkRestClient{ + BaseURL: server.URL, + Client: retryablehttp.NewClient(), + } + + err := api.Terminate("id", "cancel") + + assert.Nil(t, err) +} + +func TestTerminateWithModeCancelAndStatus404(t *testing.T) { + server := createTestServerWithBodyCheck(t, "/jobs/id?mode=cancel", "", http.StatusNotFound, "not found") + defer server.Close() + + api := FlinkRestClient{ + BaseURL: server.URL, + Client: retryablehttp.NewClient(), + } + + err := api.Terminate("id", "cancel") + + assert.EqualError(t, err, "Unexpected response status 404 with body not found") +} + +func TestTerminateWithModeStopAndStatusSuccess(t *testing.T) { + server := createTestServerWithBodyCheck(t, "/jobs/id?mode=stop", "", http.StatusAccepted, "OK") + defer server.Close() + + api := FlinkRestClient{ + BaseURL: server.URL, + Client: retryablehttp.NewClient(), + } + + err := api.Terminate("id", "stop") + + assert.Nil(t, err) +} + +func TestTerminateWithModeStopAndStatusFailure(t *testing.T) { + server := createTestServerWithBodyCheck(t, "/jobs/id?mode=stop", "", http.StatusInternalServerError, "error") + defer server.Close() + + api := FlinkRestClient{ + BaseURL: server.URL, + Client: retryablehttp.NewClient(), + } + + err := api.Terminate("id", "stop") + + assert.EqualError(t, err, "Unexpected response status 500 with body error") +} diff --git a/cmd/cli/flink/upload_jar_test.go b/cmd/cli/flink/upload_jar_test.go index 08f9a62..ec4842a 100644 --- a/cmd/cli/flink/upload_jar_test.go +++ b/cmd/cli/flink/upload_jar_test.go @@ -4,7 +4,7 @@ import ( "net/http" "testing" - retryablehttp "github.com/hashicorp/go-retryablehttp" + "github.com/hashicorp/go-retryablehttp" "github.com/stretchr/testify/assert" ) diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 423cc7d..4a9104e 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -168,6 +168,32 @@ func UpdateAction(c *cli.Context) error { return nil } +// TerminateAction executes the CLI terminate command +func TerminateAction(c *cli.Context) error { + terminate := operations.TerminateJob{} + + jobNameBase := c.String("job-name-base") + if len(jobNameBase) == 0 { + return cli.NewExitError("unspecified flag 'job-name-base'", -1) + } + terminate.JobNameBase = jobNameBase + + mode := c.String("mode") + if len(mode) > 0 && mode != "cancel" && mode != "stop" { + return cli.NewExitError("unknown value for 'mode', only 'cancel' and 'stop' are supported", -1) + } + terminate.Mode = mode + + err := operator.Terminate(terminate) + if err != nil { + return cli.NewExitError(fmt.Sprintf("an error occurred: %v", err), -1) + } + + log.Println("Job successfully terminated") + + return nil +} + func getAPITimeoutSeconds() (int64, error) { if len(os.Getenv("FLINK_API_TIMEOUT_SECONDS")) > 0 { return strconv.ParseInt(os.Getenv("FLINK_API_TIMEOUT_SECONDS"), 10, 64) @@ -306,6 +332,22 @@ func main() { }, Action: UpdateAction, }, + { + Name: "terminate", + Aliases: []string{"t"}, + Usage: "Terminate a running job", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "job-name-base, jnb", + Usage: "The base name of the job to update", + }, + cli.StringFlag{ + Name: "mode, m", + Usage: "The mode to terminate a running job, cancel and stop supported", + }, + }, + Action: TerminateAction, + }, } app.Run(os.Args) diff --git a/cmd/cli/main_test.go b/cmd/cli/main_test.go index 1e5a9c0..dbff4d0 100644 --- a/cmd/cli/main_test.go +++ b/cmd/cli/main_test.go @@ -63,6 +63,34 @@ func TestListActionShouldReturnNilWhenTheAPISucceeds(t *testing.T) { assert.Nil(t, err) } +/* + * TerminateAction + */ +func TestTerminateActionShouldThrowAnErrorWhenJobNameBaseMissing(t *testing.T) { + mockedUpdateError = nil + operator = TestOperator{} + + app := cli.App{} + set := flag.FlagSet{} + context := cli.NewContext(&app, &set, nil) + err := TerminateAction(context) + + assert.EqualError(t, err, "unspecified flag 'job-name-base'") +} + +func TestTerminateActionShouldThrowAnErrorWhenModeUnknown(t *testing.T) { + operator = TestOperator{} + + app := cli.App{} + set := flag.FlagSet{} + set.String("job-name-base", "a job", "") + set.String("mode", "not_exist_mode", "") + context := cli.NewContext(&app, &set, nil) + err := TerminateAction(context) + + assert.EqualError(t, err, "unknown value for 'mode', only 'cancel' and 'stop' are supported") +} + /* * DeployAction */ diff --git a/cmd/cli/operations/flink_api_test.go b/cmd/cli/operations/flink_api_test.go index 3140cfd..33e1290 100644 --- a/cmd/cli/operations/flink_api_test.go +++ b/cmd/cli/operations/flink_api_test.go @@ -10,7 +10,7 @@ import ( * Flink REST API mocking */ -var mockedCancelError error +var mockedTerminateError error var mockedCreateSavepointResponse flink.CreateSavepointResponse var mockedCreateSavepointError error var mockedMonitorSavepointCreationResponse flink.MonitorSavepointCreationResponse @@ -26,8 +26,8 @@ type TestFlinkRestClient struct { Client *http.Client } -func (c TestFlinkRestClient) Cancel(jobID string) error { - return mockedCancelError +func (c TestFlinkRestClient) Terminate(jobID string, mode string) error { + return mockedTerminateError } func (c TestFlinkRestClient) CreateSavepoint(jobID string, savepointPath string) (flink.CreateSavepointResponse, error) { return mockedCreateSavepointResponse, mockedCreateSavepointError diff --git a/cmd/cli/operations/operator.go b/cmd/cli/operations/operator.go index f24969b..4e2742c 100644 --- a/cmd/cli/operations/operator.go +++ b/cmd/cli/operations/operator.go @@ -11,6 +11,7 @@ type Operator interface { Deploy(d Deploy) error Update(u UpdateJob) error RetrieveJobs() ([]flink.Job, error) + Terminate(t TerminateJob) error } // RealOperator is the Operator used in the production code diff --git a/cmd/cli/operations/terminate_job.go b/cmd/cli/operations/terminate_job.go new file mode 100644 index 0000000..bd867c6 --- /dev/null +++ b/cmd/cli/operations/terminate_job.go @@ -0,0 +1,27 @@ +package operations + +import ( + "errors" + "fmt" +) + +// TerminateJob represents the configuration used for +// terminate a job on the Flink cluster +type TerminateJob struct { + JobNameBase string + Mode string +} + +// Terminate executes the actual termination of a job on the Flink cluster +func (o RealOperator) Terminate(t TerminateJob) error { + if len(t.JobNameBase) == 0 { + return errors.New("unspecified argument 'JobNameBase'") + } + + err := o.FlinkRestAPI.Terminate(t.JobNameBase, t.Mode) + if err != nil { + return fmt.Errorf("job \"%v\" failed to terminate due to: %v", t.JobNameBase, err) + } + + return nil +} diff --git a/cmd/cli/operations/update_job.go b/cmd/cli/operations/update_job.go index d2634cf..8d07763 100644 --- a/cmd/cli/operations/update_job.go +++ b/cmd/cli/operations/update_job.go @@ -119,7 +119,7 @@ func (o RealOperator) Update(u UpdateJob) error { return err } - err = o.FlinkRestAPI.Cancel(job.ID) + err = o.FlinkRestAPI.Terminate(job.ID, "cancel") if err != nil { return fmt.Errorf("job \"%v\" failed to cancel due to: %v", job.ID, err) } diff --git a/cmd/cli/operations/update_job_test.go b/cmd/cli/operations/update_job_test.go index 99fb754..44ffd85 100644 --- a/cmd/cli/operations/update_job_test.go +++ b/cmd/cli/operations/update_job_test.go @@ -202,7 +202,7 @@ func TestUpdateJobShouldReturnAnErrorWhenTheSavepointCannotBeCreated(t *testing. func TestUpdateJobShouldReturnAnErrorWhenTheJobCannotBeCanceled(t *testing.T) { mockedRetrieveJobsError = nil mockedRetrieveJobsResponse = []flink.Job{ - flink.Job{ + { ID: "Job-A", Name: "WordCountStateful v1.0", Status: "RUNNING", @@ -217,7 +217,7 @@ func TestUpdateJobShouldReturnAnErrorWhenTheJobCannotBeCanceled(t *testing.T) { Id: "COMPLETED", }, } - mockedCancelError = errors.New("failed") + mockedTerminateError = errors.New("failed") operator := RealOperator{ FlinkRestAPI: TestFlinkRestClient{ @@ -256,7 +256,7 @@ func TestUpdateJobShouldReturnAnErrorWhenTheLatestSavepointCannotBeRetrieved(t * Id: "COMPLETED", }, } - mockedCancelError = nil + mockedTerminateError = nil mockedUploadJarResponse = flink.UploadJarResponse{ Filename: "/data/flink/sample.jar", Status: "success", @@ -302,7 +302,7 @@ func TestUpdateJobShouldReturnNilWhenTheUpdateSucceeds(t *testing.T) { Id: "COMPLETED", }, } - mockedCancelError = nil + mockedTerminateError = nil mockedUploadJarResponse = flink.UploadJarResponse{ Filename: "/data/flink/sample.jar", Status: "success", diff --git a/cmd/cli/operator_test.go b/cmd/cli/operator_test.go index 2e61741..1620384 100644 --- a/cmd/cli/operator_test.go +++ b/cmd/cli/operator_test.go @@ -8,6 +8,7 @@ import ( var mockedDeployError error var mockedUpdateError error +var mockedTerminateError error var mockedRetrieveJobsResponse []flink.Job var mockedRetrieveJobsError error @@ -24,6 +25,10 @@ func (t TestOperator) Update(u operations.UpdateJob) error { return mockedUpdateError } +func (t TestOperator) Terminate(te operations.TerminateJob) error { + return mockedUpdateError +} + func (t TestOperator) RetrieveJobs() ([]flink.Job, error) { return mockedRetrieveJobsResponse, mockedRetrieveJobsError }