diff --git a/examples/example_config.yaml b/examples/example_config.yaml index d27cdaee..a162f4dc 100644 --- a/examples/example_config.yaml +++ b/examples/example_config.yaml @@ -1,169 +1,198 @@ -chain_create_script: - handler: lsst.cmservice.handlers.scripts.ChainCreateScriptHandler -chain_prepend_script: - handler: lsst.cmservice.handlers.scripts.ChainPrependScriptHandler -chain_collect_script: - handler: lsst.cmservice.handlers.scripts.ChainCollectScriptHandler -tag_inputs_script: - handler: lsst.cmservice.handlers.scripts.TagInputsScriptHandler -tag_create_script: - handler: lsst.cmservice.handlers.scripts.TagCreateScriptHandler -tag_associate_script: - handler: lsst.cmservice.handlers.scripts.TagAssociateScriptHandler -prepare_step_script: - handler: lsst.cmservice.handlers.scripts.PrepareStepScriptHandler -validate_script: - handler: lsst.cmservice.handlers.scripts.ValidateScriptHandler -run_jobs: - handler: lsst.cmservice.handlers.elements.RunJobsScriptHandler - child_config: - spec_block: panda_job -run_groups: - handler: lsst.cmservice.handlers.elements.RunGroupsScriptHandler -run_steps: - handler: lsst.cmservice.handlers.elements.RunStepsScriptHandler -panda_job: - handler: lsst.cmservice.handlers.jobs.PandaJobHandler - collections: - run: "{root}/{campaign}/{step}/{group}/{script}/{job}" - output: "{group_output}" - data: - rescue: false -group: - collections: - group_output: "{root}/{campaign}/{step}/{group}" - group_validation: "{root}/{campaign}/{step}/{group}/validate" - scripts: - - run: - spec_block: run_jobs - - validate: - prerequisites: ['run'] - spec_block: validate_script - collections: - input: "{group_output}" - output: "{group_validation}" - - add_to_output: - prerequisites: ['validate'] - spec_block: tag_associate_script - collections: - input: "{group_output}" - output: "{campaign_tagged_output}" -step: - collections: - step_input: "{root}/{campaign}/{step}/input" - step_output: "{root}/{campaign}/{step}_ouput" - step_public_output: "{root}/{campaign}/{step}" - step_validation: "{root}/{campaign}/{step}/validate" - scripts: - - prepare: - spec_block: prepare_step_script - collections: - output: "{step_input}" - inputs: ["{campaign_input}", "{campaign_ancilllary}"] - - run: - prerequisites: ['prepare'] - spec_block: run_groups - - collect_groups: - prerequisites: ['run'] - spec_block: chain_collect_script - collections: - inputs: [] - output: "{step_output}" - - make_step_public_output: - prerequisites: ['collect_groups'] - spec_block: chain_create_script - collections: - inputs: ["{step_output}", "{campaign_input}", "{campaign_ancilllary}"] - output: "{step_public_output}" - - validate: - prerequisites: ['make_step_public_output'] - spec_block: validate_script - collections: - input: "{step_public_output}" - output: "{step_validation}" -campaign: - collections: - root: 'cm/p1' - campaign_source: /prod/raw/all - campaign_input: "{root}/{campaign}/input" - campaign_output: "{root}/{campaign}" - campaign_tagged_output: "{root}/{campaign}/_output" - campaign_ancillary: "{root}/{campaign}/ancillary" - campaign_validation: "{root}/{campaign}/validate" - scripts: - - tag_inputs: - spec_block: tag_inputs_script - collections: - input: "{campaign_source}" - output: "{campaign_input}" - - ancillary: - spec_block: chain_create_script - collections: - inputs: - - calib_input - - other_calib_input - output: - - "{campaign_ancillary}" - - create_empty_output: - spec_block: tag_create_script - collections: - output: "{campaign_tagged_output}" - - create_chained_output: - spec_block: chain_create_script - prerequisites: ['tag_inputs', 'ancillary', 'create_empty_output'] - collections: - inputs: - - "{campaign_ancillary}" - - "{campaign_input}" - - "{campaign_tagged_output}" - output: - - "{campaign_output}" - - run: - spec_block: run_steps - prerequisites: ['create_chained_output'] - - validate: - spec_block: validate_script - prerequisites: ['run'] - collections: - input: "{campaign_output}" - output: "{campaign_validation}" - child_config: - step1: - spec_block: step - data: - pipeline_yaml: "${OBS_LSST_DIR}/pipelines/imsim/DRP.yaml#step1" - child_config: - spec_block: group - split_methd: by_field_value - base_query: "a == 1" - split_field: "b" - split_vals: ["1", "2", "3"] - step2: - spec_block: step - prerequisites: ['step1'] - data: - pipeline_yaml: "${OBS_LSST_DIR}/pipelines/imsim/DRP.yaml#step2" - child_config: - spec_block: group - split_methd: by_field_value - base_query: "a == 1" - split_field: "b" - split_vals: ["1", "2", "3"] - step3: - spec_block: step - prerequisites: ['step2'] - data: - pipeline_yaml: "${OBS_LSST_DIR}/pipelines/imsim/DRP.yaml#step3" - child_config: - spec_block: group - split_methd: by_field_value - base_query: "a == 1" - split_field: "b" - split_vals: ["1", "2", "3"] - data: - butler_repo: 'dummy' - prod_area: 'output/archive' - data_query: "instrument = 'HSC' and exposure < 500" - bps_yaml_template: "${CM_CONFIGS}/example_template.yaml" - bps_script_template: "${CM_CONFIGS}/example_bps_template.sh" - lsst_version: "${WEEKLY}" +- SpecBlock: + name: chain_create_script + handler: lsst.cmservice.handlers.scripts.ChainCreateScriptHandler +- SpecBlock: + name: chain_prepend_script + handler: lsst.cmservice.handlers.scripts.ChainPrependScriptHandler +- SpecBlock: + name: chain_collect_script + handler: lsst.cmservice.handlers.scripts.ChainCollectScriptHandler +- SpecBlock: + name: tag_inputs_script + handler: lsst.cmservice.handlers.scripts.TagInputsScriptHandler +- SpecBlock: + name: tag_create_script + handler: lsst.cmservice.handlers.scripts.TagCreateScriptHandler +- SpecBlock: + name: tag_associate_script + handler: lsst.cmservice.handlers.scripts.TagAssociateScriptHandler +- SpecBlock: + name: prepare_step_script + handler: lsst.cmservice.handlers.scripts.PrepareStepScriptHandler +- SpecBlock: + name: validate_script + handler: lsst.cmservice.handlers.scripts.ValidateScriptHandler +- SpecBlock: + name: run_jobs + handler: lsst.cmservice.handlers.elements.RunJobsScriptHandler + child_config: + spec_block: panda_job +- SpecBlock: + name: run_groups + handler: lsst.cmservice.handlers.elements.RunGroupsScriptHandler +- SpecBlock: + name: run_steps + handler: lsst.cmservice.handlers.elements.RunStepsScriptHandler +- SpecBlock: + name: panda_job + handler: lsst.cmservice.handlers.jobs.PandaJobHandler + collections: + run: "{root}/{campaign}/{step}/{group}/{script}/{job}" + output: "{group_output}" + data: + rescue: false +- SpecBlock: + name: group + collections: + group_output: "{root}/{campaign}/{step}/{group}" + group_validation: "{root}/{campaign}/{step}/{group}/validate" + scripts: + - Script: + name: run + spec_block: run_jobs + - Script: + name: validate + prerequisites: ['run'] + spec_block: validate_script + collections: + input: "{group_output}" + output: "{group_validation}" + - Script: + add_to_output: + prerequisites: ['validate'] + spec_block: tag_associate_script + collections: + input: "{group_output}" + output: "{campaign_tagged_output}" +- SpecBlock: + name: step + collections: + step_input: "{root}/{campaign}/{step}/input" + step_output: "{root}/{campaign}/{step}_ouput" + step_public_output: "{root}/{campaign}/{step}" + step_validation: "{root}/{campaign}/{step}/validate" + scripts: + - Script: + name: prepare + spec_block: prepare_step_script + collections: + output: "{step_input}" + inputs: ["{campaign_input}", "{campaign_ancilllary}"] + - Script: + name: run + prerequisites: ['prepare'] + spec_block: run_groups + - Script: + name: collect_groups + prerequisites: ['run'] + spec_block: chain_collect_script + collections: + inputs: [] + output: "{step_output}" + - Script: + name: make_step_public_output + prerequisites: ['collect_groups'] + spec_block: chain_create_script + collections: + inputs: ["{step_output}", "{campaign_input}", "{campaign_ancilllary}"] + output: "{step_public_output}" + - Script: + name: validate + prerequisites: ['make_step_public_output'] + spec_block: validate_script + collections: + input: "{step_public_output}" + output: "{step_validation}" +- SpecBlock: + name: campaign + collections: + root: 'cm/p1' + campaign_source: /prod/raw/all + campaign_input: "{root}/{campaign}/input" + campaign_output: "{root}/{campaign}" + campaign_tagged_output: "{root}/{campaign}/_output" + campaign_ancillary: "{root}/{campaign}/ancillary" + campaign_validation: "{root}/{campaign}/validate" + scripts: + - Script: + name: tag_inputs + spec_block: tag_inputs_script + collections: + input: "{campaign_source}" + output: "{campaign_input}" + - Script: + name: ancillary + spec_block: chain_create_script + collections: + inputs: + - calib_input + - other_calib_input + output: + - "{campaign_ancillary}" + - Script: + name: create_empty_output + spec_block: tag_create_script + collections: + output: "{campaign_tagged_output}" + - Script: + name: create_chained_output + spec_block: chain_create_script + prerequisites: ['tag_inputs', 'ancillary', 'create_empty_output'] + collections: + inputs: + - "{campaign_ancillary}" + - "{campaign_input}" + - "{campaign_tagged_output}" + output: + - "{campaign_output}" + - Script: + name: run + spec_block: run_steps + prerequisites: ['create_chained_output'] + - Script: + name: validate + spec_block: validate_script + prerequisites: ['run'] + collections: + input: "{campaign_output}" + output: "{campaign_validation}" + child_config: + step1: + spec_block: step + data: + pipeline_yaml: "${OBS_LSST_DIR}/pipelines/imsim/DRP.yaml#step1" + child_config: + spec_block: group + split_methd: by_field_value + base_query: "a == 1" + split_field: "b" + split_vals: ["1", "2", "3"] + step2: + spec_block: step + prerequisites: ['step1'] + data: + pipeline_yaml: "${OBS_LSST_DIR}/pipelines/imsim/DRP.yaml#step2" + child_config: + spec_block: group + split_methd: by_field_value + base_query: "a == 1" + split_field: "b" + split_vals: ["1", "2", "3"] + step3: + spec_block: step + prerequisites: ['step2'] + data: + pipeline_yaml: "${OBS_LSST_DIR}/pipelines/imsim/DRP.yaml#step3" + child_config: + spec_block: group + split_methd: by_field_value + base_query: "a == 1" + split_field: "b" + split_vals: ["1", "2", "3"] + data: + butler_repo: 'dummy' + prod_area: 'output/archive' + data_query: "instrument = 'HSC' and exposure < 500" + bps_yaml_template: "${CM_CONFIGS}/example_template.yaml" + bps_script_template: "${CM_CONFIGS}/example_bps_template.sh" + lsst_version: "${WEEKLY}" diff --git a/src/lsst/cmservice/db/element_handler.py b/src/lsst/cmservice/db/element_handler.py index c013032f..baa7905a 100644 --- a/src/lsst/cmservice/db/element_handler.py +++ b/src/lsst/cmservice/db/element_handler.py @@ -132,25 +132,30 @@ async def prepare( script_ids_dict = {} prereq_pairs = [] - for script_dict in spec_block.scripts: - for script_name, script_vals in script_dict.items(): - assert isinstance(script_vals, dict) - script_spec_block = script_vals.get("spec_block", None) - assert script_spec_block - script_spec_block_fullname = f"{spec_name}#{script_spec_block}" - new_script = await Script.create_row( - session, - parent_level=element.level, - spec_block_name=script_spec_block_fullname, - parent_name=element.fullname, - name=script_name, - **script_vals, - ) - assert new_script - await session.refresh(new_script) - script_ids_dict[script_name] = new_script.id - for prereq_ in script_vals.get("prerequisites", []): - prereq_pairs.append((script_name, prereq_)) + for script_item in spec_block.scripts: + try: + script_vals = script_item["Script"] + except KeyError as msg: + raise KeyError(f"Expected Script tag, found {script_item.keys()}") from msg + assert isinstance(script_vals, dict) + script_spec_block = script_vals.get("spec_block", None) + script_name = script_vals.pop("name") + assert script_spec_block + assert script_name + script_spec_block_fullname = f"{spec_name}#{script_spec_block}" + new_script = await Script.create_row( + session, + parent_level=element.level, + spec_block_name=script_spec_block_fullname, + parent_name=element.fullname, + name=script_name, + **script_vals, + ) + assert new_script + await session.refresh(new_script) + script_ids_dict[script_name] = new_script.id + for prereq_ in script_vals.get("prerequisites", []): + prereq_pairs.append((script_name, prereq_)) for depend_name, prereq_name in prereq_pairs: prereq_id = script_ids_dict[prereq_name] diff --git a/src/lsst/cmservice/db/functions.py b/src/lsst/cmservice/db/functions.py index 729c435d..2f795ed6 100644 --- a/src/lsst/cmservice/db/functions.py +++ b/src/lsst/cmservice/db/functions.py @@ -27,7 +27,12 @@ async def load_specification( specification = Specification(name=spec_name) session.add(specification) - for key, val in spec_data.items(): + for config_item in spec_data: + try: + val = config_item["SpecBlock"] + except KeyError as msg: + raise KeyError(f"Expecting SpecBlock items not: {spec_data.keys()})") from msg + key = val.pop("name") fullname = f"{spec_name}#{key}" spec_block_q = select(SpecBlock).where(SpecBlock.fullname == fullname) spec_block_result = await session.scalars(spec_block_q) @@ -93,7 +98,7 @@ async def add_steps( prereq_pairs = [] for child_name_, child_config_ in child_configs.items(): - spec_block_name = child_config_.pop("spec_block", None) + spec_block_name = child_config_.pop("spec_block") assert spec_block_name spec_block = await specification.get_block(session, spec_block_name) new_step = await Step.create_row( @@ -108,7 +113,7 @@ async def add_steps( assert isinstance(new_step, Step) await session.refresh(new_step) step_ids_dict[child_name_] = new_step.id - full_child_config = await new_step.get_child_config(session) + full_child_config: dict = await new_step.get_child_config(session) prereqs_names = full_child_config.pop("prerequisites", []) for prereq_ in prereqs_names: prereq_pairs.append((child_name_, prereq_)) diff --git a/src/lsst/cmservice/db/node.py b/src/lsst/cmservice/db/node.py index 371ae323..4fc81cd8 100644 --- a/src/lsst/cmservice/db/node.py +++ b/src/lsst/cmservice/db/node.py @@ -235,15 +235,15 @@ async def get_child_config( child_config: dict The child configuration """ - ret_dict = {} + ret_dict: dict = {} if not hasattr(self, "child_config"): return {} async with session.begin_nested(): await session.refresh(self, attribute_names=["spec_block_"]) if self.spec_block_.child_config: - ret_dict.update(self.spec_block_.child_config) + ret_dict.update(**self.spec_block_.child_config) if self.child_config: - ret_dict.update(self.child_config) + ret_dict.update(**self.child_config) return ret_dict async def data_dict( diff --git a/src/lsst/cmservice/main.py b/src/lsst/cmservice/main.py index cf7eb7be..fd01a7fc 100644 --- a/src/lsst/cmservice/main.py +++ b/src/lsst/cmservice/main.py @@ -20,6 +20,7 @@ productions, queries, scripts, + spec_blocks, steps, updates, ) @@ -118,6 +119,7 @@ app.include_router(scripts.router, prefix=config.prefix) app.include_router(jobs.router, prefix=config.prefix) app.include_router(error_types.router, prefix=config.prefix) +app.include_router(spec_blocks.router, prefix=config.prefix) @app.on_event("startup") diff --git a/src/lsst/cmservice/routers/spec_blocks.py b/src/lsst/cmservice/routers/spec_blocks.py new file mode 100644 index 00000000..05f06939 --- /dev/null +++ b/src/lsst/cmservice/routers/spec_blocks.py @@ -0,0 +1,56 @@ +from typing import Sequence + +from fastapi import APIRouter, Depends +from safir.dependencies.db_session import db_session_dependency +from sqlalchemy.ext.asyncio import async_scoped_session + +from .. import db, models + +response_model_class = models.SpecBlock +create_model_class = models.SpecBlockCreate +db_class = db.SpecBlock +class_string = "spec_block" +tag_string = "SpecBlocks" + + +router = APIRouter( + prefix=f"/{class_string}s", + tags=[tag_string], +) + + +@router.get( + "", + response_model=list[response_model_class], + summary=f"List {class_string}s", +) +async def get_rows( + parent_id: int | None = None, + parent_name: str | None = None, + skip: int = 0, + limit: int = 100, + session: async_scoped_session = Depends(db_session_dependency), +) -> Sequence[db_class]: + result = await db_class.get_rows( + session, + parent_id=parent_id, + skip=skip, + limit=limit, + parent_name=parent_name, + parent_class=db.Specification, + ) + return result + + +@router.get( + "/{row_id}", + response_model=response_model_class, + summary=f"Retrieve a {class_string}", +) +async def get_row( + row_id: int, + session: async_scoped_session = Depends(db_session_dependency), +) -> db_class: + result = await db_class.get_row(session, row_id) + assert isinstance(result, db_class) + return result