diff --git a/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py b/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py index 63b712b..5e804e7 100644 --- a/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py +++ b/dispatcher_plugin_nb2workflow/dataserver_dispatcher.py @@ -123,13 +123,15 @@ def get_progress_run(self, workflow_status = res_data['workflow_status'] if run_asynch else 'done' if workflow_status == 'started' or workflow_status == 'done': resroot = res_data['data'] if run_asynch and workflow_status == 'done' else res_data - jobdir = resroot['jobdir'].split('/')[-1] - trace_url = os.path.join(self.data_server_url, 'trace', jobdir, task.strip('/')) - res_trace = requests.get(trace_url) - res_trace_dict = { - 'res': res_trace, - 'progress_product': True - } + jobdir = resroot.get('jobdir', None) + if jobdir is not None: + jobdir = jobdir.split('/')[-1] + trace_url = os.path.join(self.data_server_url, 'trace', jobdir, task.strip('/')) + res_trace = requests.get(trace_url) + res_trace_dict = { + 'res': res_trace, + 'progress_product': True + } workflow_status = 'progress' if workflow_status == 'started' else workflow_status query_out.set_status(0, job_status=workflow_status) else: diff --git a/dispatcher_plugin_nb2workflow/queries.py b/dispatcher_plugin_nb2workflow/queries.py index c17473d..39f5f5f 100644 --- a/dispatcher_plugin_nb2workflow/queries.py +++ b/dispatcher_plugin_nb2workflow/queries.py @@ -137,18 +137,19 @@ def build_product_list(self, instrument, res, out_dir, api=False): if isinstance(res, dict): res_progress_product = res.get('progress_product', False) res = res.get('res', None) - res_content_type = res.headers.get('content-type', None) - if res_content_type is not None and res_content_type == 'application/json': - if 'output' in res.json().keys(): # in synchronous mode - _o_dict = res.json() + if res is not None: + res_content_type = res.headers.get('content-type', None) + if res_content_type is not None and res_content_type == 'application/json': + if 'output' in res.json().keys(): # in synchronous mode + _o_dict = res.json() + else: + _o_dict = res.json()['data'] + _output = _o_dict['output'] + prod_list = NB2WProduct.prod_list_factory(self.backend_output_dict, _output, out_dir, self.ontology_path) else: - _o_dict = res.json()['data'] - _output = _o_dict['output'] - prod_list = NB2WProduct.prod_list_factory(self.backend_output_dict, _output, out_dir, self.ontology_path) - else: - _o_text = res.content.decode() - if res_progress_product: - prod_list.append(NB2WProgressProduct(_o_text, out_dir)) + _o_text = res.content.decode() + if res_progress_product: + prod_list.append(NB2WProgressProduct(_o_text, out_dir)) return prod_list diff --git a/tests/conftest.py b/tests/conftest.py index f6d5bc5..160f942 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -23,6 +23,20 @@ dummy_cache: "" """ +backend_status_fn = "Backend-status.state" +trace_backend_status_fn = "Trace-Backend-status.state" + + +def set_backend_status(value): + open(backend_status_fn, "w").write(value) + + +def get_backend_status(): + if os.path.exists(backend_status_fn): + return open(backend_status_fn).read() + else: + return '' + @pytest.fixture(scope="session") def httpserver_listen_address(): @@ -33,16 +47,24 @@ def lightcurve_handler(request: Request): parsed_request_query = parse_qs(urlparse(request.url).query) async_request = parsed_request_query.get('_async_request', ['no']) responses_path = os.path.join(os.path.dirname(__file__), 'responses') - if async_request[0] == 'yes': - with open(os.path.join(responses_path, 'lightcurve_async.json'), 'r') as fd: - runjson_async = json.loads(fd.read()) - response_data = json.dumps(runjson_async, indent=4) - return Response(response_data, status=200, content_type='application/json') + + backend_status = get_backend_status() + + if backend_status == 'fail': + return Response("backend failure", status=500, content_type=' text/plain') + elif backend_status == 'trace_fail': + return Response('{"workflow_status": "done", "data": {}}', status=200, content_type='application/json') else: - with open(os.path.join(responses_path, 'lightcurve.json'), 'r') as fd: - runjson = json.loads(fd.read()) - response_data = json.dumps(runjson, indent=4) - return Response(response_data, status=200, content_type='application/json') + if async_request[0] == 'yes': + with open(os.path.join(responses_path, 'lightcurve_async.json'), 'r') as fd: + runjson_async = json.loads(fd.read()) + response_data = json.dumps(runjson_async, indent=4) + return Response(response_data, status=200, content_type='application/json') + else: + with open(os.path.join(responses_path, 'lightcurve.json'), 'r') as fd: + runjson = json.loads(fd.read()) + response_data = json.dumps(runjson, indent=4) + return Response(response_data, status=200, content_type='application/json') @pytest.fixture diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 86e344c..c68a135 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -10,6 +10,8 @@ import gzip import os from magic import from_buffer as mime_from_buffer +from conftest import set_backend_status + logger = logging.getLogger(__name__) @@ -172,7 +174,8 @@ def test_instrument_added(conf_file, dispatcher_live_fixture, mock_backend): def test_pass_comment(dispatcher_live_fixture, mock_backend): server = dispatcher_live_fixture logger.info("constructed server: %s", server) - + set_backend_status('') + c = requests.get(server + "/run_analysis", params = {'instrument': 'example0', 'query_status': 'new', @@ -496,6 +499,7 @@ def test_failed_nbhtml_download(live_nb2service, def test_return_progress(dispatcher_live_fixture, mock_backend, run_asynch): server = dispatcher_live_fixture logger.info("constructed server: %s", server) + set_backend_status('') params = {'instrument': 'example0', 'query_status': 'new', @@ -524,6 +528,8 @@ def test_api_return_progress(dispatcher_live_fixture, mock_backend, api): server = dispatcher_live_fixture logger.info("constructed server: %s", server) + set_backend_status('') + params = {'instrument': 'example0', 'query_status': 'new', 'query_type': 'Real', @@ -548,3 +554,51 @@ def test_api_return_progress(dispatcher_live_fixture, mock_backend, api): else: assert 'progress_product_html_output' in jdata['products'] assert jdata['products']['progress_product_html_output'][0] == test_output_html + + +def test_fail_return_progress(dispatcher_live_fixture, mock_backend): + server = dispatcher_live_fixture + logger.info("constructed server: %s", server) + + set_backend_status('fail') + + params = {'instrument': 'example0', + 'query_status': 'new', + 'query_type': 'Real', + 'product_type': 'lightcurve', + 'run_asynch': True, + 'return_progress': True} + + c = requests.get(os.path.join(server, "run_analysis"), + params=params) + logger.info("content: %s", c.text) + assert c.status_code == 200 + jdata = c.json() + logger.info(json.dumps(jdata, indent=4, sort_keys=True)) + logger.info(jdata) + assert jdata['job_status'] == 'failed' + assert jdata['exit_status']['message'] == 'connection status code: 500' + + +def test_trace_fail_return_progress(dispatcher_live_fixture, mock_backend): + server = dispatcher_live_fixture + logger.info("constructed server: %s", server) + + set_backend_status('trace_fail') + + params = {'instrument': 'example0', + 'query_status': 'new', + 'query_type': 'Real', + 'product_type': 'lightcurve', + 'run_asynch': True, + 'return_progress': True} + + c = requests.get(os.path.join(server, "run_analysis"), + params=params) + logger.info("content: %s", c.text) + assert c.status_code == 200 + jdata = c.json() + logger.info(json.dumps(jdata, indent=4, sort_keys=True)) + logger.info(jdata) + assert jdata['job_status'] == 'done' + assert 'progress_product_html_output' not in jdata['products']