Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] External code references #65

Merged
merged 4 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions test/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,5 @@
outputs:
- output/sifter.edge.edge.json.gz
- output/sifter.vertex.vertex.json.gz
- playbook: examples/code-ref/Pipeline.yaml
- playbook: examples/code-ref/flatMappipeline.yaml
26 changes: 26 additions & 0 deletions test/examples/code-ref/Pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@

class: Playbook
name: codeTest

inputs:
startData:
embedded:
- {"value": 0, "name": "alice"}
- {"value": 1, "name": "bob"}
- {"value": 2, "name": "charlie"}


pipelines:
codeTest:
- from: startData
- map:
method: update
gpython:
$ref: map.py
- map:
method: update
gpython: |
def update(x):
x["value"] = x["value"] + 1
return x
- debug: {}
2 changes: 2 additions & 0 deletions test/examples/code-ref/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def filter_bob(row):
return row['name'] == "bob"
19 changes: 19 additions & 0 deletions test/examples/code-ref/flatMap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
def fix(row):
out = {
"identifier":[{
"system": "https://redivis.com/datasets/ye2v-6skh7wdr7/tables",
"value":str(int(row["person_id"]))
}]
}

if(row["person_source_value"] is not None):
out["identifier"].append({
"value": row["person_source_value"],
"system": "https://redivis.com/datasets/ye2v-6skh7wdr7/tables"
})
else:
out["identifier"].append({"value": "None", "system": "https://redivis.com/datasets/ye2v-6skh7wdr7/tables"})

out["identifier"][1]["value"] = str(out["identifier"][1]["value"]) + "_" + "None"

return out["identifier"]
47 changes: 47 additions & 0 deletions test/examples/code-ref/flatMappipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@

class: Playbook
name: codeTest

inputs:
startData:
embedded:
- {"COMPLEX_ID":"Complex_76a0f49f-272e-4ec4-bcba-723806b35a31___null__6468_","PROTEIN":"Q9Y296"}
- {"COMPLEX_ID":"Complex_da6f165b-e085-4ec6-ba43-1170756b0a57___null__6967_","PROTEIN":"O08957"}
- {"COMPLEX_ID":"Complex_c688ddcc-a541-4098-ab0d-25b87e5bc5cd___null__1097_","PROTEIN":"Q13347"}

otherData:
embedded:
- {"person_id": 3589912774911670272, "person_source_value": 10009628, "name": "alice"}
- {"person_id": -3210373572193940992, "person_source_value": 10011398, "name": "bob"}
- {"person_id": -775517641933593344, "person_source_value": 10004235, "name": "charlie"}

pipelines:
filterpipeline:
- from: startData
- reduce:
field: COMPLEX_ID
method: merge
init: { "proteins": [] }
gpython:
$ref: reduce.py

- debug: {}

otherpipelines:
- from: otherData
- filter:
# The [field,match] values and the gpython file give the same result
#field: name
#match: bob
method: filter_bob
gpython:
$ref: filter.py
#- debug: {}
- flatMap:
method: fix
gpython:
$ref: flatMap.py

- debug: {}


4 changes: 4 additions & 0 deletions test/examples/code-ref/map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

def update(x):
x["value"] = x["value"] + 1
return x
3 changes: 3 additions & 0 deletions test/examples/code-ref/reduce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
def merge(x,y):
x["proteins"] = [x["PROTEIN"]] + y["proteins"]
return x
6 changes: 5 additions & 1 deletion test/examples/gdc/gdc-convert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ pipelines:
- from: caseObject
- graphBuild:
schema: "{{config.schema}}"
title: Case
title: Case
EdgeFix:
method: test
gpython:
$ref: test.py
3 changes: 3 additions & 0 deletions test/examples/gdc/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
def test(row):
row["TEST"] = "test_string"
return row
45 changes: 45 additions & 0 deletions transform/code_block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package transform

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
)

type CodeBlock struct {
Code string
Ref string
BaseDir string
}

func (cb *CodeBlock) UnmarshalJSON(data []byte) error {
if err := json.Unmarshal(data, &cb.Code); err == nil {
return nil
}
ref := map[string]any{}
if err := json.Unmarshal(data, &ref); err == nil {
if path, ok := ref["$ref"]; ok {
if pathStr, ok := path.(string); ok {
cb.Ref = pathStr
return nil
}
}
}
return fmt.Errorf("unknown code block type")
}

func (cb *CodeBlock) SetBaseDir(path string) {
cb.BaseDir = path
}

func (cb *CodeBlock) String() string {
if cb.Ref != "" {
path := filepath.Join(cb.BaseDir, cb.Ref)
data, err := os.ReadFile(path)
if err == nil {
cb.Code = string(data)
}
}
return cb.Code
}
19 changes: 10 additions & 9 deletions transform/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
)

type FilterStep struct {
Field string `json:"field"`
Value string `json:"value"`
Match string `json:"match"`
Check string `json:"check" jsonschema_description:"How to check value, 'exists' or 'hasValue'"`
Method string `json:"method"`
Python string `json:"python"`
GPython string `json:"gpython"`
Field string `json:"field"`
Value string `json:"value"`
Match string `json:"match"`
Check string `json:"check" jsonschema_description:"How to check value, 'exists' or 'hasValue'"`
Method string `json:"method"`
Python string `json:"python"`
GPython *CodeBlock `json:"gpython"`
}

type filterProcessor struct {
Expand All @@ -34,10 +34,11 @@ func (fs FilterStep) Init(task task.RuntimeTask) (Processor, error) {
log.Printf("Compile Error: %s", err)
}
return &filterProcessor{fs, c, task}, nil
} else if fs.GPython != "" && fs.Method != "" {
} else if fs.GPython != nil && fs.Method != "" {
log.Printf("Starting Filter Map: %s", fs.GPython)
fs.GPython.SetBaseDir(task.BaseDir())
e := evaluate.GetEngine("gpython", task.WorkDir())
c, err := e.Compile(fs.GPython, fs.Method)
c, err := e.Compile(fs.GPython.String(), fs.Method)
if err != nil {
log.Printf("Compile Error: %s", err)
}
Expand Down
11 changes: 6 additions & 5 deletions transform/flat_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
)

type FlatMapStep struct {
Method string `json:"method" jsonschema_description:"Name of function to call"`
Python string `json:"python" jsonschema_description:"Python code to be run"`
GPython string `json:"gpython" jsonschema_description:"Python code to be run using GPython"`
Method string `json:"method" jsonschema_description:"Name of function to call"`
Python string `json:"python" jsonschema_description:"Python code to be run"`
GPython *CodeBlock `json:"gpython" jsonschema_description:"Python code to be run using GPython"`
}

type flatMapProcess struct {
Expand All @@ -30,10 +30,11 @@ func (ms *FlatMapStep) Init(task task.RuntimeTask) (Processor, error) {
log.Printf("Compile Error: %s", err)
}
return &flatMapProcess{ms, c}, nil
} else if ms.GPython != "" {
} else if ms.GPython != nil {
log.Printf("Init Map: %s", ms.GPython)
ms.GPython.SetBaseDir(task.BaseDir())
e := evaluate.GetEngine("gpython", task.WorkDir())
c, err := e.Compile(ms.GPython, ms.Method)
c, err := e.Compile(ms.GPython.String(), ms.Method)
if err != nil {
log.Printf("Compile Error: %s", err)
}
Expand Down
9 changes: 5 additions & 4 deletions transform/graph_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
)

type EdgeFix struct {
Method string `json:"method"`
GPython string `json:"gpython"`
Method string `json:"method"`
GPython *CodeBlock `json:"gpython"`
}

type GraphBuildStep struct {
Expand Down Expand Up @@ -49,10 +49,11 @@ func (ts GraphBuildStep) Init(task task.RuntimeTask) (Processor, error) {

var edgeFix evaluate.Processor
if ts.EdgeFix != nil {
if ts.EdgeFix.GPython != "" {
if ts.EdgeFix.GPython != nil {
ts.EdgeFix.GPython.SetBaseDir(task.BaseDir())
log.Printf("Init Map: %s", ts.EdgeFix.GPython)
e := evaluate.GetEngine("gpython", task.WorkDir())
c, err := e.Compile(ts.EdgeFix.GPython, ts.EdgeFix.Method)
c, err := e.Compile(ts.EdgeFix.GPython.String(), ts.EdgeFix.Method)
if err != nil {
log.Printf("Compile Error: %s", err)
}
Expand Down
11 changes: 6 additions & 5 deletions transform/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
)

type MapStep struct {
Method string `json:"method" jsonschema_description:"Name of function to call"`
Python string `json:"python" jsonschema_description:"Python code to be run"`
GPython string `json:"gpython" jsonschema_description:"Python code to be run using GPython"`
Method string `json:"method" jsonschema_description:"Name of function to call"`
Python string `json:"python" jsonschema_description:"Python code to be run"`
GPython *CodeBlock `json:"gpython" jsonschema_description:"Python code to be run using GPython"`
}

type mapProcess struct {
Expand All @@ -30,10 +30,11 @@ func (ms *MapStep) Init(task task.RuntimeTask) (Processor, error) {
log.Printf("Compile Error: %s", err)
}
return &mapProcess{ms, c}, nil
} else if ms.GPython != "" {
} else if ms.GPython != nil {
log.Printf("Init Map: %s", ms.GPython)
ms.GPython.SetBaseDir(task.BaseDir())
e := evaluate.GetEngine("gpython", task.WorkDir())
c, err := e.Compile(ms.GPython, ms.Method)
c, err := e.Compile(ms.GPython.String(), ms.Method)
if err != nil {
log.Printf("Compile Error: %s", err)
}
Expand Down
7 changes: 4 additions & 3 deletions transform/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type ReduceStep struct {
Field string `json:"field"`
Method string `json:"method"`
Python string `json:"python"`
GPython string `json:"gpython"`
GPython *CodeBlock `json:"gpython"`
InitData *map[string]interface{} `json:"init"`
}

Expand All @@ -31,11 +31,12 @@ func (ms *ReduceStep) Init(t task.RuntimeTask) (Processor, error) {
log.Printf("Compile Error: %s", err)
}
return &reduceProcess{ms, c}, nil
} else if ms.GPython != "" {
} else if ms.GPython != nil {
ms.GPython.SetBaseDir(t.BaseDir())
log.Printf("ReduceInit: %s", ms.InitData)
log.Printf("Reduce: %s", ms.GPython)
e := evaluate.GetEngine("gpython", t.WorkDir())
c, err := e.Compile(ms.GPython, ms.Method)
c, err := e.Compile(ms.GPython.String(), ms.Method)
if err != nil {
log.Printf("Compile Error: %s", err)
}
Expand Down
Loading