Skip to content

Commit

Permalink
Artifact/event processing (#4277)
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor authored Oct 27, 2023
1 parent 729b300 commit ceb390d
Show file tree
Hide file tree
Showing 37 changed files with 1,297 additions and 154 deletions.
36 changes: 35 additions & 1 deletion cmd/single/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package single

import (
"context"
sharedCmd "github.com/flyteorg/flyte/flyteartifacts/cmd/shared"
"github.com/flyteorg/flyte/flyteartifacts/pkg/configuration"
artifactsServer "github.com/flyteorg/flyte/flyteartifacts/pkg/server"
"github.com/flyteorg/flyte/flytestdlib/database"
"net/http"

datacatalogConfig "github.com/flyteorg/flyte/datacatalog/pkg/config"
Expand Down Expand Up @@ -60,7 +64,37 @@ func startClusterResourceController(ctx context.Context) error {
}

func startArtifact(ctx context.Context, cfg Artifacts) error {
return nil
if cfg.Disabled {
logger.Infof(ctx, "Artifacts server is disabled. Skipping...")
return nil
}
// Roughly copies main/NewMigrateCmd
logger.Infof(ctx, "Artifacts: running database migrations if any...")
migs := artifactsServer.GetMigrations(ctx)
initializationSql := "create extension if not exists hstore;"
dbConfig := artifactsServer.GetDbConfig()
err := database.Migrate(context.Background(), dbConfig, migs, initializationSql)
if err != nil {
logger.Errorf(ctx, "Failed to run Artifacts database migrations. Error: %v", err)
return err
}

g, childCtx := errgroup.WithContext(ctx)

// Rough copy of NewServeCmd
g.Go(func() error {
cfg := configuration.GetApplicationConfig()
serverCfg := &cfg.ArtifactServerConfig
err := sharedCmd.ServeGateway(childCtx, "artifacts", serverCfg, artifactsServer.GrpcRegistrationHook,
artifactsServer.HttpRegistrationHook)
if err != nil {
logger.Errorf(childCtx, "Failed to start Artifacts server. Error: %v", err)
return err
}
return nil
})

return g.Wait()
}

func startAdmin(ctx context.Context, cfg Admin) error {
Expand Down
26 changes: 21 additions & 5 deletions flyte-single-binary-local.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# This is a sample configuration file for running single-binary Flyte locally against
# a sandbox.
# gatepr: revert the local dir to reflect home.
# paths were changed to personal to ensure settings didn't get lost.
admin:
endpoint: localhost:8089
insecure: true
Expand All @@ -15,7 +17,7 @@ cluster_resources:

logger:
show-source: true
level: 6
level: 3

propeller:
create-flyteworkflow-crd: true
Expand Down Expand Up @@ -81,13 +83,27 @@ database:
cloudEvents:
enable: true
cloudEventVersion: v2
type: redis
redis:
addr: "localhost:30004"
type: sandbox
# For admin to find artifacts service
artifacts:
host: localhost
port: 50051
port: 30080
insecure: true
# For artifact service itself
artifactsServer:
artifactBlobStoreConfig:
type: stow
stow:
kind: s3
config:
disable_ssl: true
v2_signing: true
endpoint: http://localhost:30002
auth_type: accesskey
access_key_id: minio
secret_key: miniostorage
artifactsProcessor:
cloudProvider: Sandbox
storage:
type: stow
stow:
Expand Down
2 changes: 2 additions & 0 deletions flyteadmin/pkg/artifacts/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package artifacts

// gatepr: add proper config bits for this
// eduardo to consider moving to idl clients.
type Config struct {
Host string `json:"host"`
Port int `json:"port"`
Expand Down
18 changes: 5 additions & 13 deletions flyteadmin/pkg/async/cloudevent/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cloudevent

import (
"context"
"github.com/flyteorg/flyte/flytestdlib/sandbox_utils"
"time"

dataInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/data/interfaces"
Expand All @@ -17,7 +18,6 @@ import (
"github.com/flyteorg/flyte/flyteadmin/pkg/async"
cloudEventImplementations "github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/implementations"
"github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/interfaces"
redisPublisher "github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/redis"
"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/implementations"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
Expand Down Expand Up @@ -90,20 +90,12 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi
}
sender = &cloudEventImplementations.KafkaSender{Client: client}

case common.Redis:
case common.Sandbox:
var publisher pubsub.Publisher
var err error
err = async.Retry(reconnectAttempts, reconnectDelay, func() error {
publisher, err = redisPublisher.NewPublisher(cloudEventsConfig.RedisConfig)
return err
})
logger.Infof(ctx, "Using Redis cloud events publisher [%v] [%+v]", publisher, cloudEventsConfig.RedisConfig)

// Persistent errors should hard fail
if err != nil {
panic(err)
publisher = sandbox_utils.NewCloudEventsPublisher()
sender = &cloudEventImplementations.PubSubSender{
Pub: publisher,
}
sender = &cloudEventImplementations.PubSubSender{Pub: publisher}

case common.Local:
fallthrough
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,8 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
// Get outputs from the workflow execution
var outputs *core.LiteralMap
if rawEvent.GetOutputData() != nil {
fmt.Printf("remove this - Got output data")
outputs = rawEvent.GetOutputData()
} else if len(rawEvent.GetOutputUri()) > 0 {
fmt.Printf("remove this - Got output URI")
// GetInputs actually fetches the data, even though this is an output
outputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig, c.storageClient, rawEvent.GetOutputUri())
if err != nil {
Expand Down Expand Up @@ -282,10 +280,8 @@ func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Con

var outputs *core.LiteralMap
if rawEvent.GetOutputData() != nil {
fmt.Printf("remove this - task Got output data")
outputs = rawEvent.GetOutputData()
} else if len(rawEvent.GetOutputUri()) > 0 {
fmt.Printf("remove this - task Got output URI")
// GetInputs actually fetches the data, even though this is an output
outputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig,
c.storageClient, rawEvent.GetOutputUri())
Expand Down Expand Up @@ -375,6 +371,7 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy
executionID = msgType.ExecutionId.String()
eventID = fmt.Sprintf("%v", executionID)
eventTime = time.Now()
// CloudEventExecutionStart don't have a nested event
finalMsg = msgType
default:
return fmt.Errorf("unsupported event types [%+v]", reflect.TypeOf(msg))
Expand Down
7 changes: 4 additions & 3 deletions flyteadmin/pkg/async/cloudevent/implementations/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package implementations

import (
"context"
"encoding/json"
"fmt"
pbcloudevents "github.com/cloudevents/sdk-go/binding/format/protobuf/v2"

"github.com/NYTimes/gizmo/pubsub"
"github.com/Shopify/sarama"
Expand All @@ -25,8 +25,9 @@ type PubSubSender struct {

func (s *PubSubSender) Send(ctx context.Context, notificationType string, event cloudevents.Event) error {
// gatepr: investigate why the previous statement didn't work.
// eventByte, err := pbcloudevents.Protobuf.Marshal(&event)
eventByte, err := json.Marshal(&event)
// perhaps only because of redis.
eventByte, err := pbcloudevents.Protobuf.Marshal(&event)
//eventByte, err := json.Marshal(&event)
if err != nil {
logger.Errorf(ctx, "Failed to marshal cloudevent with error: %v", err)
return err
Expand Down
1 change: 0 additions & 1 deletion flyteadmin/pkg/common/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,5 @@ const (
GCP CloudProvider = "gcp"
Sandbox CloudProvider = "sandbox"
Local CloudProvider = "local"
Redis CloudProvider = "redis"
None CloudProvider = "none"
)
11 changes: 9 additions & 2 deletions flyteadmin/pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,18 @@ func (m *TaskExecutionManager) updateTaskExecutionModelState(

func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, request admin.TaskExecutionEventRequest) (
*admin.TaskExecutionEventResponse, error) {

logger.Warningf(ctx, "HERE!!!123")

if err := validation.ValidateTaskExecutionRequest(request, m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes); err != nil {
return nil, err
}
logger.Warningf(ctx, "HERE!!!123-1")

if err := validation.ValidateClusterForExecutionID(ctx, m.db, request.Event.ParentNodeExecutionId.ExecutionId, request.Event.ProducerId); err != nil {
return nil, err
}
logger.Warningf(ctx, "HERE!!!123-2")

// Get the parent node execution, if none found a MissingEntityError will be returned
nodeExecutionID := request.Event.ParentNodeExecutionId
Expand Down Expand Up @@ -204,10 +209,12 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req
logger.Infof(ctx, "error publishing event [%+v] with err: [%v]", request.RequestId, err)
}

logger.Warningf(ctx, "HERE!!!123-3")
go func() {
ceCtx := context.TODO()
ceCtx := context.Background()
logger.Warningf(ctx, "HERE!!!123-4")
if err := m.cloudEventsPublisher.Publish(ceCtx, proto.MessageName(&request), &request); err != nil {
logger.Infof(ctx, "error publishing cloud event [%+v] with err: [%v]", request.RequestId, err)
logger.Errorf(ctx, "error publishing cloud event [%+v] with err: [%v]", request.RequestId, err)
}
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,8 +536,8 @@ type ExternalEventsConfig struct {
ReconnectDelaySeconds int `json:"reconnectDelaySeconds"`
}

//go:generate enumer -type=CloudEventVersion -trimprefix=CloudEventVersion
type CloudEventVersion int
//go:generate enumer -type=CloudEventVersion -json -yaml -trimprefix=CloudEventVersion
type CloudEventVersion uint8

const (
// This is the initial version of the cloud events
Expand Down
39 changes: 37 additions & 2 deletions flyteadmin/pkg/runtime/interfaces/cloudeventversion_enumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions flyteartifacts/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"context"
sharedCmd "github.com/flyteorg/flyte/flyteartifacts/cmd/shared"
"github.com/flyteorg/flyte/flyteartifacts/pkg/server"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
"github.com/flyteorg/flyte/flytestdlib/storage"

_ "net/http/pprof" // Required to serve application.
)
Expand All @@ -22,3 +25,9 @@ func main() {
panic(err)
}
}

func init() {
// Set Keys
labeled.SetMetricKeys(contextutils.AppNameKey, contextutils.ProjectKey,
contextutils.DomainKey, storage.FailureTypeLabel)
}
18 changes: 5 additions & 13 deletions flyteartifacts/cmd/shared/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@ import (
"context"
"flag"
"fmt"
sharedCfg "github.com/flyteorg/flyte/flyteartifacts/pkg/configuration/shared"
"github.com/flyteorg/flyte/flyteartifacts/pkg/configuration"
"github.com/flyteorg/flyte/flytestdlib/config"
"github.com/flyteorg/flyte/flytestdlib/config/viper"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/profutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
"github.com/flyteorg/flyte/flytestdlib/storage"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"os"
Expand All @@ -36,9 +33,9 @@ func NewRootCmd(rootUse string, grpcHook GrpcRegistrationHook, httpHook HttpRegi

go func() {
ctx := context.Background()
sharedConfig := sharedCfg.SharedServerConfig.GetConfig().(*sharedCfg.ServerConfiguration)
metricsCfg := configuration.GetApplicationConfig().ArtifactServerConfig.Metrics
err := profutils.StartProfilingServerWithDefaultHandlers(ctx,
sharedConfig.Metrics.Port.Port, nil)
metricsCfg.Port.Port, nil)
if err != nil {
logger.Panicf(ctx, "Failed to Start profiling and metrics server. Error: %v", err)
}
Expand All @@ -52,12 +49,6 @@ func NewRootCmd(rootUse string, grpcHook GrpcRegistrationHook, httpHook HttpRegi
return rootCmd
}

func init() {
// Set Keys
labeled.SetMetricKeys(contextutils.AppNameKey, contextutils.ProjectKey,
contextutils.DomainKey, storage.FailureTypeLabel)
}

func initConfig(cmd *cobra.Command, _ []string) error {
configAccessor = viper.NewAccessor(config.Options{
SearchPaths: []string{cfgFile, ".", "/etc/flyte/config", "$GOPATH/src/github.com/flyteorg/flyte/flyteartifacts"},
Expand Down Expand Up @@ -86,7 +77,8 @@ func initSubCommands(rootCmd *cobra.Command, grpcHook GrpcRegistrationHook, http
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "artifact_config.yaml", "config file (default is ./artifact_config.yaml)")

rootCmd.AddCommand(viper.GetConfigCommand())
rootCmd.AddCommand(NewServeCmd(rootCmd.Use, grpcHook, httpHook))
cfg := configuration.GetApplicationConfig()
rootCmd.AddCommand(NewServeCmd(rootCmd.Use, cfg.ArtifactServerConfig, grpcHook, httpHook))

// Allow viper to read the value of the flags
configAccessor.InitializePflags(rootCmd.PersistentFlags())
Expand Down
Loading

0 comments on commit ceb390d

Please sign in to comment.