-
-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for jobs with cron #30
Comments
some alternative for the backend: https://apscheduler.readthedocs.io/en/3.x/userguide.html from gpt: Using APScheduler as a backend for task management within an application like Makim involves creating a wrapper or interface around APScheduler that allows you to create, delete, and list scheduled tasks. Below is an example implementation that demonstrates how you might accomplish this. This example assumes you have APScheduler installed and are familiar with basic Python programming concepts. Example Scheduler Interfacefrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.base import JobLookupError
class TaskScheduler:
def __init__(self):
self.scheduler = BackgroundScheduler()
self.scheduler.start()
def add_task(self, task_func, task_id, trigger, **trigger_args):
"""
Adds a new task to the scheduler.
Parameters:
- task_func: The function to be scheduled.
- task_id: A unique identifier for the task.
- trigger: The type of trigger ('date', 'interval', or 'cron').
- **trigger_args: Arguments specific to the chosen trigger type.
"""
self.scheduler.add_job(task_func, trigger, id=task_id, **trigger_args)
def remove_task(self, task_id):
"""
Removes a task from the scheduler using its unique identifier.
Parameters:
- task_id: The unique identifier for the task.
"""
try:
self.scheduler.remove_job(task_id)
except JobLookupError:
print(f"Failed to remove task {task_id}: Task not found")
def list_tasks(self):
"""
Lists all scheduled tasks.
"""
jobs = self.scheduler.get_jobs()
for job in jobs:
print(f"ID: {job.id}, Next Run: {job.next_run_time}, Trigger: {job.trigger}")
# Example usage
def my_scheduled_task():
print("Executing my scheduled task.")
# Initialize the task scheduler
scheduler = TaskScheduler()
# Add a new task
scheduler.add_task(my_scheduled_task, 'task1', 'interval', seconds=30)
# List scheduled tasks
scheduler.list_tasks()
# Remove a task
scheduler.remove_task('task1') Features of This Implementation:
This example provides a basic framework for managing tasks within an application. You can expand upon this by adding more features, such as task persistence (saving tasks to a database), error handling, and more sophisticated task querying capabilities. APScheduler's versatility makes it suitable for a wide range of scheduling needs, from simple one-off tasks to complex, cron-like scheduling, making it an excellent choice for backend task management in applications. |
the storage could be sqlalchemy with sqlite |
It would be nice to have some support for jobs that will be "installed" the cron.
so we could define the jobs inside the makim, also it could reuse the targets defined, and using the cli it could create the job in the as a cron task.
for this we need to have root access. so maybe we could have some cron task folder for the user locally where we can add any job, with no super user power, and call that from a cron tab folder, where we will need root permissions just once.
Example of makim file:
example of the CLI call:
The text was updated successfully, but these errors were encountered: