Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Policy eval generification #32

Merged
merged 14 commits into from
Jan 3, 2024
29 changes: 29 additions & 0 deletions policy/data/chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package data

import (
"context"
"fmt"
)

// ChainDataSource is a wrapper DataSource that takes a list of other DataSources
// and returns query results from the first applicable downstream source
type ChainDataSource struct {
links []DataSource
}

func NewChainDataSource(links ...DataSource) *ChainDataSource {
return &ChainDataSource{
links: links,
}
}

func (ds ChainDataSource) Query(ctx context.Context, queryName string, query string, variables map[string]interface{}, output interface{}) (*QueryResponse, error) {
for _, l := range ds.links {
res, err := l.Query(ctx, queryName, query, variables, output)
if res != nil || err != nil {
return res, err
}
}

return nil, fmt.Errorf("no DataSource was available to process query %s", queryName)
}
34 changes: 34 additions & 0 deletions policy/data/fixed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package data

import (
"context"
)

type FixedDataSourceUnmarshaler func(data []byte, output interface{}) error

// FixedDataSource returns static data from responses passed in at construction time
type FixedDataSource struct {
unmarshaler FixedDataSourceUnmarshaler
data map[string][]byte
}

func NewFixedDataSource(unmarshaler FixedDataSourceUnmarshaler, data map[string][]byte) FixedDataSource {
return FixedDataSource{
unmarshaler: unmarshaler,
data: data,
}
}

func (ds FixedDataSource) Query(ctx context.Context, queryName string, query string, variables map[string]interface{}, output interface{}) (*QueryResponse, error) {
res, ok := ds.data[queryName]
if !ok {
return nil, nil
}

err := ds.unmarshaler(res, output)
if err != nil {
return nil, err
}

return &QueryResponse{}, nil
}
136 changes: 136 additions & 0 deletions policy/data/gql_async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package data

import (
"bytes"
"context"
b64 "encoding/base64"
"fmt"
"io"
"net/http"
"strings"

"github.com/atomist-skills/go-skill"
"olympos.io/encoding/edn"
)

const AsyncQueryName = "async-query"

type (
AsyncQueryBody struct {
Query string `edn:"query"`
Variables map[edn.Keyword]interface{} `edn:"variables"`
}

AsyncQueryRequest struct {
Name string `edn:"name"`
Body AsyncQueryBody `edn:"body"`
Metadata string `edn:"metadata"`
}

AsyncQueryResponse struct {
Data edn.RawMessage `edn:"data"`
Errors []struct {
Message string `edn:"message"`
}
}

AsyncResultMetadata struct {
SubscriptionResults [][]edn.RawMessage `edn:"subscription"`
AsyncQueryResults map[string]AsyncQueryResponse `edn:"results"`
InFlightQueryName string `edn:"query-name"`
}

AsyncDataSource struct {
multipleQuerySupport bool
log skill.Logger
url string
token string
subscriptionResults [][]edn.RawMessage
asyncResults map[string]AsyncQueryResponse
}
)

func NewAsyncDataSource(
multipleQuerySupport bool,
req skill.RequestContext,
subscriptionResults [][]edn.RawMessage,
asyncResults map[string]AsyncQueryResponse,
) AsyncDataSource {
return AsyncDataSource{
multipleQuerySupport: multipleQuerySupport,
log: req.Log,
url: fmt.Sprintf("%s:enqueue", req.Event.Urls.Graphql),
token: req.Event.Token,
subscriptionResults: subscriptionResults,
asyncResults: asyncResults,
}
}

func (ds AsyncDataSource) Query(ctx context.Context, queryName string, query string, variables map[string]interface{}, output interface{}) (*QueryResponse, error) {
if existingResult, ok := ds.asyncResults[queryName]; ok {
if len(existingResult.Errors) != 0 {
return nil, fmt.Errorf("async query returned error: %s", existingResult.Errors[0].Message)
}
return &QueryResponse{}, edn.Unmarshal(existingResult.Data, output)
}

if len(ds.asyncResults) > 0 && !ds.multipleQuerySupport {
ds.log.Debugf("skipping async query for query %s due to lack of multipleQuerySupport", queryName)
return nil, nil // don't error, in case there is another applicable query executor down-chain
}

ednVariables := map[edn.Keyword]interface{}{}
for k, v := range variables {
ednVariables[edn.Keyword(k)] = v
}

metadata := AsyncResultMetadata{
SubscriptionResults: ds.subscriptionResults,
AsyncQueryResults: ds.asyncResults,
InFlightQueryName: queryName,
}
metadataEdn, err := edn.Marshal(metadata)
if err != nil {
return nil, fmt.Errorf("failed to marshal metadata: %w", err)
}

request := AsyncQueryRequest{
Name: AsyncQueryName,
Body: AsyncQueryBody{
Query: query,
Variables: ednVariables,
},
Metadata: b64.StdEncoding.EncodeToString(metadataEdn),
}

reqEdn, err := edn.Marshal(request)
if err != nil {
return nil, err
}

ds.log.Infof("Async request: %s", string(reqEdn))

req, err := http.NewRequest(http.MethodPost, ds.url, bytes.NewBuffer(reqEdn))
if err != nil {
return nil, err
}

req.Header.Add("Content-Type", "application/edn")

authToken := ds.token
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", authToken))

r, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
if r.StatusCode >= 400 {
buf := new(strings.Builder)
_, _ = io.Copy(buf, r.Body)
body := buf.String()

return nil, fmt.Errorf("async request returned unexpected status %s: %s", r.Status, body)
}

return &QueryResponse{AsyncRequestMade: true}, nil
}
56 changes: 56 additions & 0 deletions policy/data/gql_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package data

import (
"context"
"encoding/json"
"net/http"

"golang.org/x/oauth2"

"github.com/hasura/go-graphql-client"

"github.com/atomist-skills/go-skill"
)

type SyncGraphqlDataSource struct {
Url string
GraphqlClient *graphql.Client
RequestContext skill.RequestContext
}

func NewSyncGraphqlDataSource(ctx context.Context, req skill.RequestContext) (SyncGraphqlDataSource, error) {
httpClient := oauth2.NewClient(ctx, oauth2.StaticTokenSource(
&oauth2.Token{AccessToken: req.Event.Token, TokenType: "Bearer"},
))

return SyncGraphqlDataSource{
Url: req.Event.Urls.Graphql,
GraphqlClient: graphql.NewClient(req.Event.Urls.Graphql, httpClient).
WithRequestModifier(func(r *http.Request) {
r.Header.Add("Accept", "application/json")
}),
RequestContext: req,
}, nil
}

func (ds SyncGraphqlDataSource) Query(ctx context.Context, queryName string, query string, variables map[string]interface{}, output interface{}) (*QueryResponse, error) {
log := ds.RequestContext.Log

log.Infof("Graphql endpoint: %s", ds.RequestContext.Event.Urls.Graphql)
log.Infof("Executing query %s: %s", queryName, query)
log.Infof("Query variables: %v", variables)

res, err := ds.GraphqlClient.ExecRaw(ctx, query, variables)
if err != nil {
return nil, err
}

log.Infof("GraphQL query response: %s", string(res))

err = json.Unmarshal(res, output)
if err != nil {
return nil, err
}

return &QueryResponse{}, nil
}
22 changes: 22 additions & 0 deletions policy/data/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package data

import (
"context"

"github.com/atomist-skills/go-skill"
)

type QueryResponse struct {
AsyncRequestMade bool
}

type DataSource interface {
Query(ctx context.Context, queryName string, query string, variables map[string]interface{}, output interface{}) (*QueryResponse, error)
}

func GqlContext(req skill.RequestContext) map[string]interface{} {
return map[string]interface{}{
"teamId": req.Event.WorkspaceId,
"organization": req.Event.Organization,
}
}
Loading