From 3facc149ea28f79968d3236872f7f2f85e0cacb4 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 11 Oct 2023 20:35:26 +0200 Subject: [PATCH 01/32] arg naming --- cdci_data_analysis/analysis/instrument.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdci_data_analysis/analysis/instrument.py b/cdci_data_analysis/analysis/instrument.py index 296999cd5..586b35482 100644 --- a/cdci_data_analysis/analysis/instrument.py +++ b/cdci_data_analysis/analysis/instrument.py @@ -434,7 +434,7 @@ def run_query(self, product_type, config=config, logger=logger, sentry_dsn=sentry_dsn, - report_progress=return_progress, + return_progress=return_progress, api=api) if query_out.status_dictionary['status'] == 0: if 'comment' in query_out.status_dictionary.keys(): From 3a7d70db900a6a2f947175749395484c18bdb274 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Tue, 17 Oct 2023 16:07:44 +0200 Subject: [PATCH 02/32] DataServerQueryReturnProgress --- .../dummy_plugin/data_server_dispatcher.py | 37 +++++++++++-------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py index b7259fd38..fcac5cb70 100644 --- a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py +++ b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py @@ -104,26 +104,21 @@ def run_query(self, *args, **kwargs): status = self.decide_status() if status == "submitted": - # set done to submitted? query_out.set_done(message="job submitted mock", - debug_message="no message really", - job_status='submitted', - comment="mock comment", - warning="mock warning") + debug_message="no message really", + job_status='submitted', + comment="mock comment", + warning="mock warning") elif status == "done": - # set done to submitted? query_out.set_done(message="job done mock", - debug_message="no message really", - job_status='done', - comment="mock comment", - warning="mock warning") + debug_message="no message really", + job_status='done', + comment="mock comment", + warning="mock warning") elif status == "failed": - # set done to submitted? query_out.set_failed(message="job failed mock", - debug_message="no message really", - job_status='failed', - comment="mock comment", - warning="mock warning") + debug_message="no message really", + job_status='failed') else: NotImplementedError @@ -164,6 +159,18 @@ def run_query(self, *args, **kwargs): return None, query_out +class DataServerQueryReturnProgress(DataServerQuery): + def __init__(self): + super().__init__() + + def run_query(self, *args, **kwargs): + logger.warn('fake run_query in %s with %s, %s', self, args, kwargs) + + query_out = QueryOutput() + + return None, query_out + + class EmptyProductQuery(ProductQuery): def __init__(self, name='unset-name', config=None, instrument=None): From 1921567632a05d36bc7ed2e21ce394d87d153228 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Tue, 17 Oct 2023 16:08:28 +0200 Subject: [PATCH 03/32] return_progress inside get_query_products, passed to run_query of data_server_query --- cdci_data_analysis/analysis/queries.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/cdci_data_analysis/analysis/queries.py b/cdci_data_analysis/analysis/queries.py index 4e2beb554..c021dc49e 100644 --- a/cdci_data_analysis/analysis/queries.py +++ b/cdci_data_analysis/analysis/queries.py @@ -526,7 +526,7 @@ def test_has_products(self,instrument,job=None,query_type='Real',logger=None,con return query_out - def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=None,config=None,scratch_dir=None,sentry_dsn=None,api=False): + def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=None,config=None,scratch_dir=None,sentry_dsn=None,api=False,return_progress=False): if logger is None: logger = self.get_logger() @@ -545,7 +545,7 @@ def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=N if query_type != 'Dummy': q = self.get_data_server_query(instrument,config) - res, data_server_query_out = q.run_query(call_back_url=job.get_call_back_url(), run_asynch=run_asynch, logger=logger) + res, data_server_query_out = q.run_query(call_back_url=job.get_call_back_url(), run_asynch=run_asynch, logger=logger, return_progress=return_progress) for field in ['message', 'debug_message', 'comment', 'warning']: if field in data_server_query_out.status_dictionary.keys(): @@ -570,7 +570,11 @@ def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=N else: status=0 - self.query_prod_list = self.get_dummy_products(instrument,config=config,out_dir=scratch_dir,api=api) + self.query_prod_list = self.get_dummy_products(instrument, + config=config, + out_dir=scratch_dir, + api=api, + return_progress=return_progress) #self.query_prod_list = QueryProductList(prod_list=prod_list) @@ -701,7 +705,16 @@ def run_query(self, if query_out.status_dictionary['status'] == 0: - query_out = self.get_query_products(instrument,job,run_asynch, query_type=query_type, logger=logger, config=config, scratch_dir=scratch_dir, sentry_dsn=sentry_dsn, api=api) + query_out = self.get_query_products(instrument, + job, + run_asynch, + query_type=query_type, + logger=logger, + config=config, + scratch_dir=scratch_dir, + sentry_dsn=sentry_dsn, + api=api, + return_progress=return_progress) self._t_query_steps['after_get_query_products'] = _time.time() if query_out.status_dictionary['status'] == 0: From 405f326ed3376119eb512f1b3815057bc9968093 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Tue, 17 Oct 2023 16:13:59 +0200 Subject: [PATCH 04/32] return_progress inside get_query_products, passed to run_query of data_server_query --- cdci_data_analysis/analysis/queries.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cdci_data_analysis/analysis/queries.py b/cdci_data_analysis/analysis/queries.py index c021dc49e..5af9ce2ce 100644 --- a/cdci_data_analysis/analysis/queries.py +++ b/cdci_data_analysis/analysis/queries.py @@ -545,7 +545,10 @@ def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=N if query_type != 'Dummy': q = self.get_data_server_query(instrument,config) - res, data_server_query_out = q.run_query(call_back_url=job.get_call_back_url(), run_asynch=run_asynch, logger=logger, return_progress=return_progress) + res, data_server_query_out = q.run_query(call_back_url=job.get_call_back_url(), + run_asynch=run_asynch, + logger=logger, + return_progress=return_progress) for field in ['message', 'debug_message', 'comment', 'warning']: if field in data_server_query_out.status_dictionary.keys(): From 121838b4c1b16ae81c86c7fabae0dfa2e338e892 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Tue, 17 Oct 2023 16:45:36 +0200 Subject: [PATCH 05/32] no return_progress for data_server_query --- cdci_data_analysis/analysis/queries.py | 11 ++++------- .../plugins/dummy_plugin/data_server_dispatcher.py | 2 +- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/cdci_data_analysis/analysis/queries.py b/cdci_data_analysis/analysis/queries.py index 5af9ce2ce..7285cd61e 100644 --- a/cdci_data_analysis/analysis/queries.py +++ b/cdci_data_analysis/analysis/queries.py @@ -526,7 +526,7 @@ def test_has_products(self,instrument,job=None,query_type='Real',logger=None,con return query_out - def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=None,config=None,scratch_dir=None,sentry_dsn=None,api=False,return_progress=False): + def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=None,config=None,scratch_dir=None,sentry_dsn=None,api=False): if logger is None: logger = self.get_logger() @@ -547,8 +547,7 @@ def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=N res, data_server_query_out = q.run_query(call_back_url=job.get_call_back_url(), run_asynch=run_asynch, - logger=logger, - return_progress=return_progress) + logger=logger) for field in ['message', 'debug_message', 'comment', 'warning']: if field in data_server_query_out.status_dictionary.keys(): @@ -576,8 +575,7 @@ def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=N self.query_prod_list = self.get_dummy_products(instrument, config=config, out_dir=scratch_dir, - api=api, - return_progress=return_progress) + api=api) #self.query_prod_list = QueryProductList(prod_list=prod_list) @@ -716,8 +714,7 @@ def run_query(self, config=config, scratch_dir=scratch_dir, sentry_dsn=sentry_dsn, - api=api, - return_progress=return_progress) + api=api) self._t_query_steps['after_get_query_products'] = _time.time() if query_out.status_dictionary['status'] == 0: diff --git a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py index fcac5cb70..92b8d4947 100644 --- a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py +++ b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py @@ -164,7 +164,7 @@ def __init__(self): super().__init__() def run_query(self, *args, **kwargs): - logger.warn('fake run_query in %s with %s, %s', self, args, kwargs) + logger.info(f'fake run_query that can return progress in {self} with {args} and {kwargs}') query_out = QueryOutput() From 1757f59ef84e0202c758b23dee9fb3e7aaec3896 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Tue, 17 Oct 2023 16:56:09 +0200 Subject: [PATCH 06/32] passing return_progress to get_query_products --- cdci_data_analysis/analysis/queries.py | 5 ++-- .../dummy_plugin/data_server_dispatcher.py | 30 ++++++++++++++----- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/cdci_data_analysis/analysis/queries.py b/cdci_data_analysis/analysis/queries.py index 7285cd61e..4a6dcc572 100644 --- a/cdci_data_analysis/analysis/queries.py +++ b/cdci_data_analysis/analysis/queries.py @@ -526,7 +526,7 @@ def test_has_products(self,instrument,job=None,query_type='Real',logger=None,con return query_out - def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=None,config=None,scratch_dir=None,sentry_dsn=None,api=False): + def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=None,config=None,scratch_dir=None,sentry_dsn=None,api=False,return_progress=False): if logger is None: logger = self.get_logger() @@ -714,7 +714,8 @@ def run_query(self, config=config, scratch_dir=scratch_dir, sentry_dsn=sentry_dsn, - api=api) + api=api, + return_progress=return_progress) self._t_query_steps['after_get_query_products'] = _time.time() if query_out.status_dictionary['status'] == 0: diff --git a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py index 92b8d4947..8311d0f89 100644 --- a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py +++ b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py @@ -159,16 +159,32 @@ def run_query(self, *args, **kwargs): return None, query_out -class DataServerQueryReturnProgress(DataServerQuery): - def __init__(self): - super().__init__() +class ReturnProgressProductQuery(ProductQuery): + def __init__(self, name, parameters_list=None): + if parameters_list is None: + parameters_list = [] + super().__init__(name, parameters_list=parameters_list) - def run_query(self, *args, **kwargs): - logger.info(f'fake run_query that can return progress in {self} with {args} and {kwargs}') - query_out = QueryOutput() + def run_query(self, + instrument, + scratch_dir, + job, + run_asynch, + query_type='Real', + config=None, + logger=None, + sentry_dsn=None, + api=False, + return_progress=False): + query_out = self.process_query_product(instrument, job, logger=logger, config=config, scratch_dir=scratch_dir, + sentry_dsn=sentry_dsn, api=api) + if query_out.status_dictionary['status'] == 0: + job.set_done() + else: + job.set_failed() - return None, query_out + return query_out class EmptyProductQuery(ProductQuery): From bc44f6d1064af2bbe375360e1536fa3865e577e7 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Tue, 17 Oct 2023 18:51:27 +0200 Subject: [PATCH 07/32] dedicated functions for returning query progress --- cdci_data_analysis/analysis/queries.py | 52 +++++++++++++++++-- .../dummy_plugin/data_server_dispatcher.py | 2 +- 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/cdci_data_analysis/analysis/queries.py b/cdci_data_analysis/analysis/queries.py index 4a6dcc572..4634bd9d6 100644 --- a/cdci_data_analysis/analysis/queries.py +++ b/cdci_data_analysis/analysis/queries.py @@ -380,6 +380,12 @@ def get_products(self, instrument,run_asynch, job=None,config=None,logger=None,* def get_dummy_products(self,instrument, config=None,**kwargs): raise RuntimeError(f'{self}: get_dummy_products needs to be implemented in derived class') + def get_progress(self): + raise RuntimeError(f'{self}: get_progress needs to be implemented in derived class') + + def get_dummy_progress(self, instrument, config=None,**kwargs): + raise RuntimeError(f'{self}: get_dummy_progress needs to be implemented in derived class') + def get_data_server_query(self,instrument,config=None,**kwargs): traceback.print_stack() raise RuntimeError(f'{self}: get_data_server_query needs to be implemented in derived class') @@ -526,7 +532,36 @@ def test_has_products(self,instrument,job=None,query_type='Real',logger=None,con return query_out - def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=None,config=None,scratch_dir=None,sentry_dsn=None,api=False,return_progress=False): + def get_query_progress_details(self, instrument, job, query_type='Real', config=None, logger=None, scratch_dir=None, api=False): + if logger is None: + logger = self.get_logger() + + query_out = QueryOutput() + # status=0 + + messages = { + 'message': '', + 'debug_message': '', + 'comment': '', + 'warning': '' + } + try: + if query_type != 'Dummy': + q = self.get_data_server_query(instrument, config) + q.get_progress() + else: + status = 0 + self.query_prod_list = self.get_dummy_progress(instrument, + config=config, + out_dir=scratch_dir, + api=api) + + job.set_done() + except Exception as e: + sentry_sdk.capture_exception(e) + raise InternalError(f"unexpected error while getting query progress details with {instrument}, {e}") + + def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=None,config=None,scratch_dir=None,sentry_dsn=None,api=False): if logger is None: logger = self.get_logger() @@ -714,12 +749,11 @@ def run_query(self, config=config, scratch_dir=scratch_dir, sentry_dsn=sentry_dsn, - api=api, - return_progress=return_progress) + api=api) self._t_query_steps['after_get_query_products'] = _time.time() if query_out.status_dictionary['status'] == 0: - if job.status!='done': + if job.status != 'done': query_out.prod_dictionary = {} # TODO: add check if is asynch @@ -727,6 +761,16 @@ def run_query(self, # TODO: if asynch and running return proper query_out # TODO: if asynch and done proceed + if return_progress: + self.get_query_progress_details(instrument, + job, + query_type=query_type, + logger=logger, + config=config, + scratch_dir=scratch_dir, + api=api + ) + else: if query_out.status_dictionary['status'] == 0: #print('-->',query_out.status_dictionary) diff --git a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py index 8311d0f89..fb4c07724 100644 --- a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py +++ b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py @@ -163,7 +163,7 @@ class ReturnProgressProductQuery(ProductQuery): def __init__(self, name, parameters_list=None): if parameters_list is None: parameters_list = [] - super().__init__(name, parameters_list=parameters_list) + super().__init__(name, return_progress=True, parameters_list=parameters_list) def run_query(self, From 13d7096e3b49dac525d343417c6282ba2562ff28 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Tue, 17 Oct 2023 20:51:32 +0200 Subject: [PATCH 08/32] dummy plugin --- cdci_data_analysis/analysis/queries.py | 12 ++++------- .../dummy_plugin/data_server_dispatcher.py | 20 +++++++++++++++++++ 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/cdci_data_analysis/analysis/queries.py b/cdci_data_analysis/analysis/queries.py index 4634bd9d6..133414103 100644 --- a/cdci_data_analysis/analysis/queries.py +++ b/cdci_data_analysis/analysis/queries.py @@ -380,9 +380,6 @@ def get_products(self, instrument,run_asynch, job=None,config=None,logger=None,* def get_dummy_products(self,instrument, config=None,**kwargs): raise RuntimeError(f'{self}: get_dummy_products needs to be implemented in derived class') - def get_progress(self): - raise RuntimeError(f'{self}: get_progress needs to be implemented in derived class') - def get_dummy_progress(self, instrument, config=None,**kwargs): raise RuntimeError(f'{self}: get_dummy_progress needs to be implemented in derived class') @@ -548,15 +545,14 @@ def get_query_progress_details(self, instrument, job, query_type='Real', config= try: if query_type != 'Dummy': q = self.get_data_server_query(instrument, config) - q.get_progress() + res, data_server_query_out = q.get_progress() else: status = 0 self.query_prod_list = self.get_dummy_progress(instrument, - config=config, - out_dir=scratch_dir, - api=api) + config=config, + out_dir=scratch_dir, + api=api) - job.set_done() except Exception as e: sentry_sdk.capture_exception(e) raise InternalError(f"unexpected error while getting query progress details with {instrument}, {e}") diff --git a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py index fb4c07724..6d840752a 100644 --- a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py +++ b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py @@ -159,6 +159,26 @@ def run_query(self, *args, **kwargs): return None, query_out +class ReturnProgressDataServerQuery(DataServerQuery): + def __init__(self): + super().__init__() + + def get_progress(self): + + query_out = QueryOutput() + current_status = self.get_status() + + query_out.set_status( + current_status, + message=f"current progress is {current_status}", + debug_message="no debug message really", + job_status=current_status, + comment="mock comment", + warning="mock warning") + + return None, query_out + + class ReturnProgressProductQuery(ProductQuery): def __init__(self, name, parameters_list=None): if parameters_list is None: From 7285b4f1af4fce11e5c04caedf029020ddfd8c2a Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 18 Oct 2023 16:31:30 +0200 Subject: [PATCH 09/32] avoid code repetition inside ProductQuery --- cdci_data_analysis/analysis/queries.py | 62 +++++++------------ .../dummy_plugin/data_server_dispatcher.py | 34 +++------- 2 files changed, 32 insertions(+), 64 deletions(-) diff --git a/cdci_data_analysis/analysis/queries.py b/cdci_data_analysis/analysis/queries.py index 133414103..b2859065a 100644 --- a/cdci_data_analysis/analysis/queries.py +++ b/cdci_data_analysis/analysis/queries.py @@ -380,7 +380,7 @@ def get_products(self, instrument,run_asynch, job=None,config=None,logger=None,* def get_dummy_products(self,instrument, config=None,**kwargs): raise RuntimeError(f'{self}: get_dummy_products needs to be implemented in derived class') - def get_dummy_progress(self, instrument, config=None,**kwargs): + def get_dummy_progress_run(self, instrument, config=None,**kwargs): raise RuntimeError(f'{self}: get_dummy_progress needs to be implemented in derived class') def get_data_server_query(self,instrument,config=None,**kwargs): @@ -529,35 +529,8 @@ def test_has_products(self,instrument,job=None,query_type='Real',logger=None,con return query_out - def get_query_progress_details(self, instrument, job, query_type='Real', config=None, logger=None, scratch_dir=None, api=False): - if logger is None: - logger = self.get_logger() - - query_out = QueryOutput() - # status=0 - - messages = { - 'message': '', - 'debug_message': '', - 'comment': '', - 'warning': '' - } - try: - if query_type != 'Dummy': - q = self.get_data_server_query(instrument, config) - res, data_server_query_out = q.get_progress() - else: - status = 0 - self.query_prod_list = self.get_dummy_progress(instrument, - config=config, - out_dir=scratch_dir, - api=api) - except Exception as e: - sentry_sdk.capture_exception(e) - raise InternalError(f"unexpected error while getting query progress details with {instrument}, {e}") - - def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=None,config=None,scratch_dir=None,sentry_dsn=None,api=False): + def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=None,config=None,scratch_dir=None,sentry_dsn=None,api=False,return_progress=False): if logger is None: logger = self.get_logger() @@ -576,9 +549,12 @@ def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=N if query_type != 'Dummy': q = self.get_data_server_query(instrument,config) - res, data_server_query_out = q.run_query(call_back_url=job.get_call_back_url(), - run_asynch=run_asynch, - logger=logger) + if return_progress: + res, data_server_query_out = q.get_progress_run() + else: + res, data_server_query_out = q.run_query(call_back_url=job.get_call_back_url(), + run_asynch=run_asynch, + logger=logger) for field in ['message', 'debug_message', 'comment', 'warning']: if field in data_server_query_out.status_dictionary.keys(): @@ -603,10 +579,16 @@ def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=N else: status=0 - self.query_prod_list = self.get_dummy_products(instrument, - config=config, - out_dir=scratch_dir, - api=api) + if return_progress: + self.progress_query_prod_list = self.get_dummy_progress_run(instrument, + config=config, + out_dir=scratch_dir, + api=api) + else: + self.query_prod_list = self.get_dummy_products(instrument, + config=config, + out_dir=scratch_dir, + api=api) #self.query_prod_list = QueryProductList(prod_list=prod_list) @@ -621,14 +603,18 @@ def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=N except Exception as e: # TODO: could we avoid these? they make error tracking hard # TODO we could use the very same approach used when test_communication fails - logger.exception("failed to get query products") #status=1 job.set_failed() if os.environ.get('DISPATCHER_DEBUG', 'yes') == 'yes': raise exception_message = getattr(e, 'message', '') - e_message = f'Failed when getting query products for job {job.job_id}:\n{exception_message}' + if return_progress: + logger.exception("failed to get progress run") + e_message = f'Failed when getting the progress run for job {job.job_id}:\n{exception_message}' + else: + logger.exception("failed to get query products") + e_message = f'Failed when getting query products for job {job.job_id}:\n{exception_message}' messages['debug_message'] = repr(e) + ' : ' + getattr(e, 'debug_message', '') query_out.set_failed('get_query_products found job failed', diff --git a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py index 6d840752a..3729ae699 100644 --- a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py +++ b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py @@ -163,16 +163,16 @@ class ReturnProgressDataServerQuery(DataServerQuery): def __init__(self): super().__init__() - def get_progress(self): + def get_progress_run(self): query_out = QueryOutput() - current_status = self.get_status() + progress_status = self.get_status() query_out.set_status( - current_status, - message=f"current progress is {current_status}", + progress_status, + message=f"current progress is {progress_status}", debug_message="no debug message really", - job_status=current_status, + job_status=progress_status, comment="mock comment", warning="mock warning") @@ -180,32 +180,14 @@ def get_progress(self): class ReturnProgressProductQuery(ProductQuery): + def __init__(self, name, parameters_list=None): if parameters_list is None: parameters_list = [] super().__init__(name, return_progress=True, parameters_list=parameters_list) - - def run_query(self, - instrument, - scratch_dir, - job, - run_asynch, - query_type='Real', - config=None, - logger=None, - sentry_dsn=None, - api=False, - return_progress=False): - query_out = self.process_query_product(instrument, job, logger=logger, config=config, scratch_dir=scratch_dir, - sentry_dsn=sentry_dsn, api=api) - if query_out.status_dictionary['status'] == 0: - job.set_done() - else: - job.set_failed() - - return query_out - + def get_dummy_progress_run(self, instrument, config=None,**kwargs): + return [] class EmptyProductQuery(ProductQuery): From 33e9d54c6691827df1cf170414270e9ef24a8096 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 18 Oct 2023 16:34:40 +0200 Subject: [PATCH 10/32] arg in get_query_products --- cdci_data_analysis/analysis/queries.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/cdci_data_analysis/analysis/queries.py b/cdci_data_analysis/analysis/queries.py index b2859065a..32ebbbb40 100644 --- a/cdci_data_analysis/analysis/queries.py +++ b/cdci_data_analysis/analysis/queries.py @@ -731,7 +731,8 @@ def run_query(self, config=config, scratch_dir=scratch_dir, sentry_dsn=sentry_dsn, - api=api) + api=api, + return_progress=return_progress) self._t_query_steps['after_get_query_products'] = _time.time() if query_out.status_dictionary['status'] == 0: @@ -743,16 +744,6 @@ def run_query(self, # TODO: if asynch and running return proper query_out # TODO: if asynch and done proceed - if return_progress: - self.get_query_progress_details(instrument, - job, - query_type=query_type, - logger=logger, - config=config, - scratch_dir=scratch_dir, - api=api - ) - else: if query_out.status_dictionary['status'] == 0: #print('-->',query_out.status_dictionary) From 17aa0beb88d73a94a703d069670d19619425b915 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 18 Oct 2023 16:38:09 +0200 Subject: [PATCH 11/32] arg in get_query_products --- cdci_data_analysis/analysis/queries.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/cdci_data_analysis/analysis/queries.py b/cdci_data_analysis/analysis/queries.py index 32ebbbb40..0ddc24e76 100644 --- a/cdci_data_analysis/analysis/queries.py +++ b/cdci_data_analysis/analysis/queries.py @@ -720,8 +720,6 @@ def run_query(self, input_prod_list=query_out.prod_dictionary['input_prod_list'] self._t_query_steps['after_test_has_products'] = _time.time() - - if query_out.status_dictionary['status'] == 0: query_out = self.get_query_products(instrument, job, From 5eab7529365814cc2de6aa2cb19acbf408611950 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 18 Oct 2023 18:02:13 +0200 Subject: [PATCH 12/32] empty-async-return-progress instrument --- .../dummy_plugin/data_server_dispatcher.py | 3 +- .../empty_async_return_progress_instrument.py | 31 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 cdci_data_analysis/plugins/dummy_plugin/empty_async_return_progress_instrument.py diff --git a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py index 3729ae699..2cdab0144 100644 --- a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py +++ b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py @@ -187,7 +187,8 @@ def __init__(self, name, parameters_list=None): super().__init__(name, return_progress=True, parameters_list=parameters_list) def get_dummy_progress_run(self, instrument, config=None,**kwargs): - return [] + p_value = instrument.get_par_by_name('p').value + return [p_value] class EmptyProductQuery(ProductQuery): diff --git a/cdci_data_analysis/plugins/dummy_plugin/empty_async_return_progress_instrument.py b/cdci_data_analysis/plugins/dummy_plugin/empty_async_return_progress_instrument.py new file mode 100644 index 000000000..4855cb722 --- /dev/null +++ b/cdci_data_analysis/plugins/dummy_plugin/empty_async_return_progress_instrument.py @@ -0,0 +1,31 @@ +from __future__ import absolute_import, division, print_function + +__author__ = "Gabriele Barni" + +from cdci_data_analysis.analysis.instrument import Instrument +from cdci_data_analysis.analysis.queries import SourceQuery, InstrumentQuery, Float, Name + +from .data_server_dispatcher import DataServerQuery, DataServerNumericQuery, ReturnProgressProductQuery + + +def my_instr_factory(): + src_query = SourceQuery('src_query') + + # empty query + instr_query = InstrumentQuery(name='empty_async_return_progress_instrument_query', + input_prod_list_name='scw_list', + catalog=None, + catalog_name='user_catalog') + + p = Float(value=10., name='p', units='W') + return_progress_query = ReturnProgressProductQuery('empty_parameters_dummy_query_return_progress', + parameters_list=[p]) + + query_dictionary = {'dummy': 'empty_parameters_dummy_query_return_progress'} + + return Instrument('empty-async-return-progress', + src_query=src_query, + instrumet_query=instr_query, + product_queries_list=[return_progress_query], + query_dictionary=query_dictionary, + data_server_query_class=DataServerQuery) From f5c24fe5ed1b844a323e21fa7ed3d62018c3d894 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 18 Oct 2023 18:12:49 +0200 Subject: [PATCH 13/32] build prod_list also in case of return_progress --- cdci_data_analysis/analysis/queries.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/cdci_data_analysis/analysis/queries.py b/cdci_data_analysis/analysis/queries.py index 0ddc24e76..4bb110a1d 100644 --- a/cdci_data_analysis/analysis/queries.py +++ b/cdci_data_analysis/analysis/queries.py @@ -570,10 +570,14 @@ def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=N else: job.set_submitted() - if job.status != 'done': - prod_list = QueryProductList(prod_list=[], job=job) + if return_progress: + prod_list = self.build_product_list(instrument, res, scratch_dir, api=api) else: - prod_list = self.build_product_list(instrument,res, scratch_dir,api=api) + if job.status != 'done': + prod_list = QueryProductList(prod_list=[], job=job) + else: + prod_list = self.build_product_list(instrument,res, scratch_dir,api=api) + self.query_prod_list=QueryProductList(prod_list=prod_list,job=job) From 02754ab95b2697fc1563809cee489a9339f98643 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Thu, 19 Oct 2023 16:55:09 +0200 Subject: [PATCH 14/32] removed unused imports --- .../dummy_plugin/empty_async_return_progress_instrument.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdci_data_analysis/plugins/dummy_plugin/empty_async_return_progress_instrument.py b/cdci_data_analysis/plugins/dummy_plugin/empty_async_return_progress_instrument.py index 4855cb722..3ba52f27d 100644 --- a/cdci_data_analysis/plugins/dummy_plugin/empty_async_return_progress_instrument.py +++ b/cdci_data_analysis/plugins/dummy_plugin/empty_async_return_progress_instrument.py @@ -3,9 +3,9 @@ __author__ = "Gabriele Barni" from cdci_data_analysis.analysis.instrument import Instrument -from cdci_data_analysis.analysis.queries import SourceQuery, InstrumentQuery, Float, Name +from cdci_data_analysis.analysis.queries import SourceQuery, InstrumentQuery, Float -from .data_server_dispatcher import DataServerQuery, DataServerNumericQuery, ReturnProgressProductQuery +from .data_server_dispatcher import DataServerQuery, ReturnProgressProductQuery def my_instr_factory(): From b847656e965f3f4699ac2ea7626dc8bba7908647 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Thu, 19 Oct 2023 17:32:44 +0200 Subject: [PATCH 15/32] added missing config for new dummy instrument --- .../plugins/dummy_plugin/data_server_dispatcher.py | 2 +- cdci_data_analysis/plugins/dummy_plugin/dummy_plugin_conf.yml | 1 + .../dummy_plugin/empty_async_return_progress_instrument.py | 1 - cdci_data_analysis/plugins/dummy_plugin/exposer.py | 1 + tests/test_server_basic.py | 2 +- 5 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py index 2cdab0144..c984c65c1 100644 --- a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py +++ b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py @@ -184,7 +184,7 @@ class ReturnProgressProductQuery(ProductQuery): def __init__(self, name, parameters_list=None): if parameters_list is None: parameters_list = [] - super().__init__(name, return_progress=True, parameters_list=parameters_list) + super().__init__(name, parameters_list=parameters_list) def get_dummy_progress_run(self, instrument, config=None,**kwargs): p_value = instrument.get_par_by_name('p').value diff --git a/cdci_data_analysis/plugins/dummy_plugin/dummy_plugin_conf.yml b/cdci_data_analysis/plugins/dummy_plugin/dummy_plugin_conf.yml index 2358ec7f6..59b47a9cf 100644 --- a/cdci_data_analysis/plugins/dummy_plugin/dummy_plugin_conf.yml +++ b/cdci_data_analysis/plugins/dummy_plugin/dummy_plugin_conf.yml @@ -3,3 +3,4 @@ instruments: - empty_async_instrument - empty_semi_async_instrument - empty_development_instrument +- empty_async_return_progress_instrument diff --git a/cdci_data_analysis/plugins/dummy_plugin/empty_async_return_progress_instrument.py b/cdci_data_analysis/plugins/dummy_plugin/empty_async_return_progress_instrument.py index 3ba52f27d..75996c3e1 100644 --- a/cdci_data_analysis/plugins/dummy_plugin/empty_async_return_progress_instrument.py +++ b/cdci_data_analysis/plugins/dummy_plugin/empty_async_return_progress_instrument.py @@ -11,7 +11,6 @@ def my_instr_factory(): src_query = SourceQuery('src_query') - # empty query instr_query = InstrumentQuery(name='empty_async_return_progress_instrument_query', input_prod_list_name='scw_list', catalog=None, diff --git a/cdci_data_analysis/plugins/dummy_plugin/exposer.py b/cdci_data_analysis/plugins/dummy_plugin/exposer.py index 10c05fa3b..251478142 100644 --- a/cdci_data_analysis/plugins/dummy_plugin/exposer.py +++ b/cdci_data_analysis/plugins/dummy_plugin/exposer.py @@ -2,6 +2,7 @@ from . import empty_semi_async_instrument from . import empty_async_instrument from . import empty_development_instrument +from . import empty_async_return_progress_instrument from . import conf_file import yaml diff --git a/tests/test_server_basic.py b/tests/test_server_basic.py index 04f9a313e..fdcab83aa 100644 --- a/tests/test_server_basic.py +++ b/tests/test_server_basic.py @@ -139,7 +139,7 @@ def test_empty_request(dispatcher_live_fixture): assert c.status_code == 400 # parameterize this - assert sorted(jdata['installed_instruments']) == sorted(['empty', 'empty-async', 'empty-semi-async', 'empty-development']) or \ + assert sorted(jdata['installed_instruments']) == sorted(['empty', 'empty-async', 'empty-semi-async', 'empty-development', 'empty-async-return-progress']) or \ jdata['installed_instruments'] == [] assert jdata['debug_mode'] == "yes" From 158fcb06bc215d0a4756fe06bae029bf180aa7ee Mon Sep 17 00:00:00 2001 From: burnout87 Date: Thu, 19 Oct 2023 18:02:29 +0200 Subject: [PATCH 16/32] fixing test --- tests/test_server_basic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_server_basic.py b/tests/test_server_basic.py index fdcab83aa..38e5153cc 100644 --- a/tests/test_server_basic.py +++ b/tests/test_server_basic.py @@ -208,7 +208,7 @@ def test_matrix_options_mode_empty_request(dispatcher_live_fixture_with_matrix_o assert c.status_code == 400 assert sorted(jdata['installed_instruments']) == sorted( - ['empty', 'empty-async', 'empty-semi-async', 'empty-development']) or \ + ['empty', 'empty-async', 'empty-semi-async', 'empty-development', 'empty-async-return-progress']) or \ jdata['installed_instruments'] == [] # assert jdata['debug_mode'] == "no" From 9c6143b7bf775db2717b2fae91c931a0eb3e52de Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 25 Oct 2023 18:14:34 +0200 Subject: [PATCH 17/32] arg reformatting --- cdci_data_analysis/flask_app/dispatcher_query.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index 23a9f13e7..b4a8e1277 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -161,7 +161,7 @@ def __init__(self, app, self.client_name = self.par_dic.pop('client-name', 'unknown') - self.return_progress = self.par_dic.pop('return-progress', False) == 'True' + self.return_progress = self.par_dic.pop('return_progress', False) == 'True' if os.environ.get("DISPATCHER_ASYNC_ENABLED", "no") == "yes": # TODO: move to config! self.async_dispatcher = self.par_dic.pop( 'async_dispatcher', 'True') == 'True' # why string true?? else false anyway From 79c10543108642ff75128fc4d0ca0d4c784e7fe4 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 25 Oct 2023 18:14:48 +0200 Subject: [PATCH 18/32] extending ReturnProgressProductQuery --- .../dummy_plugin/data_server_dispatcher.py | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py index c984c65c1..13f3ca770 100644 --- a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py +++ b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py @@ -181,15 +181,41 @@ def get_progress_run(self): class ReturnProgressProductQuery(ProductQuery): + p_value_fn = "ReturnProgressProductQuery-current_p_value.out" + def __init__(self, name, parameters_list=None): if parameters_list is None: parameters_list = [] super().__init__(name, parameters_list=parameters_list) + @classmethod + def set_p_value(cls, p_value): + with open(cls.p_value_fn, "w") as p_value_f: + p_value_f.write(str(p_value)) + + @classmethod + def get_p_value(cls): + if os.path.exists(cls.p_value_fn): + with open(cls.p_value_fn) as p_value_f: + p_value = float(p_value_f.read()) + return p_value + else: + return 0 + def get_dummy_progress_run(self, instrument, config=None,**kwargs): - p_value = instrument.get_par_by_name('p').value + p_value = self.get_p_value() * 5 + self.set_p_value(p_value) return [p_value] + def get_dummy_products(self, instrument, config=None, **kwargs): + p_value = self.get_p_value() * 2 + return [p_value] + + def process_product_method(self, instrument, prod_list, api=False, **kw): + query_out = QueryOutput() + query_out.prod_dictionary['p'] = prod_list[0] + return query_out + class EmptyProductQuery(ProductQuery): def __init__(self, name='unset-name', config=None, instrument=None): From a62536213d146f34606c0378f5eba9e44118118b Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 25 Oct 2023 18:15:02 +0200 Subject: [PATCH 19/32] always returns query_prod_list --- cdci_data_analysis/analysis/queries.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cdci_data_analysis/analysis/queries.py b/cdci_data_analysis/analysis/queries.py index 4bb110a1d..167c89b55 100644 --- a/cdci_data_analysis/analysis/queries.py +++ b/cdci_data_analysis/analysis/queries.py @@ -584,10 +584,10 @@ def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=N else: status=0 if return_progress: - self.progress_query_prod_list = self.get_dummy_progress_run(instrument, - config=config, - out_dir=scratch_dir, - api=api) + self.query_prod_list = self.get_dummy_progress_run(instrument, + config=config, + out_dir=scratch_dir, + api=api) else: self.query_prod_list = self.get_dummy_products(instrument, config=config, From 87196360cf9ec68d2f46e956c1bc3fe7c72f1159 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 25 Oct 2023 18:15:08 +0200 Subject: [PATCH 20/32] dedicated test --- tests/test_server_basic.py | 49 +++++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/tests/test_server_basic.py b/tests/test_server_basic.py index 38e5153cc..d896eed52 100644 --- a/tests/test_server_basic.py +++ b/tests/test_server_basic.py @@ -26,7 +26,7 @@ from cdci_data_analysis.flask_app.dispatcher_query import InstrumentQueryBackEnd from cdci_data_analysis.analysis.renku_helper import clone_renku_repo, checkout_branch_renku_repo, check_job_id_branch_is_present, get_repo_path, generate_commit_request_url, create_new_notebook_with_code, generate_nb_hash, create_renku_ini_config_obj, generate_ini_file_hash from cdci_data_analysis.analysis.drupal_helper import execute_drupal_request, get_drupal_request_headers, get_revnum, get_observations_for_time_range, generate_gallery_jwt_token, get_user_id, get_source_astrophysical_entity_id_by_source_name -from cdci_data_analysis.plugins.dummy_plugin.data_server_dispatcher import DataServerQuery +from cdci_data_analysis.plugins.dummy_plugin.data_server_dispatcher import DataServerQuery, ReturnProgressProductQuery # logger logger = logging.getLogger(__name__) @@ -1574,6 +1574,53 @@ def test_empty_instrument_request(dispatcher_live_fixture): assert jdata["exit_status"]["message"] == "" +def test_empty_async_return_progress_instrument_request(dispatcher_live_fixture): + server = dispatcher_live_fixture + print("constructed server:", server) + + ReturnProgressProductQuery.set_p_value(5) + + params = { + **default_params, + 'product_type': 'dummy', + 'query_type': "Dummy", + 'instrument': 'empty-async-return-progress', + 'return_progress': True + } + + jdata = ask(server, + params, + expected_query_status=["done"], + max_time_s=50, + ) + + logger.info("Json output content") + logger.info(json.dumps(jdata, indent=4)) + + assert jdata["exit_status"]["debug_message"] == "" + assert jdata["exit_status"]["error_message"] == "" + assert jdata["exit_status"]["message"] == "" + + assert jdata["products"]["p"] == 25 + + params.pop("return_progress", None) + + jdata = ask(server, + params, + expected_query_status=["done"], + max_time_s=50, + ) + + logger.info("Json output content") + logger.info(json.dumps(jdata, indent=4)) + + assert jdata["exit_status"]["debug_message"] == "" + assert jdata["exit_status"]["error_message"] == "" + assert jdata["exit_status"]["message"] == "" + + assert jdata["products"]["p"] == 50 + + def test_no_instrument(dispatcher_live_fixture): server = dispatcher_live_fixture print("constructed server:", server) From ad39a6de381309cc10780f0a1e18cc8e1698aa8e Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 25 Oct 2023 22:09:38 +0200 Subject: [PATCH 21/32] Real query_type --- .../dummy_plugin/data_server_dispatcher.py | 22 ++++++++++++++----- .../empty_async_return_progress_instrument.py | 4 ++-- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py index 13f3ca770..800ae608a 100644 --- a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py +++ b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py @@ -160,19 +160,20 @@ def run_query(self, *args, **kwargs): class ReturnProgressDataServerQuery(DataServerQuery): - def __init__(self): + def __init__(self, config=None, instrument=None): super().__init__() def get_progress_run(self): query_out = QueryOutput() - progress_status = self.get_status() + + p_value = ReturnProgressProductQuery.get_p_value() query_out.set_status( - progress_status, - message=f"current progress is {progress_status}", + 0, + message=f"current p value is {p_value}", debug_message="no debug message really", - job_status=progress_status, + job_status="submitted", comment="mock comment", warning="mock warning") @@ -202,6 +203,13 @@ def get_p_value(cls): else: return 0 + def get_data_server_query(self,instrument,config=None,**kwargs): + if instrument.data_server_query_class: + q = instrument.data_server_query_class(instrument=instrument, config=config) + else: + q = DataServerQuery() + return q + def get_dummy_progress_run(self, instrument, config=None,**kwargs): p_value = self.get_p_value() * 5 self.set_p_value(p_value) @@ -216,6 +224,10 @@ def process_product_method(self, instrument, prod_list, api=False, **kw): query_out.prod_dictionary['p'] = prod_list[0] return query_out + def build_product_list(self, instrument, res, out_dir, prod_prefix='', api=False): + p_value = self.get_p_value() + return [p_value] + class EmptyProductQuery(ProductQuery): def __init__(self, name='unset-name', config=None, instrument=None): diff --git a/cdci_data_analysis/plugins/dummy_plugin/empty_async_return_progress_instrument.py b/cdci_data_analysis/plugins/dummy_plugin/empty_async_return_progress_instrument.py index 75996c3e1..91b9cec54 100644 --- a/cdci_data_analysis/plugins/dummy_plugin/empty_async_return_progress_instrument.py +++ b/cdci_data_analysis/plugins/dummy_plugin/empty_async_return_progress_instrument.py @@ -5,7 +5,7 @@ from cdci_data_analysis.analysis.instrument import Instrument from cdci_data_analysis.analysis.queries import SourceQuery, InstrumentQuery, Float -from .data_server_dispatcher import DataServerQuery, ReturnProgressProductQuery +from .data_server_dispatcher import ReturnProgressDataServerQuery, ReturnProgressProductQuery def my_instr_factory(): @@ -27,4 +27,4 @@ def my_instr_factory(): instrumet_query=instr_query, product_queries_list=[return_progress_query], query_dictionary=query_dictionary, - data_server_query_class=DataServerQuery) + data_server_query_class=ReturnProgressDataServerQuery) From d954c556aff40b320c866d5d79be284990bb2ee8 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Thu, 26 Oct 2023 14:02:21 +0200 Subject: [PATCH 22/32] reformatting method signature --- cdci_data_analysis/analysis/queries.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/cdci_data_analysis/analysis/queries.py b/cdci_data_analysis/analysis/queries.py index 167c89b55..0695bcb09 100644 --- a/cdci_data_analysis/analysis/queries.py +++ b/cdci_data_analysis/analysis/queries.py @@ -530,7 +530,17 @@ def test_has_products(self,instrument,job=None,query_type='Real',logger=None,con return query_out - def get_query_products(self,instrument,job,run_asynch,query_type='Real',logger=None,config=None,scratch_dir=None,sentry_dsn=None,api=False,return_progress=False): + def get_query_products(self, + instrument, + job, + run_asynch, + query_type='Real', + logger=None, + config=None, + scratch_dir=None, + sentry_dsn=None, + api=False, + return_progress=False): if logger is None: logger = self.get_logger() From f18d3fba6da8c7b31757382a812ad7de623d28f6 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Tue, 31 Oct 2023 14:57:31 +0100 Subject: [PATCH 23/32] dummy get_progress_run returns also the current product value --- .../plugins/dummy_plugin/data_server_dispatcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py index 800ae608a..f73917ea6 100644 --- a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py +++ b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py @@ -177,7 +177,7 @@ def get_progress_run(self): comment="mock comment", warning="mock warning") - return None, query_out + return p_value, query_out class ReturnProgressProductQuery(ProductQuery): @@ -225,7 +225,7 @@ def process_product_method(self, instrument, prod_list, api=False, **kw): return query_out def build_product_list(self, instrument, res, out_dir, prod_prefix='', api=False): - p_value = self.get_p_value() + p_value = res return [p_value] class EmptyProductQuery(ProductQuery): From 3dfe1d24f7dc21d5e76e3100a26bc21254f832c9 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Tue, 31 Oct 2023 18:27:11 +0100 Subject: [PATCH 24/32] not forcing to status one in case of return_progress --- cdci_data_analysis/analysis/queries.py | 12 +++++--- .../dummy_plugin/data_server_dispatcher.py | 29 +++++++++++++++---- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/cdci_data_analysis/analysis/queries.py b/cdci_data_analysis/analysis/queries.py index 0695bcb09..63787a600 100644 --- a/cdci_data_analysis/analysis/queries.py +++ b/cdci_data_analysis/analysis/queries.py @@ -598,15 +598,16 @@ def get_query_products(self, config=config, out_dir=scratch_dir, api=api) + job.set_submitted() else: self.query_prod_list = self.get_dummy_products(instrument, config=config, out_dir=scratch_dir, api=api) + job.set_done() #self.query_prod_list = QueryProductList(prod_list=prod_list) - job.set_done() #DONE query_out.set_done(message=messages['message'], debug_message=str(messages['debug_message']),job_status=job.status,status=status,comment=messages['comment'],warning=messages['warning']) #print('-->', query_out.status_dictionary) @@ -661,6 +662,7 @@ def process_query_product(self, api=False, backend_warning='', backend_comment='', + return_progress=False, **kwargs): if logger is None: logger = self.get_logger() @@ -682,7 +684,8 @@ def process_query_product(self, status = process_products_query_out.get_status() - job.set_done() + if not return_progress: + job.set_done() #DONE process_products_query_out.set_done( message=message, debug_message=str(debug_message), job_status=job.status,status=status,comment=backend_comment,warning=backend_warning) @@ -748,7 +751,7 @@ def run_query(self, self._t_query_steps['after_get_query_products'] = _time.time() if query_out.status_dictionary['status'] == 0: - if job.status != 'done': + if job.status != 'done' and not return_progress: query_out.prod_dictionary = {} # TODO: add check if is asynch @@ -773,6 +776,7 @@ def run_query(self, config=config, sentry_dsn=sentry_dsn, api=api, + return_progress=return_progress, backend_comment=backend_comment, backend_warning=backend_warning) self._t_query_steps['after_process_query_products'] = _time.time() @@ -825,7 +829,7 @@ def check_file_exist(self,files_list,out_dir=None): def process_product(self,instrument,job, config=None,out_dir=None,**kwargs): raise RuntimeError('this method has to be implemented in the derived class') - def process_query_product(self,instrument,job,query_type='Real',logger=None,config=None,scratch_dir=None,sentry_dsn=None,api=False,**kwargs): + def process_query_product(self,instrument,job,query_type='Real',logger=None,config=None,scratch_dir=None,sentry_dsn=None,api=False, return_progress=False, **kwargs): if logger is None: logger = self.get_logger() diff --git a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py index f73917ea6..4ef64c5d9 100644 --- a/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py +++ b/cdci_data_analysis/plugins/dummy_plugin/data_server_dispatcher.py @@ -179,6 +179,22 @@ def get_progress_run(self): return p_value, query_out + def run_query(self, *args, **kwargs): + logger.warn('fake run_query in %s with %s, %s', self, args, kwargs) + query_out = QueryOutput() + + p_value = ReturnProgressProductQuery.get_p_value() + + query_out.set_status( + 0, + message=f"current p value is {p_value}", + debug_message="no debug message really", + job_status="done", + comment="mock comment", + warning="mock warning") + + return p_value, query_out + class ReturnProgressProductQuery(ProductQuery): @@ -211,17 +227,18 @@ def get_data_server_query(self,instrument,config=None,**kwargs): return q def get_dummy_progress_run(self, instrument, config=None,**kwargs): - p_value = self.get_p_value() * 5 - self.set_p_value(p_value) - return [p_value] + p_value = self.get_p_value() + prod_list = QueryProductList(prod_list=[p_value]) + return prod_list def get_dummy_products(self, instrument, config=None, **kwargs): - p_value = self.get_p_value() * 2 - return [p_value] + p_value = self.get_p_value() + prod_list = QueryProductList(prod_list=[p_value]) + return prod_list def process_product_method(self, instrument, prod_list, api=False, **kw): query_out = QueryOutput() - query_out.prod_dictionary['p'] = prod_list[0] + query_out.prod_dictionary['p'] = prod_list.prod_list[0] return query_out def build_product_list(self, instrument, res, out_dir, prod_prefix='', api=False): From cf1f3ebbc13d88b3b558dfc84bcff39295608475 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Tue, 31 Oct 2023 18:27:16 +0100 Subject: [PATCH 25/32] adapted test --- tests/test_server_basic.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/test_server_basic.py b/tests/test_server_basic.py index d896eed52..5eaed5a41 100644 --- a/tests/test_server_basic.py +++ b/tests/test_server_basic.py @@ -1574,7 +1574,8 @@ def test_empty_instrument_request(dispatcher_live_fixture): assert jdata["exit_status"]["message"] == "" -def test_empty_async_return_progress_instrument_request(dispatcher_live_fixture): +@pytest.mark.parametrize("query_type", ["Dummy", "Real"]) +def test_empty_async_return_progress_instrument_request(dispatcher_live_fixture, query_type): server = dispatcher_live_fixture print("constructed server:", server) @@ -1583,14 +1584,14 @@ def test_empty_async_return_progress_instrument_request(dispatcher_live_fixture) params = { **default_params, 'product_type': 'dummy', - 'query_type': "Dummy", + 'query_type': query_type, 'instrument': 'empty-async-return-progress', 'return_progress': True } jdata = ask(server, params, - expected_query_status=["done"], + expected_query_status=["submitted"], max_time_s=50, ) @@ -1601,9 +1602,10 @@ def test_empty_async_return_progress_instrument_request(dispatcher_live_fixture) assert jdata["exit_status"]["error_message"] == "" assert jdata["exit_status"]["message"] == "" - assert jdata["products"]["p"] == 25 + assert jdata["products"]["p"] == 5 params.pop("return_progress", None) + ReturnProgressProductQuery.set_p_value(15) jdata = ask(server, params, @@ -1618,7 +1620,7 @@ def test_empty_async_return_progress_instrument_request(dispatcher_live_fixture) assert jdata["exit_status"]["error_message"] == "" assert jdata["exit_status"]["message"] == "" - assert jdata["products"]["p"] == 50 + assert jdata["products"]["p"] == 15 def test_no_instrument(dispatcher_live_fixture): From 0cad7b51d4b3e895621e8f54aba5f4c3f474e915 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Thu, 2 Nov 2023 17:41:43 +0100 Subject: [PATCH 26/32] return_progress-arg-run_analysis --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 650d5089b..4ae1bde93 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,7 +29,7 @@ GitPython nbformat sentry-sdk pytest-sentry --e git+https://github.com/oda-hub/oda_api.git#egg=oda_api +-e git+https://github.com/oda-hub/oda_api.git@adapt-to-new-dummy-instrument#egg=oda_api MarkupSafe==2.0.1 From 782c621634d7f5052cdc01fe0484b38d7669ea86 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Thu, 2 Nov 2023 18:17:00 +0100 Subject: [PATCH 27/32] revert oda_api branch --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 4ae1bde93..650d5089b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,7 +29,7 @@ GitPython nbformat sentry-sdk pytest-sentry --e git+https://github.com/oda-hub/oda_api.git@adapt-to-new-dummy-instrument#egg=oda_api +-e git+https://github.com/oda-hub/oda_api.git#egg=oda_api MarkupSafe==2.0.1 From 672974f95bbd88a0d212de7784733ce642cc056d Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 8 Nov 2023 18:57:28 +0100 Subject: [PATCH 28/32] test specific oda_api branch --- .github/workflows/python-oda-api.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/python-oda-api.yml b/.github/workflows/python-oda-api.yml index 4ece62dc3..63f2e0b99 100644 --- a/.github/workflows/python-oda-api.yml +++ b/.github/workflows/python-oda-api.yml @@ -38,6 +38,7 @@ jobs: uses: actions/checkout@v2 with: repository: 'oda-hub/oda_api' + ref: 'adapt-to-new-dummy-instrument' path: oda_api From e6a9fa3a9139811a807cbd0684736c25cd2e5908 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Thu, 9 Nov 2023 09:45:46 +0100 Subject: [PATCH 29/32] back on the master oda_api branch --- .github/workflows/python-oda-api.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/python-oda-api.yml b/.github/workflows/python-oda-api.yml index 63f2e0b99..4ece62dc3 100644 --- a/.github/workflows/python-oda-api.yml +++ b/.github/workflows/python-oda-api.yml @@ -38,7 +38,6 @@ jobs: uses: actions/checkout@v2 with: repository: 'oda-hub/oda_api' - ref: 'adapt-to-new-dummy-instrument' path: oda_api From eeabef5d8e4d8166e0c6f243ac97f41865b64287 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Thu, 9 Nov 2023 11:30:46 +0100 Subject: [PATCH 30/32] using test oda_api branch --- .github/workflows/python-oda-api.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/python-oda-api.yml b/.github/workflows/python-oda-api.yml index 4ece62dc3..63f2e0b99 100644 --- a/.github/workflows/python-oda-api.yml +++ b/.github/workflows/python-oda-api.yml @@ -38,6 +38,7 @@ jobs: uses: actions/checkout@v2 with: repository: 'oda-hub/oda_api' + ref: 'adapt-to-new-dummy-instrument' path: oda_api From 196731987882839f4b40225d80da3a7ab4ec29fe Mon Sep 17 00:00:00 2001 From: burnout87 Date: Thu, 9 Nov 2023 19:02:26 +0100 Subject: [PATCH 31/32] master branch for oda_api workflow --- .github/workflows/python-oda-api.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/python-oda-api.yml b/.github/workflows/python-oda-api.yml index 63f2e0b99..4ece62dc3 100644 --- a/.github/workflows/python-oda-api.yml +++ b/.github/workflows/python-oda-api.yml @@ -38,7 +38,6 @@ jobs: uses: actions/checkout@v2 with: repository: 'oda-hub/oda_api' - ref: 'adapt-to-new-dummy-instrument' path: oda_api From f25a7ede0d9a6bc9080d53ec754c519744391f9e Mon Sep 17 00:00:00 2001 From: burnout87 Date: Mon, 13 Nov 2023 14:09:34 +0100 Subject: [PATCH 32/32] avoid duplicaiton --- cdci_data_analysis/analysis/queries.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/cdci_data_analysis/analysis/queries.py b/cdci_data_analysis/analysis/queries.py index f0cb27f66..c9c771bf0 100644 --- a/cdci_data_analysis/analysis/queries.py +++ b/cdci_data_analysis/analysis/queries.py @@ -580,14 +580,10 @@ def get_query_products(self, else: job.set_submitted() - if return_progress: + if return_progress or (not return_progress and job.status == 'done'): prod_list = self.build_product_list(instrument, res, scratch_dir, api=api) else: - if job.status != 'done': - prod_list = QueryProductList(prod_list=[], job=job) - else: - prod_list = self.build_product_list(instrument,res, scratch_dir,api=api) - + prod_list = QueryProductList(prod_list=[], job=job) self.query_prod_list=QueryProductList(prod_list=prod_list,job=job)