Skip to content

Commit

Permalink
Fix radiasoft/raydata#164: handle chx scans without cycle in start doc
Browse files Browse the repository at this point in the history
Create the concept of "elegible scan" that tests whether the fields
we need are present.

Also cleanup some cruft.
  • Loading branch information
e-carlin committed Sep 18, 2024
1 parent b5100f2 commit e199847
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 62 deletions.
12 changes: 6 additions & 6 deletions sirepo/package_data/static/js/raydata.js
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ SIREPO.app.directive('scanDetail', function() {
<div data-ng-if="analysisElapsedTime()"><strong>Analysis Elapsed Time:</strong> {{ analysisElapsedTime() }} seconds</div>
<div>
<div><strong>Current Status: </strong>{{ scan.status }}</div>
<div data-ng-if="! isEmptyObject(latestDetailedStatus)">
<div data-ng-if="latestDetailedStatus">
<strong>Detailed Status:</strong>
<ul>
<li data-ng-repeat="(stepName, stepInfo) in latestDetailedStatus">
Expand All @@ -1146,17 +1146,17 @@ SIREPO.app.directive('scanDetail', function() {
$scope.latestDetailedStatus = null;

function setLatestDetailedStatus() {
$scope.latestDetailedStatus = $scope.scan.detailed_status[Math.max(Object.keys($scope.scan.detailed_status))];
$scope.latestDetailedStatus = null;
if ($scope.scan.detailed_status) {
$scope.latestDetailedStatus = $scope.scan.detailed_status[Math.max(Object.keys($scope.scan.detailed_status))];
}

}

$scope.analysisElapsedTime = () => {
return $scope.scan && $scope.scan.analysis_elapsed_time ? $scope.scan.analysis_elapsed_time : null;
};

$scope.isEmptyObject = (obj) => {
return $.isEmptyObject(obj);
};

$scope.parseTime = (unixTime) => {
return (new Date(unixTime * 1000)).toString();
};
Expand Down
9 changes: 9 additions & 0 deletions sirepo/raydata/analysis_driver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ def get_analysis_pdf_paths(self):
def get_conda_env(self):
raise NotImplementedError("children must implement this method")

def get_detailed_status_file(*args, **kwargs):
return None

def get_notebooks(self, *args, **kwargs):
raise NotImplementedError("children must implement this method")

Expand Down Expand Up @@ -95,6 +98,11 @@ def get_run_log(self):
def has_analysis_pdfs(self):
return len(self.get_analysis_pdf_paths()) > 0

# TODO(e-carlin): There should be a databroker class for each
# beamline and this question should be answered by it.
def is_scan_elegible_for_analysis(self):
return True

def render_papermill_script(self, input_f, output_f):
p = self.get_output_dir().join(_PAPERMILL_SCRIPT)
pkjinja.render_resource(
Expand All @@ -116,6 +124,7 @@ def _get_papermill_args(self, *args, **kwargs):
return []


# TODO(e-carlin): support just passing catalog_name and rduid outsidef of PKDict
def get(incoming):
def _verify_rduid(rduid):
# rduid will be combined with paths throughout the application.
Expand Down
7 changes: 5 additions & 2 deletions sirepo/raydata/analysis_driver/chx.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ class CHX(sirepo.raydata.analysis_driver.AnalysisDriverBase):
def get_conda_env(self):
return _cfg.conda_env

def get_detailed_status_file(self, rduid):
def get_detailed_status_file(self, rduid, *args, **kwargs):
p = self.get_output_dir().join(f"progress_dict_{rduid}.json")
if not p.check():
return PKDict()
return None
d = pkjson.load_any(p)
# The notebooks do json.dump(json.dumps(progress_dict), outfile)
# which double encodes the json object. So, we may
Expand Down Expand Up @@ -56,6 +56,9 @@ def get_output_dir(self):
self.rduid,
)

def is_scan_elegible_for_analysis(self):
return bool(self._scan_metadata.get_start_field("cycle", unchecked=True))

def _get_papermill_args(self, *args, **kwargs):
return [
# Cycle can look like 2024_2 which is converted to int by papermill unless raw_param=True
Expand Down
98 changes: 44 additions & 54 deletions sirepo/raydata/scan_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,20 +186,15 @@ def set_scan_status(cls, analysis_driver, status, analysis_elapsed_time=None):
r.save()

@classmethod
def statuses_for_scans(cls, catalog_name, rduids):
return (
cls.session.query(cls.rduid, cls.status)
.filter(cls.catalog_name == catalog_name, cls.rduid.in_(rduids))
.all()
)

@classmethod
def analysis_elapsed_time_for_scans(cls, catalog_name, rduids):
return (
cls.session.query(cls.rduid, cls.analysis_elapsed_time)
.filter(cls.catalog_name == catalog_name, cls.rduid.in_(rduids))
.all()
def status_and_elapsed_time(cls, catalog_name, rduid):
r = (
cls.session.query(cls.status, cls.analysis_elapsed_time)
.filter(cls.rduid == rduid)
.one_or_none()
)
if r:
return PKDict(status=r[0], analysis_elapsed_time=r[1])
return PKDict(status=None, analysis_elapsed_time=None)

@classmethod
def _db_upgrade(cls):
Expand Down Expand Up @@ -317,7 +312,7 @@ def _build_search_text(self, text):
},
],
}
elif len(nums):
if len(nums):
return {
"scan_id": {"$in": nums},
}
Expand Down Expand Up @@ -358,32 +353,27 @@ def _search_params(req_data):
)
/ req_data.pageSize
)
l = [
PKDict(rduid=u)
for u in c.search(
_search_params(req_data),
sort=_sort_params(req_data),
limit=req_data.pageSize,
skip=req_data.pageNumber * req_data.pageSize,
)
]
d = PKDict(
_Analysis.statuses_for_scans(
catalog_name=req_data.catalogName, rduids=[s.rduid for s in l]
)
)

e = PKDict(
_Analysis.analysis_elapsed_time_for_scans(
catalog_name=req_data.catalogName, rduids=[s.rduid for s in l]
)
)

for s in l:
s.status = d.get(s.rduid, _AnalysisStatus.NONE)
s.analysis_elapsed_time = e.get(s.rduid, None)
s.detailed_status = _get_detailed_status(req_data.catalogName, s.rduid)
return l, pc
res = []
for u in c.search(
_search_params(req_data),
sort=_sort_params(req_data),
limit=req_data.pageSize,
skip=req_data.pageNumber * req_data.pageSize,
):
# Code after this (ex detailed_status) expects that the
# scan is valid (ex 'cycle' exists in start doc for chx).
# So, don't even show scans to users that aren't elegible.
if sirepo.raydata.analysis_driver.get(
PKDict(catalog_name=req_data.catalogName, rduid=u)
).is_scan_elegible_for_analysis():
res.append(
PKDict(
rduid=u,
detailed_status=_get_detailed_status(req_data.catalogName, u),
**_Analysis.status_and_elapsed_time(req_data.catalogName, u),
)
)
return res, pc

def _request_analysis_output(self, req_data):
return sirepo.raydata.analysis_driver.get(req_data).get_output()
Expand Down Expand Up @@ -602,13 +592,9 @@ def _default_columns(catalog_name):


def _get_detailed_status(catalog_name, rduid):
d = sirepo.raydata.analysis_driver.get(
return sirepo.raydata.analysis_driver.get(
PKDict(catalog_name=catalog_name, rduid=rduid)
)
if hasattr(d, "get_detailed_status_file"):
return d.get_detailed_status_file(rduid)
else:
return None
).get_detailed_status_file(rduid)


async def _init_catalog_monitors():
Expand All @@ -629,7 +615,6 @@ def _monitor_catalog(catalog_name):
# new documents are available.
# But, for now it is easiest to just poll
async def _poll_catalog_for_scans(catalog_name):
# TODO(e-carlin): need to test polling feature
def _collect_new_scans_and_queue(last_known_scan_metadata):
r = [
sirepo.raydata.databroker.get_metadata(s, catalog_name)
Expand Down Expand Up @@ -663,14 +648,17 @@ async def _poll_for_new_scans():
s = _collect_new_scans_and_queue(s)
await pkasyncio.sleep(2)

async def _wait_for_catalog():
while True:
try:
sirepo.raydata.databroker.catalog(catalog_name)
return
except KeyError:
pkdlog(f"no catalog_name={catalog_name}. Retrying...")
await pkasyncio.sleep(15)

pkdlog("catalog_name={}", catalog_name)
c = None
while not c:
try:
c = sirepo.raydata.databroker.catalog(catalog_name)
except KeyError:
pkdlog(f"no catalog_name={catalog_name}. Retrying...")
await pkasyncio.sleep(15)
await _wait_for_catalog()
await _poll_for_new_scans()
raise AssertionError("should never get here")

Expand All @@ -680,6 +668,8 @@ def _queue_for_analysis(scan_metadata):
rduid=scan_metadata.rduid,
catalog_name=scan_metadata.catalog_name,
)
if not sirepo.raydata.analysis_driver.get(s).is_scan_elegible_for_analysis():
return
pkdlog("scan={}", s)
if s not in _SCANS_AWAITING_ANALYSIS:
pkio.unchecked_remove(sirepo.raydata.analysis_driver.get(s).get_output_dir())
Expand Down

0 comments on commit e199847

Please sign in to comment.