Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include return progress arg #76

Merged
merged 41 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
a299d0a
ignore local venv from pycharm
burnout87 Nov 29, 2023
b20e995
ignore local venv from pycharm
burnout87 Nov 29, 2023
54d4e34
ignore local venv from pycharm
burnout87 Nov 29, 2023
9410dd8
empty get_progress_run
burnout87 Nov 29, 2023
1f8e643
dedicated test
burnout87 Nov 29, 2023
ef0b0de
reverted test-requires
burnout87 Nov 29, 2023
d333c8c
test_return_progress
burnout87 Nov 29, 2023
c5ea387
setting path json files
burnout87 Nov 29, 2023
51cc8fc
get_progress_run requests workflow status and then the output content
burnout87 Dec 20, 2023
87d69de
returning trace endpoint content
burnout87 Dec 21, 2023
fde637d
test html output
burnout87 Dec 22, 2023
cd288bb
NB2WProgressProduct product type
burnout87 Dec 22, 2023
64e6923
bug fix
burnout87 Dec 22, 2023
d9713cf
extended test
burnout87 Dec 22, 2023
1a97e1d
checking content type to build product list
burnout87 Dec 22, 2023
866a06b
properly extracting workflow_status
burnout87 Dec 22, 2023
91e6301
extended test
burnout87 Dec 22, 2023
4ab2afb
path join
burnout87 Jan 3, 2024
908cda3
not needed responses
burnout87 Jan 3, 2024
3b1f104
no kwargs for args get_progress_run
burnout87 Jan 3, 2024
e6a4896
added logging arg for compatibility
burnout87 Jan 4, 2024
81e55b0
added logging arg for compatibility
burnout87 Jan 4, 2024
65f9ce0
better var naming
burnout87 Jan 4, 2024
231a011
extract jobdir only if status is not is done or started
burnout87 Jan 4, 2024
cc6d354
removed type_key for NB2WProgressProduct
burnout87 Jan 4, 2024
8b7921e
prod_list_factory adjustment
burnout87 Jan 4, 2024
b0d7a3d
extracting more response info from res in build_product_list
burnout87 Jan 4, 2024
c34a044
re-inserted call to get_html_draw
burnout87 Jan 4, 2024
c3e4722
not calling right arg
burnout87 Jan 4, 2024
4d86e18
adapted test
burnout87 Jan 4, 2024
5ba4f27
no need for write and get_html_draw for NB2WProgressProduct
burnout87 Jan 4, 2024
979a79d
test name
burnout87 Jan 4, 2024
3d3d460
removed unused var
burnout87 Jan 10, 2024
dac3a92
Update dispatcher_plugin_nb2workflow/dataserver_dispatcher.py
burnout87 Jan 10, 2024
98bdcca
not needed check
burnout87 Jan 10, 2024
3334125
build_product_list conditions commented
burnout87 Jan 10, 2024
e24247f
logging in case of error from backend
burnout87 Jan 10, 2024
9c07b19
passing the correct param for sentry
burnout87 Jan 10, 2024
5718cab
better exception handling
burnout87 Jan 10, 2024
4030c7c
better exception handling
burnout87 Jan 10, 2024
aa9fea8
not needed import
burnout87 Jan 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ dispatcher_plugin_nb2workflow/config_dir/data_server_conf.yml
.env
**.nb2workflow**
function.xml
download_*
download_*
/venv
/.idea
/build
47 changes: 47 additions & 0 deletions dispatcher_plugin_nb2workflow/dataserver_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,53 @@ def test_has_input_products(self, instrument, logger=None):
query_out.set_done('input products check skipped')
return query_out, []

def get_progress_run(self,
call_back_url=None,
run_asynch=None,
logger=None,
task=None,
param_dict=None):

query_out = QueryOutput()
res_trace_dict = None

if task is None:
task = self.task
if param_dict is None:
param_dict = self.param_dict
if run_asynch and call_back_url is not None:
param_dict['_async_request_callback'] = call_back_url
param_dict['_async_request'] = "yes"

url = os.path.join(self.data_server_url, 'api/v1.0/get', task.strip('/'))
res = requests.get(url, params=param_dict)
if res.status_code in [200, 201]:
res_data = res.json()
workflow_status = res_data['workflow_status'] if run_asynch else 'done'
if workflow_status == 'started' or workflow_status == 'done':
resroot = res_data['data'] if run_asynch and workflow_status == 'done' else res_data
jobdir = resroot['jobdir'].split('/')[-1]
trace_url = os.path.join(self.data_server_url, 'trace', jobdir, task.strip('/'))
res_trace = requests.get(trace_url)
res_trace_dict = {
'res': res_trace,
'progress_product': True
}

query_out.set_status(0, job_status=workflow_status)
else:
if 'application/json' in res.headers.get('content-type', ''):
e_message = res.json()['exceptions'][0]
else:
e_message = res.text
query_out.set_failed('Error in the backend',
message='connection status code: ' + str(res.status_code),
e_message=e_message)
logger.error(f'Error in the backend, connection status code: {str(res.status_code)}. '
f'error: \n{e_message}')

return res_trace_dict, query_out

def run_query(self,
call_back_url = None,
run_asynch = True,
Expand Down
25 changes: 17 additions & 8 deletions dispatcher_plugin_nb2workflow/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,25 @@ def _init_as_list(cls, encoded_data, *args, **kwargs):
@classmethod
def prod_list_factory(cls, output_description_dict, output, out_dir = None, ontology_path = None):
par_prod_classes = parameter_products_factory(ontology_path)
mapping = {x.type_key: x for x in cls.__subclasses__() + par_prod_classes}

mapping = {x.type_key: x for x in cls.__subclasses__() + par_prod_classes if hasattr(x, 'type_key')}

prod_list = []
for key in output_description_dict.keys():
owl_type = output_description_dict[key]['owl_type']

extra_kw = {}
extra_ttl = output_description_dict[key].get('extra_ttl')
if extra_ttl == '\n': extra_ttl = None
if extra_ttl == '\n': extra_ttl = None
if extra_ttl:
extra_kw = {'extra_ttl': extra_ttl}

try:
prod_list.extend( mapping.get(owl_type, cls)._init_as_list(output[key],
out_dir=out_dir,
name=key,
prod_list.extend( mapping.get(owl_type, cls)._init_as_list(output[key],
out_dir=out_dir,
name=key,
**extra_kw
)
)
burnout87 marked this conversation as resolved.
Show resolved Hide resolved
)
except Exception as e:
logger.warning('unable to construct %s product: %s from this: %s ', key, e, output[key])
Expand Down Expand Up @@ -174,6 +174,15 @@ def write(self):
def get_html_draw(self):
return {'image': {'div': '<br>'+self.data_prod, 'script': ''} }


class NB2WProgressProduct(NB2WProduct):

def __init__(self, progress_html_data, out_dir=None, name='progress'):
self.out_dir = out_dir
self.name = name
self.progress_data = progress_html_data


class NB2WPictureProduct(NB2WProduct):
type_key = 'http://odahub.io/ontology#ODAPictureProduct'

Expand Down
121 changes: 76 additions & 45 deletions dispatcher_plugin_nb2workflow/queries.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from cdci_data_analysis.analysis.queries import ProductQuery, QueryOutput, BaseQuery, InstrumentQuery
from cdci_data_analysis.analysis.parameters import Parameter, Name
from .products import (NB2WProduct,
NB2WAstropyTableProduct,
NB2WBinaryProduct,
NB2WPictureProduct,
NB2WTextProduct,
NB2WParameterProduct)
from .products import (NB2WProduct,
NB2WAstropyTableProduct,
NB2WBinaryProduct,
NB2WPictureProduct,
NB2WTextProduct,
NB2WParameterProduct,
NB2WProgressProduct)
from .dataserver_dispatcher import NB2WDataDispatcher
from cdci_data_analysis.analysis.ontology import Ontology
import os
Expand All @@ -31,7 +32,7 @@ def construct_parameter_lists(backend_param_dict, ontology_path):
"http://odahub.io/ontology#EndTime": "T2",
"http://odahub.io/ontology#AstrophysicalObject": "src_name"}
par_name_substitution = {}

plist = []
source_plist = []
for pname, pval in backend_param_dict.items():
Expand All @@ -43,8 +44,8 @@ def construct_parameter_lists(backend_param_dict, ontology_path):
if src_query_owl_uri_set:
default_pname = src_query_pars_uris[src_query_owl_uri_set.pop()]
par_name_substitution[ default_pname ] = pname
source_plist.append(Parameter.from_owl_uri(pval['owl_type'],
value=pval['default_value'],
source_plist.append(Parameter.from_owl_uri(pval['owl_type'],
value=pval['default_value'],
name=default_pname,
ontology_path=ontology_path,
extra_ttl=pval.get("extra_ttl")
Expand All @@ -55,13 +56,13 @@ def construct_parameter_lists(backend_param_dict, ontology_path):
if pname in src_query_pars_uris.values():
cur_name = pname + '_rename'
par_name_substitution[ cur_name ] = pname
plist.append(Parameter.from_owl_uri(pval['owl_type'],
value=pval['default_value'],
plist.append(Parameter.from_owl_uri(pval['owl_type'],
value=pval['default_value'],
name=cur_name,
ontology_path=ontology_path,
extra_ttl=pval.get("extra_ttl")
))

return {'source_plist': source_plist,
'prod_plist': plist,
'par_name_substitution': par_name_substitution}
Expand All @@ -70,7 +71,7 @@ class NB2WSourceQuery(BaseQuery):
@classmethod
def from_backend_options(cls, backend_options, ontology_path):
product_names = backend_options.keys()
# Note that different backend products could contain different sets of the source query parameters.
# Note that different backend products could contain different sets of the source query parameters.
# So we squash them into one list without duplicates
parameters_dict = {}
for product_name in product_names:
Expand All @@ -82,7 +83,7 @@ def from_backend_options(cls, backend_options, ontology_path):
parameters_list.append(Name(name_format='str', name='token', value=None))
return cls('src_query', parameters_list)

class NB2WProductQuery(ProductQuery):
class NB2WProductQuery(ProductQuery):
def __init__(self, name, backend_product_name, backend_param_dict, backend_output_dict, ontology_path):
self.backend_product_name = backend_product_name
self.backend_output_dict = backend_output_dict
Expand All @@ -91,7 +92,7 @@ def __init__(self, name, backend_product_name, backend_param_dict, backend_outpu
plist = parameter_lists['prod_plist']
self.ontology_path = ontology_path
super().__init__(name, parameters_list = plist)

@classmethod
def query_list_and_dict_factory(cls, backend_options, ontology_path):
product_names = backend_options.keys()
Expand All @@ -103,8 +104,8 @@ def query_list_and_dict_factory(cls, backend_options, ontology_path):
qlist.append(cls(f'{product_name}_query', product_name, backend_param_dict, backend_output_dict, ontology_path))
qdict[product_name] = f'{product_name}_query'
return qlist, qdict


def get_data_server_query(self, instrument, config=None, **kwargs):
param_dict = {}
for param_name in instrument.get_parameters_name_list(prod_name = self.backend_product_name):
Expand All @@ -116,74 +117,104 @@ def get_data_server_query(self, instrument, config=None, **kwargs):
param_dict[bk_pname] = param_instance.get_value_in_default_units()
else:
param_dict[bk_pname] = param_instance.value

return instrument.data_server_query_class(instrument=instrument,
config=config,
param_dict=param_dict,
task=self.backend_product_name)
task=self.backend_product_name)

def build_product_list(self, instrument, res, out_dir, api=False):
prod_list = []
_output = None
if out_dir is None:
out_dir = './'
if 'output' in res.json().keys(): # in synchronous mode
_o_dict = res.json()
res_progress_product = False
# In the case of a dispatcher request where the progress of the execution has been requested
# (`return_progress: True`), the get_progress_run wraps the response from the nb2service within a dict,
# so that it is easier here to understand how to treat the response, and build the correct product list.
# In case of a standard request then the res argument is expected to be a Response object with the content in
# json format.
if isinstance(res, dict):
res_progress_product = res.get('progress_product', False)
res = res.get('res', None)
res_content_type = res.headers.get('content-type', None)
if res_content_type is not None and res_content_type == 'application/json':
dsavchenko marked this conversation as resolved.
Show resolved Hide resolved
volodymyrss marked this conversation as resolved.
Show resolved Hide resolved
if 'output' in res.json().keys(): # in synchronous mode
_o_dict = res.json()
else:
_o_dict = res.json()['data']
_output = _o_dict['output']
prod_list = NB2WProduct.prod_list_factory(self.backend_output_dict, _output, out_dir, self.ontology_path)
else:
_o_dict = res.json()['data']
prod_list = NB2WProduct.prod_list_factory(self.backend_output_dict, _o_dict['output'], out_dir, self.ontology_path)
_o_text = res.content.decode()
if res_progress_product:
prod_list.append(NB2WProgressProduct(_o_text, out_dir))

return prod_list

def process_product_method(self, instrument, prod_list, api=False):
query_out = QueryOutput()
np_dp_list, bin_dp_list, tab_dp_list, bin_im_dp_list, text_dp_list = [], [], [], [], []


np_dp_list, bin_dp_list, tab_dp_list, bin_im_dp_list, text_dp_list, progress_dp_list = [], [], [], [], [], []
if api is True:
for product in prod_list.prod_list:
if isinstance(product, NB2WAstropyTableProduct):
tab_dp_list.append(product.dispatcher_data_prod.table_data)
elif isinstance(product, NB2WBinaryProduct):
bin_dp_list.append(product.data_prod)
elif isinstance(product, NB2WPictureProduct):
bin_im_dp_list.append(product.data_prod)
bin_im_dp_list.append(product.data_prod)
elif isinstance(product, NB2WTextProduct):
text_dp_list.append({'name': product.name, 'value': product.data_prod})
elif isinstance(product, NB2WParameterProduct):
text_dp_list.append({'name': product.name,
'value': product.parameter_obj.value,
text_dp_list.append({'name': product.name,
'value': product.parameter_obj.value,
'meta_data': {'uri': product.type_key}})
elif isinstance(product, NB2WProgressProduct):
progress_dp_list.append({'name': product.name,
'value': product.progress_data})
else: # NB2WProduct contains NumpyDataProd by default
np_dp_list.append(product.dispatcher_data_prod.data)

query_out.prod_dictionary['numpy_data_product_list'] = np_dp_list
query_out.prod_dictionary['astropy_table_product_ascii_list'] = tab_dp_list
query_out.prod_dictionary['binary_data_product_list'] = bin_dp_list
query_out.prod_dictionary['binary_image_product_list'] = bin_im_dp_list
query_out.prod_dictionary['text_product_list'] = text_dp_list
query_out.prod_dictionary['progress_product_list'] = progress_dp_list
else:
prod_name_list, file_name_list, image_list = [], [], []
prod_name_list, file_name_list, image_list, progress_product_list = [], [], [], []
for product in prod_list.prod_list:
product.write()
try:
file_name_list.append(os.path.basename(product.file_path))
except AttributeError:
pass
im = product.get_html_draw()
if im:
image_list.append(im)
if not isinstance(product, NB2WProgressProduct):
html_draw = product.get_html_draw()
product.write()
try:
file_name_list.append(os.path.basename(product.file_path))
except AttributeError:
pass
if html_draw:
image_list.append(html_draw)
else:
html_draw = product.progress_data
progress_product_list.append(html_draw)

prod_name_list.append(product.name)

query_out.prod_dictionary['file_name'] = file_name_list
query_out.prod_dictionary['image'] = image_list[0] if len(image_list) == 1 else image_list
query_out.prod_dictionary['name'] = prod_name_list
if len(file_name_list) == 1:
query_out.prod_dictionary['download_file_name'] = f'{file_name_list[0]}.gz'
if len(prod_list.prod_list) == 1 and isinstance(prod_list.prod_list[0], NB2WProgressProduct):
query_out.prod_dictionary['progress_product_html_output'] = progress_product_list
else:
query_out.prod_dictionary['download_file_name'] = f'{self.backend_product_name}.tar.gz'
if len(file_name_list) == 1:
query_out.prod_dictionary['download_file_name'] = f'{file_name_list[0]}.gz'
else:
query_out.prod_dictionary['download_file_name'] = f'{self.backend_product_name}.tar.gz'
query_out.prod_dictionary['prod_process_message'] = ''

return query_out

class NB2WInstrumentQuery(InstrumentQuery):
def __init__(self, name, restricted_access):
super().__init__(name, restricted_access=restricted_access)
Expand Down
Loading
Loading