Skip to content

Commit

Permalink
Revert: refactor: serverless connector This reverts commit b413e6c.
Browse files Browse the repository at this point in the history
  • Loading branch information
Yeuoly committed Jan 9, 2025
1 parent 3e6e496 commit d08d78e
Show file tree
Hide file tree
Showing 14 changed files with 487 additions and 356 deletions.
23 changes: 11 additions & 12 deletions internal/core/plugin_manager/install_to_serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

// InstallToAWSFromPkg installs a plugin to AWS Lambda
func (p *PluginManager) InstallToAWSFromPkg(
originalPackager []byte,
decoder decoder.PluginDecoder,
source string,
meta map[string]any,
Expand All @@ -33,7 +32,7 @@ func (p *PluginManager) InstallToAWSFromPkg(
return nil, err
}

response, err := serverless.LaunchPlugin(originalPackager, decoder)
response, err := serverless.UploadPlugin(decoder)
if err != nil {
return nil, err
}
Expand All @@ -50,17 +49,17 @@ func (p *PluginManager) InstallToAWSFromPkg(
newResponse.Close()
}()

functionUrl := ""
functionName := ""
lambdaUrl := ""
lambdaFunctionName := ""

response.Async(func(r serverless.LaunchFunctionResponse) {
response.Async(func(r serverless.LaunchAWSLambdaFunctionResponse) {
if r.Event == serverless.Info {
newResponse.Write(PluginInstallResponse{
Event: PluginInstallEventInfo,
Data: "Installing...",
})
} else if r.Event == serverless.Done {
if functionUrl == "" || functionName == "" {
if lambdaUrl == "" || lambdaFunctionName == "" {
newResponse.Write(PluginInstallResponse{
Event: PluginInstallEventError,
Data: "Internal server error, failed to get lambda url or function name",
Expand All @@ -77,8 +76,8 @@ func (p *PluginManager) InstallToAWSFromPkg(
serverlessModel := &models.ServerlessRuntime{
Checksum: checksum,
Type: models.SERVERLESS_RUNTIME_TYPE_AWS_LAMBDA,
FunctionURL: functionUrl,
FunctionName: functionName,
FunctionURL: lambdaUrl,
FunctionName: lambdaFunctionName,
PluginUniqueIdentifier: uniqueIdentity.String(),
Declaration: declaration,
}
Expand Down Expand Up @@ -107,10 +106,10 @@ func (p *PluginManager) InstallToAWSFromPkg(
Event: PluginInstallEventError,
Data: "Internal server error",
})
} else if r.Event == serverless.FunctionUrl {
functionUrl = r.Message
} else if r.Event == serverless.Function {
functionName = r.Message
} else if r.Event == serverless.LambdaUrl {
lambdaUrl = r.Message
} else if r.Event == serverless.Lambda {
lambdaFunctionName = r.Message
} else {
newResponse.WriteError(fmt.Errorf("unknown event: %s, with message: %s", r.Event, r.Message))
}
Expand Down
156 changes: 45 additions & 111 deletions internal/core/plugin_manager/serverless_connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import (
"net/url"

"github.com/langgenius/dify-plugin-daemon/internal/utils/http_requests"
"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
"github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
"github.com/langgenius/dify-plugin-daemon/pkg/entities"
)

type ServerlessFunction struct {
var ()

type LambdaFunction struct {
FunctionName string `json:"function_name" validate:"required"`
FunctionDRN string `json:"function_drn" validate:"required"`
FunctionARN string `json:"function_arn" validate:"required"`
FunctionURL string `json:"function_url" validate:"required"`
}

Expand Down Expand Up @@ -44,78 +44,72 @@ func Ping() error {
}

var (
ErrFunctionNotFound = errors.New("no function found")
ErrNoLambdaFunction = errors.New("no lambda function found")
)

// Fetch the function from serverless connector, return error if failed
func FetchFunction(manifest plugin_entities.PluginDeclaration, checksum string) (*ServerlessFunction, error) {
filename := getFunctionFilename(manifest, checksum)
// Fetch the lambda function from serverless connector, return error if failed
func FetchLambda(identity string, checksum string) (*LambdaFunction, error) {
request := map[string]any{
"plugin": map[string]any{
"config": map[string]any{
"identity": identity,
"checksum": checksum,
},
},
}

url, err := url.JoinPath(baseurl.String(), "/v1/runner/instances")
url, err := url.JoinPath(baseurl.String(), "/v1/lambda/fetch")
if err != nil {
return nil, err
}

response, err := http_requests.GetAndParse[RunnerInstances](
response, err := http_requests.PostAndParse[entities.GenericResponse[LambdaFunction]](
client,
url,
http_requests.HttpHeader(map[string]string{
"Authorization": SERVERLESS_CONNECTOR_API_KEY,
}),
http_requests.HttpParams(map[string]string{
"filename": filename,
}),
http_requests.HttpPayloadJson(request),
)

if err != nil {
return nil, err
}

if response.Error != "" {
return nil, fmt.Errorf("unexpected response from plugin controller: %s", response.Error)
}

if len(response.Items) == 0 {
return nil, ErrFunctionNotFound
if response.Code != 0 {
if response.Code == -404 {
return nil, ErrNoLambdaFunction
}
return nil, fmt.Errorf("unexpected response from serverless connector: %s", response.Message)
}

return &ServerlessFunction{
FunctionName: response.Items[0].Name,
FunctionDRN: response.Items[0].ResourceName,
FunctionURL: response.Items[0].Endpoint,
}, nil
return &response.Data, nil
}

type LaunchFunctionEvent string
type LaunchAWSLambdaFunctionEvent string

const (
Error LaunchFunctionEvent = "error"
Info LaunchFunctionEvent = "info"
Function LaunchFunctionEvent = "function"
FunctionUrl LaunchFunctionEvent = "function_url"
Done LaunchFunctionEvent = "done"
Error LaunchAWSLambdaFunctionEvent = "error"
Info LaunchAWSLambdaFunctionEvent = "info"
Lambda LaunchAWSLambdaFunctionEvent = "lambda"
LambdaUrl LaunchAWSLambdaFunctionEvent = "lambda_url"
Done LaunchAWSLambdaFunctionEvent = "done"
)

type LaunchFunctionResponse struct {
Event LaunchFunctionEvent `json:"event"`
Message string `json:"message"`
type LaunchAWSLambdaFunctionResponse struct {
Event LaunchAWSLambdaFunctionEvent `json:"event"`
Message string `json:"message"`
}

// Setup the function from serverless connector, it will receive the context as the input
// Launch the lambda function from serverless connector, it will receive the context_tar as the input
// and build it a docker image, then run it on serverless platform like AWS Lambda
// it returns a event stream, the caller should consider it as a async operation
func SetupFunction(
manifest plugin_entities.PluginDeclaration,
checksum string,
context io.Reader,
) (*stream.Stream[LaunchFunctionResponse], error) {
url, err := url.JoinPath(baseurl.String(), "/v1/launch")
func LaunchLambda(identity string, checksum string, context_tar io.Reader) (*stream.Stream[LaunchAWSLambdaFunctionResponse], error) {
url, err := url.JoinPath(baseurl.String(), "/v1/lambda/launch")
if err != nil {
return nil, err
}

// join a filename
serverless_connector_response, err := http_requests.PostAndParseStream[LaunchFunctionResponseChunk](
response, err := http_requests.PostAndParseStream[LaunchAWSLambdaFunctionResponse](
client,
url,
http_requests.HttpHeader(map[string]string{
Expand All @@ -124,79 +118,19 @@ func SetupFunction(
http_requests.HttpReadTimeout(240000),
http_requests.HttpWriteTimeout(240000),
http_requests.HttpPayloadMultipart(
map[string]string{},
map[string]http_requests.HttpPayloadMultipartFile{
"context": {
Filename: getFunctionFilename(manifest, checksum),
Reader: context,
},
map[string]string{
"identity": identity,
"checksum": checksum,
},
map[string]io.Reader{
"context": context_tar,
},
),
)

if err != nil {
return nil, err
}

response := stream.NewStream[LaunchFunctionResponse](10)

routine.Submit(map[string]string{
"module": "serverless_connector",
"func": "SetupFunction",
}, func() {
defer response.Close()
if err := serverless_connector_response.Async(func(chunk LaunchFunctionResponseChunk) {
if chunk.State == LAUNCH_STATE_FAILED {
response.Write(LaunchFunctionResponse{
Event: Error,
Message: chunk.Message,
})
return
}

switch chunk.Stage {
case LAUNCH_STAGE_START, LAUNCH_STAGE_BUILD:
response.Write(LaunchFunctionResponse{
Event: Info,
Message: "Building plugin...",
})
case LAUNCH_STAGE_RUN:
if chunk.State == LAUNCH_STATE_SUCCESS {
data, err := parser.ParserCommaSeparatedValues[LaunchFunctionFinalStageMessage]([]byte(chunk.Message))
if err != nil {
response.Write(LaunchFunctionResponse{
Event: Error,
Message: err.Error(),
})
return
}

response.Write(LaunchFunctionResponse{
Event: Function,
Message: data.Name,
})
response.Write(LaunchFunctionResponse{
Event: FunctionUrl,
Message: data.Endpoint,
})
} else {
response.Write(LaunchFunctionResponse{
Event: Info,
Message: "Launching plugin...",
})
}
case LAUNCH_STAGE_END:
response.Write(LaunchFunctionResponse{
Event: Done,
Message: "Plugin launched",
})
}
}); err != nil {
response.Write(LaunchFunctionResponse{
Event: Error,
Message: err.Error(),
})
}
})

return response, nil
}
Loading

0 comments on commit d08d78e

Please sign in to comment.