From 4d12ec3293bf68f2b02e7516a8d3863744171f9d Mon Sep 17 00:00:00 2001 From: chromium7 Date: Mon, 25 Mar 2024 21:48:51 +0700 Subject: [PATCH 1/7] Worker pool handles spawned process --- .../management/commands/rqworker-pool.py | 6 +-- django_rq/tests/tests.py | 17 ++++++++- django_rq/tests/utils.py | 5 +++ django_rq/worker_pool.py | 38 +++++++++++++++++++ 4 files changed, 62 insertions(+), 4 deletions(-) create mode 100644 django_rq/worker_pool.py diff --git a/django_rq/management/commands/rqworker-pool.py b/django_rq/management/commands/rqworker-pool.py index a7329801..7339e2c0 100644 --- a/django_rq/management/commands/rqworker-pool.py +++ b/django_rq/management/commands/rqworker-pool.py @@ -1,9 +1,8 @@ import os import sys -from rq.serializers import resolve_serializer -from rq.worker_pool import WorkerPool from rq.logutils import setup_loghandlers +from rq.serializers import resolve_serializer from django.core.management.base import BaseCommand @@ -11,6 +10,7 @@ from ...utils import configure_sentry from ...queues import get_queues from ...workers import get_worker_class +from ...worker_pool import DjangoWorkerPool class Command(BaseCommand): @@ -89,7 +89,7 @@ def handle(self, *args, **options): worker_class = get_worker_class(options.get('worker_class', None)) serializer = resolve_serializer(options['serializer']) - pool = WorkerPool( + pool = DjangoWorkerPool( queues=queues, connection=queues[0].connection, num_workers=options['num_workers'], diff --git a/django_rq/tests/tests.py b/django_rq/tests/tests.py index 50f2733d..30151ace 100644 --- a/django_rq/tests/tests.py +++ b/django_rq/tests/tests.py @@ -1,5 +1,6 @@ -import sys import datetime +import multiprocessing +import sys import time from unittest import skipIf, mock from unittest.mock import patch, PropertyMock, MagicMock @@ -37,6 +38,8 @@ from django_rq.utils import get_jobs, get_statistics, get_scheduler_pid from django_rq.workers import get_worker, get_worker_class +from .utils import query_queue + try: from rq_scheduler import Scheduler from ..queues import get_scheduler @@ -303,6 +306,18 @@ def test_pass_queue_via_commandline_args(self): self.assertTrue(job['job'].is_finished) self.assertIn(job['job'].id, job['finished_job_registry'].get_job_ids()) + def test_rqworker_pool_process_start_method(self) -> None: + for start_method in ['spawn', 'fork']: + with mock.patch.object(multiprocessing, "get_start_method", return_value=start_method): + queue_name = 'django_rq_test' + queue = get_queue(queue_name) + job = queue.enqueue(query_queue) + finished_job_registry = FinishedJobRegistry(queue.name, queue.connection) + call_command('rqworker-pool', queue_name, burst=True) + + self.assertTrue(job.is_finished) + self.assertIn(job.id, finished_job_registry.get_job_ids()) + def test_configure_sentry(self): rqworker.configure_sentry('https://1@sentry.io/1') self.mock_sdk.init.assert_called_once_with( diff --git a/django_rq/tests/utils.py b/django_rq/tests/utils.py index afe4df2a..e33754de 100644 --- a/django_rq/tests/utils.py +++ b/django_rq/tests/utils.py @@ -1,4 +1,5 @@ from django_rq.queues import get_connection, get_queue_by_index +from django_rq.models import Queue def get_queue_index(name='default'): @@ -17,3 +18,7 @@ def get_queue_index(name='default'): queue_index = i break return queue_index + + +def query_queue(): + return Queue.objects.first() diff --git a/django_rq/worker_pool.py b/django_rq/worker_pool.py new file mode 100644 index 00000000..68876cb4 --- /dev/null +++ b/django_rq/worker_pool.py @@ -0,0 +1,38 @@ +import django +from multiprocessing import Process, get_start_method +from typing import Any + +from rq.worker_pool import WorkerPool, run_worker + + +class DjangoWorkerPool(WorkerPool): + def get_worker_process( + self, + name: str, + burst: bool, + _sleep: float = 0, + logging_level: str = "INFO", + ) -> Process: + """Returns the worker process""" + return Process( + target=run_django_worker, + args=(name, self._queue_names, self._connection_class, self._pool_class, self._pool_kwargs), + kwargs={ + '_sleep': _sleep, + 'burst': burst, + 'logging_level': logging_level, + 'worker_class': self.worker_class, + 'job_class': self.job_class, + 'serializer': self.serializer, + }, + name=f'Worker {name} (WorkerPool {self.name})', + ) + + +def run_django_worker(*args: Any, **kwargs: Any) -> None: + # multiprocessing library default process start method may be + # `spawn` or `fork` depending on the host OS + if get_start_method() == 'spawn': + django.setup() + + run_worker(*args, **kwargs) From c9aaacfa99cb9f56014d1dc4ff2322cbf1ce28d2 Mon Sep 17 00:00:00 2001 From: chromium7 Date: Sat, 30 Mar 2024 14:10:04 +0700 Subject: [PATCH 2/7] Use disk for test DB --- django_rq/tests/settings.py | 7 +++++-- django_rq/tests/tests.py | 4 ++-- django_rq/tests/utils.py | 11 +++++++---- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/django_rq/tests/settings.py b/django_rq/tests/settings.py index c54273f0..cf9abb2c 100644 --- a/django_rq/tests/settings.py +++ b/django_rq/tests/settings.py @@ -40,8 +40,11 @@ DATABASES = { 'default': { 'ENGINE': 'django.db.backends.sqlite3', - 'NAME': ':memory:', - }, + 'NAME': 'test_db.sqlite3', + 'TEST': { + 'NAME': 'test_db.sqlite3', + } + } } if REDIS_CACHE_TYPE == 'django-redis': diff --git a/django_rq/tests/tests.py b/django_rq/tests/tests.py index 30151ace..0e0352e3 100644 --- a/django_rq/tests/tests.py +++ b/django_rq/tests/tests.py @@ -38,7 +38,7 @@ from django_rq.utils import get_jobs, get_statistics, get_scheduler_pid from django_rq.workers import get_worker, get_worker_class -from .utils import query_queue +from .utils import query_user try: from rq_scheduler import Scheduler @@ -311,7 +311,7 @@ def test_rqworker_pool_process_start_method(self) -> None: with mock.patch.object(multiprocessing, "get_start_method", return_value=start_method): queue_name = 'django_rq_test' queue = get_queue(queue_name) - job = queue.enqueue(query_queue) + job = queue.enqueue(query_user) finished_job_registry = FinishedJobRegistry(queue.name, queue.connection) call_command('rqworker-pool', queue_name, burst=True) diff --git a/django_rq/tests/utils.py b/django_rq/tests/utils.py index e33754de..cc0ca0c3 100644 --- a/django_rq/tests/utils.py +++ b/django_rq/tests/utils.py @@ -1,8 +1,11 @@ +from typing import Optional + from django_rq.queues import get_connection, get_queue_by_index -from django_rq.models import Queue + +from django.contrib.auth.models import User -def get_queue_index(name='default'): +def get_queue_index(name: str = 'default') -> int: """ Returns the position of Queue for the named queue in QUEUES_LIST """ @@ -20,5 +23,5 @@ def get_queue_index(name='default'): return queue_index -def query_queue(): - return Queue.objects.first() +def query_user() -> Optional[User]: + return User.objects.first() From a735ee3354620c0cc0d8a3a0faf15002f45c3504 Mon Sep 17 00:00:00 2001 From: chromium7 Date: Sat, 30 Mar 2024 16:43:57 +0700 Subject: [PATCH 3/7] Change test to use postgres --- .github/workflows/test.yml | 11 +++++++++++ django_rq/tests/settings.py | 12 ++++++++---- django_rq/tests/tests.py | 2 +- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6237716e..ed8cb93b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,6 +14,17 @@ jobs: build: runs-on: ubuntu-latest name: Python${{ matrix.python-version }}/Django${{ matrix.django-version }} + + services: + postgres: + image: postgres:15 + env: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: django_rq_test_db + ports: + - 5432:5432 + strategy: matrix: python-version: ["3.10", "3.11", "3.12"] diff --git a/django_rq/tests/settings.py b/django_rq/tests/settings.py index cf9abb2c..4272ed80 100644 --- a/django_rq/tests/settings.py +++ b/django_rq/tests/settings.py @@ -39,12 +39,16 @@ DATABASES = { 'default': { - 'ENGINE': 'django.db.backends.sqlite3', - 'NAME': 'test_db.sqlite3', + 'ENGINE': 'django.db.backends.postgresql_psycopg2', + 'NAME': 'django_rq_test_db', + 'USER': 'postgres', + 'PASSWORD': 'postgres', + 'HOST': 'localhost', + 'PORT': '5432', 'TEST': { - 'NAME': 'test_db.sqlite3', + 'NAME': 'django_rq_test_db', } - } + }, } if REDIS_CACHE_TYPE == 'django-redis': diff --git a/django_rq/tests/tests.py b/django_rq/tests/tests.py index 0e0352e3..164079ed 100644 --- a/django_rq/tests/tests.py +++ b/django_rq/tests/tests.py @@ -308,7 +308,7 @@ def test_pass_queue_via_commandline_args(self): def test_rqworker_pool_process_start_method(self) -> None: for start_method in ['spawn', 'fork']: - with mock.patch.object(multiprocessing, "get_start_method", return_value=start_method): + with mock.patch.object(multiprocessing, 'get_start_method', return_value=start_method): queue_name = 'django_rq_test' queue = get_queue(queue_name) job = queue.enqueue(query_user) From ac0859f7e23af02d08294c3f05df90ce609bcfd6 Mon Sep 17 00:00:00 2001 From: chromium7 Date: Sat, 30 Mar 2024 17:04:19 +0700 Subject: [PATCH 4/7] Add postgres in requirements --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ed8cb93b..8c0f3338 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -45,7 +45,7 @@ jobs: run: | python -m pip install --upgrade pip pip install django==${{ matrix.django-version }} - pip install redis django-redis rq sentry-sdk rq-scheduler + pip install redis django-redis rq sentry-sdk rq-scheduler psycopg2-binary - name: Run Test run: | From 7cd58c954aaa7dab74b73e4acd36a66f4b6ad91c Mon Sep 17 00:00:00 2001 From: chromium7 Date: Sat, 30 Mar 2024 17:15:45 +0700 Subject: [PATCH 5/7] Add no input --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 8c0f3338..ecec1b3e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -49,4 +49,4 @@ jobs: - name: Run Test run: | - `which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=. + `which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=. --noinput From c746169083bf4359e9a7c30ff8dd00ba751c4f75 Mon Sep 17 00:00:00 2001 From: chromium7 Date: Sat, 30 Mar 2024 17:25:01 +0700 Subject: [PATCH 6/7] Debug failing CI --- django_rq/tests/utils.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/django_rq/tests/utils.py b/django_rq/tests/utils.py index cc0ca0c3..f2091f58 100644 --- a/django_rq/tests/utils.py +++ b/django_rq/tests/utils.py @@ -24,4 +24,8 @@ def get_queue_index(name: str = 'default') -> int: def query_user() -> Optional[User]: - return User.objects.first() + try: + return User.objects.first() + except Exception as e: + print('Exception caught when querying user: ', e) + raise e From 755d1d7b28dd59e46fad2e6ebd63909dc70599af Mon Sep 17 00:00:00 2001 From: chromium7 Date: Mon, 13 May 2024 13:09:14 +0700 Subject: [PATCH 7/7] Set multiprocessing start method to fork --- django_rq/management/commands/rqworker-pool.py | 6 +++++- django_rq/tests/settings.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/django_rq/management/commands/rqworker-pool.py b/django_rq/management/commands/rqworker-pool.py index 7339e2c0..c8b2a4a4 100644 --- a/django_rq/management/commands/rqworker-pool.py +++ b/django_rq/management/commands/rqworker-pool.py @@ -1,3 +1,4 @@ +import multiprocessing as mp import os import sys @@ -7,7 +8,7 @@ from django.core.management.base import BaseCommand from ...jobs import get_job_class -from ...utils import configure_sentry +from ...utils import configure_sentry, reset_db_connections from ...queues import get_queues from ...workers import get_worker_class from ...worker_pool import DjangoWorkerPool @@ -97,4 +98,7 @@ def handle(self, *args, **options): worker_class=worker_class, job_class=job_class, ) + # Close any opened DB connection before any fork + reset_db_connections() + mp.set_start_method('fork', force=True) pool.start(burst=options.get('burst', False), logging_level=logging_level) diff --git a/django_rq/tests/settings.py b/django_rq/tests/settings.py index 4272ed80..430f64a7 100644 --- a/django_rq/tests/settings.py +++ b/django_rq/tests/settings.py @@ -40,7 +40,7 @@ DATABASES = { 'default': { 'ENGINE': 'django.db.backends.postgresql_psycopg2', - 'NAME': 'django_rq_test_db', + 'NAME': 'django_rq_db', 'USER': 'postgres', 'PASSWORD': 'postgres', 'HOST': 'localhost',