From 4653317f17012b4425879490c47b0db31a654dbf Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Wed, 21 Feb 2024 13:39:27 +0100 Subject: [PATCH 01/25] less /options requests --- .../dataserver_dispatcher.py | 29 ++++++++++--------- dispatcher_plugin_nb2workflow/exposer.py | 2 +- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py b/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py index 5e804e7..9a90a28 100644 --- a/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py +++ b/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py @@ -21,7 +21,6 @@ def __init__(self, instrument=None, param_dict=None, task=None, config=None): self.data_server_url = config.data_server_url self.task = task self.param_dict = param_dict - self.backend_options = self.query_backend_options() self.external_disp_url = None if not isinstance(instrument, str): # TODO: seems this is always the case. But what if not? @@ -30,19 +29,23 @@ def __init__(self, instrument=None, param_dict=None, task=None, config=None): if parsed.scheme and parsed.netloc: self.external_disp_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" - - def query_backend_options(self): - url = self.data_server_url.strip('/') + '/api/v1.0/options' + @property + def backend_options(self): try: - res = requests.get("%s" % (url), params=None) - except: - return {} - if res.status_code == 200: - options_dict = res.json() - else: - return {} - raise ConnectionError(f"Backend connection failed: {res.status_code}") - # TODO: consecutive requests if failed + options_dict = self._backend_options + except AttributeError: + url = self.data_server_url.strip('/') + '/api/v1.0/options' + try: + res = requests.get("%s" % (url), params=None) + except: + return {} + if res.status_code == 200: + options_dict = res.json() + else: + return {} + raise ConnectionError(f"Backend connection failed: {res.status_code}") + # TODO: consecutive requests if failed + self._backend_options = options_dict return options_dict def get_backend_comment(self, product): diff --git a/dispatcher_plugin_nb2workflow/exposer.py b/dispatcher_plugin_nb2workflow/exposer.py index 4e2bb85..4f77418 100644 --- a/dispatcher_plugin_nb2workflow/exposer.py +++ b/dispatcher_plugin_nb2workflow/exposer.py @@ -103,7 +103,7 @@ def get_instr_conf(from_conf_file=None): def factory_factory(instr_name, restricted_access): def instr_factory(): - backend_options = NB2WDataDispatcher(instrument=instr_name).query_backend_options() + backend_options = NB2WDataDispatcher(instrument=instr_name).backend_options query_list, query_dict = NB2WProductQuery.query_list_and_dict_factory(backend_options, ontology_path) return Instrument(instr_name, src_query = NB2WSourceQuery.from_backend_options(backend_options, ontology_path), From bfd4c8ae73a797658c6e0425ab3c635b3f590b94 Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Wed, 21 Feb 2024 15:05:10 +0100 Subject: [PATCH 02/25] cache signatures not instances; pass ontology instance not path --- dispatcher_plugin_nb2workflow/queries.py | 33 ++++++++++++++++++------ test-requirements.txt | 2 +- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/dispatcher_plugin_nb2workflow/queries.py b/dispatcher_plugin_nb2workflow/queries.py index ebd7fef..fa62c24 100644 --- a/dispatcher_plugin_nb2workflow/queries.py +++ b/dispatcher_plugin_nb2workflow/queries.py @@ -15,7 +15,7 @@ class HashableDict(dict): def __hash__(self): - return hash(dumps(self)) + return hash(dumps(self, sort_keys=True)) def with_hashable_dict(func): def wrapper(backend_param_dict, ontology_path): @@ -25,12 +25,14 @@ def wrapper(backend_param_dict, ontology_path): @with_hashable_dict @lru_cache -def construct_parameter_lists(backend_param_dict, ontology_path): +def construct_parameter_signatures(backend_param_dict, ontology_path): src_query_pars_uris = { "http://odahub.io/ontology#PointOfInterestRA": "RA", "http://odahub.io/ontology#PointOfInterestDEC": "DEC", "http://odahub.io/ontology#StartTime": "T1", "http://odahub.io/ontology#EndTime": "T2", - "http://odahub.io/ontology#AstrophysicalObject": "src_name"} + "http://odahub.io/ontology#AstrophysicalObject": "src_name", + #"ThisTermShouldNotExist": "token" + } par_name_substitution = {} plist = [] @@ -47,7 +49,7 @@ def construct_parameter_lists(backend_param_dict, ontology_path): source_plist.append(Parameter.from_owl_uri(pval['owl_type'], value=pval['default_value'], name=default_pname, - ontology_path=ontology_path, + ontology_path=onto, extra_ttl=pval.get("extra_ttl") )) else: @@ -59,13 +61,28 @@ def construct_parameter_lists(backend_param_dict, ontology_path): plist.append(Parameter.from_owl_uri(pval['owl_type'], value=pval['default_value'], name=cur_name, - ontology_path=ontology_path, + ontology_path=onto, extra_ttl=pval.get("extra_ttl") )) - + return {'source_p_cls': [x[1] for x in source_plist], + 'source_p_kw': [x[2] for x in source_plist], + 'prod_p_cls': [x[1] for x in plist], + 'prod_p_kw': [x[2] for x in plist], + 'par_name_substitution': par_name_substitution + } + +def construct_parameter_lists(backend_param_dict, ontology_path): + signatures = construct_parameter_signatures(backend_param_dict, ontology_path) + source_plist = [] + for i, cls in enumerate(signatures['source_p_cls']): + source_plist.append(cls(**signatures['source_p_kw'][i])) + prod_plist = [] + for i, cls in enumerate(signatures['prod_p_cls']): + prod_plist.append(cls(**signatures['prod_p_kw'])) + return {'source_plist': source_plist, - 'prod_plist': plist, - 'par_name_substitution': par_name_substitution} + 'prod_plist': prod_plist, + 'par_name_substitution': signatures['par_name_substitution']} class NB2WSourceQuery(BaseQuery): @classmethod diff --git a/test-requirements.txt b/test-requirements.txt index 83e187c..3d9cd3f 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,3 +1,3 @@ git+https://github.com/oda-hub/nb2workflow.git@master#egg=nb2workflow[service,rdf] git+https://github.com/oda-hub/oda_api.git@master#egg=oda_api -git+https://github.com/oda-hub/dispatcher-app.git@master#egg=cdci_data_analysis +git+https://github.com/oda-hub/dispatcher-app.git@from-owl-signature#egg=cdci_data_analysis From 207c242ce29c67085517f977854b0dcb5e66514e Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Wed, 21 Feb 2024 15:14:15 +0100 Subject: [PATCH 03/25] actually use the new method --- dispatcher_plugin_nb2workflow/queries.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/dispatcher_plugin_nb2workflow/queries.py b/dispatcher_plugin_nb2workflow/queries.py index fa62c24..8fe6dd3 100644 --- a/dispatcher_plugin_nb2workflow/queries.py +++ b/dispatcher_plugin_nb2workflow/queries.py @@ -46,24 +46,24 @@ def construct_parameter_signatures(backend_param_dict, ontology_path): if src_query_owl_uri_set: default_pname = src_query_pars_uris[src_query_owl_uri_set.pop()] par_name_substitution[ default_pname ] = pname - source_plist.append(Parameter.from_owl_uri(pval['owl_type'], - value=pval['default_value'], - name=default_pname, - ontology_path=onto, - extra_ttl=pval.get("extra_ttl") - )) + source_plist.append(Parameter.instance_signature_from_owl_uri(pval['owl_type'], + value=pval['default_value'], + name=default_pname, + ontology_path=onto, + extra_ttl=pval.get("extra_ttl") + )) else: #if param name coincides with the SourceQuery default names, but it's not properly annotated, rename cur_name = pname if pname in src_query_pars_uris.values(): cur_name = pname + '_rename' par_name_substitution[ cur_name ] = pname - plist.append(Parameter.from_owl_uri(pval['owl_type'], - value=pval['default_value'], - name=cur_name, - ontology_path=onto, - extra_ttl=pval.get("extra_ttl") - )) + plist.append(Parameter.instance_signature_from_owl_uri(pval['owl_type'], + value=pval['default_value'], + name=cur_name, + ontology_path=onto, + extra_ttl=pval.get("extra_ttl") + )) return {'source_p_cls': [x[1] for x in source_plist], 'source_p_kw': [x[2] for x in source_plist], 'prod_p_cls': [x[1] for x in plist], From e17ee9fc7f03eebb1c26f8b6d3117258bbfe6e9e Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Wed, 21 Feb 2024 15:24:28 +0100 Subject: [PATCH 04/25] missed index --- dispatcher_plugin_nb2workflow/queries.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dispatcher_plugin_nb2workflow/queries.py b/dispatcher_plugin_nb2workflow/queries.py index 8fe6dd3..7cdc8df 100644 --- a/dispatcher_plugin_nb2workflow/queries.py +++ b/dispatcher_plugin_nb2workflow/queries.py @@ -78,7 +78,7 @@ def construct_parameter_lists(backend_param_dict, ontology_path): source_plist.append(cls(**signatures['source_p_kw'][i])) prod_plist = [] for i, cls in enumerate(signatures['prod_p_cls']): - prod_plist.append(cls(**signatures['prod_p_kw'])) + prod_plist.append(cls(**signatures['prod_p_kw'][i])) return {'source_plist': source_plist, 'prod_plist': prod_plist, From cc9620c28f50953c67b07318dedaef386a2a2dd1 Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Wed, 21 Feb 2024 15:48:37 +0100 Subject: [PATCH 05/25] don't parse ttl annotations before param method --- dispatcher_plugin_nb2workflow/queries.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dispatcher_plugin_nb2workflow/queries.py b/dispatcher_plugin_nb2workflow/queries.py index 7cdc8df..88f1ec0 100644 --- a/dispatcher_plugin_nb2workflow/queries.py +++ b/dispatcher_plugin_nb2workflow/queries.py @@ -40,7 +40,7 @@ def construct_parameter_signatures(backend_param_dict, ontology_path): for pname, pval in backend_param_dict.items(): onto = Ontology(ontology_path) if pval.get("extra_ttl"): - onto.parse_extra_triples(pval.get("extra_ttl")) + onto.parse_extra_triples(pval.get("extra_ttl"), parse_oda_annotations = False) onto_class_hierarchy = onto.get_parameter_hierarchy(pval['owl_type']) src_query_owl_uri_set = set(onto_class_hierarchy).intersection(src_query_pars_uris.keys()) if src_query_owl_uri_set: From 01b4c8054aa7940d2ced59aa7fa62aa898aaf900 Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Thu, 22 Feb 2024 21:25:44 +0100 Subject: [PATCH 06/25] attach name and instrument query to instr_factory --- dispatcher_plugin_nb2workflow/exposer.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dispatcher_plugin_nb2workflow/exposer.py b/dispatcher_plugin_nb2workflow/exposer.py index 4f77418..a8324ea 100644 --- a/dispatcher_plugin_nb2workflow/exposer.py +++ b/dispatcher_plugin_nb2workflow/exposer.py @@ -102,18 +102,21 @@ def get_instr_conf(from_conf_file=None): logger.info('Using ontology from %s', ontology_path) def factory_factory(instr_name, restricted_access): + instrument_query = NB2WInstrumentQuery('instr_query', restricted_access) def instr_factory(): backend_options = NB2WDataDispatcher(instrument=instr_name).backend_options query_list, query_dict = NB2WProductQuery.query_list_and_dict_factory(backend_options, ontology_path) return Instrument(instr_name, src_query = NB2WSourceQuery.from_backend_options(backend_options, ontology_path), - instrumet_query = NB2WInstrumentQuery('instr_query', restricted_access), + instrumet_query = instrument_query, data_serve_conf_file=conf_file, product_queries_list=query_list, query_dictionary=query_dict, asynch=True, data_server_query_class=NB2WDataDispatcher, ) + instr_factory.instr_name = instr_name + instr_factory.instrument_query = instrument_query return instr_factory instr_factory_list = [ factory_factory(instr_name, instr_conf.get('restricted_access', False)) From 7954d849b070fb41833524b8b51fe694239bfd32 Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Fri, 23 Feb 2024 10:04:55 +0100 Subject: [PATCH 07/25] revert signatures caching --- dispatcher_plugin_nb2workflow/queries.py | 47 ++++++++---------------- 1 file changed, 16 insertions(+), 31 deletions(-) diff --git a/dispatcher_plugin_nb2workflow/queries.py b/dispatcher_plugin_nb2workflow/queries.py index 88f1ec0..0edec99 100644 --- a/dispatcher_plugin_nb2workflow/queries.py +++ b/dispatcher_plugin_nb2workflow/queries.py @@ -25,13 +25,13 @@ def wrapper(backend_param_dict, ontology_path): @with_hashable_dict @lru_cache -def construct_parameter_signatures(backend_param_dict, ontology_path): +def construct_parameter_lists(backend_param_dict, ontology_path): src_query_pars_uris = { "http://odahub.io/ontology#PointOfInterestRA": "RA", "http://odahub.io/ontology#PointOfInterestDEC": "DEC", "http://odahub.io/ontology#StartTime": "T1", "http://odahub.io/ontology#EndTime": "T2", "http://odahub.io/ontology#AstrophysicalObject": "src_name", - #"ThisTermShouldNotExist": "token" + #"ThisNameShouldNotExist": "token" } par_name_substitution = {} @@ -46,43 +46,28 @@ def construct_parameter_signatures(backend_param_dict, ontology_path): if src_query_owl_uri_set: default_pname = src_query_pars_uris[src_query_owl_uri_set.pop()] par_name_substitution[ default_pname ] = pname - source_plist.append(Parameter.instance_signature_from_owl_uri(pval['owl_type'], - value=pval['default_value'], - name=default_pname, - ontology_path=onto, - extra_ttl=pval.get("extra_ttl") - )) + source_plist.append(Parameter.from_owl_uri(pval['owl_type'], + value=pval['default_value'], + name=default_pname, + ontology_path=onto, + extra_ttl=pval.get("extra_ttl") + )) else: #if param name coincides with the SourceQuery default names, but it's not properly annotated, rename cur_name = pname if pname in src_query_pars_uris.values(): cur_name = pname + '_rename' par_name_substitution[ cur_name ] = pname - plist.append(Parameter.instance_signature_from_owl_uri(pval['owl_type'], - value=pval['default_value'], - name=cur_name, - ontology_path=onto, - extra_ttl=pval.get("extra_ttl") - )) - return {'source_p_cls': [x[1] for x in source_plist], - 'source_p_kw': [x[2] for x in source_plist], - 'prod_p_cls': [x[1] for x in plist], - 'prod_p_kw': [x[2] for x in plist], + plist.append(Parameter.from_owl_uri(pval['owl_type'], + value=pval['default_value'], + name=cur_name, + ontology_path=onto, + extra_ttl=pval.get("extra_ttl") + )) + return {'source_plist': source_plist, + 'prod_plist': plist, 'par_name_substitution': par_name_substitution } - -def construct_parameter_lists(backend_param_dict, ontology_path): - signatures = construct_parameter_signatures(backend_param_dict, ontology_path) - source_plist = [] - for i, cls in enumerate(signatures['source_p_cls']): - source_plist.append(cls(**signatures['source_p_kw'][i])) - prod_plist = [] - for i, cls in enumerate(signatures['prod_p_cls']): - prod_plist.append(cls(**signatures['prod_p_kw'][i])) - - return {'source_plist': source_plist, - 'prod_plist': prod_plist, - 'par_name_substitution': signatures['par_name_substitution']} class NB2WSourceQuery(BaseQuery): @classmethod From 5d4516c4a9b720ff9296afb78c26917d9856de14 Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Fri, 23 Feb 2024 16:31:29 +0100 Subject: [PATCH 08/25] failing test: default value preservation --- tests/test_plugin.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/test_plugin.py b/tests/test_plugin.py index bea7c45..0f3c19c 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -638,3 +638,45 @@ def test_trace_fail_return_progress(dispatcher_live_fixture, mock_backend): logger.info(jdata) assert jdata['job_status'] == 'done' assert 'progress_product_html_output' not in jdata['products'] + + +def test_default_value_preservation(dispatcher_live_fixture, mock_backend): + server = dispatcher_live_fixture + logger.info("constructed server: %s", server) + + def get_param_default(): + c = requests.get(server + "/api/meta-data", + params = {'instrument': 'example0'}) + assert c.status_code == 200 + logger.info("content: %s", c.text) + jdata = c.json() + logger.info(jdata) + + for x in jdata[0]: + if isinstance(x, dict): + continue + elif isinstance(x, list): + # if we finally decide to output it non-encoded at some point + pass + else: + x = json.loads(x) + + if {"query_name": "table_query"} in x: + some_param_value = x[2]['value'] + return some_param_value + + some_param_value = get_param_default() + + params = {'instrument': 'example0', + 'query_status': 'new', + 'query_type': 'Real', + 'product_type': 'table', + 'some_param': 5, + 'run_asynch': False} + c = requests.get(server + "/run_analysis", + params = params) + assert c.status_code == 200 + + new_param_value = get_param_default() + + assert new_param_value == some_param_value From 50f105eb8a2900972d7fc348b4a18f5a8fc57ab5 Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Fri, 23 Feb 2024 16:52:37 +0100 Subject: [PATCH 09/25] fix default value preservation --- dispatcher_plugin_nb2workflow/queries.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/dispatcher_plugin_nb2workflow/queries.py b/dispatcher_plugin_nb2workflow/queries.py index 0edec99..a89871d 100644 --- a/dispatcher_plugin_nb2workflow/queries.py +++ b/dispatcher_plugin_nb2workflow/queries.py @@ -12,19 +12,25 @@ import os from functools import lru_cache from json import dumps +from copy import copy class HashableDict(dict): def __hash__(self): return hash(dumps(self, sort_keys=True)) +def copying_lru_cache(func): + def wrapper(backend_param_dict, ontology_path): + cached_func = lru_cache()(func) + return copy(cached_func(backend_param_dict, ontology_path)) + return wrapper + def with_hashable_dict(func): def wrapper(backend_param_dict, ontology_path): return func(HashableDict(backend_param_dict), ontology_path) return wrapper - @with_hashable_dict -@lru_cache +@copying_lru_cache def construct_parameter_lists(backend_param_dict, ontology_path): src_query_pars_uris = { "http://odahub.io/ontology#PointOfInterestRA": "RA", "http://odahub.io/ontology#PointOfInterestDEC": "DEC", From 76cf492ef3d1d52dc0dfd3d0f0cd5cfe1d060fc5 Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Fri, 23 Feb 2024 17:26:29 +0100 Subject: [PATCH 10/25] test_structured_default_value_preservation --- tests/test_plugin.py | 59 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 0f3c19c..effd0cd 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -680,3 +680,62 @@ def get_param_default(): new_param_value = get_param_default() assert new_param_value == some_param_value + +@pytest.mark.fullstack +def test_structured_default_value_preservation(live_nb2service, + conf_file, + dispatcher_live_fixture): + with open(conf_file, 'r') as fd: + conf_bk = fd.read() + + try: + with open(conf_file, 'w') as fd: + fd.write( config_real_nb2service % live_nb2service ) + + server = dispatcher_live_fixture + logger.info("constructed server: %s", server) + + #ensure new conf file readed + c = requests.get(server + "/reload-plugin/dispatcher_plugin_nb2workflow") + assert c.status_code == 200 + + def get_param_default(): + c = requests.get(server + "/api/meta-data", + params = {'instrument': 'example'}) + assert c.status_code == 200 + logger.info("content: %s", c.text) + jdata = c.json() + logger.info(jdata) + + for x in jdata[0]: + if isinstance(x, dict): + continue + elif isinstance(x, list): + # if we finally decide to output it non-encoded at some point + pass + else: + x = json.loads(x) + + if {"query_name": "structured_query"} in x: + param_value = x[2]['value'] + return param_value + + struct_par_value = get_param_default() + + params = {'instrument': 'example', + 'query_status': 'new', + 'query_type': 'Real', + 'product_type': 'structured', + 'struct_par': '{"col4": ["spam", "ham"]}', + 'run_asynch': False} + c = requests.get(server + "/run_analysis", + params = params) + assert c.status_code == 200 + + new_param_value = get_param_default() + + assert new_param_value == struct_par_value + + finally: + with open(conf_file, 'w') as fd: + fd.write(conf_bk) From b94c6281741e2a5b86b65e95feb42e98bdd7e538 Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Sun, 25 Feb 2024 17:43:45 +0100 Subject: [PATCH 11/25] manually deepcopy parameters lists where needed --- dispatcher_plugin_nb2workflow/queries.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/dispatcher_plugin_nb2workflow/queries.py b/dispatcher_plugin_nb2workflow/queries.py index a89871d..93ff8d2 100644 --- a/dispatcher_plugin_nb2workflow/queries.py +++ b/dispatcher_plugin_nb2workflow/queries.py @@ -10,27 +10,22 @@ from .dataserver_dispatcher import NB2WDataDispatcher from cdci_data_analysis.analysis.ontology import Ontology import os -from functools import lru_cache +from functools import lru_cache, wraps from json import dumps -from copy import copy +from copy import deepcopy class HashableDict(dict): def __hash__(self): return hash(dumps(self, sort_keys=True)) -def copying_lru_cache(func): - def wrapper(backend_param_dict, ontology_path): - cached_func = lru_cache()(func) - return copy(cached_func(backend_param_dict, ontology_path)) - return wrapper - def with_hashable_dict(func): + @wraps(func) def wrapper(backend_param_dict, ontology_path): return func(HashableDict(backend_param_dict), ontology_path) return wrapper @with_hashable_dict -@copying_lru_cache +@lru_cache def construct_parameter_lists(backend_param_dict, ontology_path): src_query_pars_uris = { "http://odahub.io/ontology#PointOfInterestRA": "RA", "http://odahub.io/ontology#PointOfInterestDEC": "DEC", @@ -84,7 +79,7 @@ def from_backend_options(cls, backend_options, ontology_path): parameters_dict = {} for product_name in product_names: backend_param_dict = backend_options[product_name]['parameters'] - prod_source_plist = construct_parameter_lists(backend_param_dict, ontology_path)['source_plist'] + prod_source_plist = deepcopy(construct_parameter_lists(backend_param_dict, ontology_path)['source_plist']) for par in prod_source_plist: parameters_dict[par.name] = par parameters_list = list(parameters_dict.values()) @@ -97,7 +92,7 @@ def __init__(self, name, backend_product_name, backend_param_dict, backend_outpu self.backend_output_dict = backend_output_dict parameter_lists = construct_parameter_lists(backend_param_dict, ontology_path) self.par_name_substitution = parameter_lists['par_name_substitution'] - plist = parameter_lists['prod_plist'] + plist = deepcopy(parameter_lists['prod_plist']) self.ontology_path = ontology_path super().__init__(name, parameters_list = plist) From ff8aebf4ce44070b12f75931eab5818122ca1e7d Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Sun, 25 Feb 2024 19:05:57 +0100 Subject: [PATCH 12/25] separate kg instrument config; add NB2WInstrumentFactoryIter --- dispatcher_plugin_nb2workflow/exposer.py | 77 +++++++++++++++++------- 1 file changed, 55 insertions(+), 22 deletions(-) diff --git a/dispatcher_plugin_nb2workflow/exposer.py b/dispatcher_plugin_nb2workflow/exposer.py index a8324ea..0b7fcc5 100644 --- a/dispatcher_plugin_nb2workflow/exposer.py +++ b/dispatcher_plugin_nb2workflow/exposer.py @@ -50,10 +50,28 @@ def kg_select(t, kg_conf_dict): logger.warning(json.dumps(qres_js, indent=4)) return qres_js - +def get_instrs_from_kg(kg_conf_dict): + instruments = [] + for r in kg_select(''' + ?w a ; + ?deployment_name; + ?service_name ; + ? ?work_status . + ''', kg_conf_dict): + + logger.info('found instrument service record %s', r) + instruments[r['service_name']['value']] = { + "data_server_url": f"http://{r['deployment_name']['value']}:8000", + "dummy_cache": "", + "restricted_access": False if r['work_status']['value'] == "production" else True + } + + return instruments + + def get_instr_conf(from_conf_file=None): - global conf_file + masked_conf_file = from_conf_file # current default - query central oda kb # TODO: better default will be some regullary updated static location @@ -68,32 +86,20 @@ def get_instr_conf(from_conf_file=None): if 'instruments' in f_cfg_dict.keys(): cfg_dict['instruments'] = f_cfg_dict['instruments'] else: - conf_file = None + masked_conf_file = None + # need to set to None as it's being read inside Instrument if 'ontology_path' in f_cfg_dict.keys(): cfg_dict['ontology_path'] = f_cfg_dict['ontology_path'] if 'kg' in f_cfg_dict.keys(): kg_conf_dict = f_cfg_dict['kg'] else: - conf_file = None - + masked_conf_file = None - for r in kg_select(''' - ?w a ; - ?deployment_name; - ?service_name ; - ? ?work_status . - ''', kg_conf_dict): - - logger.info('found instrument service record %s', r) - cfg_dict['instruments'][r['service_name']['value']] = { - "data_server_url": f"http://{r['deployment_name']['value']}:8000", - "dummy_cache": "", - "restricted_access": False if r['work_status']['value'] == "production" else True - } + cfg_dict['kg_instruments'].update(get_instrs_from_kg(kg_conf_dict)) - return cfg_dict + return cfg_dict, masked_conf_file -config_dict = get_instr_conf(conf_file) +config_dict, masked_conf_file = get_instr_conf(conf_file) if 'ODA_ONTOLOGY_PATH' in os.environ: ontology_path = os.environ.get('ODA_ONTOLOGY_PATH') else: @@ -109,7 +115,7 @@ def instr_factory(): return Instrument(instr_name, src_query = NB2WSourceQuery.from_backend_options(backend_options, ontology_path), instrumet_query = instrument_query, - data_serve_conf_file=conf_file, + data_serve_conf_file=masked_conf_file, product_queries_list=query_list, query_dictionary=query_dict, asynch=True, @@ -119,5 +125,32 @@ def instr_factory(): instr_factory.instrument_query = instrument_query return instr_factory +class NB2WInstrumentFactoryIter: + def __init__(self, lst): + self.lst = lst + + def _update_instruments_list(self): + static_instrs = config_dict['instruments'] + kg_instrs = get_instrs_from_kg(config_dict) + + current_instrs = [x.instr_name for x in self.lst] + available_instrs = static_instrs.keys() + kg_instrs.keys() + new_instrs = set(available_instrs) - set(current_instrs) + old_instrs = set(current_instrs) - set(available_instrs) + + if old_instrs: + for instr in old_instrs: + idx = current_instrs.index(instr) + self.lst.pop(idx) + + if new_instrs: + for instr in new_instrs: + self.lst.append(factory_factory(instr, kg_instrs[instr].get('restricted_access', False))) + + def __iter__(self): + self._update_instruments_list() + return self.lst.__iter__() + instr_factory_list = [ factory_factory(instr_name, instr_conf.get('restricted_access', False)) - for instr_name, instr_conf in config_dict['instruments'].items() ] + for instr_name, instr_conf in + config_dict['instruments'].items() + config_dict['kg_instruments'].items() ] From f06af18c5e5c0474b7253d035dddd331a691ed1e Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Sun, 25 Feb 2024 19:14:05 +0100 Subject: [PATCH 13/25] fix --- dispatcher_plugin_nb2workflow/exposer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dispatcher_plugin_nb2workflow/exposer.py b/dispatcher_plugin_nb2workflow/exposer.py index 0b7fcc5..680f2b1 100644 --- a/dispatcher_plugin_nb2workflow/exposer.py +++ b/dispatcher_plugin_nb2workflow/exposer.py @@ -95,7 +95,7 @@ def get_instr_conf(from_conf_file=None): else: masked_conf_file = None - cfg_dict['kg_instruments'].update(get_instrs_from_kg(kg_conf_dict)) + cfg_dict['kg_instruments'] = get_instrs_from_kg(kg_conf_dict) return cfg_dict, masked_conf_file From dc7a1f2e5318405c5732920ec8d18b2322148f78 Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Sun, 25 Feb 2024 19:31:53 +0100 Subject: [PATCH 14/25] fix --- dispatcher_plugin_nb2workflow/exposer.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dispatcher_plugin_nb2workflow/exposer.py b/dispatcher_plugin_nb2workflow/exposer.py index 680f2b1..04000d3 100644 --- a/dispatcher_plugin_nb2workflow/exposer.py +++ b/dispatcher_plugin_nb2workflow/exposer.py @@ -35,7 +35,9 @@ def kg_select(t, kg_conf_dict): if os.path.isfile(kg_conf_dict['path']): graph.parse(kg_conf_dict['path']) else: - logger.warning("Knowledge graph file %s doesn't exist yet. No instruments information will be loaded.") + logger.warning("Knowledge graph file %s doesn't exist yet. " + "No instruments information will be loaded.", + kg_conf_dict['path']) qres = graph.query(f""" SELECT * WHERE {{ {t} @@ -52,7 +54,7 @@ def kg_select(t, kg_conf_dict): return qres_js def get_instrs_from_kg(kg_conf_dict): - instruments = [] + instruments = {} for r in kg_select(''' ?w a ; ?deployment_name; From c7f75cfacb7f588b4be02ac78bed92fc841de18c Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Sun, 25 Feb 2024 19:56:06 +0100 Subject: [PATCH 15/25] fix --- .../dataserver_dispatcher.py | 2 +- dispatcher_plugin_nb2workflow/exposer.py | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py b/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py index 9a90a28..b31b954 100644 --- a/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py +++ b/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py @@ -16,7 +16,7 @@ def __init__(self, instrument=None, param_dict=None, task=None, config=None): allowed_keys = ['restricted_access']) except: #this happens if the instrument is not found in the instrument config, which is always read from a static file - config = DataServerConf.from_conf_dict(exposer.get_instr_conf()['instruments'][iname]) + config = DataServerConf.from_conf_dict(exposer.get_instr_conf()[0]['kg_instruments'][iname]) self.data_server_url = config.data_server_url self.task = task diff --git a/dispatcher_plugin_nb2workflow/exposer.py b/dispatcher_plugin_nb2workflow/exposer.py index 04000d3..94cab00 100644 --- a/dispatcher_plugin_nb2workflow/exposer.py +++ b/dispatcher_plugin_nb2workflow/exposer.py @@ -8,13 +8,14 @@ import requests import rdflib as rdf import os +from itertools import chain import logging logger = logging.getLogger(__name__) def kg_select(t, kg_conf_dict): - if kg_conf_dict is None: + if kg_conf_dict is None or kg_conf_dict == {}: logger.info('Not using KG to get instruments') qres_js = [] elif kg_conf_dict.get('type') == 'query-service': @@ -75,10 +76,9 @@ def get_instrs_from_kg(kg_conf_dict): def get_instr_conf(from_conf_file=None): masked_conf_file = from_conf_file - # current default - query central oda kb - # TODO: better default will be some regullary updated static location - kg_conf_dict = {'type': 'query-service', - 'path': "https://www.astro.unige.ch/mmoda/dispatch-data/gw/odakb/query"} + # kg_conf_dict = {'type': 'query-service', + # 'path': "https://www.astro.unige.ch/mmoda/dispatch-data/gw/odakb/query"} + kg_conf_dict = {} cfg_dict = {'instruments': {}} if from_conf_file is not None: @@ -155,4 +155,4 @@ def __iter__(self): instr_factory_list = [ factory_factory(instr_name, instr_conf.get('restricted_access', False)) for instr_name, instr_conf in - config_dict['instruments'].items() + config_dict['kg_instruments'].items() ] + chain(config_dict['instruments'].items(), config_dict['kg_instruments'].items())] From f538ba35b352a6fba2566ec4bb038aa6f03d059c Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Sun, 25 Feb 2024 20:17:42 +0100 Subject: [PATCH 16/25] no hardcoded default kg --- tests/test_plugin.py | 70 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 56 insertions(+), 14 deletions(-) diff --git a/tests/test_plugin.py b/tests/test_plugin.py index effd0cd..75bd0e0 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -2,6 +2,8 @@ import logging import requests from oda_api.data_products import PictureProduct, ImageDataProduct +import shutil +from textwrap import dedent import time import jwt import pytest @@ -275,20 +277,6 @@ def test_image_product(dispatcher_live_fixture, mock_backend): imdata = jdata['products']['numpy_data_product_list'][0] oda_ndp = ImageDataProduct.decode(imdata) -def test_default_kg(dispatcher_live_fixture): - server = dispatcher_live_fixture - logger.info("constructed server: %s", server) - - c = requests.get(server + "/instr-list", - params = {'instrument': 'mock', - 'token': encoded_token}) - logger.info("content: %s", c.text) - jdata = c.json() - logger.info(json.dumps(jdata, indent=4, sort_keys=True)) - logger.info(jdata) - assert c.status_code == 200 - assert 'lightcurve-example' in jdata # TODO: change to what will be used in docs - @pytest.mark.parametrize("privileged", [True, False]) def test_local_kg(conf_file, dispatcher_live_fixture, privileged): with open(conf_file, 'r') as fd: @@ -739,3 +727,57 @@ def get_param_default(): finally: with open(conf_file, 'w') as fd: fd.write(conf_bk) + +def test_added_in_kg(conf_file, dispatcher_live_fixture): + with open(conf_file, 'r') as fd: + conf_bk = fd.read() + + try: + tmpkg = '/tmp/example-kg.ttl' + shutil.copy('tests/example-kg.ttl', tmpkg) + + with open(conf_file, 'w') as fd: + fd.write(config_local_kg.replace('tests', '/tmp')) + + server = dispatcher_live_fixture + logger.info("constructed server: %s", server) + + c = requests.get(server + "/reload-plugin/dispatcher_plugin_nb2workflow") + assert c.status_code == 200 + + params = {'instrument': 'mock'} + + c = requests.get(server + "/instr-list", + params = params) + logger.info("content: %s", c.text) + jdata = c.json() + logger.info(json.dumps(jdata, indent=4, sort_keys=True)) + logger.info(jdata) + assert c.status_code == 200 + assert 'kgprod' in jdata + + with open(tmpkg, 'a') as fd: + tmpkg.write(dedent(''' + a oda:Workflow, + oda:WorkflowService, + oda:workflow ; + oda:deployment_name "kgprod1-workflow-backend" ; + oda:deployment_namespace "oda-staging" ; + oda:service_name "kgprod1" ; + sdo:creativeWorkStatus "production" . + ''')) + + c = requests.get(server + "/instr-list", + params = params) + logger.info("content: %s", c.text) + jdata = c.json() + logger.info(json.dumps(jdata, indent=4, sort_keys=True)) + logger.info(jdata) + assert c.status_code == 200 + assert 'kgprod' in jdata + assert 'kgprod1' in jdata + + finally: + with open(conf_file, 'w') as fd: + fd.write(conf_bk) + os.remove(tmpkg) \ No newline at end of file From 2ba6d211a633eb08ac67cb3173afbeabbf4db221 Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Sun, 25 Feb 2024 21:51:12 +0100 Subject: [PATCH 17/25] refactor instrument config. Use NB2WInstrumentFactoryIter --- .../dataserver_dispatcher.py | 7 +- dispatcher_plugin_nb2workflow/exposer.py | 88 ++++++++++--------- tests/test_plugin.py | 53 ++++++----- 3 files changed, 80 insertions(+), 68 deletions(-) diff --git a/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py b/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py index b31b954..3733be4 100644 --- a/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py +++ b/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py @@ -11,12 +11,7 @@ class NB2WDataDispatcher: def __init__(self, instrument=None, param_dict=None, task=None, config=None): iname = instrument if isinstance(instrument, str) else instrument.name if config is None: - try: - config = DataServerConf.from_conf_dict(exposer.config_dict['instruments'][iname], - allowed_keys = ['restricted_access']) - except: - #this happens if the instrument is not found in the instrument config, which is always read from a static file - config = DataServerConf.from_conf_dict(exposer.get_instr_conf()[0]['kg_instruments'][iname]) + config = DataServerConf.from_conf_dict(exposer.combined_instrument_dict[iname]) self.data_server_url = config.data_server_url self.task = task diff --git a/dispatcher_plugin_nb2workflow/exposer.py b/dispatcher_plugin_nb2workflow/exposer.py index 94cab00..2c963b4 100644 --- a/dispatcher_plugin_nb2workflow/exposer.py +++ b/dispatcher_plugin_nb2workflow/exposer.py @@ -8,7 +8,7 @@ import requests import rdflib as rdf import os -from itertools import chain +from copy import copy import logging logger = logging.getLogger(__name__) @@ -54,35 +54,16 @@ def kg_select(t, kg_conf_dict): return qres_js -def get_instrs_from_kg(kg_conf_dict): - instruments = {} - for r in kg_select(''' - ?w a ; - ?deployment_name; - ?service_name ; - ? ?work_status . - ''', kg_conf_dict): - - logger.info('found instrument service record %s', r) - instruments[r['service_name']['value']] = { - "data_server_url": f"http://{r['deployment_name']['value']}:8000", - "dummy_cache": "", - "restricted_access": False if r['work_status']['value'] == "production" else True - } - - return instruments - - -def get_instr_conf(from_conf_file=None): - masked_conf_file = from_conf_file +def get_static_instr_conf(conf_file): + masked_conf_file = conf_file # kg_conf_dict = {'type': 'query-service', # 'path': "https://www.astro.unige.ch/mmoda/dispatch-data/gw/odakb/query"} - kg_conf_dict = {} - cfg_dict = {'instruments': {}} - if from_conf_file is not None: - with open(from_conf_file, 'r') as ymlfile: + cfg_dict = {'instruments': {}, 'kg': {}} + + if conf_file is not None: + with open(conf_file, 'r') as ymlfile: f_cfg_dict = yaml.load(ymlfile, Loader=yaml.SafeLoader) if f_cfg_dict is not None: if 'instruments' in f_cfg_dict.keys(): @@ -93,29 +74,56 @@ def get_instr_conf(from_conf_file=None): if 'ontology_path' in f_cfg_dict.keys(): cfg_dict['ontology_path'] = f_cfg_dict['ontology_path'] if 'kg' in f_cfg_dict.keys(): - kg_conf_dict = f_cfg_dict['kg'] + cfg_dict['kg'] = f_cfg_dict['kg'] else: masked_conf_file = None - - cfg_dict['kg_instruments'] = get_instrs_from_kg(kg_conf_dict) - return cfg_dict, masked_conf_file -config_dict, masked_conf_file = get_instr_conf(conf_file) +static_config_dict, masked_conf_file = get_static_instr_conf(conf_file) + if 'ODA_ONTOLOGY_PATH' in os.environ: ontology_path = os.environ.get('ODA_ONTOLOGY_PATH') else: - ontology_path = config_dict.get('ontology_path', + ontology_path = static_config_dict.get('ontology_path', 'http://odahub.io/ontology/ontology.ttl') logger.info('Using ontology from %s', ontology_path) +def get_config_dict_from_kg(kg_conf_dict=static_config_dict['kg']): + cfg_dict = {'instruments': {}} + + for r in kg_select(''' + ?w a ; + ?deployment_name; + ?service_name ; + ? ?work_status . + ''', kg_conf_dict): + + logger.info('found instrument service record %s', r) + cfg_dict['instruments'][r['service_name']['value']] = { + "data_server_url": f"http://{r['deployment_name']['value']}:8000", + "dummy_cache": "", + "restricted_access": False if r['work_status']['value'] == "production" else True + } + + return cfg_dict + +combined_instrument_dict = {} +def build_combined_instrument_dict(): + global combined_instrument_dict + combined_instrument_dict = copy(static_config_dict.get('instruments', {})) + combined_instrument_dict.update(get_config_dict_from_kg()['instruments']) + +build_combined_instrument_dict() + def factory_factory(instr_name, restricted_access): instrument_query = NB2WInstrumentQuery('instr_query', restricted_access) def instr_factory(): backend_options = NB2WDataDispatcher(instrument=instr_name).backend_options - query_list, query_dict = NB2WProductQuery.query_list_and_dict_factory(backend_options, ontology_path) + query_list, query_dict = NB2WProductQuery.query_list_and_dict_factory(backend_options, + ontology_path) return Instrument(instr_name, - src_query = NB2WSourceQuery.from_backend_options(backend_options, ontology_path), + src_query = NB2WSourceQuery.from_backend_options(backend_options, + ontology_path), instrumet_query = instrument_query, data_serve_conf_file=masked_conf_file, product_queries_list=query_list, @@ -132,11 +140,10 @@ def __init__(self, lst): self.lst = lst def _update_instruments_list(self): - static_instrs = config_dict['instruments'] - kg_instrs = get_instrs_from_kg(config_dict) + build_combined_instrument_dict() current_instrs = [x.instr_name for x in self.lst] - available_instrs = static_instrs.keys() + kg_instrs.keys() + available_instrs = combined_instrument_dict.keys() new_instrs = set(available_instrs) - set(current_instrs) old_instrs = set(current_instrs) - set(available_instrs) @@ -147,12 +154,13 @@ def _update_instruments_list(self): if new_instrs: for instr in new_instrs: - self.lst.append(factory_factory(instr, kg_instrs[instr].get('restricted_access', False))) + self.lst.append(factory_factory(instr, combined_instrument_dict[instr].get('restricted_access', False))) def __iter__(self): self._update_instruments_list() return self.lst.__iter__() instr_factory_list = [ factory_factory(instr_name, instr_conf.get('restricted_access', False)) - for instr_name, instr_conf in - chain(config_dict['instruments'].items(), config_dict['kg_instruments'].items())] + for instr_name, instr_conf in combined_instrument_dict.items()] + +instr_factory_list = NB2WInstrumentFactoryIter(instr_factory_list) diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 75bd0e0..0f9c04c 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -734,6 +734,10 @@ def test_added_in_kg(conf_file, dispatcher_live_fixture): try: tmpkg = '/tmp/example-kg.ttl' + + with open('tests/example-kg.ttl') as fd: + orig_kg = fd.read() + shutil.copy('tests/example-kg.ttl', tmpkg) with open(conf_file, 'w') as fd: @@ -742,22 +746,28 @@ def test_added_in_kg(conf_file, dispatcher_live_fixture): server = dispatcher_live_fixture logger.info("constructed server: %s", server) + # reload to read config c = requests.get(server + "/reload-plugin/dispatcher_plugin_nb2workflow") assert c.status_code == 200 - params = {'instrument': 'mock'} - - c = requests.get(server + "/instr-list", - params = params) - logger.info("content: %s", c.text) - jdata = c.json() - logger.info(json.dumps(jdata, indent=4, sort_keys=True)) - logger.info(jdata) - assert c.status_code == 200 - assert 'kgprod' in jdata + def assert_instruments(available, not_available): + params = {'instrument': 'mock'} + c = requests.get(server + "/instr-list", + params = params) + logger.info("content: %s", c.text) + jdata = c.json() + logger.info(json.dumps(jdata, indent=4, sort_keys=True)) + logger.info(jdata) + assert c.status_code == 200 + for av in available: + assert av in jdata + for nav in not_available: + assert nav not in jdata + + assert_instruments(['kgprod'], ['kgprod1']) with open(tmpkg, 'a') as fd: - tmpkg.write(dedent(''' + fd.write(dedent(''' a oda:Workflow, oda:WorkflowService, oda:workflow ; @@ -767,17 +777,16 @@ def test_added_in_kg(conf_file, dispatcher_live_fixture): sdo:creativeWorkStatus "production" . ''')) - c = requests.get(server + "/instr-list", - params = params) - logger.info("content: %s", c.text) - jdata = c.json() - logger.info(json.dumps(jdata, indent=4, sort_keys=True)) - logger.info(jdata) - assert c.status_code == 200 - assert 'kgprod' in jdata - assert 'kgprod1' in jdata - + assert_instruments(['kgprod', 'kgprod1'], []) + + with open(tmpkg, 'w') as fd: + fd.write(orig_kg) + + assert_instruments(['kgprod'], ['kgprod1']) + finally: with open(conf_file, 'w') as fd: fd.write(conf_bk) - os.remove(tmpkg) \ No newline at end of file + os.remove(tmpkg) + +# TODO: test_instrument_parameters kg-based (not in static file). (Means instrument factory is properly built.) \ No newline at end of file From 5169ebc88e584e0da99154f47831af8ad27d2e69 Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Sun, 25 Feb 2024 22:12:09 +0100 Subject: [PATCH 18/25] updated default kg test --- dispatcher_plugin_nb2workflow/exposer.py | 10 ++++---- tests/test_plugin.py | 32 ++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/dispatcher_plugin_nb2workflow/exposer.py b/dispatcher_plugin_nb2workflow/exposer.py index 2c963b4..c72a72e 100644 --- a/dispatcher_plugin_nb2workflow/exposer.py +++ b/dispatcher_plugin_nb2workflow/exposer.py @@ -92,11 +92,11 @@ def get_config_dict_from_kg(kg_conf_dict=static_config_dict['kg']): cfg_dict = {'instruments': {}} for r in kg_select(''' - ?w a ; - ?deployment_name; - ?service_name ; - ? ?work_status . - ''', kg_conf_dict): + ?w a ; + ?deployment_name; + ?service_name ; + ? ?work_status . + ''', kg_conf_dict): logger.info('found instrument service record %s', r) cfg_dict['instruments'][r['service_name']['value']] = { diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 0f9c04c..86e508b 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -277,6 +277,38 @@ def test_image_product(dispatcher_live_fixture, mock_backend): imdata = jdata['products']['numpy_data_product_list'][0] oda_ndp = ImageDataProduct.decode(imdata) +def test_external_service_kg(conf_file, dispatcher_live_fixture): + with open(conf_file, 'r') as fd: + conf_bk = fd.read() + + try: + with open(conf_file, 'w') as fd: + fd.write(dedent(""" + kg: + type: "query-service" + path: "https://www.astro.unige.ch/mmoda/dispatch-data/gw/odakb/query" + """)) + + server = dispatcher_live_fixture + logger.info("constructed server: %s", server) + c = requests.get(server + "/reload-plugin/dispatcher_plugin_nb2workflow") + assert c.status_code == 200 + + c = requests.get(server + "/instr-list", + params = {'instrument': 'mock', + 'token': encoded_token}) + logger.info("content: %s", c.text) + jdata = c.json() + logger.info(json.dumps(jdata, indent=4, sort_keys=True)) + logger.info(jdata) + assert c.status_code == 200 + assert 'lightcurve-example' in jdata # TODO: change to what will be used in docs + + finally: + with open(conf_file, 'w') as fd: + fd.write(conf_bk) + + @pytest.mark.parametrize("privileged", [True, False]) def test_local_kg(conf_file, dispatcher_live_fixture, privileged): with open(conf_file, 'r') as fd: From b84bfe49b426dc2c7ea9d1a8367ffd477137823e Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Sun, 25 Feb 2024 22:37:19 +0100 Subject: [PATCH 19/25] test_kg_based_instrument_parameters --- tests/conftest.py | 4 ++-- tests/test_plugin.py | 57 +++++++++++++++++++++++++++++++++++++++----- 2 files changed, 53 insertions(+), 8 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 160f942..b039dfe 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -19,7 +19,7 @@ config_one_instrument = """ instruments: example0: - data_server_url: http://localhost:9494 + data_server_url: http://localhost:8000 dummy_cache: "" """ @@ -40,7 +40,7 @@ def get_backend_status(): @pytest.fixture(scope="session") def httpserver_listen_address(): - return ("127.0.0.1", 9494) + return ("127.0.0.1", 8000) def lightcurve_handler(request: Request): diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 86e508b..9ec5d55 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -20,7 +20,7 @@ config_two_instruments = """ instruments: example0: - data_server_url: http://localhost:9494 + data_server_url: http://localhost:8000 dummy_cache: "" example1: data_server_url: http://localhost:9595 @@ -800,11 +800,8 @@ def assert_instruments(available, not_available): with open(tmpkg, 'a') as fd: fd.write(dedent(''' - a oda:Workflow, - oda:WorkflowService, - oda:workflow ; + a oda:WorkflowService; oda:deployment_name "kgprod1-workflow-backend" ; - oda:deployment_namespace "oda-staging" ; oda:service_name "kgprod1" ; sdo:creativeWorkStatus "production" . ''')) @@ -821,4 +818,52 @@ def assert_instruments(available, not_available): fd.write(conf_bk) os.remove(tmpkg) -# TODO: test_instrument_parameters kg-based (not in static file). (Means instrument factory is properly built.) \ No newline at end of file +def test_kg_based_instrument_parameters(conf_file, dispatcher_live_fixture, caplog, mock_backend): + with open(conf_file, 'r') as fd: + conf_bk = fd.read() + + try: + tmpkg = '/tmp/example-kg.ttl' + + with open(conf_file, 'w') as fd: + fd.write(dedent(f""" + kg: + type: "file" + path: "{tmpkg}" + """ + )) + + with open(tmpkg, 'w') as fd: + fd.write(dedent(""" + @prefix oda: . + @prefix sdo: . + + a oda:WorkflowService; + oda:deployment_name "localhost" ; + oda:service_name "example0" ; + sdo:creativeWorkStatus "production" . + """)) + + server = dispatcher_live_fixture + logger.info("constructed server: %s", server) + + # reload to read config + c = requests.get(server + "/reload-plugin/dispatcher_plugin_nb2workflow") + assert c.status_code == 200 + + c = requests.get(server + "/api/par-names", + params = {'instrument': 'example0'}) + logger.info("content: %s", c.text) + jdata = c.json() + logger.info(json.dumps(jdata, indent=4, sort_keys=True)) + logger.info(jdata) + assert c.status_code == 200 + assert sorted(jdata) == sorted(expected_arguments) + assert "will be discarded for the instantiation" not in caplog.text + assert "Possibly a programming error" not in caplog.text + + + finally: + with open(conf_file, 'w') as fd: + fd.write(conf_bk) + os.remove(tmpkg) From 7d752a8729c2835dcf6a2ecdaade23f6bdcfc8d3 Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Sun, 25 Feb 2024 23:02:59 +0100 Subject: [PATCH 20/25] token parameter renaming --- dispatcher_plugin_nb2workflow/queries.py | 2 +- tests/example_nb/echo.ipynb | 4 +++- tests/test_plugin.py | 9 ++++++--- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/dispatcher_plugin_nb2workflow/queries.py b/dispatcher_plugin_nb2workflow/queries.py index 93ff8d2..45871ae 100644 --- a/dispatcher_plugin_nb2workflow/queries.py +++ b/dispatcher_plugin_nb2workflow/queries.py @@ -32,7 +32,7 @@ def construct_parameter_lists(backend_param_dict, ontology_path): "http://odahub.io/ontology#StartTime": "T1", "http://odahub.io/ontology#EndTime": "T2", "http://odahub.io/ontology#AstrophysicalObject": "src_name", - #"ThisNameShouldNotExist": "token" + "ThisNameShouldNotExist": "token" } par_name_substitution = {} diff --git a/tests/example_nb/echo.ipynb b/tests/example_nb/echo.ipynb index d276842..e02d25f 100644 --- a/tests/example_nb/echo.ipynb +++ b/tests/example_nb/echo.ipynb @@ -27,7 +27,8 @@ "visible_band = \"v\" # oda:PhotometricBand ; oda:allowed_value \"b\", \"g\", \"r\", \"v\"\n", "radius = 3. # oda:Angle ; oda:unit unit:arcmin\n", "energy = 50. # oda:Energy_keV ; oda:lower_limit 35 ; oda:upper_limit 800\n", - "time_instant = '2017-08-17T12:43:0.000' # oda:TimeInstantISOT" + "time_instant = '2017-08-17T12:43:0.000' # oda:TimeInstantISOT\n", + "token = \"XGDSgs2KYqHr\"" ] }, { @@ -45,6 +46,7 @@ " radius=radius,\n", " energy=energy,\n", " time_instant=time_instant,\n", + " token=token,\n", " )" ] }, diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 9ec5d55..aa376e3 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -371,9 +371,10 @@ def test_local_kg(conf_file, dispatcher_live_fixture, privileged): ({'visible_band': 'z'}, {'visible_band': 'z'}, True), - ({'energy': 1000}, {'energy': 1000}, True) + ({'energy': 1000}, {'energy': 1000}, True), + ({'token_rename': 'aZH17bvYmP0r'}, {'token': 'aZH17bvYmP0r'}, False), ]) -def test_full_stack(live_nb2service, +def test_echo_params(live_nb2service, conf_file, dispatcher_live_fixture, set_param, @@ -401,7 +402,8 @@ def test_full_stack(live_nb2service, 'radius': 3.0, 'start_time': 56000.0, 'time_instant': '2017-08-17T12:43:00.000', - 'visible_band': 'v'} + 'visible_band': 'v', + 'token': "XGDSgs2KYqHr"} request_params = {} expected_params = default_in_params.copy() @@ -867,3 +869,4 @@ def test_kg_based_instrument_parameters(conf_file, dispatcher_live_fixture, capl with open(conf_file, 'w') as fd: fd.write(conf_bk) os.remove(tmpkg) + From 7535fa2360166f14cde99f689777983b93848b33 Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Mon, 26 Feb 2024 00:19:12 +0100 Subject: [PATCH 21/25] re-request options if failed --- .../dataserver_dispatcher.py | 32 ++++++++++++------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py b/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py index 3733be4..4b14643 100644 --- a/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py +++ b/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py @@ -25,21 +25,29 @@ def __init__(self, instrument=None, param_dict=None, task=None, config=None): self.external_disp_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" @property - def backend_options(self): + def backend_options(self, max_trial=5, sleep_seconds=5): try: options_dict = self._backend_options except AttributeError: url = self.data_server_url.strip('/') + '/api/v1.0/options' - try: - res = requests.get("%s" % (url), params=None) - except: - return {} - if res.status_code == 200: - options_dict = res.json() - else: - return {} - raise ConnectionError(f"Backend connection failed: {res.status_code}") - # TODO: consecutive requests if failed + for i in range(max_trial): + try: + res = requests.get("%s" % (url), params=None) + + if res.status_code == 200: + options_dict = res.json() + backend_available = True + break + else: + raise RuntimeError("Backend options request failed. " + f"Exit code: {res.status_code}. " + f"Response: {res.text}") + except Exception as e: + backend_available = False + time.sleep(sleep_seconds) + if not backend_available: + raise e + self._backend_options = options_dict return options_dict @@ -54,7 +62,7 @@ def get_backend_comment(self, product): def test_communication(self, max_trial=10, sleep_s=1, logger=None): print('--> start test connection') - + query_out = QueryOutput() no_connection = True excep = Exception() From 22ba6bfad67dea328c1fb9136807856017324d72 Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Mon, 26 Feb 2024 00:32:39 +0100 Subject: [PATCH 22/25] fix undefined e --- dispatcher_plugin_nb2workflow/dataserver_dispatcher.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py b/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py index 4b14643..113d341 100644 --- a/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py +++ b/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py @@ -29,6 +29,7 @@ def backend_options(self, max_trial=5, sleep_seconds=5): try: options_dict = self._backend_options except AttributeError: + exceptions = [] url = self.data_server_url.strip('/') + '/api/v1.0/options' for i in range(max_trial): try: @@ -44,9 +45,10 @@ def backend_options(self, max_trial=5, sleep_seconds=5): f"Response: {res.text}") except Exception as e: backend_available = False + exceptions.append(e) time.sleep(sleep_seconds) if not backend_available: - raise e + raise exceptions[-1] self._backend_options = options_dict return options_dict From 7bd69f161d736a66d970228f1aaff707f4e2eff1 Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Mon, 26 Feb 2024 00:44:50 +0100 Subject: [PATCH 23/25] backend unavailable behaviour --- dispatcher_plugin_nb2workflow/dataserver_dispatcher.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py b/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py index 113d341..84508b1 100644 --- a/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py +++ b/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py @@ -6,6 +6,9 @@ from urllib.parse import urlsplit, parse_qs, urlencode import os from glob import glob +import logging + +logger = logging.getLogger() class NB2WDataDispatcher: def __init__(self, instrument=None, param_dict=None, task=None, config=None): @@ -29,7 +32,6 @@ def backend_options(self, max_trial=5, sleep_seconds=5): try: options_dict = self._backend_options except AttributeError: - exceptions = [] url = self.data_server_url.strip('/') + '/api/v1.0/options' for i in range(max_trial): try: @@ -45,10 +47,10 @@ def backend_options(self, max_trial=5, sleep_seconds=5): f"Response: {res.text}") except Exception as e: backend_available = False - exceptions.append(e) + logger.error(f"Exception while getting backend options {repr(e)}") time.sleep(sleep_seconds) if not backend_available: - raise exceptions[-1] + return {} self._backend_options = options_dict return options_dict From be6f30b1390c35c56505790ac5cb372bfb257fd0 Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Mon, 26 Feb 2024 14:15:13 +0100 Subject: [PATCH 24/25] use ontology_object argument --- dispatcher_plugin_nb2workflow/queries.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dispatcher_plugin_nb2workflow/queries.py b/dispatcher_plugin_nb2workflow/queries.py index 45871ae..520623b 100644 --- a/dispatcher_plugin_nb2workflow/queries.py +++ b/dispatcher_plugin_nb2workflow/queries.py @@ -50,7 +50,7 @@ def construct_parameter_lists(backend_param_dict, ontology_path): source_plist.append(Parameter.from_owl_uri(pval['owl_type'], value=pval['default_value'], name=default_pname, - ontology_path=onto, + ontology_object=onto, extra_ttl=pval.get("extra_ttl") )) else: @@ -62,7 +62,7 @@ def construct_parameter_lists(backend_param_dict, ontology_path): plist.append(Parameter.from_owl_uri(pval['owl_type'], value=pval['default_value'], name=cur_name, - ontology_path=onto, + ontology_object=onto, extra_ttl=pval.get("extra_ttl") )) return {'source_plist': source_plist, From 308d0591b0b14b81ba2681fb55cc2cbc18962943 Mon Sep 17 00:00:00 2001 From: Denys SAVCHENKO Date: Mon, 26 Feb 2024 14:44:27 +0100 Subject: [PATCH 25/25] use ontology object in ParameterProducts --- dispatcher_plugin_nb2workflow/products.py | 9 +++++---- dispatcher_plugin_nb2workflow/util.py | 16 ---------------- tests/test_plugin.py | 2 +- 3 files changed, 6 insertions(+), 21 deletions(-) diff --git a/dispatcher_plugin_nb2workflow/products.py b/dispatcher_plugin_nb2workflow/products.py index de45c3a..8f3cef9 100644 --- a/dispatcher_plugin_nb2workflow/products.py +++ b/dispatcher_plugin_nb2workflow/products.py @@ -5,7 +5,8 @@ from cdci_data_analysis.analysis.products import LightCurveProduct, BaseQueryProduct, ImageProduct, SpectrumProduct from cdci_data_analysis.analysis.parameters import Parameter, subclasses_recursive from oda_api.data_products import NumpyDataProduct, ODAAstropyTable, BinaryProduct, PictureProduct -from .util import AstropyTableViewParser, ParProdOntology +from .util import AstropyTableViewParser +from cdci_data_analysis.analysis.ontology import Ontology from io import StringIO from functools import lru_cache from mimetypes import guess_extension @@ -110,7 +111,7 @@ class NB2WParameterProduct(NB2WProduct): type_key = 'oda:WorkflowParameter' ontology_path = None - + def __init__(self, value, out_dir=None, @@ -132,11 +133,11 @@ def get_html_draw(self): @lru_cache def parameter_products_factory(ontology_path = None): classes = [] - onto = ParProdOntology(ontology_path) + onto = Ontology(ontology_path) for term in onto.get_parprod_terms(): classes.append(type(f"{term.split('#')[-1]}Product", (NB2WParameterProduct,), - {'type_key': term, 'ontology_path': ontology_path})) + {'type_key': term, 'ontology_object': onto})) return classes diff --git a/dispatcher_plugin_nb2workflow/util.py b/dispatcher_plugin_nb2workflow/util.py index 3efbfbc..a73c216 100644 --- a/dispatcher_plugin_nb2workflow/util.py +++ b/dispatcher_plugin_nb2workflow/util.py @@ -1,20 +1,4 @@ from html.parser import HTMLParser -from cdci_data_analysis.analysis.ontology import Ontology - -class ParProdOntology(Ontology): - def get_parprod_terms(self): - query = """ - SELECT ?s WHERE { - ?s (rdfs:subClassOf|a)* ?mid0. - ?mid0 rdfs:subClassOf* oda:DataProduct. - - ?s (rdfs:subClassOf|a)* ?mid1. - ?mid1 rdfs:subClassOf* oda:WorkflowParameter . - } - GROUP BY ?s - """ - qres = self.g.query(query) - return [str(row[0]) for row in qres] class AstropyTableViewParser(HTMLParser): def handle_starttag(self, tag, attrs): diff --git a/tests/test_plugin.py b/tests/test_plugin.py index aa376e3..8de7fc1 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -462,7 +462,7 @@ def test_parameter_output(live_nb2service, ('mrk', 'Mrk 421', 'http://odahub.io/ontology#AstrophysicalObject'), ('timeinst', 56457.0, 'http://odahub.io/ontology#TimeInstantMJD'), ('timeisot', - '2022-10-09T13:00:00.000', + '2022-10-09T13:00:00', 'http://odahub.io/ontology#TimeInstantISOT'), ('wrng', 'FOO', 'http://odahub.io/ontology#PhotometricBand')]