Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring of Instrument list and Parameters creation #90

Merged
merged 25 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4653317
less /options requests
dsavchenko Feb 21, 2024
bfd4c8a
cache signatures not instances; pass ontology instance not path
dsavchenko Feb 21, 2024
207c242
actually use the new method
dsavchenko Feb 21, 2024
e17ee9f
missed index
dsavchenko Feb 21, 2024
cc9620c
don't parse ttl annotations before param method
dsavchenko Feb 21, 2024
01b4c80
attach name and instrument query to instr_factory
dsavchenko Feb 22, 2024
7954d84
revert signatures caching
dsavchenko Feb 23, 2024
5d4516c
failing test: default value preservation
dsavchenko Feb 23, 2024
50f105e
fix default value preservation
dsavchenko Feb 23, 2024
76cf492
test_structured_default_value_preservation
dsavchenko Feb 23, 2024
b94c628
manually deepcopy parameters lists where needed
dsavchenko Feb 25, 2024
ff8aebf
separate kg instrument config; add NB2WInstrumentFactoryIter
dsavchenko Feb 25, 2024
f06af18
fix
dsavchenko Feb 25, 2024
dc7a1f2
fix
dsavchenko Feb 25, 2024
c7f75cf
fix
dsavchenko Feb 25, 2024
f538ba3
no hardcoded default kg
dsavchenko Feb 25, 2024
2ba6d21
refactor instrument config. Use NB2WInstrumentFactoryIter
dsavchenko Feb 25, 2024
5169ebc
updated default kg test
dsavchenko Feb 25, 2024
b84bfe4
test_kg_based_instrument_parameters
dsavchenko Feb 25, 2024
7d752a8
token parameter renaming
dsavchenko Feb 25, 2024
7535fa2
re-request options if failed
dsavchenko Feb 25, 2024
22ba6bf
fix undefined e
dsavchenko Feb 25, 2024
7bd69f1
backend unavailable behaviour
dsavchenko Feb 25, 2024
be6f30b
use ontology_object argument
dsavchenko Feb 26, 2024
308d059
use ontology object in ParameterProducts
dsavchenko Feb 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 30 additions & 20 deletions dispatcher_plugin_nb2workflow/dataserver_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,19 @@
from urllib.parse import urlsplit, parse_qs, urlencode
import os
from glob import glob
import logging

logger = logging.getLogger()

class NB2WDataDispatcher:
def __init__(self, instrument=None, param_dict=None, task=None, config=None):
iname = instrument if isinstance(instrument, str) else instrument.name
if config is None:
try:
config = DataServerConf.from_conf_dict(exposer.config_dict['instruments'][iname],
allowed_keys = ['restricted_access'])
except:
#this happens if the instrument is not found in the instrument config, which is always read from a static file
config = DataServerConf.from_conf_dict(exposer.get_instr_conf()['instruments'][iname])
config = DataServerConf.from_conf_dict(exposer.combined_instrument_dict[iname])

self.data_server_url = config.data_server_url
self.task = task
self.param_dict = param_dict
self.backend_options = self.query_backend_options()

self.external_disp_url = None
if not isinstance(instrument, str): # TODO: seems this is always the case. But what if not?
Expand All @@ -30,19 +27,32 @@ def __init__(self, instrument=None, param_dict=None, task=None, config=None):
if parsed.scheme and parsed.netloc:
self.external_disp_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"


def query_backend_options(self):
url = self.data_server_url.strip('/') + '/api/v1.0/options'
@property
def backend_options(self, max_trial=5, sleep_seconds=5):
try:
res = requests.get("%s" % (url), params=None)
except:
return {}
if res.status_code == 200:
options_dict = res.json()
else:
return {}
raise ConnectionError(f"Backend connection failed: {res.status_code}")
# TODO: consecutive requests if failed
options_dict = self._backend_options
except AttributeError:
url = self.data_server_url.strip('/') + '/api/v1.0/options'
for i in range(max_trial):
try:
res = requests.get("%s" % (url), params=None)

if res.status_code == 200:
options_dict = res.json()
backend_available = True
break
else:
raise RuntimeError("Backend options request failed. "
f"Exit code: {res.status_code}. "
f"Response: {res.text}")
except Exception as e:
backend_available = False
logger.error(f"Exception while getting backend options {repr(e)}")
time.sleep(sleep_seconds)
if not backend_available:
return {}

self._backend_options = options_dict
return options_dict

def get_backend_comment(self, product):
Expand All @@ -56,7 +66,7 @@ def get_backend_comment(self, product):

def test_communication(self, max_trial=10, sleep_s=1, logger=None):
print('--> start test connection')

query_out = QueryOutput()
no_connection = True
excep = Exception()
Expand Down
114 changes: 80 additions & 34 deletions dispatcher_plugin_nb2workflow/exposer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import requests
import rdflib as rdf
import os
from copy import copy

import logging
logger = logging.getLogger(__name__)


def kg_select(t, kg_conf_dict):
if kg_conf_dict is None:
if kg_conf_dict is None or kg_conf_dict == {}:
logger.info('Not using KG to get instruments')
qres_js = []
elif kg_conf_dict.get('type') == 'query-service':
Expand All @@ -35,7 +36,9 @@ def kg_select(t, kg_conf_dict):
if os.path.isfile(kg_conf_dict['path']):
graph.parse(kg_conf_dict['path'])
else:
logger.warning("Knowledge graph file %s doesn't exist yet. No instruments information will be loaded.")
logger.warning("Knowledge graph file %s doesn't exist yet. "
"No instruments information will be loaded.",
kg_conf_dict['path'])
qres = graph.query(f"""
SELECT * WHERE {{
{t}
Expand All @@ -50,39 +53,50 @@ def kg_select(t, kg_conf_dict):
logger.warning(json.dumps(qres_js, indent=4))

return qres_js


def get_instr_conf(from_conf_file=None):
global conf_file
def get_static_instr_conf(conf_file):
masked_conf_file = conf_file

# current default - query central oda kb
# TODO: better default will be some regullary updated static location
kg_conf_dict = {'type': 'query-service',
'path': "https://www.astro.unige.ch/mmoda/dispatch-data/gw/odakb/query"}
cfg_dict = {'instruments': {}}
# kg_conf_dict = {'type': 'query-service',
# 'path': "https://www.astro.unige.ch/mmoda/dispatch-data/gw/odakb/query"}

if from_conf_file is not None:
with open(from_conf_file, 'r') as ymlfile:
cfg_dict = {'instruments': {}, 'kg': {}}

if conf_file is not None:
with open(conf_file, 'r') as ymlfile:
f_cfg_dict = yaml.load(ymlfile, Loader=yaml.SafeLoader)
if f_cfg_dict is not None:
if 'instruments' in f_cfg_dict.keys():
cfg_dict['instruments'] = f_cfg_dict['instruments']
else:
conf_file = None
masked_conf_file = None
# need to set to None as it's being read inside Instrument
if 'ontology_path' in f_cfg_dict.keys():
cfg_dict['ontology_path'] = f_cfg_dict['ontology_path']
if 'kg' in f_cfg_dict.keys():
kg_conf_dict = f_cfg_dict['kg']
cfg_dict['kg'] = f_cfg_dict['kg']
else:
conf_file = None

masked_conf_file = None
return cfg_dict, masked_conf_file

static_config_dict, masked_conf_file = get_static_instr_conf(conf_file)

if 'ODA_ONTOLOGY_PATH' in os.environ:
ontology_path = os.environ.get('ODA_ONTOLOGY_PATH')
else:
ontology_path = static_config_dict.get('ontology_path',
'http://odahub.io/ontology/ontology.ttl')
logger.info('Using ontology from %s', ontology_path)

def get_config_dict_from_kg(kg_conf_dict=static_config_dict['kg']):
cfg_dict = {'instruments': {}}

for r in kg_select('''
?w a <http://odahub.io/ontology#WorkflowService>;
<http://odahub.io/ontology#deployment_name> ?deployment_name;
<http://odahub.io/ontology#service_name> ?service_name ;
<https://schema.org/creativeWorkStatus>? ?work_status .
''', kg_conf_dict):
?w a <http://odahub.io/ontology#WorkflowService>;
<http://odahub.io/ontology#deployment_name> ?deployment_name;
<http://odahub.io/ontology#service_name> ?service_name ;
<https://schema.org/creativeWorkStatus>? ?work_status .
''', kg_conf_dict):

logger.info('found instrument service record %s', r)
cfg_dict['instruments'][r['service_name']['value']] = {
Expand All @@ -93,28 +107,60 @@ def get_instr_conf(from_conf_file=None):

return cfg_dict

config_dict = get_instr_conf(conf_file)
if 'ODA_ONTOLOGY_PATH' in os.environ:
ontology_path = os.environ.get('ODA_ONTOLOGY_PATH')
else:
ontology_path = config_dict.get('ontology_path',
'http://odahub.io/ontology/ontology.ttl')
logger.info('Using ontology from %s', ontology_path)
combined_instrument_dict = {}
def build_combined_instrument_dict():
global combined_instrument_dict
combined_instrument_dict = copy(static_config_dict.get('instruments', {}))
combined_instrument_dict.update(get_config_dict_from_kg()['instruments'])

build_combined_instrument_dict()

def factory_factory(instr_name, restricted_access):
instrument_query = NB2WInstrumentQuery('instr_query', restricted_access)
def instr_factory():
backend_options = NB2WDataDispatcher(instrument=instr_name).query_backend_options()
query_list, query_dict = NB2WProductQuery.query_list_and_dict_factory(backend_options, ontology_path)
backend_options = NB2WDataDispatcher(instrument=instr_name).backend_options
query_list, query_dict = NB2WProductQuery.query_list_and_dict_factory(backend_options,
ontology_path)
return Instrument(instr_name,
src_query = NB2WSourceQuery.from_backend_options(backend_options, ontology_path),
instrumet_query = NB2WInstrumentQuery('instr_query', restricted_access),
data_serve_conf_file=conf_file,
src_query = NB2WSourceQuery.from_backend_options(backend_options,
ontology_path),
instrumet_query = instrument_query,
data_serve_conf_file=masked_conf_file,
product_queries_list=query_list,
query_dictionary=query_dict,
asynch=True,
data_server_query_class=NB2WDataDispatcher,
)
instr_factory.instr_name = instr_name
instr_factory.instrument_query = instrument_query
return instr_factory
dsavchenko marked this conversation as resolved.
Show resolved Hide resolved

class NB2WInstrumentFactoryIter:
def __init__(self, lst):
self.lst = lst

def _update_instruments_list(self):
build_combined_instrument_dict()

current_instrs = [x.instr_name for x in self.lst]
available_instrs = combined_instrument_dict.keys()
new_instrs = set(available_instrs) - set(current_instrs)
old_instrs = set(current_instrs) - set(available_instrs)

if old_instrs:
for instr in old_instrs:
idx = current_instrs.index(instr)
self.lst.pop(idx)

if new_instrs:
for instr in new_instrs:
self.lst.append(factory_factory(instr, combined_instrument_dict[instr].get('restricted_access', False)))

def __iter__(self):
self._update_instruments_list()
return self.lst.__iter__()

instr_factory_list = [ factory_factory(instr_name, instr_conf.get('restricted_access', False))
for instr_name, instr_conf in config_dict['instruments'].items() ]
for instr_name, instr_conf in combined_instrument_dict.items()]

instr_factory_list = NB2WInstrumentFactoryIter(instr_factory_list)
9 changes: 5 additions & 4 deletions dispatcher_plugin_nb2workflow/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from cdci_data_analysis.analysis.products import LightCurveProduct, BaseQueryProduct, ImageProduct, SpectrumProduct
from cdci_data_analysis.analysis.parameters import Parameter, subclasses_recursive
from oda_api.data_products import NumpyDataProduct, ODAAstropyTable, BinaryProduct, PictureProduct
from .util import AstropyTableViewParser, ParProdOntology
from .util import AstropyTableViewParser
from cdci_data_analysis.analysis.ontology import Ontology
from io import StringIO
from functools import lru_cache
from mimetypes import guess_extension
Expand Down Expand Up @@ -110,7 +111,7 @@ class NB2WParameterProduct(NB2WProduct):
type_key = 'oda:WorkflowParameter'

ontology_path = None

def __init__(self,
value,
out_dir=None,
Expand All @@ -132,11 +133,11 @@ def get_html_draw(self):
@lru_cache
def parameter_products_factory(ontology_path = None):
classes = []
onto = ParProdOntology(ontology_path)
onto = Ontology(ontology_path)
for term in onto.get_parprod_terms():
classes.append(type(f"{term.split('#')[-1]}Product",
(NB2WParameterProduct,),
{'type_key': term, 'ontology_path': ontology_path}))
{'type_key': term, 'ontology_object': onto}))
return classes


Expand Down
25 changes: 14 additions & 11 deletions dispatcher_plugin_nb2workflow/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,38 @@
from .dataserver_dispatcher import NB2WDataDispatcher
from cdci_data_analysis.analysis.ontology import Ontology
import os
from functools import lru_cache
from functools import lru_cache, wraps
from json import dumps
from copy import deepcopy

class HashableDict(dict):
def __hash__(self):
return hash(dumps(self))
return hash(dumps(self, sort_keys=True))

def with_hashable_dict(func):
@wraps(func)
def wrapper(backend_param_dict, ontology_path):
return func(HashableDict(backend_param_dict), ontology_path)
return wrapper


@with_hashable_dict
@lru_cache
def construct_parameter_lists(backend_param_dict, ontology_path):
src_query_pars_uris = { "http://odahub.io/ontology#PointOfInterestRA": "RA",
"http://odahub.io/ontology#PointOfInterestDEC": "DEC",
"http://odahub.io/ontology#StartTime": "T1",
"http://odahub.io/ontology#EndTime": "T2",
"http://odahub.io/ontology#AstrophysicalObject": "src_name"}
"http://odahub.io/ontology#AstrophysicalObject": "src_name",
"ThisNameShouldNotExist": "token"
burnout87 marked this conversation as resolved.
Show resolved Hide resolved
}
par_name_substitution = {}

plist = []
source_plist = []
for pname, pval in backend_param_dict.items():
onto = Ontology(ontology_path)
if pval.get("extra_ttl"):
onto.parse_extra_triples(pval.get("extra_ttl"))
onto.parse_extra_triples(pval.get("extra_ttl"), parse_oda_annotations = False)
onto_class_hierarchy = onto.get_parameter_hierarchy(pval['owl_type'])
src_query_owl_uri_set = set(onto_class_hierarchy).intersection(src_query_pars_uris.keys())
if src_query_owl_uri_set:
Expand All @@ -47,7 +50,7 @@ def construct_parameter_lists(backend_param_dict, ontology_path):
source_plist.append(Parameter.from_owl_uri(pval['owl_type'],
value=pval['default_value'],
name=default_pname,
ontology_path=ontology_path,
ontology_object=onto,
extra_ttl=pval.get("extra_ttl")
))
else:
Expand All @@ -59,13 +62,13 @@ def construct_parameter_lists(backend_param_dict, ontology_path):
plist.append(Parameter.from_owl_uri(pval['owl_type'],
value=pval['default_value'],
name=cur_name,
ontology_path=ontology_path,
ontology_object=onto,
extra_ttl=pval.get("extra_ttl")
))

return {'source_plist': source_plist,
'prod_plist': plist,
'par_name_substitution': par_name_substitution}
'par_name_substitution': par_name_substitution
}

class NB2WSourceQuery(BaseQuery):
@classmethod
Expand All @@ -76,7 +79,7 @@ def from_backend_options(cls, backend_options, ontology_path):
parameters_dict = {}
for product_name in product_names:
backend_param_dict = backend_options[product_name]['parameters']
prod_source_plist = construct_parameter_lists(backend_param_dict, ontology_path)['source_plist']
prod_source_plist = deepcopy(construct_parameter_lists(backend_param_dict, ontology_path)['source_plist'])
for par in prod_source_plist:
parameters_dict[par.name] = par
parameters_list = list(parameters_dict.values())
Expand All @@ -89,7 +92,7 @@ def __init__(self, name, backend_product_name, backend_param_dict, backend_outpu
self.backend_output_dict = backend_output_dict
parameter_lists = construct_parameter_lists(backend_param_dict, ontology_path)
self.par_name_substitution = parameter_lists['par_name_substitution']
plist = parameter_lists['prod_plist']
plist = deepcopy(parameter_lists['prod_plist'])
self.ontology_path = ontology_path
super().__init__(name, parameters_list = plist)

Expand Down
16 changes: 0 additions & 16 deletions dispatcher_plugin_nb2workflow/util.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,4 @@
from html.parser import HTMLParser
from cdci_data_analysis.analysis.ontology import Ontology

class ParProdOntology(Ontology):
def get_parprod_terms(self):
query = """
SELECT ?s WHERE {
?s (rdfs:subClassOf|a)* ?mid0.
?mid0 rdfs:subClassOf* oda:DataProduct.

?s (rdfs:subClassOf|a)* ?mid1.
?mid1 rdfs:subClassOf* oda:WorkflowParameter .
}
GROUP BY ?s
"""
qres = self.g.query(query)
return [str(row[0]) for row in qres]

class AstropyTableViewParser(HTMLParser):
def handle_starttag(self, tag, attrs):
Expand Down
2 changes: 1 addition & 1 deletion test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
git+https://github.com/oda-hub/nb2workflow.git@master#egg=nb2workflow[service,rdf]
git+https://github.com/oda-hub/oda_api.git@master#egg=oda_api
git+https://github.com/oda-hub/dispatcher-app.git@master#egg=cdci_data_analysis
git+https://github.com/oda-hub/dispatcher-app.git@from-owl-signature#egg=cdci_data_analysis
Loading
Loading