diff --git a/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py b/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py index 5e804e7..84508b1 100644 --- a/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py +++ b/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py @@ -6,22 +6,19 @@ from urllib.parse import urlsplit, parse_qs, urlencode import os from glob import glob +import logging + +logger = logging.getLogger() class NB2WDataDispatcher: def __init__(self, instrument=None, param_dict=None, task=None, config=None): iname = instrument if isinstance(instrument, str) else instrument.name if config is None: - try: - config = DataServerConf.from_conf_dict(exposer.config_dict['instruments'][iname], - allowed_keys = ['restricted_access']) - except: - #this happens if the instrument is not found in the instrument config, which is always read from a static file - config = DataServerConf.from_conf_dict(exposer.get_instr_conf()['instruments'][iname]) + config = DataServerConf.from_conf_dict(exposer.combined_instrument_dict[iname]) self.data_server_url = config.data_server_url self.task = task self.param_dict = param_dict - self.backend_options = self.query_backend_options() self.external_disp_url = None if not isinstance(instrument, str): # TODO: seems this is always the case. But what if not? @@ -30,19 +27,32 @@ def __init__(self, instrument=None, param_dict=None, task=None, config=None): if parsed.scheme and parsed.netloc: self.external_disp_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" - - def query_backend_options(self): - url = self.data_server_url.strip('/') + '/api/v1.0/options' + @property + def backend_options(self, max_trial=5, sleep_seconds=5): try: - res = requests.get("%s" % (url), params=None) - except: - return {} - if res.status_code == 200: - options_dict = res.json() - else: - return {} - raise ConnectionError(f"Backend connection failed: {res.status_code}") - # TODO: consecutive requests if failed + options_dict = self._backend_options + except AttributeError: + url = self.data_server_url.strip('/') + '/api/v1.0/options' + for i in range(max_trial): + try: + res = requests.get("%s" % (url), params=None) + + if res.status_code == 200: + options_dict = res.json() + backend_available = True + break + else: + raise RuntimeError("Backend options request failed. " + f"Exit code: {res.status_code}. " + f"Response: {res.text}") + except Exception as e: + backend_available = False + logger.error(f"Exception while getting backend options {repr(e)}") + time.sleep(sleep_seconds) + if not backend_available: + return {} + + self._backend_options = options_dict return options_dict def get_backend_comment(self, product): @@ -56,7 +66,7 @@ def get_backend_comment(self, product): def test_communication(self, max_trial=10, sleep_s=1, logger=None): print('--> start test connection') - + query_out = QueryOutput() no_connection = True excep = Exception() diff --git a/dispatcher_plugin_nb2workflow/exposer.py b/dispatcher_plugin_nb2workflow/exposer.py index 4e2bb85..c72a72e 100644 --- a/dispatcher_plugin_nb2workflow/exposer.py +++ b/dispatcher_plugin_nb2workflow/exposer.py @@ -8,13 +8,14 @@ import requests import rdflib as rdf import os +from copy import copy import logging logger = logging.getLogger(__name__) def kg_select(t, kg_conf_dict): - if kg_conf_dict is None: + if kg_conf_dict is None or kg_conf_dict == {}: logger.info('Not using KG to get instruments') qres_js = [] elif kg_conf_dict.get('type') == 'query-service': @@ -35,7 +36,9 @@ def kg_select(t, kg_conf_dict): if os.path.isfile(kg_conf_dict['path']): graph.parse(kg_conf_dict['path']) else: - logger.warning("Knowledge graph file %s doesn't exist yet. No instruments information will be loaded.") + logger.warning("Knowledge graph file %s doesn't exist yet. " + "No instruments information will be loaded.", + kg_conf_dict['path']) qres = graph.query(f""" SELECT * WHERE {{ {t} @@ -50,39 +53,50 @@ def kg_select(t, kg_conf_dict): logger.warning(json.dumps(qres_js, indent=4)) return qres_js - -def get_instr_conf(from_conf_file=None): - global conf_file +def get_static_instr_conf(conf_file): + masked_conf_file = conf_file - # current default - query central oda kb - # TODO: better default will be some regullary updated static location - kg_conf_dict = {'type': 'query-service', - 'path': "https://www.astro.unige.ch/mmoda/dispatch-data/gw/odakb/query"} - cfg_dict = {'instruments': {}} + # kg_conf_dict = {'type': 'query-service', + # 'path': "https://www.astro.unige.ch/mmoda/dispatch-data/gw/odakb/query"} - if from_conf_file is not None: - with open(from_conf_file, 'r') as ymlfile: + cfg_dict = {'instruments': {}, 'kg': {}} + + if conf_file is not None: + with open(conf_file, 'r') as ymlfile: f_cfg_dict = yaml.load(ymlfile, Loader=yaml.SafeLoader) if f_cfg_dict is not None: if 'instruments' in f_cfg_dict.keys(): cfg_dict['instruments'] = f_cfg_dict['instruments'] else: - conf_file = None + masked_conf_file = None + # need to set to None as it's being read inside Instrument if 'ontology_path' in f_cfg_dict.keys(): cfg_dict['ontology_path'] = f_cfg_dict['ontology_path'] if 'kg' in f_cfg_dict.keys(): - kg_conf_dict = f_cfg_dict['kg'] + cfg_dict['kg'] = f_cfg_dict['kg'] else: - conf_file = None - + masked_conf_file = None + return cfg_dict, masked_conf_file + +static_config_dict, masked_conf_file = get_static_instr_conf(conf_file) + +if 'ODA_ONTOLOGY_PATH' in os.environ: + ontology_path = os.environ.get('ODA_ONTOLOGY_PATH') +else: + ontology_path = static_config_dict.get('ontology_path', + 'http://odahub.io/ontology/ontology.ttl') +logger.info('Using ontology from %s', ontology_path) + +def get_config_dict_from_kg(kg_conf_dict=static_config_dict['kg']): + cfg_dict = {'instruments': {}} for r in kg_select(''' - ?w a ; - ?deployment_name; - ?service_name ; - ? ?work_status . - ''', kg_conf_dict): + ?w a ; + ?deployment_name; + ?service_name ; + ? ?work_status . + ''', kg_conf_dict): logger.info('found instrument service record %s', r) cfg_dict['instruments'][r['service_name']['value']] = { @@ -93,28 +107,60 @@ def get_instr_conf(from_conf_file=None): return cfg_dict -config_dict = get_instr_conf(conf_file) -if 'ODA_ONTOLOGY_PATH' in os.environ: - ontology_path = os.environ.get('ODA_ONTOLOGY_PATH') -else: - ontology_path = config_dict.get('ontology_path', - 'http://odahub.io/ontology/ontology.ttl') -logger.info('Using ontology from %s', ontology_path) +combined_instrument_dict = {} +def build_combined_instrument_dict(): + global combined_instrument_dict + combined_instrument_dict = copy(static_config_dict.get('instruments', {})) + combined_instrument_dict.update(get_config_dict_from_kg()['instruments']) + +build_combined_instrument_dict() def factory_factory(instr_name, restricted_access): + instrument_query = NB2WInstrumentQuery('instr_query', restricted_access) def instr_factory(): - backend_options = NB2WDataDispatcher(instrument=instr_name).query_backend_options() - query_list, query_dict = NB2WProductQuery.query_list_and_dict_factory(backend_options, ontology_path) + backend_options = NB2WDataDispatcher(instrument=instr_name).backend_options + query_list, query_dict = NB2WProductQuery.query_list_and_dict_factory(backend_options, + ontology_path) return Instrument(instr_name, - src_query = NB2WSourceQuery.from_backend_options(backend_options, ontology_path), - instrumet_query = NB2WInstrumentQuery('instr_query', restricted_access), - data_serve_conf_file=conf_file, + src_query = NB2WSourceQuery.from_backend_options(backend_options, + ontology_path), + instrumet_query = instrument_query, + data_serve_conf_file=masked_conf_file, product_queries_list=query_list, query_dictionary=query_dict, asynch=True, data_server_query_class=NB2WDataDispatcher, ) + instr_factory.instr_name = instr_name + instr_factory.instrument_query = instrument_query return instr_factory +class NB2WInstrumentFactoryIter: + def __init__(self, lst): + self.lst = lst + + def _update_instruments_list(self): + build_combined_instrument_dict() + + current_instrs = [x.instr_name for x in self.lst] + available_instrs = combined_instrument_dict.keys() + new_instrs = set(available_instrs) - set(current_instrs) + old_instrs = set(current_instrs) - set(available_instrs) + + if old_instrs: + for instr in old_instrs: + idx = current_instrs.index(instr) + self.lst.pop(idx) + + if new_instrs: + for instr in new_instrs: + self.lst.append(factory_factory(instr, combined_instrument_dict[instr].get('restricted_access', False))) + + def __iter__(self): + self._update_instruments_list() + return self.lst.__iter__() + instr_factory_list = [ factory_factory(instr_name, instr_conf.get('restricted_access', False)) - for instr_name, instr_conf in config_dict['instruments'].items() ] + for instr_name, instr_conf in combined_instrument_dict.items()] + +instr_factory_list = NB2WInstrumentFactoryIter(instr_factory_list) diff --git a/dispatcher_plugin_nb2workflow/products.py b/dispatcher_plugin_nb2workflow/products.py index de45c3a..8f3cef9 100644 --- a/dispatcher_plugin_nb2workflow/products.py +++ b/dispatcher_plugin_nb2workflow/products.py @@ -5,7 +5,8 @@ from cdci_data_analysis.analysis.products import LightCurveProduct, BaseQueryProduct, ImageProduct, SpectrumProduct from cdci_data_analysis.analysis.parameters import Parameter, subclasses_recursive from oda_api.data_products import NumpyDataProduct, ODAAstropyTable, BinaryProduct, PictureProduct -from .util import AstropyTableViewParser, ParProdOntology +from .util import AstropyTableViewParser +from cdci_data_analysis.analysis.ontology import Ontology from io import StringIO from functools import lru_cache from mimetypes import guess_extension @@ -110,7 +111,7 @@ class NB2WParameterProduct(NB2WProduct): type_key = 'oda:WorkflowParameter' ontology_path = None - + def __init__(self, value, out_dir=None, @@ -132,11 +133,11 @@ def get_html_draw(self): @lru_cache def parameter_products_factory(ontology_path = None): classes = [] - onto = ParProdOntology(ontology_path) + onto = Ontology(ontology_path) for term in onto.get_parprod_terms(): classes.append(type(f"{term.split('#')[-1]}Product", (NB2WParameterProduct,), - {'type_key': term, 'ontology_path': ontology_path})) + {'type_key': term, 'ontology_object': onto})) return classes diff --git a/dispatcher_plugin_nb2workflow/queries.py b/dispatcher_plugin_nb2workflow/queries.py index ebd7fef..520623b 100644 --- a/dispatcher_plugin_nb2workflow/queries.py +++ b/dispatcher_plugin_nb2workflow/queries.py @@ -10,19 +10,20 @@ from .dataserver_dispatcher import NB2WDataDispatcher from cdci_data_analysis.analysis.ontology import Ontology import os -from functools import lru_cache +from functools import lru_cache, wraps from json import dumps +from copy import deepcopy class HashableDict(dict): def __hash__(self): - return hash(dumps(self)) + return hash(dumps(self, sort_keys=True)) def with_hashable_dict(func): + @wraps(func) def wrapper(backend_param_dict, ontology_path): return func(HashableDict(backend_param_dict), ontology_path) return wrapper - @with_hashable_dict @lru_cache def construct_parameter_lists(backend_param_dict, ontology_path): @@ -30,7 +31,9 @@ def construct_parameter_lists(backend_param_dict, ontology_path): "http://odahub.io/ontology#PointOfInterestDEC": "DEC", "http://odahub.io/ontology#StartTime": "T1", "http://odahub.io/ontology#EndTime": "T2", - "http://odahub.io/ontology#AstrophysicalObject": "src_name"} + "http://odahub.io/ontology#AstrophysicalObject": "src_name", + "ThisNameShouldNotExist": "token" + } par_name_substitution = {} plist = [] @@ -38,7 +41,7 @@ def construct_parameter_lists(backend_param_dict, ontology_path): for pname, pval in backend_param_dict.items(): onto = Ontology(ontology_path) if pval.get("extra_ttl"): - onto.parse_extra_triples(pval.get("extra_ttl")) + onto.parse_extra_triples(pval.get("extra_ttl"), parse_oda_annotations = False) onto_class_hierarchy = onto.get_parameter_hierarchy(pval['owl_type']) src_query_owl_uri_set = set(onto_class_hierarchy).intersection(src_query_pars_uris.keys()) if src_query_owl_uri_set: @@ -47,7 +50,7 @@ def construct_parameter_lists(backend_param_dict, ontology_path): source_plist.append(Parameter.from_owl_uri(pval['owl_type'], value=pval['default_value'], name=default_pname, - ontology_path=ontology_path, + ontology_object=onto, extra_ttl=pval.get("extra_ttl") )) else: @@ -59,13 +62,13 @@ def construct_parameter_lists(backend_param_dict, ontology_path): plist.append(Parameter.from_owl_uri(pval['owl_type'], value=pval['default_value'], name=cur_name, - ontology_path=ontology_path, + ontology_object=onto, extra_ttl=pval.get("extra_ttl") )) - return {'source_plist': source_plist, 'prod_plist': plist, - 'par_name_substitution': par_name_substitution} + 'par_name_substitution': par_name_substitution + } class NB2WSourceQuery(BaseQuery): @classmethod @@ -76,7 +79,7 @@ def from_backend_options(cls, backend_options, ontology_path): parameters_dict = {} for product_name in product_names: backend_param_dict = backend_options[product_name]['parameters'] - prod_source_plist = construct_parameter_lists(backend_param_dict, ontology_path)['source_plist'] + prod_source_plist = deepcopy(construct_parameter_lists(backend_param_dict, ontology_path)['source_plist']) for par in prod_source_plist: parameters_dict[par.name] = par parameters_list = list(parameters_dict.values()) @@ -89,7 +92,7 @@ def __init__(self, name, backend_product_name, backend_param_dict, backend_outpu self.backend_output_dict = backend_output_dict parameter_lists = construct_parameter_lists(backend_param_dict, ontology_path) self.par_name_substitution = parameter_lists['par_name_substitution'] - plist = parameter_lists['prod_plist'] + plist = deepcopy(parameter_lists['prod_plist']) self.ontology_path = ontology_path super().__init__(name, parameters_list = plist) diff --git a/dispatcher_plugin_nb2workflow/util.py b/dispatcher_plugin_nb2workflow/util.py index 3efbfbc..a73c216 100644 --- a/dispatcher_plugin_nb2workflow/util.py +++ b/dispatcher_plugin_nb2workflow/util.py @@ -1,20 +1,4 @@ from html.parser import HTMLParser -from cdci_data_analysis.analysis.ontology import Ontology - -class ParProdOntology(Ontology): - def get_parprod_terms(self): - query = """ - SELECT ?s WHERE { - ?s (rdfs:subClassOf|a)* ?mid0. - ?mid0 rdfs:subClassOf* oda:DataProduct. - - ?s (rdfs:subClassOf|a)* ?mid1. - ?mid1 rdfs:subClassOf* oda:WorkflowParameter . - } - GROUP BY ?s - """ - qres = self.g.query(query) - return [str(row[0]) for row in qres] class AstropyTableViewParser(HTMLParser): def handle_starttag(self, tag, attrs): diff --git a/test-requirements.txt b/test-requirements.txt index 83e187c..3d9cd3f 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,3 +1,3 @@ git+https://github.com/oda-hub/nb2workflow.git@master#egg=nb2workflow[service,rdf] git+https://github.com/oda-hub/oda_api.git@master#egg=oda_api -git+https://github.com/oda-hub/dispatcher-app.git@master#egg=cdci_data_analysis +git+https://github.com/oda-hub/dispatcher-app.git@from-owl-signature#egg=cdci_data_analysis diff --git a/tests/conftest.py b/tests/conftest.py index 160f942..b039dfe 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -19,7 +19,7 @@ config_one_instrument = """ instruments: example0: - data_server_url: http://localhost:9494 + data_server_url: http://localhost:8000 dummy_cache: "" """ @@ -40,7 +40,7 @@ def get_backend_status(): @pytest.fixture(scope="session") def httpserver_listen_address(): - return ("127.0.0.1", 9494) + return ("127.0.0.1", 8000) def lightcurve_handler(request: Request): diff --git a/tests/example_nb/echo.ipynb b/tests/example_nb/echo.ipynb index d276842..e02d25f 100644 --- a/tests/example_nb/echo.ipynb +++ b/tests/example_nb/echo.ipynb @@ -27,7 +27,8 @@ "visible_band = \"v\" # oda:PhotometricBand ; oda:allowed_value \"b\", \"g\", \"r\", \"v\"\n", "radius = 3. # oda:Angle ; oda:unit unit:arcmin\n", "energy = 50. # oda:Energy_keV ; oda:lower_limit 35 ; oda:upper_limit 800\n", - "time_instant = '2017-08-17T12:43:0.000' # oda:TimeInstantISOT" + "time_instant = '2017-08-17T12:43:0.000' # oda:TimeInstantISOT\n", + "token = \"XGDSgs2KYqHr\"" ] }, { @@ -45,6 +46,7 @@ " radius=radius,\n", " energy=energy,\n", " time_instant=time_instant,\n", + " token=token,\n", " )" ] }, diff --git a/tests/test_plugin.py b/tests/test_plugin.py index bea7c45..8de7fc1 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -2,6 +2,8 @@ import logging import requests from oda_api.data_products import PictureProduct, ImageDataProduct +import shutil +from textwrap import dedent import time import jwt import pytest @@ -18,7 +20,7 @@ config_two_instruments = """ instruments: example0: - data_server_url: http://localhost:9494 + data_server_url: http://localhost:8000 dummy_cache: "" example1: data_server_url: http://localhost:9595 @@ -275,20 +277,38 @@ def test_image_product(dispatcher_live_fixture, mock_backend): imdata = jdata['products']['numpy_data_product_list'][0] oda_ndp = ImageDataProduct.decode(imdata) -def test_default_kg(dispatcher_live_fixture): - server = dispatcher_live_fixture - logger.info("constructed server: %s", server) - - c = requests.get(server + "/instr-list", - params = {'instrument': 'mock', - 'token': encoded_token}) - logger.info("content: %s", c.text) - jdata = c.json() - logger.info(json.dumps(jdata, indent=4, sort_keys=True)) - logger.info(jdata) - assert c.status_code == 200 - assert 'lightcurve-example' in jdata # TODO: change to what will be used in docs - +def test_external_service_kg(conf_file, dispatcher_live_fixture): + with open(conf_file, 'r') as fd: + conf_bk = fd.read() + + try: + with open(conf_file, 'w') as fd: + fd.write(dedent(""" + kg: + type: "query-service" + path: "https://www.astro.unige.ch/mmoda/dispatch-data/gw/odakb/query" + """)) + + server = dispatcher_live_fixture + logger.info("constructed server: %s", server) + c = requests.get(server + "/reload-plugin/dispatcher_plugin_nb2workflow") + assert c.status_code == 200 + + c = requests.get(server + "/instr-list", + params = {'instrument': 'mock', + 'token': encoded_token}) + logger.info("content: %s", c.text) + jdata = c.json() + logger.info(json.dumps(jdata, indent=4, sort_keys=True)) + logger.info(jdata) + assert c.status_code == 200 + assert 'lightcurve-example' in jdata # TODO: change to what will be used in docs + + finally: + with open(conf_file, 'w') as fd: + fd.write(conf_bk) + + @pytest.mark.parametrize("privileged", [True, False]) def test_local_kg(conf_file, dispatcher_live_fixture, privileged): with open(conf_file, 'r') as fd: @@ -351,9 +371,10 @@ def test_local_kg(conf_file, dispatcher_live_fixture, privileged): ({'visible_band': 'z'}, {'visible_band': 'z'}, True), - ({'energy': 1000}, {'energy': 1000}, True) + ({'energy': 1000}, {'energy': 1000}, True), + ({'token_rename': 'aZH17bvYmP0r'}, {'token': 'aZH17bvYmP0r'}, False), ]) -def test_full_stack(live_nb2service, +def test_echo_params(live_nb2service, conf_file, dispatcher_live_fixture, set_param, @@ -381,7 +402,8 @@ def test_full_stack(live_nb2service, 'radius': 3.0, 'start_time': 56000.0, 'time_instant': '2017-08-17T12:43:00.000', - 'visible_band': 'v'} + 'visible_band': 'v', + 'token': "XGDSgs2KYqHr"} request_params = {} expected_params = default_in_params.copy() @@ -440,7 +462,7 @@ def test_parameter_output(live_nb2service, ('mrk', 'Mrk 421', 'http://odahub.io/ontology#AstrophysicalObject'), ('timeinst', 56457.0, 'http://odahub.io/ontology#TimeInstantMJD'), ('timeisot', - '2022-10-09T13:00:00.000', + '2022-10-09T13:00:00', 'http://odahub.io/ontology#TimeInstantISOT'), ('wrng', 'FOO', 'http://odahub.io/ontology#PhotometricBand')] @@ -638,3 +660,213 @@ def test_trace_fail_return_progress(dispatcher_live_fixture, mock_backend): logger.info(jdata) assert jdata['job_status'] == 'done' assert 'progress_product_html_output' not in jdata['products'] + + +def test_default_value_preservation(dispatcher_live_fixture, mock_backend): + server = dispatcher_live_fixture + logger.info("constructed server: %s", server) + + def get_param_default(): + c = requests.get(server + "/api/meta-data", + params = {'instrument': 'example0'}) + assert c.status_code == 200 + logger.info("content: %s", c.text) + jdata = c.json() + logger.info(jdata) + + for x in jdata[0]: + if isinstance(x, dict): + continue + elif isinstance(x, list): + # if we finally decide to output it non-encoded at some point + pass + else: + x = json.loads(x) + + if {"query_name": "table_query"} in x: + some_param_value = x[2]['value'] + return some_param_value + + some_param_value = get_param_default() + + params = {'instrument': 'example0', + 'query_status': 'new', + 'query_type': 'Real', + 'product_type': 'table', + 'some_param': 5, + 'run_asynch': False} + c = requests.get(server + "/run_analysis", + params = params) + assert c.status_code == 200 + + new_param_value = get_param_default() + + assert new_param_value == some_param_value + +@pytest.mark.fullstack +def test_structured_default_value_preservation(live_nb2service, + conf_file, + dispatcher_live_fixture): + with open(conf_file, 'r') as fd: + conf_bk = fd.read() + + try: + with open(conf_file, 'w') as fd: + fd.write( config_real_nb2service % live_nb2service ) + + server = dispatcher_live_fixture + logger.info("constructed server: %s", server) + + #ensure new conf file readed + c = requests.get(server + "/reload-plugin/dispatcher_plugin_nb2workflow") + assert c.status_code == 200 + + def get_param_default(): + c = requests.get(server + "/api/meta-data", + params = {'instrument': 'example'}) + assert c.status_code == 200 + logger.info("content: %s", c.text) + jdata = c.json() + logger.info(jdata) + + for x in jdata[0]: + if isinstance(x, dict): + continue + elif isinstance(x, list): + # if we finally decide to output it non-encoded at some point + pass + else: + x = json.loads(x) + + if {"query_name": "structured_query"} in x: + param_value = x[2]['value'] + return param_value + + struct_par_value = get_param_default() + + params = {'instrument': 'example', + 'query_status': 'new', + 'query_type': 'Real', + 'product_type': 'structured', + 'struct_par': '{"col4": ["spam", "ham"]}', + 'run_asynch': False} + c = requests.get(server + "/run_analysis", + params = params) + assert c.status_code == 200 + + new_param_value = get_param_default() + + assert new_param_value == struct_par_value + + finally: + with open(conf_file, 'w') as fd: + fd.write(conf_bk) + +def test_added_in_kg(conf_file, dispatcher_live_fixture): + with open(conf_file, 'r') as fd: + conf_bk = fd.read() + + try: + tmpkg = '/tmp/example-kg.ttl' + + with open('tests/example-kg.ttl') as fd: + orig_kg = fd.read() + + shutil.copy('tests/example-kg.ttl', tmpkg) + + with open(conf_file, 'w') as fd: + fd.write(config_local_kg.replace('tests', '/tmp')) + + server = dispatcher_live_fixture + logger.info("constructed server: %s", server) + + # reload to read config + c = requests.get(server + "/reload-plugin/dispatcher_plugin_nb2workflow") + assert c.status_code == 200 + + def assert_instruments(available, not_available): + params = {'instrument': 'mock'} + c = requests.get(server + "/instr-list", + params = params) + logger.info("content: %s", c.text) + jdata = c.json() + logger.info(json.dumps(jdata, indent=4, sort_keys=True)) + logger.info(jdata) + assert c.status_code == 200 + for av in available: + assert av in jdata + for nav in not_available: + assert nav not in jdata + + assert_instruments(['kgprod'], ['kgprod1']) + + with open(tmpkg, 'a') as fd: + fd.write(dedent(''' + a oda:WorkflowService; + oda:deployment_name "kgprod1-workflow-backend" ; + oda:service_name "kgprod1" ; + sdo:creativeWorkStatus "production" . + ''')) + + assert_instruments(['kgprod', 'kgprod1'], []) + + with open(tmpkg, 'w') as fd: + fd.write(orig_kg) + + assert_instruments(['kgprod'], ['kgprod1']) + + finally: + with open(conf_file, 'w') as fd: + fd.write(conf_bk) + os.remove(tmpkg) + +def test_kg_based_instrument_parameters(conf_file, dispatcher_live_fixture, caplog, mock_backend): + with open(conf_file, 'r') as fd: + conf_bk = fd.read() + + try: + tmpkg = '/tmp/example-kg.ttl' + + with open(conf_file, 'w') as fd: + fd.write(dedent(f""" + kg: + type: "file" + path: "{tmpkg}" + """ + )) + + with open(tmpkg, 'w') as fd: + fd.write(dedent(""" + @prefix oda: . + @prefix sdo: . + + a oda:WorkflowService; + oda:deployment_name "localhost" ; + oda:service_name "example0" ; + sdo:creativeWorkStatus "production" . + """)) + + server = dispatcher_live_fixture + logger.info("constructed server: %s", server) + + # reload to read config + c = requests.get(server + "/reload-plugin/dispatcher_plugin_nb2workflow") + assert c.status_code == 200 + + c = requests.get(server + "/api/par-names", + params = {'instrument': 'example0'}) + logger.info("content: %s", c.text) + jdata = c.json() + logger.info(json.dumps(jdata, indent=4, sort_keys=True)) + logger.info(jdata) + assert c.status_code == 200 + assert sorted(jdata) == sorted(expected_arguments) + assert "will be discarded for the instantiation" not in caplog.text + assert "Possibly a programming error" not in caplog.text + + + finally: + with open(conf_file, 'w') as fd: + fd.write(conf_bk) + os.remove(tmpkg) +