Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per-Script Worker Queue Setting #16552

Open
wants to merge 17 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/customization/custom-scripts.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ commit_default = False

By default, a script can be scheduled for execution at a later time. Setting `scheduling_enabled` to False disables this ability: Only immediate execution will be possible. (This also disables the ability to set a recurring execution interval.)

### `rq_queue_name`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #16927 I'm introducing a general purpose job API used by NetBox and made available for plugins. A minor background for this is abstraction to decouple plugins from using Redis directly, making future changes possible. Therefore I'd like to suggest not using parameter names related to Redis queues, i.e. just using something like queue instead of rq_queue_name.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm good with whatever Netbox wants to use for name, be it queue, worker_queue or job_queue. But I won't make any changes right now till the Netbox maintainers pick the language that is preferred for this.


This will override the standard `QUEUE_MAPPINGS` setting for the script to be process by the worker. If you use a queue name that is not one of the NetBox provided queues, (IE `high`, `default`, or `low`) then you must reconfigure the RQ working for the new queue. If the queue is not configured then the default logic found in the settings file for [QUEUE_MAPPINGS](../configuration/miscellaneous.md#queue_mappings) will be used.

### `job_timeout`

Set the maximum allowed runtime for the script. If not set, `RQ_DEFAULT_TIMEOUT` will be used.
Expand Down
30 changes: 23 additions & 7 deletions netbox/core/models/jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import uuid

import django_rq
from rq.exceptions import NoSuchJobError
from django.conf import settings
from django.contrib.contenttypes.fields import GenericForeignKey
from django.core.exceptions import ValidationError
Expand All @@ -23,6 +25,8 @@
'Job',
)

logger = logging.getLogger('netbox.core.jobs')


class Job(models.Model):
"""
Expand Down Expand Up @@ -152,10 +156,14 @@ def duration(self):

def delete(self, *args, **kwargs):
super().delete(*args, **kwargs)

rq_queue_name = get_config().QUEUE_MAPPINGS.get(self.object_type.model, RQ_QUEUE_DEFAULT)
queue = django_rq.get_queue(rq_queue_name)
job = queue.fetch_job(str(self.job_id))
job = None
for check_queue in get_config().QUEUE_MAPPINGS:
queue = django_rq.get_queue(check_queue)
try:
job = queue.fetch_job(str(self.job_id))
break
except NoSuchJobError:
pass

if job:
job.cancel()
Expand Down Expand Up @@ -198,7 +206,7 @@ def terminate(self, status=JobStatusChoices.STATUS_COMPLETED, error=None):
job_end.send(self)

@classmethod
def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, **kwargs):
def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, rq_queue_name=None, **kwargs):
"""
Create a Job instance and enqueue a job using the given callable

Expand All @@ -208,11 +216,19 @@ def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=
name: Name for the job (optional)
user: The user responsible for running the job
schedule_at: Schedule the job to be executed at the passed date and time
rq_queue_name: Queue name to route the job to for processing
interval: Recurrence interval (in minutes)
"""
object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False)
rq_queue_name = get_queue_for_model(object_type.model)
queue = django_rq.get_queue(rq_queue_name)
queue = None
if rq_queue_name:
try:
queue = django_rq.get_queue(rq_queue_name)
except Exception:
logger.warning(f"User defined queue '{rq_queue_name}' cased an error or was not found. Falling back to default queue.")
if not queue:
rq_queue_name = get_queue_for_model(object_type.model)
queue = django_rq.get_queue(rq_queue_name)
status = JobStatusChoices.STATUS_SCHEDULED if schedule_at else JobStatusChoices.STATUS_PENDING
job = Job.objects.create(
object_type=object_type,
Expand Down
1 change: 1 addition & 0 deletions netbox/extras/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ def post(self, request, pk):
request=copy_safe_request(request),
commit=input_serializer.data['commit'],
job_timeout=script.python_class.job_timeout,
rq_queue_name=script.python_class.rq_queue_name,
schedule_at=input_serializer.validated_data.get('schedule_at'),
interval=input_serializer.validated_data.get('interval')
)
Expand Down
5 changes: 5 additions & 0 deletions netbox/extras/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,10 @@ def job_timeout(self):
def scheduling_enabled(self):
return getattr(self.Meta, 'scheduling_enabled', True)

@classproperty
def rq_queue_name(self):
return getattr(self.Meta, 'rq_queue_name', None)

@property
def filename(self):
return inspect.getfile(self.__class__)
Expand Down Expand Up @@ -717,6 +721,7 @@ def _run_script(job):
schedule_at=new_scheduled_time,
interval=job.interval,
job_timeout=script.job_timeout,
rq_queue_name=script.rq_queue_name,
data=data,
request=request,
commit=commit
Expand Down
1 change: 1 addition & 0 deletions netbox/extras/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,7 @@ def post(self, request, **kwargs):
data=form.cleaned_data,
request=copy_safe_request(request),
job_timeout=script.python_class.job_timeout,
rq_queue_name=script.python_class.rq_queue_name,
commit=form.cleaned_data.pop('_commit')
)

Expand Down