diff --git a/sirepo/package_data/static/js/raydata.js b/sirepo/package_data/static/js/raydata.js index c5a32af28b..e6c1f45cee 100644 --- a/sirepo/package_data/static/js/raydata.js +++ b/sirepo/package_data/static/js/raydata.js @@ -1123,7 +1123,7 @@ SIREPO.app.directive('scanDetail', function() {
Analysis Elapsed Time: {{ analysisElapsedTime() }} seconds
Current Status: {{ scan.status }}
-
+
Detailed Status:
  • @@ -1146,17 +1146,17 @@ SIREPO.app.directive('scanDetail', function() { $scope.latestDetailedStatus = null; function setLatestDetailedStatus() { - $scope.latestDetailedStatus = $scope.scan.detailed_status[Math.max(Object.keys($scope.scan.detailed_status))]; + $scope.latestDetailedStatus = null; + if ($scope.scan.detailed_status) { + $scope.latestDetailedStatus = $scope.scan.detailed_status[Math.max(Object.keys($scope.scan.detailed_status))]; + } + } $scope.analysisElapsedTime = () => { return $scope.scan && $scope.scan.analysis_elapsed_time ? $scope.scan.analysis_elapsed_time : null; }; - $scope.isEmptyObject = (obj) => { - return $.isEmptyObject(obj); - }; - $scope.parseTime = (unixTime) => { return (new Date(unixTime * 1000)).toString(); }; diff --git a/sirepo/raydata/analysis_driver/__init__.py b/sirepo/raydata/analysis_driver/__init__.py index 701469ef4e..a13a9c6920 100644 --- a/sirepo/raydata/analysis_driver/__init__.py +++ b/sirepo/raydata/analysis_driver/__init__.py @@ -35,6 +35,9 @@ def get_analysis_pdf_paths(self): def get_conda_env(self): raise NotImplementedError("children must implement this method") + def get_detailed_status_file(*args, **kwargs): + return None + def get_notebooks(self, *args, **kwargs): raise NotImplementedError("children must implement this method") @@ -95,6 +98,11 @@ def get_run_log(self): def has_analysis_pdfs(self): return len(self.get_analysis_pdf_paths()) > 0 + # TODO(e-carlin): There should be a databroker class for each + # beamline and this question should be answered by it. + def is_scan_elegible_for_analysis(self): + return True + def render_papermill_script(self, input_f, output_f): p = self.get_output_dir().join(_PAPERMILL_SCRIPT) pkjinja.render_resource( @@ -116,6 +124,7 @@ def _get_papermill_args(self, *args, **kwargs): return [] +# TODO(e-carlin): support just passing catalog_name and rduid outsidef of PKDict def get(incoming): def _verify_rduid(rduid): # rduid will be combined with paths throughout the application. diff --git a/sirepo/raydata/analysis_driver/chx.py b/sirepo/raydata/analysis_driver/chx.py index 3175a42c70..94bedf6219 100644 --- a/sirepo/raydata/analysis_driver/chx.py +++ b/sirepo/raydata/analysis_driver/chx.py @@ -19,10 +19,10 @@ class CHX(sirepo.raydata.analysis_driver.AnalysisDriverBase): def get_conda_env(self): return _cfg.conda_env - def get_detailed_status_file(self, rduid): + def get_detailed_status_file(self, rduid, *args, **kwargs): p = self.get_output_dir().join(f"progress_dict_{rduid}.json") if not p.check(): - return PKDict() + return None d = pkjson.load_any(p) # The notebooks do json.dump(json.dumps(progress_dict), outfile) # which double encodes the json object. So, we may @@ -56,6 +56,9 @@ def get_output_dir(self): self.rduid, ) + def is_scan_elegible_for_analysis(self): + return bool(self._scan_metadata.get_start_field("cycle", unchecked=True)) + def _get_papermill_args(self, *args, **kwargs): return [ # Cycle can look like 2024_2 which is converted to int by papermill unless raw_param=True diff --git a/sirepo/raydata/scan_monitor.py b/sirepo/raydata/scan_monitor.py index 4430fe247a..7ea0d4b9a6 100644 --- a/sirepo/raydata/scan_monitor.py +++ b/sirepo/raydata/scan_monitor.py @@ -186,20 +186,15 @@ def set_scan_status(cls, analysis_driver, status, analysis_elapsed_time=None): r.save() @classmethod - def statuses_for_scans(cls, catalog_name, rduids): - return ( - cls.session.query(cls.rduid, cls.status) - .filter(cls.catalog_name == catalog_name, cls.rduid.in_(rduids)) - .all() - ) - - @classmethod - def analysis_elapsed_time_for_scans(cls, catalog_name, rduids): - return ( - cls.session.query(cls.rduid, cls.analysis_elapsed_time) - .filter(cls.catalog_name == catalog_name, cls.rduid.in_(rduids)) - .all() + def status_and_elapsed_time(cls, catalog_name, rduid): + r = ( + cls.session.query(cls.status, cls.analysis_elapsed_time) + .filter(cls.rduid == rduid) + .one_or_none() ) + if r: + return PKDict(status=r[0], analysis_elapsed_time=r[1]) + return PKDict(status=None, analysis_elapsed_time=None) @classmethod def _db_upgrade(cls): @@ -317,7 +312,7 @@ def _build_search_text(self, text): }, ], } - elif len(nums): + if len(nums): return { "scan_id": {"$in": nums}, } @@ -358,32 +353,27 @@ def _search_params(req_data): ) / req_data.pageSize ) - l = [ - PKDict(rduid=u) - for u in c.search( - _search_params(req_data), - sort=_sort_params(req_data), - limit=req_data.pageSize, - skip=req_data.pageNumber * req_data.pageSize, - ) - ] - d = PKDict( - _Analysis.statuses_for_scans( - catalog_name=req_data.catalogName, rduids=[s.rduid for s in l] - ) - ) - - e = PKDict( - _Analysis.analysis_elapsed_time_for_scans( - catalog_name=req_data.catalogName, rduids=[s.rduid for s in l] - ) - ) - - for s in l: - s.status = d.get(s.rduid, _AnalysisStatus.NONE) - s.analysis_elapsed_time = e.get(s.rduid, None) - s.detailed_status = _get_detailed_status(req_data.catalogName, s.rduid) - return l, pc + res = [] + for u in c.search( + _search_params(req_data), + sort=_sort_params(req_data), + limit=req_data.pageSize, + skip=req_data.pageNumber * req_data.pageSize, + ): + # Code after this (ex detailed_status) expects that the + # scan is valid (ex 'cycle' exists in start doc for chx). + # So, don't even show scans to users that aren't elegible. + if sirepo.raydata.analysis_driver.get( + PKDict(catalog_name=req_data.catalogName, rduid=u) + ).is_scan_elegible_for_analysis(): + res.append( + PKDict( + rduid=u, + detailed_status=_get_detailed_status(req_data.catalogName, u), + **_Analysis.status_and_elapsed_time(req_data.catalogName, u), + ) + ) + return res, pc def _request_analysis_output(self, req_data): return sirepo.raydata.analysis_driver.get(req_data).get_output() @@ -602,13 +592,9 @@ def _default_columns(catalog_name): def _get_detailed_status(catalog_name, rduid): - d = sirepo.raydata.analysis_driver.get( + return sirepo.raydata.analysis_driver.get( PKDict(catalog_name=catalog_name, rduid=rduid) - ) - if hasattr(d, "get_detailed_status_file"): - return d.get_detailed_status_file(rduid) - else: - return None + ).get_detailed_status_file(rduid) async def _init_catalog_monitors(): @@ -629,7 +615,6 @@ def _monitor_catalog(catalog_name): # new documents are available. # But, for now it is easiest to just poll async def _poll_catalog_for_scans(catalog_name): - # TODO(e-carlin): need to test polling feature def _collect_new_scans_and_queue(last_known_scan_metadata): r = [ sirepo.raydata.databroker.get_metadata(s, catalog_name) @@ -663,14 +648,17 @@ async def _poll_for_new_scans(): s = _collect_new_scans_and_queue(s) await pkasyncio.sleep(2) + async def _wait_for_catalog(): + while True: + try: + sirepo.raydata.databroker.catalog(catalog_name) + return + except KeyError: + pkdlog(f"no catalog_name={catalog_name}. Retrying...") + await pkasyncio.sleep(15) + pkdlog("catalog_name={}", catalog_name) - c = None - while not c: - try: - c = sirepo.raydata.databroker.catalog(catalog_name) - except KeyError: - pkdlog(f"no catalog_name={catalog_name}. Retrying...") - await pkasyncio.sleep(15) + await _wait_for_catalog() await _poll_for_new_scans() raise AssertionError("should never get here") @@ -680,6 +668,8 @@ def _queue_for_analysis(scan_metadata): rduid=scan_metadata.rduid, catalog_name=scan_metadata.catalog_name, ) + if not sirepo.raydata.analysis_driver.get(s).is_scan_elegible_for_analysis(): + return pkdlog("scan={}", s) if s not in _SCANS_AWAITING_ANALYSIS: pkio.unchecked_remove(sirepo.raydata.analysis_driver.get(s).get_output_dir())