Skip to content

Commit

Permalink
Assertions support for Activities and Flow Output (#179)
Browse files Browse the repository at this point in the history
* Added changes for running the unit-tests

Co-authored-by: Ashutosh Bhide <[email protected]>
  • Loading branch information
abhide-tibco and Ashutosh Bhide authored Nov 7, 2022
1 parent c35d6b5 commit 5bb5a6d
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 6 deletions.
46 changes: 46 additions & 0 deletions action.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,12 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha
if inst.TracingContext() != nil {
_ = trace.GetTracer().FinishTrace(inst.TracingContext(), nil)
}
for k, v := range returnData {
inst.SetValue(k, v)
}

fa.applyAssertionInterceptor(inst)

handler.HandleResult(returnData, err)
} else if inst.Status() == model.FlowStatusFailed {
if inst.TracingContext() != nil {
Expand All @@ -399,3 +405,43 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha

return nil
}

func (fa *FlowAction) applyAssertionInterceptor(inst *instance.IndependentInstance) {

if inst.GetInterceptor() != nil {
interceptor := inst.GetInterceptor().GetTaskInterceptor(inst.Instance.Name() + "-_flowOutput")
if interceptor != nil {
ef := expression.NewFactory(definition.GetDataResolver())
for id, assertion := range interceptor.Assertions {
if assertion.Expression == "" {
interceptor.Assertions[id].Message = "Empty expression"
interceptor.Assertions[id].Result = flowsupport.NotExecuted
continue
}

expr, _ := ef.NewExpr(fmt.Sprintf("%v", assertion.Expression))
if expr == nil {
interceptor.Assertions[id].Result = flowsupport.Fail
interceptor.Assertions[id].Message = "Failed to validate expression"
continue
}
result, err := expr.Eval(inst.Instance)
if err != nil {
interceptor.Assertions[id].Result = flowsupport.Fail
interceptor.Assertions[id].Message = "Failed to evaluate expression"
} else {
res, _ := coerce.ToBool(result)
if res {
interceptor.Assertions[id].Result = flowsupport.Pass
interceptor.Assertions[id].Message = "Comparison success"
} else {
interceptor.Assertions[id].Result = flowsupport.Fail
interceptor.Assertions[id].Message = "Comparison failure"
}
}

}
}
}

}
4 changes: 4 additions & 0 deletions instance/ind_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ func (inst *IndependentInstance) ApplyInterceptor(interceptor *flowsupport.Inter
}
}

func (inst *IndependentInstance) GetInterceptor() *flowsupport.Interceptor {
return inst.interceptor
}

// GetChanges returns the Change Tracker object
func (inst *IndependentInstance) GetChanges() ChangeTracker {
return inst.changeTracker
Expand Down
9 changes: 9 additions & 0 deletions instance/taskinst.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,11 @@ func (ti *TaskInst) EvalActivity() (done bool, evalErr error) {
return done, err
}
}

err = applyAssertionInterceptor(ti)
if err != nil {
return false, err
}
}
return done, nil
}
Expand Down Expand Up @@ -491,6 +496,10 @@ func (ti *TaskInst) PostEvalActivity() (done bool, evalErr error) {
ti.logger.Debug("Mapper not applied")
}
}
err = applyAssertionInterceptor(ti)
if err != nil {
return false, err
}
}

return done, nil
Expand Down
74 changes: 72 additions & 2 deletions instance/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package instance

import (
"errors"

"fmt"
"github.com/project-flogo/core/activity"
"github.com/project-flogo/core/data"
"github.com/project-flogo/core/data/coerce"
"github.com/project-flogo/core/data/expression"
"github.com/project-flogo/core/data/metadata"
"github.com/project-flogo/flow/definition"
"github.com/project-flogo/flow/support"
"strconv"
)

func applySettingsMapper(taskInst *TaskInst) error {
Expand Down Expand Up @@ -76,7 +79,6 @@ func applyInputInterceptor(taskInst *TaskInst) bool {

// check if this task as an interceptor
taskInterceptor := master.interceptor.GetTaskInterceptor(taskInst.task.ID())

if taskInterceptor != nil {

taskInst.logger.Debug("Applying Interceptor - Input")
Expand Down Expand Up @@ -113,6 +115,74 @@ func applyInputInterceptor(taskInst *TaskInst) bool {
return true
}

func applyAssertionInterceptor(taskInst *TaskInst) error {

master := taskInst.flowInst.master
if master.interceptor != nil {
taskInst.logger.Debug("Applying Interceptor - Assertion")
// check if this task has assertion interceptor
id := taskInst.flowInst.Name() + "-" + taskInst.task.ID()
taskInterceptor := master.interceptor.GetTaskInterceptor(id)
if taskInterceptor != nil && len(taskInterceptor.Assertions) > 0 {
ef := expression.NewFactory(definition.GetDataResolver())

for name, assertion := range taskInterceptor.Assertions {
if taskInst.logger.DebugEnabled() {
taskInst.logger.Debugf("Executing Assertion Attr: %s = %s", name, assertion)
}
result := false
var message string

if assertion.Expression == "" {
taskInterceptor.Assertions[name].Message = "Empty expression"
taskInterceptor.Assertions[name].Result = support.NotExecuted
continue
}

if assertion.Type == support.Primitive {
result, message = applyPrimitiveAssertion(taskInst, ef, assertion)
} else {
taskInst.Logger().Errorf("Invalid Assertion Mode")
return errors.New("Invalid Assertion Mode")
}

taskInterceptor.Assertions[name].Message = message
//Set the result back in the Interceptor.
if result {
taskInterceptor.Assertions[name].Result = support.Pass
} else {
taskInterceptor.Assertions[name].Result = support.Fail
}
taskInst.logger.Debugf("Assertion Execution Result => Name: %s, Assertion Expression: %v, Result: %s, Message: %s ",
assertion.Name, assertion.Expression, strconv.FormatBool(result), message)
}
}
}

return nil
}

func applyPrimitiveAssertion(taskInst *TaskInst, ef expression.Factory, assertion support.Assertion) (bool, string) {

expr, _ := ef.NewExpr(fmt.Sprintf("%v", assertion.Expression))
if expr == nil {
return false, "Failed to validate expression"
}

result, err := expr.Eval(taskInst.flowInst)
if err != nil {
taskInst.logger.Error(err)
return false, "Failed to evaluate expression"
}
res, _ := coerce.ToBool(result)

if res {
return res, "Comparison success"
} else {
return res, "Comparison failure"
}
}

func applyOutputInterceptor(taskInst *TaskInst) error {

master := taskInst.flowInst.master
Expand Down
29 changes: 25 additions & 4 deletions support/interceptor.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
package support

const (
Primitive = 1
Activity = 2
)

const (
NotExecuted = 0
Pass = 1
Fail = 2
)

// Interceptor contains a set of task interceptor, this can be used to override
// runtime data of an instance of the corresponding Flow. This can be used to
// modify runtime execution of a flow or in test/debug for implementing mocks
Expand Down Expand Up @@ -33,8 +44,18 @@ func (pi *Interceptor) GetTaskInterceptor(taskID string) *TaskInterceptor {
// Also, a 'Skip' flag can be enabled to inform the runtime that the task should not
// execute.
type TaskInterceptor struct {
ID string `json:"id"`
Skip bool `json:"skip,omitempty"`
Inputs map[string]interface{} `json:"inputs,omitempty"`
Outputs map[string]interface{} `json:"outputs,omitempty"`
ID string `json:"id"`
Skip bool `json:"skip,omitempty"`
Inputs map[string]interface{} `json:"inputs,omitempty"`
Outputs map[string]interface{} `json:"outputs,omitempty"`
Assertions []Assertion `json:"assertions,omitempty"`
}

type Assertion struct {
ID string
Name string
Type int
Expression interface{}
Result int
Message string
}

0 comments on commit 5bb5a6d

Please sign in to comment.