From 250d6d635d59f9bc5b4b1342b888325621e930e9 Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Wed, 17 Jul 2024 00:22:32 +0200 Subject: [PATCH] fix more imports --- .../client/schemas/objects/__init__.py | 5 +- .../client/schemas/schedules/__init__.py | 9 ++- .../schemas/schedules/schedule_types.py | 69 +++++++++++++++++++ 3 files changed, 81 insertions(+), 2 deletions(-) create mode 100644 src/prefect/client/schemas/schedules/schedule_types.py diff --git a/src/prefect/client/schemas/objects/__init__.py b/src/prefect/client/schemas/objects/__init__.py index 3b749d22945e..59cf8b56ed64 100644 --- a/src/prefect/client/schemas/objects/__init__.py +++ b/src/prefect/client/schemas/objects/__init__.py @@ -28,7 +28,7 @@ from .queue_filter import QueueFilter from .saved_search import SavedSearch from .saved_search_filter import SavedSearchFilter - from .state import State + from .state import State, TERMINAL_STATES from .state_details import StateDetails from .state_type import StateType from .task_run import TaskRun @@ -61,6 +61,7 @@ "Constant": (__spec__.parent, ".constant"), "CsrfToken": (__spec__.parent, ".csrf_token"), "Deployment": (__spec__.parent, ".deployment"), + "DEFAULT_BLOCK_SCHEMA_VERSION": (__spec__.parent, ".block_schema"), "DeploymentSchedule": (__spec__.parent, ".deployment_schedule"), "DeploymentStatus": (__spec__.parent, ".deployment_status"), "Flow": (__spec__.parent, ".flow"), @@ -81,6 +82,7 @@ "TaskRunInput": (__spec__.parent, ".task_run_input"), "TaskRunPolicy": (__spec__.parent, ".task_run_policy"), "TaskRunResult": (__spec__.parent, ".task_run_result"), + "TERMINAL_STATES": (__spec__.parent, ".state"), "Variable": (__spec__.parent, ".variable"), "WorkPool": (__spec__.parent, ".work_pool"), "WorkPoolStatus": (__spec__.parent, ".work_pool_status"), @@ -128,6 +130,7 @@ "TaskRunInput", "TaskRunPolicy", "TaskRunResult", + "TERMINAL_STATES", "Variable", "WorkPool", "WorkPoolStatus", diff --git a/src/prefect/client/schemas/schedules/__init__.py b/src/prefect/client/schemas/schedules/__init__.py index fa0454f2eeda..026d1dee02a2 100644 --- a/src/prefect/client/schemas/schedules/__init__.py +++ b/src/prefect/client/schemas/schedules/__init__.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Union import importlib if TYPE_CHECKING: @@ -6,6 +6,7 @@ from .interval_schedule import IntervalSchedule from .no_schedule import NoSchedule from .r_rule_schedule import RRuleSchedule + from .schedule_types import SCHEDULE_TYPES, construct_schedule, is_schedule_type _public_api: dict[str, tuple[str, str]] = { @@ -13,6 +14,9 @@ "IntervalSchedule": (__spec__.parent, ".interval_schedule"), "NoSchedule": (__spec__.parent, ".no_schedule"), "RRuleSchedule": (__spec__.parent, ".r_rule_schedule"), + "SCHEDULE_TYPES": (__spec__.parent, ".schedule_types"), + "construct_schedule": (__spec__.parent, ".schedule_types"), + "is_schedule_type": (__spec__.parent, ".schedule_types"), } __all__ = [ @@ -20,6 +24,9 @@ "IntervalSchedule", "NoSchedule", "RRuleSchedule", + "SCHEDULE_TYPES", + "construct_schedule", + "is_schedule_type", ] diff --git a/src/prefect/client/schemas/schedules/schedule_types.py b/src/prefect/client/schemas/schedules/schedule_types.py new file mode 100644 index 000000000000..666fad1449b5 --- /dev/null +++ b/src/prefect/client/schemas/schedules/schedule_types.py @@ -0,0 +1,69 @@ +import datetime +from typing import Any, Optional, TypeAlias, TypeGuard, Union + +from pydantic_extra_types.pendulum_dt import DateTime + +from .cron_schedule import CronSchedule +from .interval_schedule import IntervalSchedule +from .no_schedule import NoSchedule +from .r_rule_schedule import RRuleSchedule + +SCHEDULE_TYPES: TypeAlias = Union[ + IntervalSchedule, CronSchedule, RRuleSchedule, NoSchedule +] + + +def is_schedule_type(obj: Any) -> TypeGuard[SCHEDULE_TYPES]: + return isinstance(obj, (IntervalSchedule, CronSchedule, RRuleSchedule, NoSchedule)) + + +def construct_schedule( + interval: Optional[Union[int, float, datetime.timedelta]] = None, + anchor_date: Optional[Union[datetime.datetime, str]] = None, + cron: Optional[str] = None, + rrule: Optional[str] = None, + timezone: Optional[str] = None, +) -> SCHEDULE_TYPES: + """ + Construct a schedule from the provided arguments. + + Args: + interval: An interval on which to schedule runs. Accepts either a number + or a timedelta object. If a number is given, it will be interpreted as seconds. + anchor_date: The start date for an interval schedule. + cron: A cron schedule for runs. + rrule: An rrule schedule of when to execute runs of this flow. + timezone: A timezone to use for the schedule. Defaults to UTC. + """ + num_schedules = sum(1 for entry in (interval, cron, rrule) if entry is not None) + if num_schedules > 1: + raise ValueError("Only one of interval, cron, or rrule can be provided.") + + if anchor_date and not interval: + raise ValueError( + "An anchor date can only be provided with an interval schedule" + ) + + if timezone and not (interval or cron or rrule): + raise ValueError( + "A timezone can only be provided with interval, cron, or rrule" + ) + + schedule = None + if interval: + if isinstance(interval, (int, float)): + interval = datetime.timedelta(seconds=interval) + if not anchor_date: + anchor_date = DateTime.now() + schedule = IntervalSchedule( + interval=interval, anchor_date=anchor_date, timezone=timezone + ) + elif cron: + schedule = CronSchedule(cron=cron, timezone=timezone) + elif rrule: + schedule = RRuleSchedule(rrule=rrule, timezone=timezone) + + if schedule is None: + raise ValueError("Either interval, cron, or rrule must be provided") + + return schedule