Skip to content

Commit

Permalink
Update nexus link converter (#6460)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Update nexus link converter

## Why?
<!-- Tell your future self why have you made these changes -->
Sync with code in SDK.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
rodrigozhou authored and dnr committed Oct 4, 2024
1 parent a505b1a commit e40642e
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 65 deletions.
13 changes: 4 additions & 9 deletions components/nexusoperations/executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,6 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ
return fmt.Errorf("%w: %w", queues.NewUnprocessableTaskError("failed to generate a callback token"), err)
}

nexusLink, err := ConvertLinkWorkflowEventToNexusLink(args.workflowEventLink)
if err != nil {
return err
}

callCtx, cancel := context.WithTimeout(
ctx,
e.Config.RequestTimeout(ns.Name().String(), task.EndpointName),
Expand All @@ -204,7 +199,7 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ
CallbackHeader: nexus.Header{
commonnexus.CallbackTokenHeader: token,
},
Links: []nexus.Link{nexusLink},
Links: []nexus.Link{args.nexusLink},
})

methodTag := metrics.NexusMethodTag("StartOperation")
Expand Down Expand Up @@ -260,7 +255,7 @@ type startArgs struct {
endpointID string
header map[string]string
payload *commonpb.Payload
workflowEventLink *commonpb.Link_WorkflowEvent
nexusLink nexus.Link
namespaceFailoverVersion int64
}

Expand Down Expand Up @@ -292,7 +287,7 @@ func (e taskExecutor) loadOperationArgs(
}
args.payload = event.GetNexusOperationScheduledEventAttributes().GetInput()
args.header = event.GetNexusOperationScheduledEventAttributes().GetNexusHeader()
args.workflowEventLink = &commonpb.Link_WorkflowEvent{
args.nexusLink = ConvertLinkWorkflowEventToNexusLink(&commonpb.Link_WorkflowEvent{
Namespace: ns.Name().String(),
WorkflowId: ref.WorkflowKey.WorkflowID,
RunId: ref.WorkflowKey.RunID,
Expand All @@ -302,7 +297,7 @@ func (e taskExecutor) loadOperationArgs(
EventType: event.GetEventType(),
},
},
}
})
args.namespaceFailoverVersion = event.Version
return nil
})
Expand Down
3 changes: 1 addition & 2 deletions components/nexusoperations/executors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ func TestProcessInvocationTask(t *testing.T) {
},
},
}
handlerNexusLink, err := nexusoperations.ConvertLinkWorkflowEventToNexusLink(handlerLink)
require.NoError(t, err)
handlerNexusLink := nexusoperations.ConvertLinkWorkflowEventToNexusLink(handlerLink)

cases := []struct {
name string
Expand Down
116 changes: 82 additions & 34 deletions components/nexusoperations/link_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,59 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

// This file is duplicated in temporalio/temporal/components/nexusoperations/link_converter.go
// Any changes here or there must be replicated. This is temporary until the
// temporal repo updates to the most recent SDK version.

package nexusoperations

import (
"fmt"
"net/url"
"regexp"
"strconv"
"strings"

"github.com/nexus-rpc/sdk-go/nexus"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
)

const (
urlSchemeTemporalKey = "temporal"
urlPathNamespaceKey = "namespace"
urlPathWorkflowIDKey = "workflowID"
urlPathRunIDKey = "runID"
urlPathTemplate = "/namespaces/%s/workflows/%s/%s/history"
urlTemplate = "temporal://" + urlPathTemplate

linkWorkflowEventReferenceTypeKey = "referenceType"
linkEventReferenceEventIDKey = "eventID"
linkEventReferenceEventTypeKey = "eventType"
)

func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) (nexus.Link, error) {
u, err := url.Parse(fmt.Sprintf(
"temporal:///namespaces/%s/workflows/%s/%s/history",
url.PathEscape(we.GetNamespace()),
url.PathEscape(we.GetWorkflowId()),
url.PathEscape(we.GetRunId()),
var (
rePatternNamespace = fmt.Sprintf(`(?P<%s>[^/]+)`, urlPathNamespaceKey)
rePatternWorkflowID = fmt.Sprintf(`(?P<%s>[^/]+)`, urlPathWorkflowIDKey)
rePatternRunID = fmt.Sprintf(`(?P<%s>[^/]+)`, urlPathRunIDKey)
urlPathRE = regexp.MustCompile(fmt.Sprintf(
`^/namespaces/%s/workflows/%s/%s/history$`,
rePatternNamespace,
rePatternWorkflowID,
rePatternRunID,
))
if err != nil {
return nexus.Link{}, err
)

// ConvertLinkWorkflowEventToNexusLink converts a Link_WorkflowEvent type to Nexus Link.
func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) nexus.Link {
u := &url.URL{
Scheme: urlSchemeTemporalKey,
Path: fmt.Sprintf(urlPathTemplate, we.GetNamespace(), we.GetWorkflowId(), we.GetRunId()),
RawPath: fmt.Sprintf(
urlPathTemplate,
url.PathEscape(we.GetNamespace()),
url.PathEscape(we.GetWorkflowId()),
url.PathEscape(we.GetRunId()),
),
}

switch ref := we.GetReference().(type) {
Expand All @@ -57,9 +82,10 @@ func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) (nexus
return nexus.Link{
URL: u,
Type: string(we.ProtoReflect().Descriptor().FullName()),
}, nil
}
}

// ConvertNexusLinkToLinkWorkflowEvent converts a Nexus Link to Link_WorkflowEvent.
func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_WorkflowEvent, error) {
we := &commonpb.Link_WorkflowEvent{}
if link.Type != string(we.ProtoReflect().Descriptor().FullName()) {
Expand All @@ -70,54 +96,76 @@ func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_Workfl
)
}

if link.URL.Scheme != "temporal" {
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent")
if link.URL.Scheme != urlSchemeTemporalKey {
return nil, fmt.Errorf(
"failed to parse link to Link_WorkflowEvent: invalid scheme: %s",
link.URL.Scheme,
)
}

matches := urlPathRE.FindStringSubmatch(link.URL.EscapedPath())
if len(matches) != 4 {
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: malformed URL path")
}

pathParts := strings.Split(link.URL.Path, "/")
if len(pathParts) != 7 {
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent")
var err error
we.Namespace, err = url.PathUnescape(matches[urlPathRE.SubexpIndex(urlPathNamespaceKey)])
if err != nil {
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: %w", err)
}
if pathParts[0] != "" || pathParts[1] != "namespaces" || pathParts[3] != "workflows" || pathParts[6] != "history" {
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent")

we.WorkflowId, err = url.PathUnescape(matches[urlPathRE.SubexpIndex(urlPathWorkflowIDKey)])
if err != nil {
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: %w", err)
}
we.Namespace = pathParts[2]
we.WorkflowId = pathParts[4]
we.RunId = pathParts[5]
switch link.URL.Query().Get(linkWorkflowEventReferenceTypeKey) {

we.RunId, err = url.PathUnescape(matches[urlPathRE.SubexpIndex(urlPathRunIDKey)])
if err != nil {
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: %w", err)
}

switch refType := link.URL.Query().Get(linkWorkflowEventReferenceTypeKey); refType {
case string((&commonpb.Link_WorkflowEvent_EventReference{}).ProtoReflect().Descriptor().Name()):
eventRef, err := convertURLQueryToLinkWorkflowEventEventReference(link.URL.Query())
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to parse link to Link_WorkflowEvent: %w", err)
}
we.Reference = &commonpb.Link_WorkflowEvent_EventRef{
EventRef: eventRef,
}
default:
return nil, fmt.Errorf(
"failed to parse link to Link_WorkflowEvent: unknown reference type: %q",
refType,
)
}

return we, nil
}

func convertLinkWorkflowEventEventReferenceToURLQuery(eventRef *commonpb.Link_WorkflowEvent_EventReference) string {
values := url.Values{
linkWorkflowEventReferenceTypeKey: []string{string(eventRef.ProtoReflect().Descriptor().Name())},
linkEventReferenceEventIDKey: []string{strconv.FormatInt(eventRef.GetEventId(), 10)},
linkEventReferenceEventTypeKey: []string{eventRef.GetEventType().String()},
values := url.Values{}
values.Set(linkWorkflowEventReferenceTypeKey, string(eventRef.ProtoReflect().Descriptor().Name()))
if eventRef.GetEventId() > 0 {
values.Set(linkEventReferenceEventIDKey, strconv.FormatInt(eventRef.GetEventId(), 10))
}
values.Set(linkEventReferenceEventTypeKey, eventRef.GetEventType().String())
return values.Encode()
}

func convertURLQueryToLinkWorkflowEventEventReference(queryValues url.Values) (*commonpb.Link_WorkflowEvent_EventReference, error) {
eventID, err := strconv.ParseInt(queryValues.Get(linkEventReferenceEventIDKey), 10, 64)
if err != nil {
return nil, err
var err error
eventRef := &commonpb.Link_WorkflowEvent_EventReference{}
eventIDValue := queryValues.Get(linkEventReferenceEventIDKey)
if eventIDValue != "" {
eventRef.EventId, err = strconv.ParseInt(queryValues.Get(linkEventReferenceEventIDKey), 10, 64)
if err != nil {
return nil, err
}
}
eventType, err := enumspb.EventTypeFromString(queryValues.Get(linkEventReferenceEventTypeKey))
eventRef.EventType, err = enumspb.EventTypeFromString(queryValues.Get(linkEventReferenceEventTypeKey))
if err != nil {
return nil, err
}
return &commonpb.Link_WorkflowEvent_EventReference{
EventId: eventID,
EventType: eventType,
}, nil
return eventRef, nil
}
Loading

0 comments on commit e40642e

Please sign in to comment.