Skip to content

Commit

Permalink
Improve the error message for databricks plugins (#4477)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored Mar 5, 2024
1 parent f9c1a97 commit 99dad35
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package webapi

import (
"context"
pluginErrors "github.com/flyteorg/flyte/flyteplugins/go/tasks/errors"
"time"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
Expand All @@ -15,7 +16,7 @@ func launch(ctx context.Context, p webapi.AsyncPlugin, tCtx core.TaskExecutionCo
rMeta, r, err := p.Create(ctx, tCtx)
if err != nil {
logger.Errorf(ctx, "Failed to create resource. Error: %v", err)
return nil, core.PhaseInfo{}, err
return state, core.PhaseInfoRetryableFailure(pluginErrors.TaskFailedWithError, err.Error(), nil), nil
}

// If the plugin also returned the created resource, check to see if it's already in a terminal state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ func Test_launch(t *testing.T) {

plgn := newPluginWithProperties(webapi.PluginConfig{})
plgn.OnCreate(ctx, tCtx).Return("", nil, fmt.Errorf("error creating"))
_, _, err := launch(ctx, plgn, tCtx, c, &s)
assert.Error(t, err)
_, phase, err := launch(ctx, plgn, tCtx, c, &s)
assert.Nil(t, err)
assert.Equal(t, core.PhaseRetryableFailure, phase.Phase())
})

t.Run("Failed to cache", func(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions flyteplugins/go/tasks/plugins/webapi/agent/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ func TestEndToEnd(t *testing.T) {
tCtx.OnInputReader().Return(inputReader)

trns, err := plugin.Handle(context.Background(), tCtx)
assert.Error(t, err)
assert.Equal(t, trns.Info().Phase(), core.PhaseUndefined)
assert.Nil(t, err)
assert.Equal(t, trns.Info().Phase(), core.PhaseRetryableFailure)
err = plugin.Abort(context.Background(), tCtx)
assert.Nil(t, err)
})
Expand All @@ -155,8 +155,8 @@ func TestEndToEnd(t *testing.T) {
assert.NoError(t, err)

trns, err := plugin.Handle(context.Background(), tCtx)
assert.Error(t, err)
assert.Equal(t, trns.Info().Phase(), core.PhaseUndefined)
assert.Nil(t, err)
assert.Equal(t, trns.Info().Phase(), core.PhaseRetryableFailure)
})

t.Run("failed to read inputs", func(t *testing.T) {
Expand All @@ -176,8 +176,8 @@ func TestEndToEnd(t *testing.T) {
assert.NoError(t, err)

trns, err := plugin.Handle(context.Background(), tCtx)
assert.Error(t, err)
assert.Equal(t, trns.Info().Phase(), core.PhaseUndefined)
assert.Nil(t, err)
assert.Equal(t, trns.Info().Phase(), core.PhaseRetryableFailure)
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,17 @@ func newFakeDatabricksServer() *httptest.Server {
runID := "065168461"
jobID := "019e7546"
return httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
if request.URL.Path == fmt.Sprintf("%v/submit", databricksAPI) && request.Method == post {
writer.WriteHeader(202)
if request.URL.Path == fmt.Sprintf("%v/submit", databricksAPI) && request.Method == http.MethodPost {
writer.WriteHeader(http.StatusOK)
bytes := []byte(fmt.Sprintf(`{
"run_id": "%v"
}`, runID))
_, _ = writer.Write(bytes)
return
}

if request.URL.Path == fmt.Sprintf("%v/get", databricksAPI) && request.Method == get {
writer.WriteHeader(200)
if request.URL.Path == fmt.Sprintf("%v/get", databricksAPI) && request.Method == http.MethodGet {
writer.WriteHeader(http.StatusOK)
bytes := []byte(fmt.Sprintf(`{
"job_id": "%v",
"state": {"state_message": "execution in progress.", "life_cycle_state": "TERMINATED", "result_state": "SUCCESS"}
Expand All @@ -128,12 +128,12 @@ func newFakeDatabricksServer() *httptest.Server {
return
}

if request.URL.Path == fmt.Sprintf("%v/cancel", databricksAPI) && request.Method == post {
writer.WriteHeader(200)
if request.URL.Path == fmt.Sprintf("%v/cancel", databricksAPI) && request.Method == http.MethodPost {
writer.WriteHeader(http.StatusOK)
return
}

writer.WriteHeader(500)
writer.WriteHeader(http.StatusInternalServerError)
}))
}

Expand Down
Loading

0 comments on commit 99dad35

Please sign in to comment.