diff --git a/lib/OpenQA/WebSockets.pm b/lib/OpenQA/WebSockets.pm index b00e3363ee6..38d83c2df1a 100644 --- a/lib/OpenQA/WebSockets.pm +++ b/lib/OpenQA/WebSockets.pm @@ -19,6 +19,9 @@ sub startup { $self->_setup if $RUNNING; $self->defaults(appname => 'openQA Websocket Server'); + if (defined(my $max_online_workers = $self->config->{misc_limits}->{max_online_workers})) { + $self->log->info("Limiting number of online worker slots to $max_online_workers"); + } # no cookies for worker, no secrets to protect $self->secrets(['nosecretshere']); diff --git a/t/27-websockets.t b/t/27-websockets.t index a336ee9c108..220dba131d5 100644 --- a/t/27-websockets.t +++ b/t/27-websockets.t @@ -35,9 +35,10 @@ my $jobs = $schema->resultset('Jobs'); my $worker = $workers->find({host => 'localhost', instance => 1}); my $worker_id = $worker->id; my $status = OpenQA::WebSockets::Model::Status->singleton->workers; +my $app = $t->app; +my $worker_by_txn = $app->status->worker_by_transaction; subtest 'Authentication' => sub { - my $app = $t->app; combined_like { $t->get_ok('/test')->status_is(404)->content_like(qr/Not found/); @@ -76,6 +77,7 @@ subtest 'API' => sub { $t->status_is(429, 'no ws connection for limited worker')->content_like(qr/Limit.*exceeded/, 'error about limit'); $worker->discard_changes; like $worker->error, qr/^limited at .*/, 'worker flagged as limited via error field excluding it from assignments'; + is keys %$worker_by_txn, 0, 'no transaction added for limited worker'; }; $misc_limits->{max_online_workers} = undef; @@ -120,8 +122,10 @@ subtest 'web socket message handling' => sub { subtest 'accepted' => sub { combined_like { $t->websocket_ok('/ws/1', 'establish ws connection'); + is keys %$worker_by_txn, 1, 'transaction added for online worker'; $t->send_ok('{"type":"accepted","jobid":42}'); $t->finish_ok(1000, 'finished ws connection'); + is keys %$worker_by_txn, 0, 'transaction removed if worker no longer online'; } qr/Worker 1 accepted job 42.*never assigned/s, 'warning logged when job has never been assigned'; @@ -139,9 +143,12 @@ subtest 'web socket message handling' => sub { subtest 'multiple ws connections handled gracefully' => sub { combined_like { $t->websocket_ok('/ws/1', 'establish first ws connection'); + is keys %$worker_by_txn, 1, 'transaction added for online worker'; $t2->websocket_ok('/ws/1', 'establish second ws connection'); $t->finished_ok(1008, 'first ws connection finished due to second connection'); + is keys %$worker_by_txn, 1, 'first transaction removed, so still only one txn'; $t2->finish_ok(1000, 'finished second ws connection'); + is keys %$worker_by_txn, 0, 'all transactions removed'; } qr/only one connection per worker allowed/s, '2nd connection attempt logged'; }; diff --git a/t/43-scheduling-and-worker-scalability.t b/t/43-scheduling-and-worker-scalability.t index 03273fedb34..07ad6ffade5 100644 --- a/t/43-scheduling-and-worker-scalability.t +++ b/t/43-scheduling-and-worker-scalability.t @@ -4,11 +4,13 @@ use Test::Most; +use Mojo::Base -signatures; use Test::Warnings ':report_warnings'; use Test::MockModule; use Time::HiRes 'sleep'; use File::Path 'make_path'; use Scalar::Util 'looks_like_number'; +use List::Util qw(min max); use Mojo::File qw(path tempfile); use Mojo::Util 'dumper'; use IPC::Run qw(start); @@ -39,7 +41,7 @@ BEGIN { } setup_mojo_app_with_default_worker_timeout; -OpenQA::Setup::read_config(OpenQA::App->singleton); +OpenQA::Setup::read_config(my $app = OpenQA::App->singleton); my $load_avg_file = simulate_load('0.93 0.95 3.25 2/2207 1212', '43-scheduling-and-worker-scalability'); @@ -58,6 +60,18 @@ my $schema = OpenQA::Test::Database->new->create; my $workers = $schema->resultset('Workers'); my $jobs = $schema->resultset('Jobs'); +# configure websocket server to apply SCALABILITY_TEST_WORKER_LIMIT +my $worker_limit = $ENV{SCALABILITY_TEST_WORKER_LIMIT} // 100; +my $web_socket_server_mock = Test::MockModule->new('OpenQA::WebSockets'); +my $configure_web_socket_server = sub ($self, @args) { + my $original_function = $web_socket_server_mock->original('_setup'); + my $original_return_value = $original_function->($self, @args); + $self->config->{misc_limits}->{max_online_workers} = $worker_limit; + return $original_return_value; +}; +$web_socket_server_mock->redefine(_setup => $configure_web_socket_server); +$configure_web_socket_server->($app); # invoke this function here for the sake of tracking coverage + # create web UI and websocket server my $web_socket_server = create_websocket_server(undef, 0, 1, 1); my $webui = create_webapi(undef, 1); @@ -130,20 +144,36 @@ my $polling_tries_workers = $seconds_to_wait_per_worker / $polling_interval * $w my $polling_tries_jobs = $seconds_to_wait_per_job / $polling_interval * $job_count; subtest 'wait for workers to be idle' => sub { + # wait for all workers to register my @worker_search_args = ({'properties.key' => 'WEBSOCKET_API_VERSION'}, {join => 'properties'}); + my $actual_count = 0; for my $try (1 .. $polling_tries_workers) { - last if $workers->search(@worker_search_args)->count == $worker_count; - note("Waiting until all workers are registered, try $try"); - sleep $polling_interval; + last if ($actual_count = $workers->search(@worker_search_args)->count) == $worker_count; + note("Waiting until all workers are registered, try $try"); # uncoverable statement + sleep $polling_interval; # uncoverable statement } - is($workers->count, $worker_count, 'all workers registered'); + is $actual_count, $worker_count, 'all workers registered'; + + # wait for expected number of workers to become limited + my $limited_workers = max(0, $worker_count - $worker_limit); + $worker_count = min($worker_count, $worker_limit); + for my $try (1 .. $polling_tries_workers) { + last if ($actual_count = $workers->search({error => {-like => '%limited%'}})->count) == $limited_workers; + note("Waiting until $limited_workers workers are limited, try $try"); # uncoverable statement + sleep $polling_interval; # uncoverable statement + } + is $actual_count, $limited_workers, 'expected number of workers limited'; + + # check that no workers are in unexpected offline/error states my @non_idle_workers; for my $worker ($workers->all) { - $worker_ids{$worker->id} = 1; - push(@non_idle_workers, $worker->info) - if $worker->status ne 'idle' || ($worker->websocket_api_version || 0) != WEBSOCKET_API_VERSION; + my $is_idle = $worker->status eq 'idle'; + my $is_idle_or_limited = $is_idle || $worker->error =~ qr/limited/; + $worker_ids{$worker->id} = 1 if $is_idle; + push(@non_idle_workers, $worker->info) # uncoverable statement + if !$is_idle_or_limited || ($worker->websocket_api_version || 0) != WEBSOCKET_API_VERSION; } - ok(!@non_idle_workers, 'all workers idling') or diag explain \@non_idle_workers; + is scalar @non_idle_workers, 0, 'all workers idling/limited' or diag explain \@non_idle_workers; }; subtest 'assign and run jobs' => sub { @@ -166,7 +196,7 @@ subtest 'assign and run jobs' => sub { is(scalar @$allocated, $job_count, 'each job has a worker assigned'); } else { - # uncoverable statement only executed when the number of workers is # jobs are equal based on config parameters + # uncoverable statement only executed when the number of workers and jobs are equal based on config parameters is(scalar @$allocated, $job_count, 'all jobs assigned and all workers busy'); # uncoverable statement count:1 # uncoverable statement count:2 @@ -187,7 +217,7 @@ subtest 'assign and run jobs' => sub { } for my $try (1 .. $polling_tries_jobs) { last if $jobs->search({state => DONE})->count == $job_count; - if ($jobs->search({state => SCHEDULED})->count > $remaining_jobs) { + if ($jobs->search({state => SCHEDULED})->count > max(0, $remaining_jobs)) { # uncoverable statement note('At least one job has been set back to scheduled; aborting to wait until all jobs are done'); last; # uncoverable statement