Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: Doorman runner extensibility #287

Merged
merged 1 commit into from
Jan 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion _config/queuedjobs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ SilverStripe\Core\Injector\Injector:
Symbiote\QueuedJobs\Tasks\Engines\DoormanRunner:
properties:
DefaultRules:
- '%$DefaultRule'
DefaultRule: '%$DefaultRule'

SilverStripe\SiteConfig\SiteConfig:
extensions:
Expand Down
18 changes: 13 additions & 5 deletions src/DataObjects/QueuedJobDescriptor.php
Original file line number Diff line number Diff line change
Expand Up @@ -357,13 +357,13 @@ public function getJobTypeValues()
}

/**
* @return FieldList
* List all possible job statuses, useful for forms and filters
*
* @return array
*/
public function getCMSFields()
public function getJobStatusValues(): array
{
$fields = parent::getCMSFields();

$statuses = [
return [
QueuedJob::STATUS_NEW,
QueuedJob::STATUS_INIT,
QueuedJob::STATUS_RUN,
Expand All @@ -373,7 +373,15 @@ public function getCMSFields()
QueuedJob::STATUS_CANCELLED,
QueuedJob::STATUS_BROKEN,
];
}

/**
* @return FieldList
*/
public function getCMSFields()
{
$fields = parent::getCMSFields();
$statuses = $this->getJobStatusValues();
$runAs = $fields->fieldByName('Root.Main.RunAsID');

$fields->removeByName([
Expand Down
144 changes: 110 additions & 34 deletions src/Tasks/Engines/DoormanRunner.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

namespace Symbiote\QueuedJobs\Tasks\Engines;

use SilverStripe\Core\ClassInfo;
use SilverStripe\Core\Config\Configurable;
use SilverStripe\Core\Environment;
use SilverStripe\Core\Injector\Injector;
use SilverStripe\ORM\FieldType\DBDatetime;
use Symbiote\QueuedJobs\DataObjects\QueuedJobDescriptor;
use Symbiote\QueuedJobs\Jobs\DoormanQueuedJobTask;
use Symbiote\QueuedJobs\Services\ProcessManager;
Expand All @@ -16,6 +17,33 @@
*/
class DoormanRunner extends BaseRunner implements TaskRunnerEngine
{
use Configurable;

/**
* How many ticks are executed per one @see runQueue method call
* set 0 for unlimited ticks
*
* @config
* @var int
*/
private static $max_ticks = 0;

/**
* How many seconds between ticks
*
* @config
* @var int
*/
private static $tick_interval = 1;

/**
* Name of the dev task used to run the child process
*
* @config
* @var string
*/
private static $child_runner = 'ProcessJobQueueChildTask';

/**
* @var string[]
*/
Expand Down Expand Up @@ -48,10 +76,13 @@ public function getDefaultRules()
*/
public function runQueue($queue)
{
// check if queue can be processed
$service = QueuedJobService::singleton();
$logger = $service->getLogger();

// check if queue can be processed
if ($service->isAtMaxJobs()) {
$service->getLogger()->info('Not processing queue as jobs are at max initialisation limit.');
$logger->info('Not processing queue as jobs are at max initialisation limit.');

return;
}

Expand All @@ -60,68 +91,113 @@ public function runQueue($queue)
/** @var ProcessManager $manager */
$manager = Injector::inst()->create(ProcessManager::class);
$manager->setWorker(
BASE_PATH . "/vendor/silverstripe/framework/cli-script.php dev/tasks/ProcessJobQueueChildTask"
sprintf(
'%s/vendor/silverstripe/framework/cli-script.php dev/tasks/%s',
BASE_PATH,
$this->getChildRunner()
)
);

$logPath = Environment::getEnv('SS_DOORMAN_LOGPATH');

if ($logPath) {
$manager->setLogPath($logPath);
}

// Assign default rules
$defaultRules = $this->getDefaultRules();

if ($defaultRules) {
foreach ($defaultRules as $rule) {
if (!$rule) {
continue;
}

$manager->addRule($rule);
}
}

$descriptor = $this->getNextJobDescriptorWithoutMutex($queue);
$tickCount = 0;
$maxTicks = $this->getMaxTicks();
$descriptor = $service->getNextPendingJob($queue);

while ($manager->tick() || $descriptor) {
if (QueuedJobService::singleton()->isMaintenanceLockActive()) {
$service->getLogger()->info('Skipped queued job descriptor since maintenance log is active.');
if ($service->isMaintenanceLockActive()) {
$logger->info('Skipped queued job descriptor since maintenance lock is active.');

return;
}

$this->logDescriptorStatus($descriptor, $queue);

if ($descriptor instanceof QueuedJobDescriptor) {
$descriptor->JobStatus = QueuedJob::STATUS_INIT;
$descriptor->write();
if ($maxTicks > 0 && $tickCount >= $maxTicks) {
$logger->info(sprintf('Tick count has hit max ticks (%d)', $maxTicks));

$manager->addTask(new DoormanQueuedJobTask($descriptor));
return;
}

sleep(1);
if ($service->isAtMaxJobs()) {
$logger->info(
sprintf(
'Not processing queue as all job are at max limit. %s',
ClassInfo::shortName($service)
)
);
} elseif ($descriptor) {
$logger->info(sprintf('Next pending job is: %d', $descriptor->ID));
$this->logDescriptorStatus($descriptor, $queue);

if ($descriptor instanceof QueuedJobDescriptor) {
$descriptor->JobStatus = QueuedJob::STATUS_INIT;
$descriptor->write();

$manager->addTask(new DoormanQueuedJobTask($descriptor));
}
} else {
$logger->info('Next pending job could NOT be found or lock could NOT be obtained.');
}

$descriptor = $this->getNextJobDescriptorWithoutMutex($queue);
$tickCount += 1;
sleep($this->getTickInterval());
$descriptor = $service->getNextPendingJob($queue);
}
}

/**
* @param string $queue
* @return null|QueuedJobDescriptor
* Override this method if you need a dynamic value for the configuration, for example CMS setting
*
* @return int
*/
protected function getNextJobDescriptorWithoutMutex($queue)
protected function getMaxTicks(): int
{
$list = QueuedJobDescriptor::get()
->filter('JobType', $queue)
->sort('ID', 'ASC');
return (int) $this->config()->get('max_ticks');
}

$descriptor = $list
->filter('JobStatus', QueuedJob::STATUS_WAIT)
->first();
/**
* Override this method if you need a dynamic value for the configuration, for example CMS setting
*
* @return int
*/
protected function getTickInterval(): int
{
return (int) $this->config()->get('tick_interval');
}

if ($descriptor) {
return $descriptor;
}
/**
* Override this method if you need a dynamic value for the configuration, for example CMS setting
*
* @return string
*/
protected function getChildRunner(): string
{
return (string) $this->config()->get('child_runner');
}

return $list
->filter('JobStatus', QueuedJob::STATUS_NEW)
->where(sprintf(
'"StartAfter" < \'%s\' OR "StartAfter" IS NULL',
DBDatetime::now()->getValue()
))
->first();
/**
* @param string $queue
* @return QueuedJobDescriptor|null
* @deprecated 5.0
*/
protected function getNextJobDescriptorWithoutMutex($queue)
{
return $this->getService()->getNextPendingJob($queue);
}
}