diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/agent.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/agent.proto new file mode 100644 index 000000000..e9a484917 --- /dev/null +++ b/flyteidl-protos/src/main/proto/flyteidl/admin/agent.proto @@ -0,0 +1,88 @@ +syntax = "proto3"; + +package flyteidl.admin; +option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"; + +import "flyteidl/core/literals.proto"; +import "flyteidl/core/tasks.proto"; +import "flyteidl/core/interface.proto"; +import "flyteidl/core/identifier.proto"; + +// The state of the execution is used to control its visibility in the UI/CLI. +enum State { + RETRYABLE_FAILURE = 0; + PERMANENT_FAILURE = 1; + PENDING = 2; + RUNNING = 3; + SUCCEEDED = 4; +} + +// Represents a subset of runtime task execution metadata that are relevant to external plugins. +message TaskExecutionMetadata { + // ID of the task execution + core.TaskExecutionIdentifier task_execution_id = 1; + // k8s namespace where the task is executed in + string namespace = 2; + // Labels attached to the task execution + map labels = 3; + // Annotations attached to the task execution + map annotations = 4; + // k8s service account associated with the task execution + string k8s_service_account = 5; + // Environment variables attached to the task execution + map environment_variables = 6; +} + +// Represents a request structure to create task. +message CreateTaskRequest { + // The inputs required to start the execution. All required inputs must be + // included in this map. If not required and not provided, defaults apply. + // +optional + core.LiteralMap inputs = 1; + // Template of the task that encapsulates all the metadata of the task. + core.TaskTemplate template = 2; + // Prefix for where task output data will be written. (e.g. s3://my-bucket/randomstring) + string output_prefix = 3; + // subset of runtime task execution metadata. + TaskExecutionMetadata task_execution_metadata = 4; +} + +// Represents a create response structure. +message CreateTaskResponse { + // Metadata is created by the agent. It could be a string (jobId) or a dict (more complex metadata). + bytes resource_meta = 1; +} + +// A message used to fetch a job resource from flyte agent server. +message GetTaskRequest { + // A predefined yet extensible Task type identifier. + string task_type = 1; + // Metadata about the resource to be pass to the agent. + bytes resource_meta = 2; +} + +// Response to get an individual task resource. +message GetTaskResponse { + Resource resource = 1; +} + +message Resource { + // The state of the execution is used to control its visibility in the UI/CLI. + State state = 1; + // The outputs of the execution. It's typically used by sql task. Agent service will create a + // Structured dataset pointing to the query result table. + // +optional + core.LiteralMap outputs = 2; +} + +// A message used to delete a task. +message DeleteTaskRequest { + // A predefined yet extensible Task type identifier. + string task_type = 1; + // Metadata about the resource to be pass to the agent. + bytes resource_meta = 2; +} + +// Response to delete a task. +message DeleteTaskResponse { +} diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/cluster_assignment.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/cluster_assignment.proto index 3f279424a..85a6a4ef8 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/admin/cluster_assignment.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/admin/cluster_assignment.proto @@ -6,45 +6,6 @@ option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"; // Encapsulates specifications for routing an execution onto a specific cluster. message ClusterAssignment { - Affinity affinity = 1; - - Toleration toleration = 2; -} - -// Defines a set of constraints used to select eligible objects based on labels they possess. -message Affinity { - // Multiples selectors are 'and'-ed together to produce the list of matching, eligible objects. - repeated Selector selectors = 1; -} - -// Defines a set of specific label selectors that the execution can tolerate on a cluster. -message Toleration { - - // A toleration selector is similar to that of an affinity but the only valid operators are EQUALS AND EXISTS. - repeated Selector selectors = 1; + reserved 1, 2; + string cluster_pool_name = 3; } - -// A Selector is a specification for identifying a set of objects with corresponding labels. -message Selector { - - // The label key. - string key = 1; - - // One or more values used to match labels. - // For equality (or inequality) requirements, values must contain a single element. - // For set-based requirements, values may contain one or more elements. - repeated string value = 2; - - // Defines how a label with a corresponding key and value is selected or excluded. - enum Operator { - EQUALS = 0; - NOT_EQUALS = 1; - IN = 2; - NOT_IN = 3; - EXISTS = 4; // A label key with any value - - // K8s supports more operators, we can consider adding them if necessary - } - Operator operator = 3; -} - diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/common.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/common.proto index 69e02e710..dbfb41285 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/admin/common.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/admin/common.proto @@ -5,6 +5,8 @@ option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"; import "flyteidl/core/execution.proto"; import "flyteidl/core/identifier.proto"; +import "flyteidl/core/literals.proto"; +import "google/protobuf/timestamp.proto"; // Encapsulation of fields that identifies a Flyte resource. // A Flyte resource can be a task, workflow or launch plan. @@ -279,6 +281,14 @@ message Annotations { map values = 1; } +// Environment variable values to be applied to an execution resource. +// In the future a mode (e.g. OVERRIDE, APPEND, etc) can be defined +// to specify how to merge environment variables defined at registration and execution time. +message Envs { + // Map of custom environment variables to be applied to the execution resource. + repeated flyteidl.core.KeyValuePair values = 1; +} + // Defines permissions associated with executions created by this launch plan spec. // Use either of these roles when they have permissions required by your workflow execution. // Deprecated. @@ -300,3 +310,10 @@ message RawOutputDataConfig { // e.g. s3://bucket/key or s3://bucket/ string output_location_prefix = 1; } + +// These URLs are returned as part of node and task execution data requests. +message FlyteURLs { + string inputs = 1; + string outputs = 2; + string deck = 3; +} diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/description_entity.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/description_entity.proto new file mode 100644 index 000000000..fcf4e1a46 --- /dev/null +++ b/flyteidl-protos/src/main/proto/flyteidl/admin/description_entity.proto @@ -0,0 +1,95 @@ +syntax = "proto3"; + +package flyteidl.admin; +option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"; + +import "flyteidl/core/identifier.proto"; +import "flyteidl/admin/common.proto"; + +// DescriptionEntity contains detailed description for the task/workflow. +// Documentation could provide insight into the algorithms, business use case, etc. +message DescriptionEntity { + // id represents the unique identifier of the description entity. + core.Identifier id = 1; + // One-liner overview of the entity. + string short_description = 2; + // Full user description with formatting preserved. + Description long_description = 3; + // Optional link to source code used to define this entity. + SourceCode source_code = 4; + // User-specified tags. These are arbitrary and can be used for searching + // filtering and discovering tasks. + repeated string tags = 5; +} + +// The format of the long description +enum DescriptionFormat { + DESCRIPTION_FORMAT_UNKNOWN = 0; + DESCRIPTION_FORMAT_MARKDOWN = 1; + DESCRIPTION_FORMAT_HTML = 2; + // python default documentation - comments is rst + DESCRIPTION_FORMAT_RST = 3; +} + +// Full user description with formatting preserved. This can be rendered +// by clients, such as the console or command line tools with in-tact +// formatting. +message Description { + oneof content { + // long description - no more than 4KB + string value = 1; + // if the description sizes exceed some threshold we can offload the entire + // description proto altogether to an external data store, like S3 rather than store inline in the db + string uri = 2; + } + + // Format of the long description + DescriptionFormat format = 3; + // Optional link to an icon for the entity + string icon_link = 4; +} + +// Link to source code used to define this entity +message SourceCode { + string link = 1; +} + +// Represents a list of DescriptionEntities returned from the admin. +// See :ref:`ref_flyteidl.admin.DescriptionEntity` for more details +message DescriptionEntityList { + // A list of DescriptionEntities returned based on the request. + repeated DescriptionEntity descriptionEntities = 1; + + // In the case of multiple pages of results, the server-provided token can be used to fetch the next page + // in a query. If there are no more results, this value will be empty. + string token = 2; +} + +// Represents a request structure to retrieve a list of DescriptionEntities. +// See :ref:`ref_flyteidl.admin.DescriptionEntity` for more details +message DescriptionEntityListRequest { + // Identifies the specific type of resource that this identifier corresponds to. + flyteidl.core.ResourceType resource_type = 1; + + // The identifier for the description entity. + // +required + NamedEntityIdentifier id = 2; + + // Indicates the number of resources to be returned. + // +required + uint32 limit = 3; + + // In the case of multiple pages of results, the server-provided token can be used to fetch the next page + // in a query. + // +optional + string token = 4; + + // Indicates a list of filters passed as string. + // More info on constructing filters : + // +optional + string filters = 5; + + // Sort ordering for returned list. + // +optional + Sort sort_by = 6; +} diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/execution.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/execution.proto index a08c9bbd1..55933d470 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/admin/execution.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/admin/execution.proto @@ -8,6 +8,7 @@ import "flyteidl/admin/common.proto"; import "flyteidl/core/literals.proto"; import "flyteidl/core/execution.proto"; import "flyteidl/core/identifier.proto"; +import "flyteidl/core/metrics.proto"; import "flyteidl/core/security.proto"; import "google/protobuf/duration.proto"; import "google/protobuf/timestamp.proto"; @@ -45,10 +46,18 @@ message ExecutionRelaunchRequest { // +required core.WorkflowExecutionIdentifier id = 1; + // Deprecated field, do not use. + reserved 2; + // User provided value for the relaunched execution. // If none is provided the system will generate a unique string. // +optional string name = 3; + + // Allows for all cached values of a workflow and its tasks to be overwritten for a single execution. + // If enabled, all calculations are performed even if cached results would be available, overwriting the stored + // data once execution finishes successfully. + bool overwrite_cache = 4; } // Request to recover the referenced execution. @@ -181,6 +190,9 @@ message SystemMetadata { // Which execution cluster this execution ran on. string execution_cluster = 1; + + // Which kubernetes namespace the execution ran under. + string namespace = 2; } // Represents attributes about an execution which are not required to launch the execution but are useful to record. @@ -296,6 +308,16 @@ message ExecutionSpec { // As we need to distinguish between the field not being provided and its default value false, we have to use a wrapper // around the bool field. google.protobuf.BoolValue interruptible = 21; + + // Allows for all cached values of a workflow and its tasks to be overwritten for a single execution. + // If enabled, all calculations are performed even if cached results would be available, overwriting the stored + // data once execution finishes successfully. + bool overwrite_cache = 22; + + // Environment variables to be set for the execution. + Envs envs = 23; + // Tags to be set for the execution. + repeated string tags = 24; } // Request to terminate an in-progress execution. This action is irreversible. @@ -368,3 +390,19 @@ message ExecutionStateChangeDetails { } message ExecutionUpdateResponse {} + +// WorkflowExecutionGetMetricsRequest represents a request to retrieve metrics for the specified workflow execution. +message WorkflowExecutionGetMetricsRequest { + // id defines the workflow execution to query for. + core.WorkflowExecutionIdentifier id = 1; + + // depth defines the number of Flyte entity levels to traverse when breaking down execution details. + int32 depth = 2; +} + +// WorkflowExecutionGetMetricsResponse represents the response containing metrics for the specified workflow execution. +message WorkflowExecutionGetMetricsResponse { + // Span defines the top-level breakdown of the workflows execution. More precise information is nested in a + // hierarchical structure using Flyte entity references. + core.Span span = 1; +} diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/launch_plan.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/launch_plan.proto index 13d87fd97..2164be31f 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/admin/launch_plan.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/admin/launch_plan.proto @@ -125,6 +125,14 @@ message LaunchPlanSpec { // As we need to distinguish between the field not being provided and its default value false, we have to use a wrapper // around the bool field. google.protobuf.BoolValue interruptible = 19; + + // Allows for all cached values of a workflow and its tasks to be overwritten for a single execution. + // If enabled, all calculations are performed even if cached results would be available, overwriting the stored + // data once execution finishes successfully. + bool overwrite_cache = 20; + + // Environment variables to be set for the execution. + Envs envs = 21; } // Values computed by the flyte platform after launch plan registration. diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/matchable_resource.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/matchable_resource.proto index bba83a1b9..4ab6be6aa 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/admin/matchable_resource.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/admin/matchable_resource.proto @@ -123,6 +123,14 @@ message WorkflowExecutionConfig { // As we need to distinguish between the field not being provided and its default value false, we have to use a wrapper // around the bool field. google.protobuf.BoolValue interruptible = 6; + + // Allows for all cached values of a workflow and its tasks to be overwritten for a single execution. + // If enabled, all calculations are performed even if cached results would be available, overwriting the stored + // data once execution finishes successfully. + bool overwrite_cache = 7; + + // Environment variables to be set for the execution. + Envs envs = 8; } // Generic container for encapsulating all types of the above attributes messages. diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/node_execution.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/node_execution.proto index 618284948..fe71699a8 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/admin/node_execution.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/admin/node_execution.proto @@ -167,6 +167,10 @@ message NodeExecutionClosure { // String location uniquely identifying where the deck HTML file is. // NativeUrl specifies the url in the format of the configured storage provider (e.g. s3://my-bucket/randomstring/suffix.tar) string deck_uri = 11; + + // dynamic_job_spec_uri is the location of the DynamicJobSpec proto message for a DynamicWorkflow. This is required + // to correctly recover partially completed executions where the subworkflow has already been compiled. + string dynamic_job_spec_uri = 12; } // Metadata for a WorkflowNode @@ -181,6 +185,8 @@ message TaskNodeMetadata { core.CatalogCacheStatus cache_status = 1; // This structure carries the catalog artifact information core.CatalogMetadata catalog_key = 2; + // The latest checkpoint location + string checkpoint_uri = 4; } // For dynamic workflow nodes we capture information about the dynamic workflow definition that gets generated. @@ -190,6 +196,10 @@ message DynamicWorkflowNodeMetadata { // Represents the compiled representation of the embedded dynamic workflow. core.CompiledWorkflowClosure compiled_workflow = 2; + + // dynamic_job_spec_uri is the location of the DynamicJobSpec proto message for this DynamicWorkflow. This is + // required to correctly recover partially completed executions where the subworkflow has already been compiled. + string dynamic_job_spec_uri = 3; } // Request structure to fetch inputs and output for a node execution. @@ -217,5 +227,7 @@ message NodeExecutionGetDataResponse { // Optional Workflow closure for a dynamically generated workflow, in the case this node yields a dynamic workflow we return its structure here. DynamicWorkflowNodeMetadata dynamic_workflow = 16; -} + FlyteURLs flyte_urls = 17; + +} diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/project.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/project.proto index 80c3da86a..8d1d02959 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/admin/project.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/admin/project.proto @@ -29,7 +29,7 @@ message Project { string description = 4; - // Leverage Labels from flyteidel.admin.common.proto to + // Leverage Labels from flyteidl.admin.common.proto to // tag projects with ownership information. Labels labels = 5; diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/project_attributes.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/project_attributes.proto new file mode 100644 index 000000000..de6f7a17e --- /dev/null +++ b/flyteidl-protos/src/main/proto/flyteidl/admin/project_attributes.proto @@ -0,0 +1,60 @@ +syntax = "proto3"; + +package flyteidl.admin; +option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"; + +import "flyteidl/admin/matchable_resource.proto"; + +// Defines a set of custom matching attributes at the project level. +// For more info on matchable attributes, see :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` +message ProjectAttributes { + // Unique project id for which this set of attributes will be applied. + string project = 1; + + MatchingAttributes matching_attributes = 2; +} + +// Sets custom attributes for a project +// For more info on matchable attributes, see :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` +message ProjectAttributesUpdateRequest { + // +required + ProjectAttributes attributes = 1; +} + +// Purposefully empty, may be populated in the future. +message ProjectAttributesUpdateResponse { +} + +// Request to get an individual project level attribute override. +// For more info on matchable attributes, see :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` +message ProjectAttributesGetRequest { + // Unique project id which this set of attributes references. + // +required + string project = 1; + + // Which type of matchable attributes to return. + // +required + MatchableResource resource_type = 2; +} + +// Response to get an individual project level attribute override. +// For more info on matchable attributes, see :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` +message ProjectAttributesGetResponse { + ProjectAttributes attributes = 1; +} + +// Request to delete a set matchable project level attribute override. +// For more info on matchable attributes, see :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` +message ProjectAttributesDeleteRequest { + // Unique project id which this set of attributes references. + // +required + string project = 1; + + // Which type of matchable attributes to delete. + // +required + MatchableResource resource_type = 2; +} + +// Purposefully empty, may be populated in the future. +message ProjectAttributesDeleteResponse { +} diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/signal.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/signal.proto new file mode 100644 index 000000000..8fc1c83e5 --- /dev/null +++ b/flyteidl-protos/src/main/proto/flyteidl/admin/signal.proto @@ -0,0 +1,86 @@ +syntax = "proto3"; + +package flyteidl.admin; +option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"; + +import "flyteidl/admin/common.proto"; +import "flyteidl/core/identifier.proto"; +import "flyteidl/core/literals.proto"; +import "flyteidl/core/types.proto"; + +// SignalGetOrCreateRequest represents a request structure to retrive or create a signal. +// See :ref:`ref_flyteidl.admin.Signal` for more details +message SignalGetOrCreateRequest { + // A unique identifier for the requested signal. + core.SignalIdentifier id = 1; + + // A type denoting the required value type for this signal. + core.LiteralType type = 2; +} + +// SignalListRequest represents a request structure to retrieve a collection of signals. +// See :ref:`ref_flyteidl.admin.Signal` for more details +message SignalListRequest { + // Indicates the workflow execution to filter by. + // +required + core.WorkflowExecutionIdentifier workflow_execution_id = 1; + + // Indicates the number of resources to be returned. + // +required + uint32 limit = 2; + + // In the case of multiple pages of results, the, server-provided token can be used to fetch the next page + // in a query. + // +optional + string token = 3; + + // Indicates a list of filters passed as string. + // +optional + string filters = 4; + + // Sort ordering. + // +optional + Sort sort_by = 5; +} + +// SignalList represents collection of signals along with the token of the last result. +// See :ref:`ref_flyteidl.admin.Signal` for more details +message SignalList { + // A list of signals matching the input filters. + repeated Signal signals = 1; + + // In the case of multiple pages of results, the server-provided token can be used to fetch the next page + // in a query. If there are no more results, this value will be empty. + string token = 2; +} + +// SignalSetRequest represents a request structure to set the value on a signal. Setting a signal +// effetively satisfies the signal condition within a Flyte workflow. +// See :ref:`ref_flyteidl.admin.Signal` for more details +message SignalSetRequest { + // A unique identifier for the requested signal. + core.SignalIdentifier id = 1; + + // The value of this signal, must match the defining signal type. + core.Literal value = 2; +} + +// SignalSetResponse represents a response structure if signal setting succeeds. +message SignalSetResponse { + // Purposefully empty, may be populated in the future. +} + +// Signal encapsulates a unique identifier, associated metadata, and a value for a single Flyte +// signal. Signals may exist either without a set value (representing a signal request) or with a +// populated value (indicating the signal has been given). +message Signal { + // A unique identifier for the requested signal. + core.SignalIdentifier id = 1; + + // A type denoting the required value type for this signal. + core.LiteralType type = 2; + + // The value of the signal. This is only available if the signal has been "set" and must match + // the defined the type. + core.Literal value = 3; +} diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/task.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/task.proto index 1a5ea8dce..b768bc010 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/admin/task.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/admin/task.proto @@ -6,6 +6,7 @@ option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"; import "flyteidl/core/identifier.proto"; import "flyteidl/core/tasks.proto"; import "flyteidl/core/compiler.proto"; +import "flyteidl/admin/description_entity.proto"; import "google/protobuf/timestamp.proto"; // Represents a request structure to create a revision of a task. @@ -34,6 +35,9 @@ message Task { // closure encapsulates all the fields that maps to a compiled version of the task. TaskClosure closure = 2; + + // One-liner overview of the entity. + string short_description = 3; } // Represents a list of tasks returned from the admin. @@ -51,6 +55,9 @@ message TaskList { message TaskSpec { // Template of the task that encapsulates all the metadata of the task. core.TaskTemplate template = 1; + + // Represents the specification for description entity. + DescriptionEntity description = 2; } // Compute task attributes which include values derived from the TaskSpec, as well as plugin-specific data diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/task_execution.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/task_execution.proto index 36d9b77e1..6706a1283 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/admin/task_execution.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/admin/task_execution.proto @@ -123,6 +123,19 @@ message TaskExecutionClosure { // TaskExecutionMetadata ExternalResourceInfo fields for each subtask rather than the TaskLog // in this message. int32 event_version = 17; + + // A time-series of the phase transition or update explanations. This, when compared to storing a singular reason + // as previously done, is much more valuable in visualizing and understanding historical evaluations. + repeated Reason reasons = 18; +} + +// Reason is a single message annotated with a timestamp to indicate the instant the reason occurred. +message Reason { + // occurred_at is the timestamp indicating the instant that this reason happened. + google.protobuf.Timestamp occurred_at = 1; + + // message is the explanation for the most recent phase transition or status update. + string message = 2; } // Request structure to fetch inputs and output for a task execution. @@ -148,4 +161,8 @@ message TaskExecutionGetDataResponse { // Full_outputs will only be populated if they are under a configured size threshold. core.LiteralMap full_outputs = 4; + + // flyte tiny url to fetch a core.LiteralMap of task execution's IO + // Deck will be empty for task + FlyteURLs flyte_urls = 5; } diff --git a/flyteidl-protos/src/main/proto/flyteidl/admin/workflow.proto b/flyteidl-protos/src/main/proto/flyteidl/admin/workflow.proto index 895e33b77..b768cf960 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/admin/workflow.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/admin/workflow.proto @@ -6,6 +6,7 @@ option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"; import "flyteidl/core/compiler.proto"; import "flyteidl/core/identifier.proto"; import "flyteidl/core/workflow.proto"; +import "flyteidl/admin/description_entity.proto"; import "google/protobuf/timestamp.proto"; // Represents a request structure to create a revision of a workflow. @@ -33,6 +34,9 @@ message Workflow { // closure encapsulates all the fields that maps to a compiled version of the workflow. WorkflowClosure closure = 2; + + // One-liner overview of the entity. + string short_description = 3; } // Represents a list of workflows returned from the admin. @@ -55,6 +59,9 @@ message WorkflowSpec { // propeller compiler (since the compiler doesn't have any knowledge of other workflows - ie, it doesn't reach out // to Admin to see other registered workflows). In fact, subworkflows do not even need to be registered. repeated core.WorkflowTemplate sub_workflows = 2; + + // Represents the specification for description entity. + DescriptionEntity description = 3; } // A container holding the compiled workflow produced from the WorkflowSpec and additional metadata. @@ -65,3 +72,21 @@ message WorkflowClosure { // Time at which the workflow was created. google.protobuf.Timestamp created_at = 2; } + +// The workflow id is already used and the structure is different +message WorkflowErrorExistsDifferentStructure { + core.Identifier id = 1; +} + +// The workflow id is already used with an identical sctructure +message WorkflowErrorExistsIdenticalStructure { + core.Identifier id = 1; +} + +// When a CreateWorkflowRequest failes due to matching id +message CreateWorkflowFailureReason { + oneof reason { + WorkflowErrorExistsDifferentStructure exists_different_structure = 1; + WorkflowErrorExistsIdenticalStructure exists_identical_structure = 2; + } +} diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/catalog.proto b/flyteidl-protos/src/main/proto/flyteidl/core/catalog.proto index 945c3334b..80cc04432 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/core/catalog.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/core/catalog.proto @@ -20,6 +20,8 @@ enum CatalogCacheStatus { CACHE_LOOKUP_FAILURE = 4; // Used to indicate that cache lookup failed because of an error CACHE_PUT_FAILURE = 5; + // Used to indicate the cache lookup was skipped + CACHE_SKIPPED = 6; }; message CatalogArtifactTag { diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/condition.proto b/flyteidl-protos/src/main/proto/flyteidl/core/condition.proto index 36e5c7041..247618713 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/core/condition.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/core/condition.proto @@ -11,17 +11,13 @@ import "flyteidl/core/literals.proto"; message ComparisonExpression { // Binary Operator for each expression enum Operator { - // Equal to EQ = 0; - // Not equal to NEQ = 1; // Greater Than GT = 2; - // Greater than or equal to GTE = 3; // Less Than LT = 4; - // Less than or equal to LTE = 5; } @@ -34,9 +30,11 @@ message ComparisonExpression { message Operand { oneof val { // Can be a constant - core.Primitive primitive = 1; + core.Primitive primitive = 1 [deprecated = true]; // Or one of this node's input variables string var = 2; + // Replace the primitive field + core.Scalar scalar = 3; } } @@ -56,7 +54,6 @@ message ConjunctionExpression { enum LogicalOperator { // Conjunction AND = 0; - // Disjunction OR = 1; } diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/dynamic_job.proto b/flyteidl-protos/src/main/proto/flyteidl/core/dynamic_job.proto index c5fe07601..05d0731a1 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/core/dynamic_job.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/core/dynamic_job.proto @@ -27,6 +27,6 @@ message DynamicJobSpec { // [Optional] A complete list of task specs referenced in nodes. repeated TaskTemplate tasks = 4; - // [Optional] A complete list of sub-workflows templates. + // [Optional] A complete list of task specs referenced in nodes. repeated WorkflowTemplate subworkflows = 5; } diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/execution.proto b/flyteidl-protos/src/main/proto/flyteidl/core/execution.proto index d39024a21..0c3787b66 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/core/execution.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/core/execution.proto @@ -22,7 +22,7 @@ message WorkflowExecution { } } -// Indicates various phases of Node Execution +// Indicates various phases of Node Execution that only include the time spent to run the nodes/workflows message NodeExecution { enum Phase { UNDEFINED = 0; diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/identifier.proto b/flyteidl-protos/src/main/proto/flyteidl/core/identifier.proto index df375f44a..ef8ca4494 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/core/identifier.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/core/identifier.proto @@ -19,7 +19,7 @@ enum ResourceType { // Encapsulation of fields that uniquely identifies a Flyte resource. message Identifier { // Identifies the specific type of resource that this identifier corresponds to. - ResourceType resource_type = 1; + core.ResourceType resource_type = 1; // Name of the project the resource belongs to. string project = 2; @@ -63,3 +63,12 @@ message TaskExecutionIdentifier { uint32 retry_attempt = 3; } + +// Encapsulation of fields the uniquely identify a signal. +message SignalIdentifier { + // Unique identifier for a signal. + string signal_id = 1; + + // Identifies the Flyte workflow execution this signal belongs to. + WorkflowExecutionIdentifier execution_id = 2; +} diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/literals.proto b/flyteidl-protos/src/main/proto/flyteidl/core/literals.proto index 5df56db04..06af80335 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/core/literals.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/core/literals.proto @@ -77,7 +77,6 @@ message StructuredDataset { StructuredDatasetMetadata metadata = 2; } -// A simple value. message Scalar { oneof value { Primitive primitive = 1; diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/metrics.proto b/flyteidl-protos/src/main/proto/flyteidl/core/metrics.proto new file mode 100644 index 000000000..c96a59988 --- /dev/null +++ b/flyteidl-protos/src/main/proto/flyteidl/core/metrics.proto @@ -0,0 +1,36 @@ +syntax = "proto3"; + +package flyteidl.core; + +option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"; + +import "flyteidl/core/identifier.proto"; +import "google/protobuf/timestamp.proto"; + +// Span represents a duration trace of Flyte execution. The id field denotes a Flyte execution entity or an operation +// which uniquely identifies the Span. The spans attribute allows this Span to be further broken down into more +// precise definitions. +message Span { + // start_time defines the instance this span began. + google.protobuf.Timestamp start_time = 1; + + // end_time defines the instance this span completed. + google.protobuf.Timestamp end_time = 2; + + oneof id { + // workflow_id is the id of the workflow execution this Span represents. + flyteidl.core.WorkflowExecutionIdentifier workflow_id = 3; + + // node_id is the id of the node execution this Span represents. + flyteidl.core.NodeExecutionIdentifier node_id = 4; + + // task_id is the id of the task execution this Span represents. + flyteidl.core.TaskExecutionIdentifier task_id = 5; + + // operation_id is the id of a unique operation that this Span represents. + string operation_id = 6; + } + + // spans defines a collection of Spans that breakdown this execution. + repeated Span spans = 7; +} diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/security.proto b/flyteidl-protos/src/main/proto/flyteidl/core/security.proto index eb0c1413a..f9830bf6b 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/core/security.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/core/security.proto @@ -69,6 +69,9 @@ message Identity { // oauth2_client references an oauth2 client. Backend plugins can use this information to impersonate the client when // making external calls. OAuth2Client oauth2_client = 3; + + // execution_identity references the subject who makes the execution + string execution_identity = 4; } // OAuth2TokenRequest encapsulates information needed to request an OAuth2 token. diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/tasks.proto b/flyteidl-protos/src/main/proto/flyteidl/core/tasks.proto index 48961029e..808c196d1 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/core/tasks.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/core/tasks.proto @@ -95,6 +95,17 @@ message TaskMetadata { // Indicates whether the system should attempt to execute discoverable instances in serial to avoid duplicate work bool cache_serializable = 9; + + // Indicates whether the task will generate a Deck URI when it finishes executing. + bool generates_deck = 10; + + // Arbitrary tags that allow users and the platform to store small but arbitrary labels + map tags = 11; + + // pod_template_name is the unique name of a PodTemplate k8s resource to be used as the base configuration if this + // task creates a k8s Pod. If this value is set, the specified PodTemplate will be used instead of, but applied + // identically as, the default PodTemplate configured in FlytePropeller. + string pod_template_name = 12; } // A Task structure that uniquely identifies a task in the system @@ -104,7 +115,7 @@ message TaskTemplate { Identifier id = 1; // A predefined yet extensible Task type identifier. This can be used to customize any of the components. If no - // extensions are provided in the system, Flyte will resolve this task to its TaskCategory and default the + // extensions are provided in the system, Flyte will resolve the this task to its TaskCategory and default the // implementation registered for the TaskCategory. string type = 2; @@ -137,7 +148,6 @@ message TaskTemplate { // to use as required. // reserve the field numbers 1 through 15 for very frequently occurring message elements map config = 16; - } // ----------------- First class Plugins @@ -258,24 +268,32 @@ message DataLoadingConfig { // Defines a pod spec and additional pod metadata that is created when a task is executed. message K8sPod { - // Contains additional metadata for building a kubernetes pod. - K8sObjectMetadata metadata = 1; - - // Defines the primary pod spec created when a task is executed. - // This should be a JSON-marshalled pod spec, which can be defined in - // - go, using: https://github.com/kubernetes/api/blob/release-1.21/core/v1/types.go#L2936 - // - python: using https://github.com/kubernetes-client/python/blob/release-19.0/kubernetes/client/models/v1_pod_spec.py - google.protobuf.Struct pod_spec = 2; + // Contains additional metadata for building a kubernetes pod. + K8sObjectMetadata metadata = 1; + + // Defines the primary pod spec created when a task is executed. + // This should be a JSON-marshalled pod spec, which can be defined in + // - go, using: https://github.com/kubernetes/api/blob/release-1.21/core/v1/types.go#L2936 + // - python: using https://github.com/kubernetes-client/python/blob/release-19.0/kubernetes/client/models/v1_pod_spec.py + google.protobuf.Struct pod_spec = 2; + + // BETA: Optional configuration for DataLoading. If not specified, then default values are used. + // This makes it possible to to run a completely portable container, that uses inputs and outputs + // only from the local file-system and without having any reference to flytekit. This is supported only on K8s at the moment. + // If data loading is enabled, then data will be mounted in accompanying directories specified in the DataLoadingConfig. If the directories + // are not specified, inputs will be mounted onto and outputs will be uploaded from a pre-determined file-system path. Refer to the documentation + // to understand the default paths. + // Only K8s + DataLoadingConfig data_config = 3; } // Metadata for building a kubernetes object when a task is executed. message K8sObjectMetadata { + // Optional labels to add to the pod definition. + map labels = 1; - // Optional labels to add to the pod definition. - map labels = 1; - - // Optional annotations to add to the pod definition. - map annotations = 2; + // Optional annotations to add to the pod definition. + map annotations = 2; } // Sql represents a generic sql workload with a statement and dialect. diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/types.proto b/flyteidl-protos/src/main/proto/flyteidl/core/types.proto index a2babe783..a05c93d38 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/core/types.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/core/types.proto @@ -170,6 +170,26 @@ message OutputReference { // Variable name must refer to an output variable for the node. string var = 2; + + repeated PromiseAttribute attr_path = 3; +} + +// PromiseAttribute stores the attribute path of a promise, which will be resolved at runtime. +// The attribute path is a list of strings and integers. +// In the following example, +// ``` +// @workflow +// def wf(): +// o = t1() +// t2(o.a["b"][0]) +// ``` +// the output reference t2 binds to has a list of PromiseAttribute ["a", "b", 0] + +message PromiseAttribute { + oneof value { + string string_value = 1; + int32 int_value = 2; + } } // Represents an error thrown from a node. diff --git a/flyteidl-protos/src/main/proto/flyteidl/core/workflow.proto b/flyteidl-protos/src/main/proto/flyteidl/core/workflow.proto index 00c621daf..37d39182e 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/core/workflow.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/core/workflow.proto @@ -68,6 +68,71 @@ message WorkflowNode { } } +// ApproveCondition represents a dependency on an external approval. During execution, this will manifest as a boolean +// signal with the provided signal_id. +message ApproveCondition { + // A unique identifier for the requested boolean signal. + string signal_id = 1; +} + +// SignalCondition represents a dependency on an signal. +message SignalCondition { + // A unique identifier for the requested signal. + string signal_id = 1; + + // A type denoting the required value type for this signal. + LiteralType type = 2; + + // The variable name for the signal value in this nodes outputs. + string output_variable_name = 3; +} + +// SleepCondition represents a dependency on waiting for the specified duration. +message SleepCondition { + // The overall duration for this sleep. + google.protobuf.Duration duration = 1; +} + +// GateNode refers to the condition that is required for the gate to successfully complete. +message GateNode { + oneof condition { + // ApproveCondition represents a dependency on an external approval provided by a boolean signal. + ApproveCondition approve = 1; + + // SignalCondition represents a dependency on an signal. + SignalCondition signal = 2; + + // SleepCondition represents a dependency on waiting for the specified duration. + SleepCondition sleep = 3; + } +} + +// ArrayNode is a Flyte node type that simplifies the execution of a sub-node over a list of input +// values. An ArrayNode can be executed with configurable parallelism (separate from the parent +// workflow) and can be configured to succeed when a certain number of sub-nodes succeed. +message ArrayNode { + // node is the sub-node that will be executed for each element in the array. + Node node = 1; + + // parallelism defines the minimum number of instances to bring up concurrently at any given + // point. Note that this is an optimistic restriction and that, due to network partitioning or + // other failures, the actual number of currently running instances might be more. This has to + // be a positive number if assigned. Default value is size. + uint32 parallelism = 2; + + oneof success_criteria { + // min_successes is an absolute number of the minimum number of successful completions of + // sub-nodes. As soon as this criteria is met, the ArrayNode will be marked as successful + // and outputs will be computed. This has to be a non-negative number if assigned. Default + // value is size (if specified). + uint32 min_successes = 3; + + // If the array job size is not known beforehand, the min_success_ratio can instead be used + // to determine when an ArrayNode can be marked successful. + float min_success_ratio = 4; + } +} + // Defines extra information about the Node. message NodeMetadata { // A friendly name for the Node @@ -129,6 +194,13 @@ message Node { // Information about the branch node to evaluate in this node. BranchNode branch_node = 8; + + // Information about the condition to evaluate in this node. + GateNode gate_node = 9; + + // Information about the sub-node executions for each value in the list of this nodes + // inputs values. + ArrayNode array_node = 10; } } @@ -154,6 +226,9 @@ message WorkflowMetadata { // Defines how the system should behave when a failure is detected in the workflow execution. OnFailurePolicy on_failure = 2; + + // Arbitrary tags that allow users and the platform to store small but arbitrary labels + map tags = 3; } // The difference between these settings and the WorkflowMetadata ones is that these are meant to be passed down to diff --git a/flyteidl-protos/src/main/proto/flyteidl/event/event.proto b/flyteidl-protos/src/main/proto/flyteidl/event/event.proto index 030073390..934c9f944 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/event/event.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/event/event.proto @@ -51,7 +51,12 @@ message NodeExecutionEvent { // by the executor of the node. google.protobuf.Timestamp occurred_at = 4; - string input_uri = 5; + oneof input_value { + string input_uri = 5; + + // Raw input data consumed by this node execution. + core.LiteralMap input_data = 20; + } oneof output_result { // URL to the output of the execution, it encodes all the information @@ -99,6 +104,12 @@ message NodeExecutionEvent { // String location uniquely identifying where the deck HTML file is // NativeUrl specifies the url in the format of the configured storage provider (e.g. s3://my-bucket/randomstring/suffix.tar) string deck_uri = 19; + + // This timestamp represents the instant when the event was reported by the executing framework. For example, + // when first processing a node the `occurred_at` timestamp should be the instant propeller makes progress, so when + // literal inputs are initially copied. The event however will not be sent until after the copy completes. + // Extracting both of these timestamps facilitates a more accurate portrayal of the evaluation time-series. + google.protobuf.Timestamp reported_at = 21; } // For Workflow Nodes we need to send information about the workflow that's launched @@ -113,6 +124,8 @@ message TaskNodeMetadata { core.CatalogMetadata catalog_key = 2; // Captures the status of cache reservations for this execution. core.CatalogReservation.Status reservation_status = 3; + // The latest checkpoint location + string checkpoint_uri = 4; // In the case this task launched a dynamic workflow we capture its structure here. DynamicWorkflowNodeMetadata dynamic_workflow = 16; @@ -125,6 +138,10 @@ message DynamicWorkflowNodeMetadata { // Represents the compiled representation of the embedded dynamic workflow. core.CompiledWorkflowClosure compiled_workflow = 2; + + // dynamic_job_spec_uri is the location of the DynamicJobSpec proto message for this DynamicWorkflow. This is + // required to correctly recover partially completed executions where the workflow has already been compiled. + string dynamic_job_spec_uri = 3; } message ParentTaskExecutionMetadata { @@ -137,6 +154,14 @@ message ParentNodeExecutionMetadata { string node_id = 1; } +message EventReason { + // An explanation for this event + string reason = 1; + + // The time this reason occurred + google.protobuf.Timestamp occurred_at = 2; +} + // Plugin specific execution event information. For tasks like Python, Hive, Spark, DynamicJob. message TaskExecutionEvent { // ID of the task. In combination with the retryAttempt this will indicate @@ -163,9 +188,14 @@ message TaskExecutionEvent { // by the executor of the task. google.protobuf.Timestamp occurred_at = 7; - // URI of the input file, it encodes all the information - // including Cloud source provider. ie., s3://... - string input_uri = 8; + oneof input_value { + // URI of the input file, it encodes all the information + // including Cloud source provider. ie., s3://... + string input_uri = 8; + + // Raw input data consumed by this task execution. + core.LiteralMap input_data = 19; + } oneof output_result { // URI to the output of the execution, it will be in a format that encodes all the information @@ -188,8 +218,11 @@ message TaskExecutionEvent { uint32 phase_version = 12; // An optional explanation for the phase transition. - string reason = 13; + // Deprecated: Use reasons instead. + string reason = 13 [deprecated = true]; + // An optional list of explanations for the phase transition. + repeated EventReason reasons = 21; // A predefined yet extensible Task type identifier. If the task definition is already registered in flyte admin // this type will be identical, but not all task executions necessarily use pre-registered definitions and this @@ -204,6 +237,12 @@ message TaskExecutionEvent { // TaskExecutionMetadata ExternalResourceInfo fields for each subtask rather than the TaskLog // in this message. int32 event_version = 18; + + // This timestamp represents the instant when the event was reported by the executing framework. For example, a k8s + // pod task may be marked completed at (ie. `occurred_at`) the instant the container running user code completes, + // but this event will not be reported until the pod is marked as completed. Extracting both of these timestamps + // facilitates a more accurate portrayal of the evaluation time-series. + google.protobuf.Timestamp reported_at = 20; } // This message contains metadata about external resources produced or used by a specific task execution. diff --git a/flyteidl-protos/src/main/proto/flyteidl/service/admin.proto b/flyteidl-protos/src/main/proto/flyteidl/service/admin.proto index ca612515b..a99a9818b 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/service/admin.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/service/admin.proto @@ -6,6 +6,7 @@ option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service"; import "google/api/annotations.proto"; import "flyteidl/admin/project.proto"; import "flyteidl/admin/project_domain_attributes.proto"; +import "flyteidl/admin/project_attributes.proto"; import "flyteidl/admin/task.proto"; import "flyteidl/admin/workflow.proto"; import "flyteidl/admin/workflow_attributes.proto"; @@ -17,7 +18,8 @@ import "flyteidl/admin/node_execution.proto"; import "flyteidl/admin/task_execution.proto"; import "flyteidl/admin/version.proto"; import "flyteidl/admin/common.proto"; -import "protoc-gen-swagger/options/annotations.proto"; +import "flyteidl/admin/description_entity.proto"; +// import "protoc-gen-swagger/options/annotations.proto"; // The following defines an RPC service that is also served over HTTP via grpc-gateway. // Standard response codes for both are defined here: https://github.com/grpc-ecosystem/grpc-gateway/blob/master/runtime/errors.go @@ -28,21 +30,21 @@ service AdminService { post: "/api/v1/tasks" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Create and register a task definition." - responses: { - key: "400" - value: { - description: "Returned for bad request that may have failed validation." - } - } - responses: { - key: "409" - value: { - description: "Returned for a request that references an identical entity that has already been registered." - } - } - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Create and register a task definition." + // responses: { + // key: "400" + // value: { + // description: "Returned for bad request that may have failed validation." + // } + // } + // responses: { + // key: "409" + // value: { + // description: "Returned for a request that references an identical entity that has already been registered." + // } + // } + // }; } // Fetch a :ref:`ref_flyteidl.admin.Task` definition. @@ -50,9 +52,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/tasks/{id.project}/{id.domain}/{id.name}/{id.version}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Retrieve an existing task definition." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve an existing task definition." + // }; } // Fetch a list of :ref:`ref_flyteidl.admin.NamedEntityIdentifier` of task objects. @@ -60,9 +62,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/task_ids/{project}/{domain}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Fetch existing task definition identifiers matching input filters." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Fetch existing task definition identifiers matching input filters." + // }; } // Fetch a list of :ref:`ref_flyteidl.admin.Task` definitions. @@ -73,9 +75,9 @@ service AdminService { get: "/api/v1/tasks/{id.project}/{id.domain}" } }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Fetch existing task definitions matching input filters." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Fetch existing task definitions matching input filters." + // }; } // Create and upload a :ref:`ref_flyteidl.admin.Workflow` definition @@ -84,21 +86,21 @@ service AdminService { post: "/api/v1/workflows" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Create and register a workflow definition." - responses: { - key: "400" - value: { - description: "Returned for bad request that may have failed validation." - } - } - responses: { - key: "409" - value: { - description: "Returned for a request that references an identical entity that has already been registered." - } - } - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Create and register a workflow definition." + // responses: { + // key: "400" + // value: { + // description: "Returned for bad request that may have failed validation." + // } + // } + // responses: { + // key: "409" + // value: { + // description: "Returned for a request that references an identical entity that has already been registered." + // } + // } + // }; } // Fetch a :ref:`ref_flyteidl.admin.Workflow` definition. @@ -106,9 +108,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/workflows/{id.project}/{id.domain}/{id.name}/{id.version}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Retrieve an existing workflow definition." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve an existing workflow definition." + // }; } // Fetch a list of :ref:`ref_flyteidl.admin.NamedEntityIdentifier` of workflow objects. @@ -116,9 +118,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/workflow_ids/{project}/{domain}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Fetch an existing workflow definition identifiers matching input filters." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Fetch an existing workflow definition identifiers matching input filters." + // }; } // Fetch a list of :ref:`ref_flyteidl.admin.Workflow` definitions. @@ -129,9 +131,9 @@ service AdminService { get: "/api/v1/workflows/{id.project}/{id.domain}" } }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Fetch existing workflow definitions matching input filters." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Fetch existing workflow definitions matching input filters." + // }; } // Create and upload a :ref:`ref_flyteidl.admin.LaunchPlan` definition @@ -140,21 +142,21 @@ service AdminService { post: "/api/v1/launch_plans" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Create and register a launch plan definition." - responses: { - key: "400" - value: { - description: "Returned for bad request that may have failed validation." - } - } - responses: { - key: "409" - value: { - description: "Returned for a request that references an identical entity that has already been registered." - } - } - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Create and register a launch plan definition." + // responses: { + // key: "400" + // value: { + // description: "Returned for bad request that may have failed validation." + // } + // } + // responses: { + // key: "409" + // value: { + // description: "Returned for a request that references an identical entity that has already been registered." + // } + // } + // }; } // Fetch a :ref:`ref_flyteidl.admin.LaunchPlan` definition. @@ -162,9 +164,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/launch_plans/{id.project}/{id.domain}/{id.name}/{id.version}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Retrieve an existing launch plan definition." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve an existing launch plan definition." + // }; } // Fetch the active version of a :ref:`ref_flyteidl.admin.LaunchPlan`. @@ -172,9 +174,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/active_launch_plans/{id.project}/{id.domain}/{id.name}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Retrieve the active launch plan version specified by input request filters." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve the active launch plan version specified by input request filters." + // }; } // List active versions of :ref:`ref_flyteidl.admin.LaunchPlan`. @@ -182,9 +184,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/active_launch_plans/{project}/{domain}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Fetch the active launch plan versions specified by input request filters." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Fetch the active launch plan versions specified by input request filters." + // }; } // Fetch a list of :ref:`ref_flyteidl.admin.NamedEntityIdentifier` of launch plan objects. @@ -192,9 +194,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/launch_plan_ids/{project}/{domain}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Fetch existing launch plan definition identifiers matching input filters." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Fetch existing launch plan definition identifiers matching input filters." + // }; } // Fetch a list of :ref:`ref_flyteidl.admin.LaunchPlan` definitions. @@ -205,9 +207,9 @@ service AdminService { get: "/api/v1/launch_plans/{id.project}/{id.domain}" } }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Fetch existing launch plan definitions matching input filters." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Fetch existing launch plan definitions matching input filters." + // }; } // Updates the status of a registered :ref:`ref_flyteidl.admin.LaunchPlan`. @@ -216,14 +218,14 @@ service AdminService { put: "/api/v1/launch_plans/{id.project}/{id.domain}/{id.name}/{id.version}" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Update the status of an existing launch plan definition. " - "At most one launch plan version for a given {project, domain, name} can be active at a time. " - "If this call sets a launch plan to active and existing version is already active, the result of this call will be that the " - "formerly active launch plan will be made inactive and specified launch plan in this request will be made active. " - "In the event that the formerly active launch plan had a schedule associated it with it, this schedule will be disabled. " - "If the reference launch plan in this request is being set to active and has a schedule associated with it, the schedule will be enabled." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Update the status of an existing launch plan definition. " + // "At most one launch plan version for a given {project, domain, name} can be active at a time. " + // "If this call sets a launch plan to active and existing version is already active, the result of this call will be that the " + // "formerly active launch plan will be made inactive and specified launch plan in this request will be made active. " + // "In the event that the formerly active launch plan had a schedule associated it with it, this schedule will be disabled. " + // "If the reference launch plan in this request is being set to active and has a schedule associated with it, the schedule will be enabled." + // }; } // Triggers the creation of a :ref:`ref_flyteidl.admin.Execution` @@ -232,9 +234,9 @@ service AdminService { post: "/api/v1/executions" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Create a workflow execution." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Create a workflow execution." + // }; } // Triggers the creation of an identical :ref:`ref_flyteidl.admin.Execution` @@ -243,9 +245,9 @@ service AdminService { post: "/api/v1/executions/relaunch" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Relaunch a workflow execution." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Relaunch a workflow execution." + // }; } // Recreates a previously-run workflow execution that will only start executing from the last known failure point. @@ -258,13 +260,13 @@ service AdminService { post: "/api/v1/executions/recover" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Recreates a previously-run workflow execution that will only start executing from the last known failure point. " - "In Recover mode, users cannot change any input parameters or update the version of the execution. " - "This is extremely useful to recover from system errors and byzantine faults like - Loss of K8s cluster, bugs in platform or instability, machine failures, " - "downstream system failures (downstream services), or simply to recover executions that failed because of retry exhaustion and should complete if tried again." + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Recreates a previously-run workflow execution that will only start executing from the last known failure point. " + // "In Recover mode, users cannot change any input parameters or update the version of the execution. " + // "This is extremely useful to recover from system errors and byzantine faults like - Loss of K8s cluster, bugs in platform or instability, machine failures, " + // "downstream system failures (downstream services), or simply to recover executions that failed because of retry exhaustion and should complete if tried again." - }; + // }; } // Fetches a :ref:`ref_flyteidl.admin.Execution`. @@ -272,9 +274,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/executions/{id.project}/{id.domain}/{id.name}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Retrieve an existing workflow execution." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve an existing workflow execution." + // }; } // Update execution belonging to project domain :ref:`ref_flyteidl.admin.Execution`. @@ -283,9 +285,9 @@ service AdminService { put: "/api/v1/executions/{id.project}/{id.domain}/{id.name}" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Update execution belonging to project domain." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Update execution belonging to project domain." + // }; } // Fetches input and output data for a :ref:`ref_flyteidl.admin.Execution`. @@ -293,9 +295,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/data/executions/{id.project}/{id.domain}/{id.name}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Retrieve input and output data from an existing workflow execution." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve input and output data from an existing workflow execution." + // }; }; // Fetch a list of :ref:`ref_flyteidl.admin.Execution`. @@ -303,9 +305,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/executions/{id.project}/{id.domain}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Fetch existing workflow executions matching input filters." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Fetch existing workflow executions matching input filters." + // }; } // Terminates an in-progress :ref:`ref_flyteidl.admin.Execution`. @@ -314,9 +316,9 @@ service AdminService { delete: "/api/v1/executions/{id.project}/{id.domain}/{id.name}" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Terminate the active workflow execution specified in the request." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Terminate the active workflow execution specified in the request." + // }; } // Fetches a :ref:`ref_flyteidl.admin.NodeExecution`. @@ -324,9 +326,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/node_executions/{id.execution_id.project}/{id.execution_id.domain}/{id.execution_id.name}/{id.node_id}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Retrieve an existing node execution." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve an existing node execution." + // }; } // Fetch a list of :ref:`ref_flyteidl.admin.NodeExecution`. @@ -334,9 +336,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/node_executions/{workflow_execution_id.project}/{workflow_execution_id.domain}/{workflow_execution_id.name}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Fetch existing node executions matching input filters." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Fetch existing node executions matching input filters." + // }; } // Fetch a list of :ref:`ref_flyteidl.admin.NodeExecution` launched by the reference :ref:`ref_flyteidl.admin.TaskExecution`. @@ -344,9 +346,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/children/task_executions/{task_execution_id.node_execution_id.execution_id.project}/{task_execution_id.node_execution_id.execution_id.domain}/{task_execution_id.node_execution_id.execution_id.name}/{task_execution_id.node_execution_id.node_id}/{task_execution_id.task_id.project}/{task_execution_id.task_id.domain}/{task_execution_id.task_id.name}/{task_execution_id.task_id.version}/{task_execution_id.retry_attempt}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Fetch child node executions launched by the specified task execution." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Fetch child node executions launched by the specified task execution." + // }; } // Fetches input and output data for a :ref:`ref_flyteidl.admin.NodeExecution`. @@ -354,9 +356,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/data/node_executions/{id.execution_id.project}/{id.execution_id.domain}/{id.execution_id.name}/{id.node_id}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Retrieve input and output data from an existing node execution." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve input and output data from an existing node execution." + // }; }; // Registers a :ref:`ref_flyteidl.admin.Project` with the Flyte deployment. @@ -365,9 +367,9 @@ service AdminService { post: "/api/v1/projects" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Register a project." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Register a project." + // }; } // Updates an existing :ref:`ref_flyteidl.admin.Project` @@ -378,9 +380,9 @@ service AdminService { put: "/api/v1/projects/{id}" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Update a project." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Update a project." + // }; } // Fetches a list of :ref:`ref_flyteidl.admin.Project` @@ -388,9 +390,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/projects" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Fetch registered projects." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Fetch registered projects." + // }; } // Indicates a :ref:`ref_flyteidl.event.WorkflowExecutionEvent` has occurred. @@ -399,9 +401,9 @@ service AdminService { post: "/api/v1/events/workflows" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Create a workflow execution event recording a phase transition." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Create a workflow execution event recording a phase transition." + // }; } // Indicates a :ref:`ref_flyteidl.event.NodeExecutionEvent` has occurred. @@ -410,9 +412,9 @@ service AdminService { post: "/api/v1/events/nodes" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Create a node execution event recording a phase transition." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Create a node execution event recording a phase transition." + // }; } // Indicates a :ref:`ref_flyteidl.event.TaskExecutionEvent` has occurred. @@ -421,9 +423,9 @@ service AdminService { post: "/api/v1/events/tasks" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Create a task execution event recording a phase transition." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Create a task execution event recording a phase transition." + // }; } // Fetches a :ref:`ref_flyteidl.admin.TaskExecution`. @@ -431,9 +433,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/task_executions/{id.node_execution_id.execution_id.project}/{id.node_execution_id.execution_id.domain}/{id.node_execution_id.execution_id.name}/{id.node_execution_id.node_id}/{id.task_id.project}/{id.task_id.domain}/{id.task_id.name}/{id.task_id.version}/{id.retry_attempt}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Retrieve an existing task execution." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve an existing task execution." + // }; } // Fetches a list of :ref:`ref_flyteidl.admin.TaskExecution`. @@ -441,9 +443,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/task_executions/{node_execution_id.execution_id.project}/{node_execution_id.execution_id.domain}/{node_execution_id.execution_id.name}/{node_execution_id.node_id}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Fetch existing task executions matching input filters." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Fetch existing task executions matching input filters." + // }; } @@ -452,9 +454,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/data/task_executions/{id.node_execution_id.execution_id.project}/{id.node_execution_id.execution_id.domain}/{id.node_execution_id.execution_id.name}/{id.node_execution_id.node_id}/{id.task_id.project}/{id.task_id.domain}/{id.task_id.name}/{id.task_id.version}/{id.retry_attempt}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Retrieve input and output data from an existing task execution." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve input and output data from an existing task execution." + // }; } // Creates or updates custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` for a project and domain. @@ -463,9 +465,9 @@ service AdminService { put: "/api/v1/project_domain_attributes/{attributes.project}/{attributes.domain}" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Update the customized resource attributes associated with a project-domain combination" - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Update the customized resource attributes associated with a project-domain combination" + // }; } // Fetches custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` for a project and domain. @@ -473,9 +475,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/project_domain_attributes/{project}/{domain}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Retrieve the customized resource attributes associated with a project-domain combination" - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve the customized resource attributes associated with a project-domain combination" + // }; } // Deletes custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` for a project and domain. @@ -484,20 +486,51 @@ service AdminService { delete: "/api/v1/project_domain_attributes/{project}/{domain}" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Delete the customized resource attributes associated with a project-domain combination" + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Delete the customized resource attributes associated with a project-domain combination" + // }; + } + + // Creates or updates custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` at the project level + rpc UpdateProjectAttributes (flyteidl.admin.ProjectAttributesUpdateRequest) returns (flyteidl.admin.ProjectAttributesUpdateResponse) { + option (google.api.http) = { + put: "/api/v1/project_attributes/{attributes.project}" + body: "*" + }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Update the customized resource attributes associated with a project" + // }; + } + + // Fetches custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` for a project and domain. + rpc GetProjectAttributes (flyteidl.admin.ProjectAttributesGetRequest) returns (flyteidl.admin.ProjectAttributesGetResponse) { + option (google.api.http) = { + get: "/api/v1/project_attributes/{project}" }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve the customized resource attributes associated with a project" + // }; } + // Deletes custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` for a project and domain. + rpc DeleteProjectAttributes (flyteidl.admin.ProjectAttributesDeleteRequest) returns (flyteidl.admin.ProjectAttributesDeleteResponse) { + option (google.api.http) = { + delete: "/api/v1/project_attributes/{project}" + body: "*" + }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Delete the customized resource attributes associated with a project" + // }; + } // Creates or updates custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` for a project, domain and workflow. rpc UpdateWorkflowAttributes (flyteidl.admin.WorkflowAttributesUpdateRequest) returns (flyteidl.admin.WorkflowAttributesUpdateResponse) { option (google.api.http) = { put: "/api/v1/workflow_attributes/{attributes.project}/{attributes.domain}/{attributes.workflow}" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Update the customized resource attributes associated with a project, domain and workflow combination" - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Update the customized resource attributes associated with a project, domain and workflow combination" + // }; } // Fetches custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` for a project, domain and workflow. @@ -505,9 +538,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/workflow_attributes/{project}/{domain}/{workflow}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Retrieve the customized resource attributes associated with a project, domain and workflow combination" - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve the customized resource attributes associated with a project, domain and workflow combination" + // }; } // Deletes custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` for a project, domain and workflow. @@ -516,9 +549,9 @@ service AdminService { delete: "/api/v1/workflow_attributes/{project}/{domain}/{workflow}" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Delete the customized resource attributes associated with a project, domain and workflow combination" - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Delete the customized resource attributes associated with a project, domain and workflow combination" + // }; } // Lists custom :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` for a specific resource type. @@ -526,9 +559,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/matchable_attributes" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Retrieve a list of MatchableAttributesConfiguration objects." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve a list of MatchableAttributesConfiguration objects." + // }; } // Returns a list of :ref:`ref_flyteidl.admin.NamedEntity` objects. @@ -536,9 +569,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/named_entities/{resource_type}/{project}/{domain}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Retrieve a list of NamedEntity objects sharing a common resource type, project, and domain." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve a list of NamedEntity objects sharing a common resource type, project, and domain." + // }; } // Returns a :ref:`ref_flyteidl.admin.NamedEntity` object. @@ -546,9 +579,9 @@ service AdminService { option (google.api.http) = { get: "/api/v1/named_entities/{resource_type}/{id.project}/{id.domain}/{id.name}" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Retrieve a NamedEntity object." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve a NamedEntity object." + // }; } // Updates a :ref:`ref_flyteidl.admin.NamedEntity` object. @@ -557,19 +590,50 @@ service AdminService { put: "/api/v1/named_entities/{resource_type}/{id.project}/{id.domain}/{id.name}" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Update the fields associated with a NamedEntity" - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Update the fields associated with a NamedEntity" + // }; } rpc GetVersion (flyteidl.admin.GetVersionRequest) returns (flyteidl.admin.GetVersionResponse) { option (google.api.http) = { get: "/api/v1/version" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Retrieve the Version (including the Build information) for FlyteAdmin service" - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve the Version (including the Build information) for FlyteAdmin service" + // }; } -} + // Fetch a :ref:`ref_flyteidl.admin.DescriptionEntity` object. + rpc GetDescriptionEntity (flyteidl.admin.ObjectGetRequest) returns (flyteidl.admin.DescriptionEntity) { + option (google.api.http) = { + get: "/api/v1/description_entities/{id.resource_type}/{id.project}/{id.domain}/{id.name}/{id.version}" + }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve an existing description entity description." + // }; + } + // Fetch a list of :ref:`ref_flyteidl.admin.DescriptionEntity` definitions. + rpc ListDescriptionEntities (flyteidl.admin.DescriptionEntityListRequest) returns (flyteidl.admin.DescriptionEntityList) { + option (google.api.http) = { + get: "/api/v1/description_entities/{resource_type}/{id.project}/{id.domain}/{id.name}" + additional_bindings { + get: "/api/v1/description_entities/{resource_type}/{id.project}/{id.domain}" + } + }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Fetch existing description entity definitions matching input filters." + // }; + } + + // Fetches runtime metrics for a :ref:`ref_flyteidl.admin.Execution`. + rpc GetExecutionMetrics (flyteidl.admin.WorkflowExecutionGetMetricsRequest) returns (flyteidl.admin.WorkflowExecutionGetMetricsResponse) { + option (google.api.http) = { + get: "/api/v1/metrics/executions/{id.project}/{id.domain}/{id.name}" + }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve metrics from an existing workflow execution." + // }; + }; +} diff --git a/flyteidl-protos/src/main/proto/flyteidl/service/agent.proto b/flyteidl-protos/src/main/proto/flyteidl/service/agent.proto new file mode 100644 index 000000000..2a1a14370 --- /dev/null +++ b/flyteidl-protos/src/main/proto/flyteidl/service/agent.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; +package flyteidl.service; + +option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service"; +import "flyteidl/admin/agent.proto"; + +// AgentService defines an RPC Service that allows propeller to send the request to the agent server. +service AsyncAgentService { + // Send a task create request to the agent server. + rpc CreateTask (flyteidl.admin.CreateTaskRequest) returns (flyteidl.admin.CreateTaskResponse){}; + // Get job status. + rpc GetTask (flyteidl.admin.GetTaskRequest) returns (flyteidl.admin.GetTaskResponse){}; + // Delete the task resource. + rpc DeleteTask (flyteidl.admin.DeleteTaskRequest) returns (flyteidl.admin.DeleteTaskResponse){}; +} diff --git a/flyteidl-protos/src/main/proto/flyteidl/service/auth.proto b/flyteidl-protos/src/main/proto/flyteidl/service/auth.proto index defed6ecf..2d11e7fa3 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/service/auth.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/service/auth.proto @@ -4,7 +4,7 @@ package flyteidl.service; option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service"; import "google/api/annotations.proto"; -import "protoc-gen-swagger/options/annotations.proto"; +// import "protoc-gen-swagger/options/annotations.proto"; message OAuth2MetadataRequest {} @@ -41,6 +41,9 @@ message OAuth2MetadataResponse { // JSON array containing a list of the OAuth 2.0 grant type values that this authorization server supports. repeated string grant_types_supported = 9; + + // URL of the authorization server's device authorization endpoint, as defined in Section 3.1 of [RFC8628] + string device_authorization_endpoint = 10; } message PublicClientAuthConfigRequest {} @@ -60,6 +63,8 @@ message PublicClientAuthConfigResponse { // to configure the gRPC connection can be used for the http one respecting the insecure flag to choose between // SSL or no SSL connections. string service_http_endpoint = 5; + // audience to use when initiating OAuth2 authorization requests. + string audience = 6; } // The following defines an RPC service that is also served over HTTP via grpc-gateway. @@ -71,9 +76,9 @@ service AuthMetadataService { option (google.api.http) = { get: "/.well-known/oauth-authorization-server" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Retrieves OAuth2 authorization server metadata. This endpoint is anonymously accessible." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieves OAuth2 authorization server metadata. This endpoint is anonymously accessible." + // }; } // Anonymously accessible. Retrieves the client information clients should use when initiating OAuth2 authorization @@ -82,8 +87,8 @@ service AuthMetadataService { option (google.api.http) = { get: "/config/v1/flyte_client" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Retrieves public flyte client info. This endpoint is anonymously accessible." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieves public flyte client info. This endpoint is anonymously accessible." + // }; } } diff --git a/flyteidl-protos/src/main/proto/flyteidl/service/dataproxy.proto b/flyteidl-protos/src/main/proto/flyteidl/service/dataproxy.proto index e82757acb..8972d4f6d 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/service/dataproxy.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/service/dataproxy.proto @@ -4,9 +4,12 @@ package flyteidl.service; option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service"; import "google/api/annotations.proto"; -import "protoc-gen-swagger/options/annotations.proto"; +// import "protoc-gen-swagger/options/annotations.proto"; import "google/protobuf/duration.proto"; import "google/protobuf/timestamp.proto"; +import "flyteidl/core/identifier.proto"; +import "flyteidl/core/literals.proto"; + message CreateUploadLocationResponse { // SignedUrl specifies the url to use to upload content to (e.g. https://my-bucket.s3.amazonaws.com/randomstring/suffix.tar?X-...) @@ -20,6 +23,10 @@ message CreateUploadLocationResponse { } // CreateUploadLocationRequest specified request for the CreateUploadLocation API. +// The implementation in data proxy service will create the s3 location with some server side configured prefixes, +// and then: +// - project/domain/(a deterministic str representation of the content_md5)/filename (if present); OR +// - project/domain/filename_root (if present)/filename (if present). message CreateUploadLocationRequest { // Project to create the upload location for // +required @@ -42,10 +49,17 @@ message CreateUploadLocationRequest { // generated path. // +required bytes content_md5 = 5; + + // If present, data proxy will use this string in lieu of the md5 hash in the path. When the filename is also included + // this makes the upload location deterministic. The native url will still be prefixed by the upload location prefix + // in data proxy config. This option is useful when uploading multiple files. + // +optional + string filename_root = 6; } // CreateDownloadLocationRequest specified request for the CreateDownloadLocation API. message CreateDownloadLocationRequest { + option deprecated = true; // NativeUrl specifies the url in the format of the configured storage provider (e.g. s3://my-bucket/randomstring/suffix.tar) string native_url = 1; @@ -57,12 +71,85 @@ message CreateDownloadLocationRequest { } message CreateDownloadLocationResponse { + option deprecated = true; // SignedUrl specifies the url to use to download content from (e.g. https://my-bucket.s3.amazonaws.com/randomstring/suffix.tar?X-...) string signed_url = 1; // ExpiresAt defines when will the signed URL expires. google.protobuf.Timestamp expires_at = 2; } +// ArtifactType +enum ArtifactType { + // ARTIFACT_TYPE_UNDEFINED is the default, often invalid, value for the enum. + ARTIFACT_TYPE_UNDEFINED = 0; + + // ARTIFACT_TYPE_DECK refers to the deck html file optionally generated after a task, a workflow or a launch plan + // finishes executing. + ARTIFACT_TYPE_DECK = 1; +} + +// CreateDownloadLinkRequest defines the request parameters to create a download link (signed url) +message CreateDownloadLinkRequest { + // ArtifactType of the artifact requested. + ArtifactType artifact_type = 1; + + // ExpiresIn defines a requested expiration duration for the generated url. The request will be rejected if this + // exceeds the platform allowed max. + // +optional. The default value comes from a global config. + google.protobuf.Duration expires_in = 2; + + oneof source { + // NodeId is the unique identifier for the node execution. For a task node, this will retrieve the output of the + // most recent attempt of the task. + core.NodeExecutionIdentifier node_execution_id = 3; + } +} + +// CreateDownloadLinkResponse defines the response for the generated links +message CreateDownloadLinkResponse { + // SignedUrl specifies the url to use to download content from (e.g. https://my-bucket.s3.amazonaws.com/randomstring/suffix.tar?X-...) + repeated string signed_url = 1 [deprecated = true]; + + // ExpiresAt defines when will the signed URL expire. + google.protobuf.Timestamp expires_at = 2 [deprecated = true]; + + // New wrapper object containing the signed urls and expiration time + PreSignedURLs pre_signed_urls = 3; +} + +// Wrapper object since the message is shared across this and the GetDataResponse +message PreSignedURLs { + // SignedUrl specifies the url to use to download content from (e.g. https://my-bucket.s3.amazonaws.com/randomstring/suffix.tar?X-...) + repeated string signed_url = 1; + + // ExpiresAt defines when will the signed URL expire. + google.protobuf.Timestamp expires_at = 2; +} + +// General request artifact to retrieve data from a Flyte artifact url. +message GetDataRequest { + // A unique identifier in the form of flyte:// that uniquely, for a given Flyte + // backend, identifies a Flyte artifact ([i]nput, [o]utput, flyte [d]eck, etc.). + // e.g. flyte://v1/proj/development/execid/n2/0/i (for 0th task execution attempt input) + // flyte://v1/proj/development/execid/n2/i (for node execution input) + // flyte://v1/proj/development/execid/n2/o/o3 (the o3 output of the second node) + string flyte_url = 1; +} + +message GetDataResponse { + oneof data { + // literal map data will be returned + core.LiteralMap literal_map = 1; + + // Flyte deck html will be returned as a signed url users can download + PreSignedURLs pre_signed_urls = 2; + + // Single literal will be returned. This is returned when the user/url requests a specific output or input + // by name. See the o3 example above. + core.Literal literal = 3; + } +} + // DataProxyService defines an RPC Service that allows access to user-data in a controlled manner. service DataProxyService { // CreateUploadLocation creates a signed url to upload artifacts to for a given project/domain. @@ -71,17 +158,37 @@ service DataProxyService { post: "/api/v1/dataproxy/artifact_urn" body: "*" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Creates a write-only http location that is accessible for tasks at runtime." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Creates a write-only http location that is accessible for tasks at runtime." + // }; } + // CreateDownloadLocation creates a signed url to download artifacts. rpc CreateDownloadLocation (CreateDownloadLocationRequest) returns (CreateDownloadLocationResponse) { + option deprecated = true; option (google.api.http) = { get: "/api/v1/dataproxy/artifact_urn" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Creates a read-only http location that is accessible for tasks at runtime." + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Deprecated: Please use CreateDownloadLink instead. Creates a read-only http location that is accessible for tasks at runtime." + // }; + } + + // CreateDownloadLocation creates a signed url to download artifacts. + rpc CreateDownloadLink (CreateDownloadLinkRequest) returns (CreateDownloadLinkResponse) { + option (google.api.http) = { + post: "/api/v1/dataproxy/artifact_link" + body: "*" + }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Creates a read-only http location that is accessible for tasks at runtime." + // }; + } + + rpc GetData (GetDataRequest) returns (GetDataResponse) { + // Takes an address like flyte://v1/proj/development/execid/n2/0/i and return the actual data + option (google.api.http) = { + get: "/api/v1/data" }; } } diff --git a/flyteidl-protos/src/main/proto/flyteidl/service/external_plugin_service.proto b/flyteidl-protos/src/main/proto/flyteidl/service/external_plugin_service.proto new file mode 100644 index 000000000..18f60a7d9 --- /dev/null +++ b/flyteidl-protos/src/main/proto/flyteidl/service/external_plugin_service.proto @@ -0,0 +1,80 @@ +syntax = "proto3"; +package flyteidl.service; + +option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service"; +import "flyteidl/core/literals.proto"; +import "flyteidl/core/tasks.proto"; +import "flyteidl/core/interface.proto"; + +// ExternalPluginService defines an RPC Service that allows propeller to send the request to the backend plugin server. +service ExternalPluginService { + // Send a task create request to the backend plugin server. + rpc CreateTask (TaskCreateRequest) returns (TaskCreateResponse){option deprecated = true;}; + // Get job status. + rpc GetTask (TaskGetRequest) returns (TaskGetResponse){option deprecated = true;}; + // Delete the task resource. + rpc DeleteTask (TaskDeleteRequest) returns (TaskDeleteResponse){option deprecated = true;}; +} + +// The state of the execution is used to control its visibility in the UI/CLI. +enum State { + option deprecated = true; + RETRYABLE_FAILURE = 0; + PERMANENT_FAILURE = 1; + PENDING = 2; + RUNNING = 3; + SUCCEEDED = 4; +} + +// Represents a request structure to create task. +message TaskCreateRequest { + option deprecated = true; + // The inputs required to start the execution. All required inputs must be + // included in this map. If not required and not provided, defaults apply. + // +optional + core.LiteralMap inputs = 1; + // Template of the task that encapsulates all the metadata of the task. + core.TaskTemplate template = 2; + // Prefix for where task output data will be written. (e.g. s3://my-bucket/randomstring) + string output_prefix = 3; +} + +// Represents a create response structure. +message TaskCreateResponse { + option deprecated = true; + string job_id = 1; +} + +// A message used to fetch a job state from backend plugin server. +message TaskGetRequest { + option deprecated = true; + // A predefined yet extensible Task type identifier. + string task_type = 1; + // The unique id identifying the job. + string job_id = 2; +} + +// Response to get an individual task state. +message TaskGetResponse { + option deprecated = true; + // The state of the execution is used to control its visibility in the UI/CLI. + State state = 1; + // The outputs of the execution. It's typically used by sql task. Flyteplugins service will create a + // Structured dataset pointing to the query result table. + // +optional + core.LiteralMap outputs = 2; +} + +// A message used to delete a task. +message TaskDeleteRequest { + option deprecated = true; + // A predefined yet extensible Task type identifier. + string task_type = 1; + // The unique id identifying the job. + string job_id = 2; +} + +// Response to delete a task. +message TaskDeleteResponse { + option deprecated = true; +} diff --git a/flyteidl-protos/src/main/proto/flyteidl/service/identity.proto b/flyteidl-protos/src/main/proto/flyteidl/service/identity.proto index d51168cb9..e4bc5dcb0 100644 --- a/flyteidl-protos/src/main/proto/flyteidl/service/identity.proto +++ b/flyteidl-protos/src/main/proto/flyteidl/service/identity.proto @@ -4,7 +4,8 @@ package flyteidl.service; option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service"; import "google/api/annotations.proto"; -import "protoc-gen-swagger/options/annotations.proto"; +import "google/protobuf/struct.proto"; +// import "protoc-gen-swagger/options/annotations.proto"; message UserInfoRequest {} @@ -31,6 +32,9 @@ message UserInfoResponse { // Profile picture URL string picture = 7; + + // Additional claims + google.protobuf.Struct additional_claims = 8; } // IdentityService defines an RPC Service that interacts with user/app identities. @@ -40,8 +44,8 @@ service IdentityService { option (google.api.http) = { get: "/me" }; - option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { - description: "Retrieves authenticated identity info." - }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieves authenticated identity info." + // }; } } diff --git a/flyteidl-protos/src/main/proto/flyteidl/service/signal.proto b/flyteidl-protos/src/main/proto/flyteidl/service/signal.proto new file mode 100644 index 000000000..634440715 --- /dev/null +++ b/flyteidl-protos/src/main/proto/flyteidl/service/signal.proto @@ -0,0 +1,55 @@ +syntax = "proto3"; +package flyteidl.service; + +option go_package = "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service"; + +import "google/api/annotations.proto"; +import "flyteidl/admin/signal.proto"; +// import "protoc-gen-swagger/options/annotations.proto"; + +// SignalService defines an RPC Service that may create, update, and retrieve signal(s). +service SignalService { + // Fetches or creates a :ref:`ref_flyteidl.admin.Signal`. + rpc GetOrCreateSignal (flyteidl.admin.SignalGetOrCreateRequest) returns (flyteidl.admin.Signal) { + // Purposefully left out an HTTP API for this RPC call. This is meant to idempotently retrieve + // a signal, meaning the first call will create the signal and all subsequent calls will + // fetch the existing signal. This is only useful during Flyte Workflow execution and therefore + // is not exposed to mitigate unintended behavior. + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Retrieve a signal, creating it if it does not exist." + // }; + } + + // Fetch a list of :ref:`ref_flyteidl.admin.Signal` definitions. + rpc ListSignals (flyteidl.admin.SignalListRequest) returns (flyteidl.admin.SignalList) { + option (google.api.http) = { + get: "/api/v1/signals/{workflow_execution_id.project}/{workflow_execution_id.domain}/{workflow_execution_id.name}" + }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Fetch existing signal definitions matching the input signal id filters." + // }; + } + + // Sets the value on a :ref:`ref_flyteidl.admin.Signal` definition + rpc SetSignal (flyteidl.admin.SignalSetRequest) returns (flyteidl.admin.SignalSetResponse) { + option (google.api.http) = { + post: "/api/v1/signals" + body: "*" + }; + // option (grpc.gateway.protoc_gen_swagger.options.openapiv2_operation) = { + // description: "Set a signal value." + // responses: { + // key: "400" + // value: { + // description: "Returned for bad request that may have failed validation." + // } + // } + // responses: { + // key: "409" + // value: { + // description: "Returned for a request that references an identical entity that has already been registered." + // } + // } + // }; + } +} diff --git a/flytekit-scala-tests/src/test/scala/org/flyte/flytekitscala/SdkScalaTypeTest.scala b/flytekit-scala-tests/src/test/scala/org/flyte/flytekitscala/SdkScalaTypeTest.scala index a36a6a3c2..949550c61 100644 --- a/flytekit-scala-tests/src/test/scala/org/flyte/flytekitscala/SdkScalaTypeTest.scala +++ b/flytekit-scala-tests/src/test/scala/org/flyte/flytekitscala/SdkScalaTypeTest.scala @@ -95,7 +95,9 @@ class SdkScalaTypeTest { datetime: SdkBindingData[Instant], duration: SdkBindingData[Duration], blob: SdkBindingData[Blob], - generic: SdkBindingData[ScalarNested] + generic: SdkBindingData[ScalarNested], + none: SdkBindingData[Option[String]], + some: SdkBindingData[Option[String]] ) case class CollectionInput( @@ -105,7 +107,8 @@ class SdkScalaTypeTest { booleans: SdkBindingData[List[Boolean]], datetimes: SdkBindingData[List[Instant]], durations: SdkBindingData[List[Duration]], - generics: SdkBindingData[List[ScalarNested]] + generics: SdkBindingData[List[ScalarNested]], + options: SdkBindingData[List[Option[String]]] ) case class MapInput( @@ -115,7 +118,8 @@ class SdkScalaTypeTest { booleanMap: SdkBindingData[Map[String, Boolean]], datetimeMap: SdkBindingData[Map[String, Instant]], durationMap: SdkBindingData[Map[String, Duration]], - genericMap: SdkBindingData[Map[String, ScalarNested]] + genericMap: SdkBindingData[Map[String, ScalarNested]], + optionMap: SdkBindingData[Map[String, Option[String]]] ) case class ComplexInput( @@ -196,7 +200,9 @@ class SdkScalaTypeTest { .literalType(LiteralType.ofBlobType(BlobType.DEFAULT)) .description("") .build(), - "generic" -> createVar(SimpleType.STRUCT) + "generic" -> createVar(SimpleType.STRUCT), + "none" -> createVar(SimpleType.STRUCT), + "some" -> createVar(SimpleType.STRUCT) ).asJava val output = SdkScalaType[ScalarInput].getVariableMap @@ -274,6 +280,16 @@ class SdkScalaTypeTest { ).asJava ) ) + ), + "none" -> Literal.ofScalar( + Scalar.ofGeneric( + Struct.of(Map.empty[String, Struct.Value].asJava) + ) + ), + "some" -> Literal.ofScalar( + Scalar.ofGeneric( + Struct.of(Map("value" -> Struct.Value.ofStringValue("hello")).asJava) + ) ) ).asJava @@ -295,6 +311,14 @@ class SdkScalaTypeTest { List(ScalarNestedNested("foo", Some("bar"))), Map("foo" -> ScalarNestedNested("foo", Some("bar"))) ) + ), + none = SdkBindingDataFactory.of( + SdkLiteralTypes.generics[Option[String]](), + Option(null) + ), + some = SdkBindingDataFactory.of( + SdkLiteralTypes.generics[Option[String]](), + Option("hello") ) ) @@ -323,7 +347,11 @@ class SdkScalaTypeTest { List(ScalarNestedNested("foo", Some("bar"))), Map("foo" -> ScalarNestedNested("foo", Some("bar"))) ) - ) + ), + none = + SdkBindingDataFactory.of(SdkLiteralTypes.generics(), Option(null)), + some = + SdkBindingDataFactory.of(SdkLiteralTypes.generics(), Option("hello")) ) val expected = Map( @@ -399,6 +427,23 @@ class SdkScalaTypeTest { ).asJava ) ) + ), + "none" -> Literal.ofScalar( + Scalar.ofGeneric( + Struct.of( + Map(__TYPE -> Struct.Value.ofStringValue("scala.None$")).asJava + ) + ) + ), + "some" -> Literal.ofScalar( + Scalar.ofGeneric( + Struct.of( + Map( + "value" -> Struct.Value.ofStringValue("hello"), + __TYPE -> Struct.Value.ofStringValue("scala.Some") + ).asJava + ) + ) ) ).asJava @@ -416,7 +461,8 @@ class SdkScalaTypeTest { "booleans" -> createCollectionVar(SimpleType.BOOLEAN), "datetimes" -> createCollectionVar(SimpleType.DATETIME), "durations" -> createCollectionVar(SimpleType.DURATION), - "generics" -> createCollectionVar(SimpleType.STRUCT) + "generics" -> createCollectionVar(SimpleType.STRUCT), + "options" -> createCollectionVar(SimpleType.STRUCT) ).asJava val output = SdkScalaType[CollectionInput].getVariableMap @@ -443,6 +489,14 @@ class SdkScalaTypeTest { List(ScalarNestedNested("foo", Some("bar"))), Map("foo" -> ScalarNestedNested("foo", Some("bar"))) ) + ), + none = SdkBindingDataFactory.of( + SdkLiteralTypes.generics[Option[String]](), + Option(null) + ), + some = SdkBindingDataFactory.of( + SdkLiteralTypes.generics[Option[String]](), + Option("hello") ) ) @@ -465,6 +519,14 @@ class SdkScalaTypeTest { List(ScalarNestedNested("foo", Some("bar"))), Map("foo" -> ScalarNestedNested("foo", Some("bar"))) ) + ), + "none" -> SdkBindingDataFactory.of( + SdkLiteralTypes.generics[Option[String]](), + Option(null) + ), + "some" -> SdkBindingDataFactory.of( + SdkLiteralTypes.generics[Option[String]](), + Option("hello") ) ).asJava @@ -531,6 +593,10 @@ class SdkScalaTypeTest { Map("foo2" -> ScalarNestedNested("foo2", Some("bar2"))) ) ) + ), + options = SdkBindingDataFactory.of( + SdkLiteralTypes.generics[Option[String]](), + List(Option("hello"), Option(null)) ) ) @@ -550,7 +616,8 @@ class SdkScalaTypeTest { "booleanMap" -> createMapVar(SimpleType.BOOLEAN), "datetimeMap" -> createMapVar(SimpleType.DATETIME), "durationMap" -> createMapVar(SimpleType.DURATION), - "genericMap" -> createMapVar(SimpleType.STRUCT) + "genericMap" -> createMapVar(SimpleType.STRUCT), + "optionMap" -> createMapVar(SimpleType.STRUCT) ).asJava val output = SdkScalaType[MapInput].getVariableMap @@ -598,6 +665,10 @@ class SdkScalaTypeTest { Map("foo2" -> ScalarNestedNested("foo2", Some("bar2"))) ) ) + ), + optionMap = SdkBindingDataFactory.of( + SdkLiteralTypes.generics[Option[String]](), + Map("none" -> Option(null), "some" -> Option("hello")) ) ) diff --git a/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/SdkLiteralTypes.scala b/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/SdkLiteralTypes.scala index 8b4502d30..47cb80077 100644 --- a/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/SdkLiteralTypes.scala +++ b/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/SdkLiteralTypes.scala @@ -28,6 +28,7 @@ import scala.reflect.api.{Mirror, TypeCreator, Universe} import scala.reflect.runtime.universe import scala.reflect.{ClassTag, classTag} import scala.reflect.runtime.universe.{ + ClassSymbol, NoPrefix, Symbol, Type, @@ -73,7 +74,7 @@ object SdkLiteralTypes { blobs(BlobType.DEFAULT).asInstanceOf[SdkLiteralType[T]] case t if t =:= typeOf[Binary] => binary().asInstanceOf[SdkLiteralType[T]] - case t if t <:< typeOf[Product] && !(t =:= typeOf[Option[_]]) => + case t if t <:< typeOf[Product] => generics().asInstanceOf[SdkLiteralType[T]] case t if t <:< typeOf[List[Any]] => @@ -301,23 +302,37 @@ object SdkLiteralTypes { } val clazz = typeOf[S].typeSymbol.asClass - val classMirror = mirror.reflectClass(clazz) - val constructor = typeOf[S].decl(termNames.CONSTRUCTOR).asMethod - val constructorMirror = classMirror.reflectConstructor(constructor) - - val constructorArgs = - constructor.paramLists.flatten.map((param: Symbol) => { - val paramName = param.name.toString - val value = map.getOrElse( - paramName, - throw new IllegalArgumentException( - s"Map is missing required parameter named $paramName" + + def instantiateViaConstructor(cls: ClassSymbol): S = { + val classMirror = mirror.reflectClass(cls) + val constructor = typeOf[S].decl(termNames.CONSTRUCTOR).asMethod + val constructorMirror = classMirror.reflectConstructor(constructor) + + val constructorArgs = + constructor.paramLists.flatten.map((param: Symbol) => { + val paramName = param.name.toString + val value = map.getOrElse( + paramName, + throw new IllegalArgumentException( + s"Map is missing required parameter named $paramName" + ) ) - ) - valueToParamValue(value, param.typeSignature.dealias) - }) + valueToParamValue(value, param.typeSignature.dealias) + }) + + constructorMirror(constructorArgs: _*).asInstanceOf[S] + } + + // special handling of scala.Option as it is a Product, but can't be instantiated like common + // case classes + if (clazz.name.toString == "Option") + map + .get("value") + .map(valueToParamValue(_, typeOf[S].typeArgs.head)) + .asInstanceOf[S] + else + instantiateViaConstructor(clazz) - constructorMirror(constructorArgs: _*).asInstanceOf[S] } def structValueToAny(value: Struct.Value): Any = { diff --git a/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/SdkScalaType.scala b/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/SdkScalaType.scala index 00cbdea56..c4868094e 100644 --- a/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/SdkScalaType.scala +++ b/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/SdkScalaType.scala @@ -232,11 +232,8 @@ object SdkScalaType { implicit def durationLiteralType: SdkScalaLiteralType[Duration] = DelegateLiteralType(SdkLiteralTypes.durations()) - // more specific matching to fail the usage of SdkBindingData[Option[_]] - implicit def optionLiteralType: SdkScalaLiteralType[Option[_]] = ??? - // fixme: using Product is just an approximation for case class because Product - // is also super class of, for example, Option and Tuple + // is also super class of, for example, Either or Try implicit def productLiteralType[T <: Product: TypeTag: ClassTag] : SdkScalaLiteralType[T] = DelegateLiteralType(SdkLiteralTypes.generics()) diff --git a/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/package.scala b/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/package.scala index 47c6332b3..b5bcc208d 100644 --- a/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/package.scala +++ b/flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/package.scala @@ -30,7 +30,11 @@ package object flytekitscala { } catch { case _: Throwable => // fall back to java's way, less reliable and with limitations - product.getClass.getDeclaredFields.map(_.getName).toList + val methodNames = product.getClass.getDeclaredMethods.map(_.getName) + product.getClass.getDeclaredFields + .map(_.getName) + .filter(methodNames.contains) + .toList } } }