Skip to content

Commit

Permalink
This commit doesn't update any reqs
Browse files Browse the repository at this point in the history
  • Loading branch information
eacharles committed Oct 24, 2023
1 parent fc9a28d commit 4b83310
Show file tree
Hide file tree
Showing 13 changed files with 602 additions and 43 deletions.
5 changes: 5 additions & 0 deletions src/lsst/cmservice/common/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class TableEnum(enum.Enum):
product_set = 12
specification = 13
spec_block = 14
script_template = 15

def is_node(self) -> bool:
"""Is this a subclass of NodeMixin"""
Expand Down Expand Up @@ -139,6 +140,10 @@ def is_bad(self) -> bool:
"""Is this a failed state"""
return self.value <= StatusEnum.rejected.value

def is_processable_element(self) -> bool:
"""Is this a processable state for an elememnt"""
return self.value >= StatusEnum.waiting.value and self.value <= StatusEnum.reviewable.value


class TaskStatusEnum(enum.Enum):
"""Defines possible outcomes for Pipetask tasks"""
Expand Down
4 changes: 4 additions & 0 deletions src/lsst/cmservice/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
from .pipetask_error_type import PipetaskErrorType
from .product_set import ProductSet
from .production import Production
from .queue import Queue
from .row import RowMixin
from .script import Script
from .script_dependency import ScriptDependency
from .script_error import ScriptError
from .script_template import ScriptTemplate
from .specification import SpecBlock, Specification
from .step import Step
from .step_dependency import StepDependency
Expand All @@ -33,10 +35,12 @@
"PipetaskErrorType",
"ProductSet",
"Production",
"Queue",
"RowMixin",
"Script",
"ScriptDependency",
"ScriptError",
"ScriptTemplate",
"SpecBlock",
"Specification",
"Step",
Expand Down
2 changes: 1 addition & 1 deletion src/lsst/cmservice/db/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ async def get_spec_aliases(
async with session.begin_nested():
if self.level == LevelEnum.script:
raise NotImplementedError()
elif self.level.value > LevelEnum.campaign.value:
if self.level.value > LevelEnum.campaign.value:
await session.refresh(self, attribute_names=["parent_"])
parent_data = await self.parent_.get_spec_aliases(session)
ret_dict.update(parent_data)
Expand Down
184 changes: 184 additions & 0 deletions src/lsst/cmservice/db/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
from __future__ import annotations

from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Optional

import pause
from sqlalchemy import JSON, DateTime
from sqlalchemy.ext.asyncio import async_scoped_session
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.schema import ForeignKey

from ..common.enums import LevelEnum
from .base import Base
from .campaign import Campaign
from .dbid import DbId
from .element import ElementMixin
from .group import Group
from .job import Job
from .node import NodeMixin
from .step import Step

if TYPE_CHECKING:
pass


class Queue(Base, NodeMixin):
"""Database table to implement processing queue"""

__tablename__ = "queue"

id: Mapped[int] = mapped_column(primary_key=True)
time_created: Mapped[DateTime] = mapped_column()
time_updated: Mapped[DateTime] = mapped_column()
time_finished: Mapped[DateTime | None] = mapped_column(default=None)
interval: Mapped[float] = mapped_column(default=300.0)
options: Mapped[Optional[dict | list]] = mapped_column(type_=JSON)

element_level: Mapped[LevelEnum] = mapped_column()
element_id: Mapped[int] = mapped_column()
c_id: Mapped[int | None] = mapped_column(ForeignKey("campaign.id", ondelete="CASCADE"), index=True)
s_id: Mapped[int | None] = mapped_column(ForeignKey("step.id", ondelete="CASCADE"), index=True)
g_id: Mapped[int | None] = mapped_column(ForeignKey("group.id", ondelete="CASCADE"), index=True)
j_id: Mapped[int | None] = mapped_column(ForeignKey("job.id", ondelete="CASCADE"), index=True)

c_: Mapped["Campaign"] = relationship("Campaign", viewonly=True)
s_: Mapped["Step"] = relationship("Step", viewonly=True)
g_: Mapped["Group"] = relationship("Group", viewonly=True)
j_: Mapped["Job"] = relationship("Job", viewonly=True)

@hybrid_property
def element_db_id(self) -> DbId:
"""Returns DbId"""
return DbId(self.element_level, self.element_id)

async def get_element(
self,
session: async_scoped_session,
) -> ElementMixin:
"""Get the parent `Element`
Parameters
----------
session : async_scoped_session
DB session manager
Returns
-------
element : ElementMixin
Requested Parent Element
"""
async with session.begin_nested():
element: ElementMixin | None = None
if self.element_level == LevelEnum.campaign:
await session.refresh(self, attribute_names=["c_"])
element = self.c_
elif self.element_level == LevelEnum.step:
await session.refresh(self, attribute_names=["s_"])
element = self.s_
elif self.element_level == LevelEnum.group:
await session.refresh(self, attribute_names=["g_"])
element = self.g_
elif self.element_level == LevelEnum.job:
await session.refresh(self, attribute_names=["j_"])
element = self.j_
else:
raise ValueError(f"Bad level for script: {self.element_level}")
if TYPE_CHECKING:
assert isinstance(element, ElementMixin)
return element

@classmethod
async def get_create_kwargs(
cls,
session: async_scoped_session,
**kwargs: Any,
) -> dict:
element_name = kwargs["element_name"]
element_level = kwargs["element_level"]

now = datetime.now()
ret_dict = {
"element_level": element_level,
"time_created": now,
"time_updated": now,
"options": kwargs.get("options", {}),
}

if element_level == LevelEnum.campaign:
element = await Campaign.get_row_by_fullname(session, element_name)
ret_dict["c_id"] = element.id
elif element_level == LevelEnum.step:
element = await Step.get_row_by_fullname(session, element_name)
ret_dict["s_id"] = element.id
elif element_level == LevelEnum.group:
element = await Group.get_row_by_fullname(session, element_name)
ret_dict["g_id"] = element.id
elif element_level == LevelEnum.job:
element = await Job.get_row_by_fullname(session, element_name)
ret_dict["j_id"] = element.id
else:
raise ValueError(f"Bad level for script: {element_level}")
ret_dict["element_id"] = element.id
return ret_dict

def waiting(
self,
) -> bool:
"""Check if this the Queue Element is done waiting
Returns
-------
done: bool
Returns True if still waiting
"""
delta_t = timedelta(seconds=self.interval)
next_check = self.time_updated + delta_t
now = datetime.now()
return now < next_check

def pause_until_next_check(
self,
) -> None:
"""Sleep until the next time check"""
delta_t = timedelta(seconds=self.interval)
next_check = self.time_updated + delta_t
now = datetime.now()
if now < next_check:
pause.until(next_check)

async def _process_and_update(
self,
session: async_scoped_session,
) -> bool:
element = await self.get_element()
if not element.status.is_processable_element():
return False

status = await element.process(session, **self.options)
now = datetime.now()
update_dict = {"time_updated": now}
if status.is_successful_element():
update_dict.update(time_finished=now)

await self.update_values(session, **update_dict)
return element.status.is_processable_element()

async def process_element(
self,
session: async_scoped_session,
) -> bool:
"""Process associated element"""
if self.waiting():
return True
return await self._process_and_update(session)

async def process_element_loop(
self,
session: async_scoped_session,
) -> None:
can_continue = True
while can_continue:
self.pause_until_next_check()
can_continue = await self._process_and_update(session)
7 changes: 6 additions & 1 deletion src/lsst/cmservice/db/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@ class Script(Base, NodeMixin):
@hybrid_property
def db_id(self) -> DbId:
"""Returns DbId"""
return DbId(LevelEnum.group, self.id)
return DbId(LevelEnum.script, self.id)

@hybrid_property
def parent_db_id(self) -> DbId:
"""Returns DbId"""
return DbId(self.parent_level, self.parent_id)

@property
def level(self) -> LevelEnum:
Expand Down
84 changes: 84 additions & 0 deletions src/lsst/cmservice/db/script_template.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING, Any, Optional

from sqlalchemy.ext.asyncio import async_scoped_session
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.schema import ForeignKey

from .base import Base
from .row import RowMixin
from .specification import Specification


class ScriptTemplate(Base, RowMixin):
"""Database table to manage script templates
A 'ScriptTemplate' is a template that gets used to create a bash script
"""

__tablename__ = "spec_block"

id: Mapped[int] = mapped_column(primary_key=True)
spec_id: Mapped[int] = mapped_column(ForeignKey("specification.id", ondelete="CASCADE"), index=True)
name: Mapped[str] = mapped_column(index=True)
fullname: Mapped[str] = mapped_column(unique=True)
data: Mapped[Optional[str]] = mapped_column()

spec_: Mapped["Specification"] = relationship("Specification", viewonly=True)

def __repr__(self) -> str:
return f"ScriptTemplate {self.id}: {self.fullname} {self.data}"

@classmethod
async def get_create_kwargs(
cls,
session: async_scoped_session,
**kwargs: Any,
) -> dict:
spec_name = kwargs["spec_name"]
spec = await Specification.get_row_by_fullname(session, spec_name)
name = kwargs["name"]

ret_dict = {
"spec_id": spec.id,
"name": name,
"fullname": f"{spec_name}#{name}",
"data": kwargs.get("data", None),
}
return ret_dict

@classmethod
async def load(
cls,
session: async_scoped_session,
spec_name: str,
file_path: str,
) -> ScriptTemplate:
"""Load a ScriptTemplate from a file
Parameters
----------
session : async_scoped_session
DB session manager
spec_name: str,
Name for the specification
file_path
Path to the file
Returns
-------
script_template : `ScriptTemplate`
Newly created `ScriptTemplate`
"""
full_file_path = os.path.abspath(os.path.expandvars(file_path))
with open(full_file_path, "r") as fin:
data = fin.read()

new_row = cls.create_row(session, spec_name=spec_name, data=data)
if TYPE_CHECKING:
assert isinstance(new_row, ScriptTemplate)
return new_row
6 changes: 5 additions & 1 deletion src/lsst/cmservice/db/specification.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Any, List, Optional
from typing import TYPE_CHECKING, Any, List, Optional

from sqlalchemy import JSON, select
from sqlalchemy.ext.asyncio import async_scoped_session
Expand All @@ -11,6 +11,9 @@
from .base import Base
from .row import RowMixin

if TYPE_CHECKING:
pass


class SpecBlock(Base, RowMixin):
"""Database table to manage blocks that are used to build campaigns
Expand Down Expand Up @@ -68,6 +71,7 @@ class Specification(Base, RowMixin):
name: Mapped[str] = mapped_column(index=True)

blocks_: Mapped[List["SpecBlock"]] = relationship("SpecBlock", viewonly=True)
script_templates_ = Mapped[List["ScriptTemplate"]] = relationship("ScriptTemplate", viewonly=True)

@hybrid_property
def fullname(self) -> str:
Expand Down
Loading

0 comments on commit 4b83310

Please sign in to comment.