Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for jobs using cron #126

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
388 changes: 316 additions & 72 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ dependencies = [
"shellingham >= 1.5.4",
"jsonschema >= 4",
"paramiko >= 3.5.0",
"types-paramiko >= 3.5.0"
"types-paramiko >= 3.5.0",
"apscheduler (>=3.11.0,<4.0.0)",
"sqlalchemy (>=2.0.37,<3.0.0)"
]

[project.scripts]
Expand Down Expand Up @@ -121,7 +123,7 @@ quote-style = "single"
[tool.bandit]
exclude_dirs = ["tests"]
targets = "src/makim/"
skips = ["B102", "B701", "B507", "B601"]
skips = ["B102", "B701", "B507", "B601", "B603"]

[tool.vulture]
exclude = ["tests"]
Expand Down
238 changes: 229 additions & 9 deletions src/makim/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Cli functions to define the arguments and to call Makim."""
"""CLI functions to define the arguments and call Makim."""

from __future__ import annotations

Expand All @@ -9,6 +9,9 @@

import typer

from rich.console import Console
from rich.table import Table

from makim import __version__
from makim.cli.auto_generator import (
create_dynamic_command,
Expand Down Expand Up @@ -60,7 +63,7 @@ def main(
help='Execute the command in verbose mode',
),
) -> None:
"""Process envers for specific flags, otherwise show the help menu."""
"""Process top-level flags; otherwise, show the help menu."""
typer.echo(f'Makim file: {file}')

if version:
Expand Down Expand Up @@ -96,18 +99,233 @@ def _get_command_from_cli() -> str:
return command


def _create_cron_table() -> Table:
"""Create a table for displaying scheduled tasks."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move the scheduler code to its own module .. you can calle it as scheduler.py inside the cli folder.

table = Table(show_header=True, header_style='bold magenta')
table.add_column('Name', style='cyan')
table.add_column('Task', style='blue')
table.add_column('Schedule', style='yellow')
table.add_column('Status', style='green')
table.add_column('Next Run', style='magenta')
return table


def _handle_cron_list(makim_instance: Makim) -> None:
"""Handle the cron list command."""
scheduled_tasks = makim_instance.global_data.get('scheduler', {})

if not scheduled_tasks:
typer.echo('No scheduled tasks configured in .makim.yaml')
return

console = Console()
table = _create_cron_table()

active_jobs = {
job['name']: job
for job in (
makim_instance.scheduler.list_jobs()
if makim_instance.scheduler
else []
)
}

for name, config in scheduled_tasks.items():
active_job = active_jobs.get(name)
status = 'Active' if active_job else 'Inactive'
next_run = (
active_job['next_run_time'] if active_job else 'Not scheduled'
)

table.add_row(
name,
config.get('task', 'N/A'),
config.get('schedule', 'N/A'),
status,
next_run or 'Not scheduled',
)

console.print(table)


def _handle_cron_start(
makim_instance: Makim,
name: str | None,
all_jobs: bool,
) -> None:
"""Handle the cron start command."""
if not makim_instance.scheduler:
typer.echo('No scheduler configured.')
return

scheduled_tasks = makim_instance.global_data.get('scheduler', {})

if all_jobs:
success_count = 0
error_count = 0
for schedule_name, schedule_config in scheduled_tasks.items():
try:
makim_instance.scheduler.add_job(
name=schedule_name,
schedule=schedule_config['schedule'],
task=schedule_config['task'],
args=schedule_config.get('args', {}),
)
success_count += 1
typer.echo(f"Successfully started schedule '{schedule_name}'")
except Exception as e:
error_count += 1
typer.echo(
f"Failed to start schedule '{schedule_name}': {e}",
err=True,
)

typer.echo(
f'\nSummary: {success_count} jobs started successfully, '
f'{error_count} failed'
)
return

if not name:
typer.echo("Please provide a scheduler name or use '--all' flag")
raise typer.Exit(1)

try:
schedule_config = scheduled_tasks.get(name)
if not schedule_config:
typer.echo(f"No configuration found for schedule '{name}'")
return

makim_instance.scheduler.add_job(
name=name,
schedule=schedule_config['schedule'],
task=schedule_config['task'],
args=schedule_config.get('args', {}),
)
typer.echo(f"Successfully started schedule '{name}'")
except Exception as e:
typer.echo(f"Failed to start schedule '{name}': {e}", err=True)


def _handle_cron_stop(
makim_instance: Makim,
name: str | None,
all_jobs: bool,
) -> None:
"""Handle the cron stop command."""
if not makim_instance.scheduler:
typer.echo('No scheduler configured.')
return

if all_jobs:
active_jobs = makim_instance.scheduler.list_jobs()
success_count = 0
error_count = 0

for job in active_jobs:
try:
makim_instance.scheduler.remove_job(job['name'])
success_count += 1
typer.echo(f"Successfully stopped schedule '{job['name']}'")
except Exception as e:
error_count += 1
typer.echo(
f"Failed to stop schedule '{job['name']}': {e}",
err=True,
)

typer.echo(
f'\nSummary: {success_count} jobs stopped successfully, '
f'{error_count} failed'
)
return

if not name:
typer.echo("Please provide a scheduler name or use '--all' flag")
raise typer.Exit(1)

try:
makim_instance.scheduler.remove_job(name)
typer.echo(f"Successfully stopped schedule '{name}'")
except Exception as e:
typer.echo(f"Failed to stop schedule '{name}': {e}", err=True)


def _handle_cron_commands(makim_instance: Makim) -> typer.Typer:
"""Create and handle cron-related commands.

Returns
-------
typer.Typer: The cron command group with all subcommands.
"""
typer_cron = typer.Typer(
help='Tasks Scheduler',
invoke_without_command=True,
)

if 'scheduler' in makim_instance.global_data:
for schedule_name, schedule_params in makim_instance.global_data.get(
'scheduler', {}
).items():
create_dynamic_command_cron(
makim_instance,
typer_cron,
schedule_name,
schedule_params or {},
)

@typer_cron.command(help='List all scheduled tasks')
def list() -> None:
"""List tasks defined in .makim.yaml and their current status."""
_handle_cron_list(makim_instance)

@typer_cron.command(help='Start a scheduler by its name')
def start(
name: str = typer.Argument(
None,
help="""Name of the scheduler to start.
Use '--all' for all schedulers""",
),
all: bool = typer.Option(
False,
'--all',
help='Start all available schedulers',
is_flag=True,
),
) -> None:
"""Start (enable) a scheduled task."""
_handle_cron_start(makim_instance, name, all)

@typer_cron.command(help='Stop a scheduler by its name')
def stop(
name: str = typer.Argument(
None,
help="""Name of the scheduler to stop.
Use '--all' for all schedulers""",
),
all: bool = typer.Option(
False,
'--all',
help='Stop all running schedulers',
is_flag=True,
),
) -> None:
"""Stop (disable) scheduled task(s)."""
_handle_cron_stop(makim_instance, name, all)

return typer_cron


def run_app() -> None:
"""Run the typer app."""
"""Run the Typer app."""
root_config = extract_root_config()

config_file_path = cast(str, root_config.get('file', '.makim.yaml'))

cli_completion_words = [
w for w in os.getenv('COMP_WORDS', '').split('\n') if w
]

if not makim._check_makim_file(config_file_path) and cli_completion_words:
# autocomplete call
root_config = extract_root_config(cli_completion_words)
config_file_path = cast(str, root_config.get('file', '.makim.yaml'))
if not makim._check_makim_file(config_file_path):
Expand All @@ -119,7 +337,6 @@ def run_app() -> None:
verbose=cast(bool, root_config.get('verbose', False)),
)

# create tasks data
tasks: dict[str, Any] = {}
for group_name, group_data in makim.global_data.get('groups', {}).items():
for task_name, task_data in group_data.get('tasks', {}).items():
Expand All @@ -143,19 +360,23 @@ def run_app() -> None:
app.add_typer(typer_cron, name='cron', rich_help_panel='Extensions')

# Add dynamically commands to Typer app
# Add cron commands if scheduler is configured
typer_cron = _handle_cron_commands(makim)
app.add_typer(typer_cron, name='cron', rich_help_panel='Extensions')

# Add dynamic commands
for name, args in tasks.items():
create_dynamic_command(makim, app, name, args)

try:
app()
except SystemExit as e:
# code 2 means code not found
# Code 2 means command not found
error_code = 2
if e.code != error_code:
raise e

command_used = _get_command_from_cli()

available_cmds = [
cmd.name for cmd in app.registered_commands if cmd.name is not None
]
Expand All @@ -165,7 +386,6 @@ def run_app() -> None:
f"Command {command_used} not found. Did you mean '{suggestion}'?",
fg='red',
)

raise e


Expand Down
7 changes: 7 additions & 0 deletions src/makim/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

from makim.console import get_terminal_size
from makim.logs import MakimError, MakimLogs
from makim.scheduler import MakimScheduler

MAKIM_CURRENT_PATH = Path(__file__).parent

Expand Down Expand Up @@ -132,6 +133,7 @@ class Makim:
task_name: str = ''
task_data: dict[str, Any] = {}
ssh_config: dict[str, Any] = {}
scheduler: Optional[MakimScheduler] = None

def __init__(self) -> None:
"""Prepare the Makim class with the default configuration."""
Expand All @@ -145,6 +147,7 @@ def __init__(self) -> None:
self.shell_app = sh.xonsh
self.shell_args: list[str] = []
self.tmp_suffix: str = '.makim'
self.scheduler = None

def _call_shell_app(self, cmd: str) -> None:
self._load_shell_app()
Expand Down Expand Up @@ -386,6 +389,10 @@ def _load_config_data(self) -> None:

self._validate_config()

if 'scheduler' in self.global_data:
if self.scheduler is None:
self.scheduler = MakimScheduler(self)

def _resolve_working_directory(self, scope: str) -> Optional[Path]:
scope_options = ('global', 'group', 'task')
if scope not in scope_options:
Expand Down
3 changes: 3 additions & 0 deletions src/makim/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class MakimError(Enum):
SSH_CONNECTION_ERROR = 16
SSH_EXECUTION_ERROR = 17
REMOTE_HOST_NOT_FOUND = 18
SCHEDULER_JOB_ERROR = 19
SCHEDULER_JOB_NOT_FOUND = 20
SCHEDULER_INVALID_SCHEDULE = 21


class MakimLogs:
Expand Down
Loading
Loading