Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Flink API to retrieve savepoint name #22

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions cmd/cli/flink/savepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ import (
"io/ioutil"
)

type createSavepointRequest struct {
TargetDirectory string `json:"target-directory"`
CancelJob bool `json:"cancel-job"`
}
type createSavepointRequest map[string]interface{}

// CreateSavepointResponse represents the response body
// used by the create savepoint API
Expand All @@ -19,11 +16,13 @@ type CreateSavepointResponse struct {
}

// CreateSavepoint creates a savepoint for a job specified by job ID
func (c FlinkRestClient) CreateSavepoint(jobID string, savepointPath string) (CreateSavepointResponse, error) {
req := createSavepointRequest{
TargetDirectory: savepointPath,
CancelJob: false,
func (c FlinkRestClient) CreateSavepoint(jobID string, savepointDir string) (CreateSavepointResponse, error) {
req := make(map[string]interface{})
if len(savepointDir) > 0 {
// The target-directory attribute has to be omitted if the cluster-default savepoint directory should be used
req["target-directory"] = savepointDir
}
req["cancel-job"] = true

reqBody := new(bytes.Buffer)
json.NewEncoder(reqBody).Encode(req)
Expand Down Expand Up @@ -63,6 +62,12 @@ type SavepointCreationStatus struct {
// used by the savepoint monitoring API
type MonitorSavepointCreationResponse struct {
Status SavepointCreationStatus `json:"status"`
Operation SavepointCreationOperation `json:"operation"`
}

type SavepointCreationOperation struct {
Location string `json:"location"`
FailureCause string `json:"failure-cause"`
}

// MonitorSavepointCreation allows for monitoring the status of a savepoint creation
Expand Down
6 changes: 3 additions & 3 deletions cmd/cli/flink/savepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
* Create Savepoint
*/
func TestCreateSavepointReturnsAnErrorWhenTheStatusIsNot202(t *testing.T) {
server := createTestServerWithBodyCheck(t, "/jobs/1/savepoints", `{"target-directory":"/data/flink","cancel-job":false}`, http.StatusOK, "{}")
server := createTestServerWithBodyCheck(t, "/jobs/1/savepoints", `{"cancel-job":true,"target-directory":"/data/flink"}`, http.StatusOK, "{}")
defer server.Close()

api := FlinkRestClient{server.URL, retryablehttp.NewClient()}
Expand All @@ -22,7 +22,7 @@ func TestCreateSavepointReturnsAnErrorWhenTheStatusIsNot202(t *testing.T) {
}

func TestCreateSavepointReturnsAnErrorWhenItCannotDeserializeTheResponseAsJSON(t *testing.T) {
server := createTestServerWithBodyCheck(t, "/jobs/1/savepoints", `{"target-directory":"/data/flink","cancel-job":false}`, http.StatusAccepted, `{"jobs: []}`)
server := createTestServerWithBodyCheck(t, "/jobs/1/savepoints", `{"cancel-job":true,"target-directory":"/data/flink"}`, http.StatusAccepted, `{"jobs: []}`)
defer server.Close()

api := FlinkRestClient{server.URL, retryablehttp.NewClient()}
Expand All @@ -32,7 +32,7 @@ func TestCreateSavepointReturnsAnErrorWhenItCannotDeserializeTheResponseAsJSON(t
}

func TestCreateSavepointCorrectlyReturnsARequestID(t *testing.T) {
server := createTestServerWithBodyCheck(t, "/jobs/1/savepoints", `{"target-directory":"/data/flink","cancel-job":false}`, http.StatusAccepted, `{"request-id": "1"}`)
server := createTestServerWithBodyCheck(t, "/jobs/1/savepoints", `{"cancel-job":true,"target-directory":"/data/flink"}`, http.StatusAccepted, `{"request-id": "1"}`)
defer server.Close()

api := FlinkRestClient{server.URL, retryablehttp.NewClient()}
Expand Down
9 changes: 1 addition & 8 deletions cmd/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,7 @@ func DeployAction(c *cli.Context) error {
deploy.ProgramArgs = programArgs
}

savepointDir := c.String("savepoint-dir")
savepointPath := c.String("savepoint-path")
if len(savepointDir) > 0 && len(savepointPath) > 0 {
return cli.NewExitError("both flags 'savepoint-dir' and 'savepoint-path' specified, only one allowed", -1)
}
if len(savepointDir) > 0 {
deploy.SavepointDir = savepointDir
}
if len(savepointPath) > 0 {
deploy.SavepointPath = savepointPath
}
Expand Down Expand Up @@ -152,7 +145,7 @@ func UpdateAction(c *cli.Context) error {
if len(savepointDir) != 0 {
update.SavepointDir = savepointDir
} else {
return cli.NewExitError("unspecified flag 'savepoint-dir'", -1)
update.SavepointDir = ""
}

update.AllowNonRestoredState = c.Bool("allow-non-restored-state")
Expand Down
14 changes: 0 additions & 14 deletions cmd/cli/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,6 @@ func TestDeployActionShouldThrowAnErrorWhenBothTheLocalFilenameAndRemoteFilename
assert.EqualError(t, err, "both flags 'file-name' and 'remote-file-name' specified, only one allowed")
}

func TestDeployActionShouldThrowAnErrorWhenBothTheSavepointDirAndSavepointPathArgumentsAreSet(t *testing.T) {
operator = TestOperator{}

app := cli.App{}
set := flag.FlagSet{}
set.String("file-name", "file.jar", "")
set.String("savepoint-dir", "/data/flink", "")
set.String("savepoint-path", "/data/flink/savepoint-abc", "")
context := cli.NewContext(&app, &set, nil)
err := DeployAction(context)

assert.EqualError(t, err, "both flags 'savepoint-dir' and 'savepoint-path' specified, only one allowed")
}

func TestDeployActionShouldThrowAnErrorWhenTheCommandFails(t *testing.T) {
mockedDeployError = errors.New("failed")
operator = TestOperator{}
Expand Down
23 changes: 1 addition & 22 deletions cmd/cli/operations/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package operations

import (
"errors"
"fmt"
"log"
"strings"
)
Expand All @@ -16,7 +15,6 @@ type Deploy struct {
EntryClass string
Parallelism int
ProgramArgs string
SavepointDir string
SavepointPath string
AllowNonRestoredState bool
}
Expand All @@ -30,26 +28,7 @@ func (o RealOperator) extractJarIDFromFilename(filename string) string {
func (o RealOperator) Deploy(d Deploy) error {
log.Println("Starting deploy")

if len(d.SavepointDir) > 0 && len(d.SavepointPath) > 0 {
return errors.New("both properties 'SavepointDir' and 'SavepointPath' are specified")
}

if len(d.SavepointDir) > 0 {
log.Printf("Using savepoint directory to retrieve the latest savepoint: %v", d.SavepointDir)

latestSavepoint, err := o.retrieveLatestSavepoint(d.SavepointDir)
if err != nil {
return fmt.Errorf("retrieving the latest savepoint failed: %v", err)
}

if len(latestSavepoint) != 0 {
d.SavepointPath = latestSavepoint
}
}

if len(d.SavepointPath) > 0 {
log.Printf("Using savepoint for deployment: %v", d.SavepointPath)
}
log.Printf("Using savepoint for deployment: %v", d.SavepointPath)

if d.AllowNonRestoredState == true {
log.Printf("Allowing non restorable state")
Expand Down
41 changes: 1 addition & 40 deletions cmd/cli/operations/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"testing"

"github.com/ing-bank/flink-deployer/cmd/cli/flink"
"github.com/spf13/afero"

"github.com/stretchr/testify/assert"
)

Expand All @@ -25,21 +23,10 @@ func TestExtractJarIDFromFilenameShouldReturnThePartAfterTheLastSlash(t *testing
/*
* Deploy
*/
func TestDeployShouldReturnAnErrorWhenBothTheSavepointDirAndSavepointPathAreSet(t *testing.T) {
operator := RealOperator{}

err := operator.Deploy(Deploy{
SavepointDir: "/data/flink",
SavepointPath: "/data/flink/savepoint-abc",
})

assert.EqualError(t, err, "both properties 'SavepointDir' and 'SavepointPath' are specified")
}

func TestDeployShouldReturnAnErrorWhenNeitherTheLocalOrRemoteFileNameAreSet(t *testing.T) {
operator := RealOperator{}

err := operator.Deploy(Deploy{})
err := operator.Deploy(Deploy{SavepointPath: "dummy"})

assert.EqualError(t, err, "both properties 'RemoteFilename' and 'LocalFilename' are unspecified")
}
Expand All @@ -61,32 +48,6 @@ func TestDeployShouldReturnAnErrorWhenTheJarUploadFails(t *testing.T) {
assert.EqualError(t, err, "failed")
}

func TestDeployShouldReturnAnErrorWhenTheLatestSavepointCannotBeRetrieved(t *testing.T) {
filesystem := afero.NewMemMapFs()
filesystem.Mkdir("/data/flink/", 0755)

mockedUploadJarResponse = flink.UploadJarResponse{
Filename: "/data/flink/sample.jar",
Status: "success",
}
mockedUploadJarError = nil

operator := RealOperator{
Filesystem: filesystem,
FlinkRestAPI: TestFlinkRestClient{
BaseURL: "http://localhost",
Client: &http.Client{},
},
}

err := operator.Deploy(Deploy{
LocalFilename: "testdata/sample.jar",
SavepointDir: "/data/flink",
})

assert.EqualError(t, err, "retrieving the latest savepoint failed: No savepoints present in directory: /data/flink")
}

func TestDeployShouldReturnAnErrorWhenTheJarRunFails(t *testing.T) {
mockedUploadJarResponse = flink.UploadJarResponse{
Filename: "/data/flink/sample.jar",
Expand Down
40 changes: 0 additions & 40 deletions cmd/cli/operations/retrieve_savepoint.go

This file was deleted.

68 changes: 0 additions & 68 deletions cmd/cli/operations/retrieve_savepoint_test.go

This file was deleted.

Loading