Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
perlpunk committed Oct 11, 2024
1 parent bcf92fe commit 52b6a5d
Showing 1 changed file with 32 additions and 27 deletions.
59 changes: 32 additions & 27 deletions lib/OpenQA/Shared/Plugin/Gru.pm
Original file line number Diff line number Diff line change
Expand Up @@ -115,48 +115,53 @@ sub is_task_active ($self, $task) {
# checks if there are worker registered
sub has_workers ($self) { !!$self->app->minion->backend->list_workers(0, 1)->{total} }

# For some tasks with the same args we don't need to repeat them if they were
# enqueued less than a minute ago, like 'git fetch'
sub _find_existing_gru_task ($self, $task, $args, $jobs) {
# warn __PACKAGE__ . ':' . __LINE__ . ": ================ _find_existing_gru_task($task)\n";
# warn __PACKAGE__.':'.__LINE__.$".Data::Dumper->Dump([\$args], ['args']);
# warn __PACKAGE__ . ':' . __LINE__ . ": ========================== _find_existing_gru_task($task)\n";
# warn __PACKAGE__ . ':' . __LINE__ . $" . Data::Dumper->Dump([\$args], ['args']);
my $schema = OpenQA::Schema->singleton;
my $minion = $self->app->minion;
my $existing = $minion->jobs({tasks => [$task], states => [qw(inactive active)]});
my $num = $existing->total;
# warn __PACKAGE__ . ':' . __LINE__ . ": ???????? existing: $num\n";
while (my $job = $existing->next) {
my $gru_id = $job->{notes}->{gru_id} or next;
# warn __PACKAGE__.':'.__LINE__.": ========= existing $job->{id} gru_id=$gru_id\n";
my $created = $job->{created};
next if time - 60 > $created;
my $existing_args = $job->{args}->[0];
my @keys = sort keys %$args;
my @e_keys = sort keys %$existing_args;
no warnings 'uninitialized';
next unless "@keys @$args{@keys}" eq "@e_keys @$existing_args{@e_keys}";
# warn __PACKAGE__.':'.__LINE__.": !!!!!!!! same args as existing task\n";
my $grutask = $schema->resultset('GruTasks')->find($gru_id);
while (@$jobs) {
my $dtf = $schema->storage->datetime_parser;
$args = [$args] if ref $args eq 'HASH';
my @tasks = $schema->resultset('GruTasks')->search(
{
taskname => $task,
t_created => {'>' => $dtf->format_datetime(DateTime->now()->subtract(minutes => 1))},
args => OpenQA::Schema::Result::GruTasks->encode_json_to_db($args),
})->all;
my $num = @tasks;
# warn __PACKAGE__ . ':' . __LINE__ . ": ?????????????????????????? existing: $num\n";
for my $task (@tasks) {
my $gru_id = $task->id;
# warn __PACKAGE__ . ':' . __LINE__ . ": ========= existing gru_id=$gru_id\n";
for my $job (@$jobs) {
my $id = $jobs->[0];
# Add job to existing gru task with the same args
# warn __PACKAGE__ . ':' . __LINE__ . ": !!! assigning job $id to gru $gru_id\n";
my $gru_dep
= eval { $schema->resultset('GruDependencies')->create({job_id => $id, gru_task_id => $gru_id}); };
= eval { $schema->resultset('GruDependencies')->create({job_id => $id, gru_task_id => $gru_id + 10}); };
unless ($gru_dep) {
# warn __PACKAGE__.':'.__LINE__.": ???????????? ERROR: $@\n";
# if the GruTask is already deleted, we can skip the rest of the
# jobs, since the wanted task was done
@$jobs = ();
last;
my $error = $@;
warn __PACKAGE__ . ':' . __LINE__ . ": ???????????? ERROR: $error\n";
if ($err
=~ m/insert or update on table "gru_dependencies" violates foreign key constraint "gru_dependencies_fk_gru_task_id"/i
)
{
# if the GruTask was already deleted mranwhile, we can skip
# the rest of the jobs, since the wanted task was done
@$jobs = ();
last;
}
die $error;
}
shift @$jobs;
}
# warn __PACKAGE__.':'.__LINE__.": !! gru $gru_id=$grutask\n";
last;
}
}

sub enqueue ($self, $task, $args = [], $options = {}, $jobs = []) {
# warn __PACKAGE__ . ':' . __LINE__ . ": ================ enqueue($task)\n";
# warn __PACKAGE__ . ':' . __LINE__ . ": ========================== enqueue($task)\n";
my $ttl = $options->{ttl};
my $limit = $options->{limit} ? $options->{limit} : undef;
my $notes = $options->{notes} ? $options->{notes} : undef;
Expand Down

0 comments on commit 52b6a5d

Please sign in to comment.