Skip to content

Commit

Permalink
preserve query results in metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Norton committed Jan 1, 2024
1 parent 22e5672 commit 8eb32c2
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 62 deletions.
82 changes: 43 additions & 39 deletions policy/data/gql_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package data
import (
"bytes"
"context"
b64 "encoding/base64"
"fmt"
"io"
"net/http"
Expand All @@ -12,6 +13,8 @@ import (
"olympos.io/encoding/edn"
)

const AsyncQueryName = "async-query"

type (
AsyncQueryBody struct {
Query string `edn:"query"`
Expand All @@ -25,52 +28,77 @@ type (
}

AsyncQueryResponse struct {
Data map[edn.Keyword]edn.RawMessage `edn:"data"`
Data edn.RawMessage `edn:"data"`
Errors []struct {
Message string `edn:"message"`
}
}

AsyncResultMetadata struct {
SubscriptionResults [][]edn.RawMessage `edn:"subscription"`
AsyncQueryResults map[string]AsyncQueryResponse `edn:"results"`
InFlightQueryName string `edn:"query-name"`
}

AsyncDataSource struct {
log skill.Logger
url string
token string
metadata string
log skill.Logger
url string
token string
subscriptionResults [][]edn.RawMessage
asyncResults map[string]AsyncQueryResponse
}
)

func NewAsyncDataSource(req skill.RequestContext, metadata string) AsyncDataSource {
func NewAsyncDataSource(req skill.RequestContext, subscriptionResults [][]edn.RawMessage, asyncResults map[string]AsyncQueryResponse) AsyncDataSource {
return AsyncDataSource{
log: req.Log,
url: fmt.Sprintf("%s:enqueue", req.Event.Urls.Graphql),
token: req.Event.Token,
metadata: metadata,
log: req.Log,
url: fmt.Sprintf("%s:enqueue", req.Event.Urls.Graphql),
token: req.Event.Token,
subscriptionResults: subscriptionResults,
asyncResults: asyncResults,
}
}

func (ds AsyncDataSource) Query(ctx context.Context, queryName string, query string, variables map[string]interface{}, output interface{}) (*QueryResponse, error) {
if existingResult, ok := ds.asyncResults[queryName]; ok {
if len(existingResult.Errors) != 0 {
return nil, fmt.Errorf("async query returned error: %s", existingResult.Errors[0].Message)
}
return &QueryResponse{}, edn.Unmarshal(existingResult.Data, output)
}

ednVariables := map[edn.Keyword]interface{}{}
for k, v := range variables {
ednVariables[edn.Keyword(k)] = v
}

metadata := AsyncResultMetadata{
SubscriptionResults: ds.subscriptionResults,
AsyncQueryResults: ds.asyncResults,
InFlightQueryName: queryName,
}
metadataEdn, err := edn.Marshal(metadata)
if err != nil {
return nil, fmt.Errorf("failed to marshal metadata: %w", err)
}

request := AsyncQueryRequest{
Name: queryName,
Name: AsyncQueryName,
Body: AsyncQueryBody{
Query: query,
Variables: ednVariables,
},
Metadata: ds.metadata,
Metadata: b64.StdEncoding.EncodeToString(metadataEdn),
}

edn, err := edn.Marshal(request)
reqEdn, err := edn.Marshal(request)
if err != nil {
return nil, err
}

ds.log.Infof("Async request: %s", string(edn))
ds.log.Infof("Async request: %s", string(reqEdn))

req, err := http.NewRequest(http.MethodPost, ds.url, bytes.NewBuffer(edn))
req, err := http.NewRequest(http.MethodPost, ds.url, bytes.NewBuffer(reqEdn))
if err != nil {
return nil, err
}
Expand All @@ -94,27 +122,3 @@ func (ds AsyncDataSource) Query(ctx context.Context, queryName string, query str

return &QueryResponse{AsyncRequestMade: true}, nil
}

func UnwrapAsyncResponse(result map[edn.Keyword]edn.RawMessage) (DataSource, error) {
ednBody, err := edn.Marshal(result)
if err != nil {
return nil, err
}

var response AsyncQueryResponse
err = edn.Unmarshal(ednBody, &response)
if err != nil {
return nil, err
}

if len(response.Errors) > 0 {
return nil, fmt.Errorf(response.Errors[0].Message)
}

queryResponses := map[string][]byte{}
for k, v := range response.Data {
queryResponses[string(k)] = v
}

return NewFixedDataSource(edn.Unmarshal, queryResponses), nil
}
41 changes: 22 additions & 19 deletions policy/policy_handler/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,42 @@ import (
"context"
b64 "encoding/base64"
"fmt"

"github.com/atomist-skills/go-skill"
"github.com/atomist-skills/go-skill/policy/data"
"olympos.io/encoding/edn"
)

const eventNameAsyncQuery = "async_query"
const EventNameAsyncQuery = data.AsyncQueryName // these must match for the event handler to be registered

// WithAsync will enable async graphql queries for the EventHandler.
// When used, data.QueryResponse#AsyncRequestMade will be true when performed asynchronously.
// If that flag is set, the policy evaluator should terminate early with no results.
// It will be automatically retried once the async query results are returned.
func WithAsync() Opt {
return func(h *EventHandler) {
h.subscriptionNames = append(h.subscriptionNames, eventNameAsyncQuery)
h.subscriptionNames = append(h.subscriptionNames, EventNameAsyncQuery)
h.subscriptionDataProviders = append(h.subscriptionDataProviders, getAsyncSubscriptionData)
h.dataSourceProviders = append(h.dataSourceProviders, buildAsyncDataSources)
}
}

func getAsyncSubscriptionData(ctx context.Context, req skill.RequestContext) ([][]edn.RawMessage, skill.Configuration, error) {
if req.Event.Context.AsyncQueryResult.Name != eventNameAsyncQuery {
if req.Event.Context.AsyncQueryResult.Name != EventNameAsyncQuery {
return nil, skill.Configuration{}, nil
}

metadata := req.Event.Context.AsyncQueryResult.Metadata
encoded, err := b64.StdEncoding.DecodeString(metadata)
metaEdn, err := b64.StdEncoding.DecodeString(req.Event.Context.AsyncQueryResult.Metadata)
if err != nil {
return nil, skill.Configuration{}, fmt.Errorf("failed to decode async metadata: %w", err)
}

var subscriptionResult [][]edn.RawMessage
err = edn.Unmarshal(encoded, &subscriptionResult)
var metadata data.AsyncResultMetadata
err = edn.Unmarshal(metaEdn, &metadata)
if err != nil {
return nil, skill.Configuration{}, fmt.Errorf("failed to unmarshal async metadata: %w", err)
}

return subscriptionResult, req.Event.Context.AsyncQueryResult.Configuration, nil
return metadata.SubscriptionResults, req.Event.Context.AsyncQueryResult.Configuration, nil
}

// buildAsyncDataSources always returns at least a data.AsyncDataSource,
Expand All @@ -52,23 +50,28 @@ func buildAsyncDataSources(ctx context.Context, req skill.RequestContext) ([]dat
return []data.DataSource{}, nil
}

if req.Event.Context.AsyncQueryResult.Name == eventNameAsyncQuery {
responseSource, err := data.UnwrapAsyncResponse(req.Event.Context.AsyncQueryResult.Result)
if err != nil {
return nil, err
}
if req.Event.Context.AsyncQueryResult.Name != EventNameAsyncQuery {
return []data.DataSource{
responseSource,
data.NewAsyncDataSource(req, req.Event.Context.AsyncQueryResult.Metadata),
data.NewAsyncDataSource(req, req.Event.Context.Subscription.Result, map[string]data.AsyncQueryResponse{}),
}, nil
}

ednBody, err := edn.Marshal(req.Event.Context.Subscription.Result)
metaEdn, err := b64.StdEncoding.DecodeString(req.Event.Context.AsyncQueryResult.Metadata)

var metadata data.AsyncResultMetadata
err = edn.Unmarshal(metaEdn, &metadata)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal metadata: %w", err)
}

var queryResponse data.AsyncQueryResponse
err = edn.Unmarshal(req.Event.Context.AsyncQueryResult.Result, &queryResponse)
if err != nil {
return nil, fmt.Errorf("failed to marshal metadata [%w]", err)
return nil, fmt.Errorf("failed to unmarshal async query result: %w", err)
}
metadata.AsyncQueryResults[metadata.InFlightQueryName] = queryResponse

return []data.DataSource{
data.NewAsyncDataSource(req, b64.StdEncoding.EncodeToString(ednBody)),
data.NewAsyncDataSource(req, metadata.SubscriptionResults, metadata.AsyncQueryResults),
}, nil
}
8 changes: 4 additions & 4 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ type EventIncoming struct {
Metadata map[edn.Keyword]edn.RawMessage `edn:"metadata"`
} `edn:"sync-request"`
AsyncQueryResult struct {
Name string `edn:"name"`
Configuration Configuration `edn:"configuration"`
Metadata string `edn:"metadata"`
Result map[edn.Keyword]edn.RawMessage `edn:"result"`
Name string `edn:"name"`
Configuration Configuration `edn:"configuration"`
Metadata string `edn:"metadata"`
Result edn.RawMessage `edn:"result"`
} `edn:"query-result"`
Event struct {
Name string `edn:"name"`
Expand Down

0 comments on commit 8eb32c2

Please sign in to comment.