diff --git a/flyteadmin/go.mod b/flyteadmin/go.mod index c2844c6cac..18f7b2eaf9 100644 --- a/flyteadmin/go.mod +++ b/flyteadmin/go.mod @@ -46,6 +46,7 @@ require ( github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 + github.com/wI2L/jsondiff v0.5.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 go.opentelemetry.io/otel v1.21.0 golang.org/x/oauth2 v0.16.0 @@ -166,6 +167,10 @@ require ( github.com/spf13/viper v1.11.0 // indirect github.com/stretchr/objx v0.5.0 // indirect github.com/subosito/gotenv v1.2.0 // indirect + github.com/tidwall/gjson v1.17.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect + github.com/tidwall/sjson v1.2.5 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect diff --git a/flyteadmin/go.sum b/flyteadmin/go.sum index 0693371e59..6e6479957a 100644 --- a/flyteadmin/go.sum +++ b/flyteadmin/go.sum @@ -1265,13 +1265,22 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69 github.com/tidwall/gjson v1.3.2/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= github.com/tidwall/gjson v1.6.8/go.mod h1:zeFuBCIqD4sN/gmqBzZ4j7Jd6UcA2Fc56x7QFsv+8fI= github.com/tidwall/gjson v1.7.1/go.mod h1:5/xDoumyyDNerp2U36lyolv46b3uF/9Bu6OfyQ9GImk= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.17.0 h1:/Jocvlh98kcTfpN2+JzGQWQcqrPQwDrVEMApx/M5ZwM= +github.com/tidwall/gjson v1.17.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/tidwall/match v1.0.3/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tidwall/pretty v1.0.2/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tidwall/pretty v1.1.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/sjson v1.0.4/go.mod h1:bURseu1nuBkFpIES5cz6zBtjmYeOQmEESshn7VpF15Y= github.com/tidwall/sjson v1.1.5/go.mod h1:VuJzsZnTowhSxWdOgsAnb886i4AjEyTkk7tNtsL7EYE= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g= @@ -1288,6 +1297,8 @@ github.com/unrolled/secure v0.0.0-20181005190816-ff9db2ff917f/go.mod h1:mnPT77IA github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/wI2L/jsondiff v0.5.0 h1:RRMTi/mH+R2aXcPe1VYyvGINJqQfC3R+KSEakuU1Ikw= +github.com/wI2L/jsondiff v0.5.0/go.mod h1:qqG6hnK0Lsrz2BpIVCxWiK9ItsBCpIZQiv0izJjOZ9s= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= diff --git a/flyteadmin/pkg/errors/errors.go b/flyteadmin/pkg/errors/errors.go index e269715a91..366e2a3f42 100644 --- a/flyteadmin/pkg/errors/errors.go +++ b/flyteadmin/pkg/errors/errors.go @@ -7,10 +7,12 @@ import ( "strings" "github.com/golang/protobuf/proto" + "github.com/wI2L/jsondiff" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytestdlib/logger" ) @@ -109,8 +111,47 @@ func NewIncompatibleClusterError(ctx context.Context, errorMsg, curCluster strin return statusErr } -func NewWorkflowExistsDifferentStructureError(ctx context.Context, request *admin.WorkflowCreateRequest) FlyteAdminError { - errorMsg := "workflow with different structure already exists" +func compareJsons(jsonArray1 jsondiff.Patch, jsonArray2 jsondiff.Patch) []string { + results := []string{} + map1 := make(map[string]jsondiff.Operation) + for _, obj := range jsonArray1 { + map1[obj.Path] = obj + } + + for _, obj := range jsonArray2 { + if val, ok := map1[obj.Path]; ok { + result := fmt.Sprintf("\t\t- %v: %v -> %v", obj.Path, obj.Value, val.Value) + results = append(results, result) + } + } + return results +} + +func NewTaskExistsDifferentStructureError(ctx context.Context, request *admin.TaskCreateRequest, oldSpec *core.CompiledTask, newSpec *core.CompiledTask) FlyteAdminError { + errorMsg := "task with different structure already exists:\n" + diff, _ := jsondiff.Compare(oldSpec, newSpec) + rdiff, _ := jsondiff.Compare(newSpec, oldSpec) + rs := compareJsons(diff, rdiff) + + errorMsg += strings.Join(rs, "\n") + + return NewFlyteAdminErrorf(codes.InvalidArgument, errorMsg) + +} + +func NewTaskExistsIdenticalStructureError(ctx context.Context, request *admin.TaskCreateRequest) FlyteAdminError { + errorMsg := "task with identical structure already exists" + return NewFlyteAdminErrorf(codes.AlreadyExists, errorMsg) +} + +func NewWorkflowExistsDifferentStructureError(ctx context.Context, request *admin.WorkflowCreateRequest, oldSpec *core.CompiledWorkflowClosure, newSpec *core.CompiledWorkflowClosure) FlyteAdminError { + errorMsg := "workflow with different structure already exists:\n" + diff, _ := jsondiff.Compare(oldSpec, newSpec) + rdiff, _ := jsondiff.Compare(newSpec, oldSpec) + rs := compareJsons(diff, rdiff) + + errorMsg += strings.Join(rs, "\n") + statusErr, transformationErr := NewFlyteAdminError(codes.InvalidArgument, errorMsg).WithDetails(&admin.CreateWorkflowFailureReason{ Reason: &admin.CreateWorkflowFailureReason_ExistsDifferentStructure{ ExistsDifferentStructure: &admin.WorkflowErrorExistsDifferentStructure{ @@ -119,7 +160,7 @@ func NewWorkflowExistsDifferentStructureError(ctx context.Context, request *admi }, }) if transformationErr != nil { - logger.Panicf(ctx, "Failed to wrap grpc status in type 'Error': %v", transformationErr) + logger.Errorf(ctx, "Failed to wrap grpc status in type 'Error': %v", transformationErr) return NewFlyteAdminErrorf(codes.InvalidArgument, errorMsg) } return statusErr @@ -135,12 +176,28 @@ func NewWorkflowExistsIdenticalStructureError(ctx context.Context, request *admi }, }) if transformationErr != nil { - logger.Panicf(ctx, "Failed to wrap grpc status in type 'Error': %v", transformationErr) + logger.Errorf(ctx, "Failed to wrap grpc status in type 'Error': %v", transformationErr) return NewFlyteAdminErrorf(codes.AlreadyExists, errorMsg) } return statusErr } +func NewLaunchPlanExistsDifferentStructureError(ctx context.Context, request *admin.LaunchPlanCreateRequest, oldSpec *admin.LaunchPlanSpec, newSpec *admin.LaunchPlanSpec) FlyteAdminError { + errorMsg := "launch plan with different structure already exists:\n" + diff, _ := jsondiff.Compare(oldSpec, newSpec) + rdiff, _ := jsondiff.Compare(newSpec, oldSpec) + rs := compareJsons(diff, rdiff) + + errorMsg += strings.Join(rs, "\n") + + return NewFlyteAdminErrorf(codes.InvalidArgument, errorMsg) +} + +func NewLaunchPlanExistsIdenticalStructureError(ctx context.Context, request *admin.LaunchPlanCreateRequest) FlyteAdminError { + errorMsg := "launch plan with identical structure already exists" + return NewFlyteAdminErrorf(codes.AlreadyExists, errorMsg) +} + func IsDoesNotExistError(err error) bool { adminError, ok := err.(FlyteAdminError) return ok && adminError.Code() == codes.NotFound diff --git a/flyteadmin/pkg/errors/errors_test.go b/flyteadmin/pkg/errors/errors_test.go index cb6a2a0ae0..6c97d9e911 100644 --- a/flyteadmin/pkg/errors/errors_test.go +++ b/flyteadmin/pkg/errors/errors_test.go @@ -3,9 +3,11 @@ package errors import ( "context" "errors" + "strings" "testing" "github.com/stretchr/testify/assert" + "github.com/wI2L/jsondiff" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -13,6 +15,14 @@ import ( "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" ) +var identifier = core.Identifier{ + ResourceType: core.ResourceType_TASK, + Project: "testProj", + Domain: "domain", + Name: "name", + Version: "ver", +} + func TestGrpcStatusError(t *testing.T) { msg := "some error" @@ -46,22 +56,168 @@ func TestNewIncompatibleClusterError(t *testing.T) { assert.True(t, ok) } +func TestJsonDifferHasDiffError(t *testing.T) { + oldSpec := map[string]int{ + "one": 1, + "two": 2, + "three": 3, + "four": 4, + "five": 5, + } + newSpec := map[string]int{ + "five": 5, + "four": 0, + "three": 3, + "two": 2, + "one": 1, + } + diff, _ := jsondiff.Compare(oldSpec, newSpec) + rdiff, _ := jsondiff.Compare(newSpec, oldSpec) + rs := compareJsons(diff, rdiff) + assert.Equal(t, "\t\t- /four: 4 -> 0", strings.Join(rs, "\n")) +} + +func TestJsonDifferNoDiffError(t *testing.T) { + oldSpec := map[string]int{ + "one": 1, + "two": 2, + "three": 3, + "four": 4, + "five": 5, + } + newSpec := map[string]int{ + "five": 5, + "four": 4, + "three": 3, + "two": 2, + "one": 1, + } + diff, _ := jsondiff.Compare(oldSpec, newSpec) + rdiff, _ := jsondiff.Compare(newSpec, oldSpec) + rs := compareJsons(diff, rdiff) + assert.Equal(t, "", strings.Join(rs, "\n")) +} + +func TestNewTaskExistsDifferentStructureError(t *testing.T) { + req := &admin.TaskCreateRequest{ + Id: &identifier, + } + + oldTask := &core.CompiledTask{ + Template: &core.TaskTemplate{ + Target: &core.TaskTemplate_Container{ + Container: &core.Container{ + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + { + Name: core.Resources_CPU, + Value: "150m", + }, + }, + }, + }, + }, + Id: &identifier, + }, + } + + newTask := &core.CompiledTask{ + Template: &core.TaskTemplate{ + Target: &core.TaskTemplate_Container{ + Container: &core.Container{ + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + { + Name: core.Resources_CPU, + Value: "250m", + }, + }, + }, + }, + }, + Id: &identifier, + }, + } + + statusErr := NewTaskExistsDifferentStructureError(context.Background(), req, oldTask, newTask) + assert.NotNil(t, statusErr) + s, ok := status.FromError(statusErr) + assert.True(t, ok) + assert.Equal(t, codes.InvalidArgument, s.Code()) + assert.Equal(t, "task with different structure already exists:\n\t\t- /template/Target/Container/resources/requests/0/value: 150m -> 250m", s.Message()) +} + +func TestNewTaskExistsIdenticalStructureError(t *testing.T) { + req := &admin.TaskCreateRequest{ + Id: &identifier, + } + statusErr := NewTaskExistsIdenticalStructureError(context.Background(), req) + assert.NotNil(t, statusErr) + s, ok := status.FromError(statusErr) + assert.True(t, ok) + assert.Equal(t, codes.AlreadyExists, s.Code()) + assert.Equal(t, "task with identical structure already exists", s.Message()) +} + func TestNewWorkflowExistsDifferentStructureError(t *testing.T) { - wf := &admin.WorkflowCreateRequest{ - Id: &core.Identifier{ - ResourceType: core.ResourceType_WORKFLOW, - Project: "testProj", - Domain: "domain", - Name: "name", - Version: "ver", + req := &admin.WorkflowCreateRequest{ + Id: &identifier, + } + + oldWorkflow := &core.CompiledWorkflowClosure{ + Primary: &core.CompiledWorkflow{ + Connections: &core.ConnectionSet{ + Upstream: map[string]*core.ConnectionSet_IdList{ + "foo": &core.ConnectionSet_IdList{ + Ids: []string{"start-node"}, + }, + "end-node": &core.ConnectionSet_IdList{ + Ids: []string{"foo"}, + }, + }, + }, + Template: &core.WorkflowTemplate{ + Nodes: []*core.Node{ + &core.Node{ + Id: "foo", + Target: &core.Node_TaskNode{}, + }, + }, + Id: &identifier, + }, + }, + } + + newWorkflow := &core.CompiledWorkflowClosure{ + Primary: &core.CompiledWorkflow{ + Connections: &core.ConnectionSet{ + Upstream: map[string]*core.ConnectionSet_IdList{ + "bar": &core.ConnectionSet_IdList{ + Ids: []string{"start-node"}, + }, + "end-node": &core.ConnectionSet_IdList{ + Ids: []string{"bar"}, + }, + }, + }, + Template: &core.WorkflowTemplate{ + Nodes: []*core.Node{ + &core.Node{ + Id: "bar", + Target: &core.Node_TaskNode{}, + }, + }, + Id: &identifier, + }, }, } - statusErr := NewWorkflowExistsDifferentStructureError(context.Background(), wf) + + statusErr := NewWorkflowExistsDifferentStructureError(context.Background(), req, oldWorkflow, newWorkflow) assert.NotNil(t, statusErr) s, ok := status.FromError(statusErr) assert.True(t, ok) assert.Equal(t, codes.InvalidArgument, s.Code()) - assert.Equal(t, "workflow with different structure already exists", s.Message()) + assert.Equal(t, "workflow with different structure already exists:\n\t\t- /primary/connections/upstream/bar: -> map[ids:[start-node]]\n\t\t- /primary/connections/upstream/end-node/ids/0: foo -> bar\n\t\t- /primary/connections/upstream/foo: map[ids:[start-node]] -> \n\t\t- /primary/template/nodes/0/id: foo -> bar", s.Message()) details, ok := s.Details()[0].(*admin.CreateWorkflowFailureReason) assert.True(t, ok) @@ -70,16 +226,10 @@ func TestNewWorkflowExistsDifferentStructureError(t *testing.T) { } func TestNewWorkflowExistsIdenticalStructureError(t *testing.T) { - wf := &admin.WorkflowCreateRequest{ - Id: &core.Identifier{ - ResourceType: core.ResourceType_WORKFLOW, - Project: "testProj", - Domain: "domain", - Name: "name", - Version: "ver", - }, + req := &admin.WorkflowCreateRequest{ + Id: &identifier, } - statusErr := NewWorkflowExistsIdenticalStructureError(context.Background(), wf) + statusErr := NewWorkflowExistsIdenticalStructureError(context.Background(), req) assert.NotNil(t, statusErr) s, ok := status.FromError(statusErr) assert.True(t, ok) @@ -92,6 +242,55 @@ func TestNewWorkflowExistsIdenticalStructureError(t *testing.T) { assert.True(t, ok) } +func TestNewLaunchPlanExistsDifferentStructureError(t *testing.T) { + req := &admin.LaunchPlanCreateRequest{ + Id: &identifier, + } + + oldLaunchPlan := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + WorkflowId: &core.Identifier{ + Project: "testProj", + Domain: "domain", + Name: "lp_name", + Version: "ver1", + }, + }, + Id: &identifier, + } + + newLaunchPlan := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + WorkflowId: &core.Identifier{ + Project: "testProj", + Domain: "domain", + Name: "lp_name", + Version: "ver2", + }, + }, + Id: &identifier, + } + + statusErr := NewLaunchPlanExistsDifferentStructureError(context.Background(), req, oldLaunchPlan.Spec, newLaunchPlan.Spec) + assert.NotNil(t, statusErr) + s, ok := status.FromError(statusErr) + assert.True(t, ok) + assert.Equal(t, codes.InvalidArgument, s.Code()) + assert.Equal(t, "launch plan with different structure already exists:\n\t\t- /workflow_id/version: ver1 -> ver2", s.Message()) +} + +func TestNewLaunchPlanExistsIdenticalStructureError(t *testing.T) { + req := &admin.LaunchPlanCreateRequest{ + Id: &identifier, + } + statusErr := NewLaunchPlanExistsIdenticalStructureError(context.Background(), req) + assert.NotNil(t, statusErr) + s, ok := status.FromError(statusErr) + assert.True(t, ok) + assert.Equal(t, codes.AlreadyExists, s.Code()) + assert.Equal(t, "launch plan with identical structure already exists", s.Message()) +} + func TestIsDoesNotExistError(t *testing.T) { assert.True(t, IsDoesNotExistError(NewFlyteAdminError(codes.NotFound, "foo"))) } diff --git a/flyteadmin/pkg/manager/impl/launch_plan_manager.go b/flyteadmin/pkg/manager/impl/launch_plan_manager.go index 093b4d7cce..57936313e5 100644 --- a/flyteadmin/pkg/manager/impl/launch_plan_manager.go +++ b/flyteadmin/pkg/manager/impl/launch_plan_manager.go @@ -88,12 +88,15 @@ func (m *LaunchPlanManager) CreateLaunchPlan( existingLaunchPlanModel, err := util.GetLaunchPlanModel(ctx, m.db, *request.Id) if err == nil { if bytes.Equal(existingLaunchPlanModel.Digest, launchPlanDigest) { - return nil, errors.NewFlyteAdminErrorf(codes.AlreadyExists, - "identical launch plan already exists with id %s", request.Id) + return nil, errors.NewLaunchPlanExistsIdenticalStructureError(ctx, &request) } - - return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, - "launch plan with different structure already exists with id %v", request.Id) + existingLaunchPlan, transformerErr := transformers.FromLaunchPlanModel(existingLaunchPlanModel) + if transformerErr != nil { + logger.Errorf(ctx, "failed to transform launch plan from launch plan model") + return nil, transformerErr + } + // A launch plan exists with different structure + return nil, errors.NewLaunchPlanExistsDifferentStructureError(ctx, &request, existingLaunchPlan.Spec, launchPlan.Spec) } launchPlanModel, err := diff --git a/flyteadmin/pkg/manager/impl/task_manager.go b/flyteadmin/pkg/manager/impl/task_manager.go index d42639c31e..f3eced5d81 100644 --- a/flyteadmin/pkg/manager/impl/task_manager.go +++ b/flyteadmin/pkg/manager/impl/task_manager.go @@ -89,14 +89,17 @@ func (t *TaskManager) CreateTask( return nil, err } // See if a task exists and confirm whether it's an identical task or one that with a separate definition. - existingTask, err := util.GetTaskModel(ctx, t.db, request.Spec.Template.Id) + existingTaskModel, err := util.GetTaskModel(ctx, t.db, request.Spec.Template.Id) if err == nil { - if bytes.Equal(taskDigest, existingTask.Digest) { - return nil, errors.NewFlyteAdminErrorf(codes.AlreadyExists, - "identical task already exists with id %s", request.Id) + if bytes.Equal(taskDigest, existingTaskModel.Digest) { + return nil, errors.NewTaskExistsIdenticalStructureError(ctx, &request) } - return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, - "task with different structure already exists with id %v", request.Id) + existingTask, transformerErr := transformers.FromTaskModel(*existingTaskModel) + if transformerErr != nil { + logger.Errorf(ctx, "failed to transform task from task model") + return nil, transformerErr + } + return nil, errors.NewTaskExistsDifferentStructureError(ctx, &request, existingTask.Closure.GetCompiledTask(), compiledTask) } taskModel, err := transformers.CreateTaskModel(finalizedRequest, admin.TaskClosure{ CompiledTask: compiledTask, diff --git a/flyteadmin/pkg/manager/impl/workflow_manager.go b/flyteadmin/pkg/manager/impl/workflow_manager.go index 4b755a4707..e4cc5cc120 100644 --- a/flyteadmin/pkg/manager/impl/workflow_manager.go +++ b/flyteadmin/pkg/manager/impl/workflow_manager.go @@ -168,8 +168,13 @@ func (w *WorkflowManager) CreateWorkflow( if bytes.Equal(workflowDigest, existingWorkflowModel.Digest) { return nil, errors.NewWorkflowExistsIdenticalStructureError(ctx, &request) } + existingWorkflow, transformerErr := transformers.FromWorkflowModel(existingWorkflowModel) + if transformerErr != nil { + logger.Errorf(ctx, "failed to transform workflow from workflow model") + return nil, transformerErr + } // A workflow exists with different structure - return nil, errors.NewWorkflowExistsDifferentStructureError(ctx, &request) + return nil, errors.NewWorkflowExistsDifferentStructureError(ctx, &request, existingWorkflow.Closure.GetCompiledWorkflow(), workflowClosure.GetCompiledWorkflow()) } else if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound { logger.Debugf(ctx, "Failed to get workflow for comparison in CreateWorkflow with ID [%+v] with err %v", request.Id, err) diff --git a/flyteadmin/pkg/manager/impl/workflow_manager_test.go b/flyteadmin/pkg/manager/impl/workflow_manager_test.go index b5565f8cda..3c286a2430 100644 --- a/flyteadmin/pkg/manager/impl/workflow_manager_test.go +++ b/flyteadmin/pkg/manager/impl/workflow_manager_test.go @@ -187,7 +187,7 @@ func TestCreateWorkflow_ExistingWorkflow(t *testing.T) { getMockWorkflowConfigProvider(), getMockWorkflowCompiler(), mockStorageClient, storagePrefix, mockScope.NewTestScope()) request := testutils.GetWorkflowRequest() response, err := workflowManager.CreateWorkflow(context.Background(), request) - assert.EqualError(t, err, "workflow with different structure already exists") + assert.EqualError(t, err, "workflow with different structure already exists:\n\t\t- /primary/template/id: -> map[domain:domain name:name project:project resource_type:2 version:version]\n\t\t- /primary/template/interface/inputs: -> map[variables:map[foo:map[type:map[Type:map[Simple:3]]]]]\n\t\t- /primary/template/interface/outputs: -> map[variables:map[bar:map[type:map[Type:map[Simple:3]]]]]\n\t\t- /primary/template/nodes: -> [map[Target: id:node 1] map[Target: id:node 2]]") assert.Nil(t, response) } @@ -205,7 +205,7 @@ func TestCreateWorkflow_ExistingWorkflow_Different(t *testing.T) { request := testutils.GetWorkflowRequest() response, err := workflowManager.CreateWorkflow(context.Background(), request) - assert.EqualError(t, err, "workflow with different structure already exists") + assert.EqualError(t, err, "workflow with different structure already exists:\n\t\t- /primary/template/id: -> map[domain:domain name:name project:project resource_type:2 version:version]\n\t\t- /primary/template/interface/inputs: -> map[variables:map[foo:map[type:map[Type:map[Simple:3]]]]]\n\t\t- /primary/template/interface/outputs: -> map[variables:map[bar:map[type:map[Type:map[Simple:3]]]]]\n\t\t- /primary/template/nodes: -> [map[Target: id:node 1] map[Target: id:node 2]]") flyteErr := err.(flyteErrors.FlyteAdminError) assert.Equal(t, codes.InvalidArgument, flyteErr.Code()) assert.Nil(t, response) diff --git a/go.mod b/go.mod index 998cd19de7..5f00818fca 100644 --- a/go.mod +++ b/go.mod @@ -61,7 +61,7 @@ require ( github.com/eapache/queue v1.1.0 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect - github.com/evanphx/json-patch/v5 v5.6.0 // indirect + github.com/evanphx/json-patch/v5 v5.9.0 // indirect github.com/fatih/color v1.13.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/flyteorg/flyte/flyteidl v0.0.0-00010101000000-000000000000 // indirect @@ -169,6 +169,11 @@ require ( github.com/stretchr/objx v0.5.0 // indirect github.com/stretchr/testify v1.8.4 // indirect github.com/subosito/gotenv v1.2.0 // indirect + github.com/tidwall/gjson v1.17.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect + github.com/tidwall/sjson v1.2.5 // indirect + github.com/wI2L/jsondiff v0.5.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect diff --git a/go.sum b/go.sum index 294ed0f37d..df8f6fa506 100644 --- a/go.sum +++ b/go.sum @@ -250,8 +250,8 @@ github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= -github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww= -github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= +github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg= +github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= @@ -1300,13 +1300,22 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69 github.com/tidwall/gjson v1.3.2/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= github.com/tidwall/gjson v1.6.8/go.mod h1:zeFuBCIqD4sN/gmqBzZ4j7Jd6UcA2Fc56x7QFsv+8fI= github.com/tidwall/gjson v1.7.1/go.mod h1:5/xDoumyyDNerp2U36lyolv46b3uF/9Bu6OfyQ9GImk= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.17.0 h1:/Jocvlh98kcTfpN2+JzGQWQcqrPQwDrVEMApx/M5ZwM= +github.com/tidwall/gjson v1.17.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/tidwall/match v1.0.3/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tidwall/pretty v1.0.2/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tidwall/pretty v1.1.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/sjson v1.0.4/go.mod h1:bURseu1nuBkFpIES5cz6zBtjmYeOQmEESshn7VpF15Y= github.com/tidwall/sjson v1.1.5/go.mod h1:VuJzsZnTowhSxWdOgsAnb886i4AjEyTkk7tNtsL7EYE= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g= @@ -1323,6 +1332,8 @@ github.com/unrolled/secure v0.0.0-20181005190816-ff9db2ff917f/go.mod h1:mnPT77IA github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/wI2L/jsondiff v0.5.0 h1:RRMTi/mH+R2aXcPe1VYyvGINJqQfC3R+KSEakuU1Ikw= +github.com/wI2L/jsondiff v0.5.0/go.mod h1:qqG6hnK0Lsrz2BpIVCxWiK9ItsBCpIZQiv0izJjOZ9s= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=