From bece63b1bf46968e57b5d10d0f0f12a413efbad9 Mon Sep 17 00:00:00 2001 From: Markus Schirp Date: Fri, 8 Mar 2024 01:51:31 +0000 Subject: [PATCH] Change tests to native parallel runner --- Gemfile.lock | 1 - lib/mutant/parallel.rb | 2 +- lib/mutant/parallel/connection.rb | 5 +- lib/mutant/parallel/worker.rb | 1 + mutant.gemspec | 1 - spec/integration/mutant/parallel_spec.rb | 19 +++- spec/support/corpus.rb | 96 +++++++++++-------- spec/unit/mutant/mutation/runner/sink_spec.rb | 10 +- .../mutant/parallel/connection/reader_spec.rb | 25 +++-- spec/unit/mutant/parallel/worker_spec.rb | 16 ++-- spec/unit/mutant/test/runner/sink_spec.rb | 17 ++++ 11 files changed, 125 insertions(+), 68 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index 75ab00a30..9f0e23abc 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -70,7 +70,6 @@ PLATFORMS DEPENDENCIES mutant! mutant-license! - parallel (~> 1.3) rspec (~> 3.10) rspec-core (~> 3.10) rspec-its (~> 1.3.0) diff --git a/lib/mutant/parallel.rb b/lib/mutant/parallel.rb index 2a524643c..6a05b87b1 100644 --- a/lib/mutant/parallel.rb +++ b/lib/mutant/parallel.rb @@ -109,7 +109,7 @@ class Config end # Config class Response - include Anima.new(:error, :log, :result) + include Anima.new(:error, :job, :log, :result) end # Parallel execution status diff --git a/lib/mutant/parallel/connection.rb b/lib/mutant/parallel/connection.rb index 344e3d0b3..3b8da4c3f 100644 --- a/lib/mutant/parallel/connection.rb +++ b/lib/mutant/parallel/connection.rb @@ -41,12 +41,13 @@ def initialize(*) @results = [] end - def self.read_response(**attributes) + def self.read_response(job:, **attributes) reader = new(**attributes).read_till_final Response.new( - log: reader.log, error: reader.error, + job: job, + log: reader.log, result: reader.result ) end diff --git a/lib/mutant/parallel/worker.rb b/lib/mutant/parallel/worker.rb index 60ba2e2e7..f03ce90ed 100644 --- a/lib/mutant/parallel/worker.rb +++ b/lib/mutant/parallel/worker.rb @@ -98,6 +98,7 @@ def call response = Connection::Reader.read_response( deadline: config.world.deadline(config.timeout), io: config.world.io, + job: job, log_reader: log_reader, marshal: config.world.marshal, response_reader: response_reader diff --git a/mutant.gemspec b/mutant.gemspec index 128dda90b..625463ae5 100644 --- a/mutant.gemspec +++ b/mutant.gemspec @@ -30,7 +30,6 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency('sorbet-runtime', '~> 0.5.0') gem.add_runtime_dependency('unparser', '~> 0.6.9') - gem.add_development_dependency('parallel', '~> 1.3') gem.add_development_dependency('rspec', '~> 3.10') gem.add_development_dependency('rspec-core', '~> 3.10') gem.add_development_dependency('rspec-its', '~> 1.3.0') diff --git a/spec/integration/mutant/parallel_spec.rb b/spec/integration/mutant/parallel_spec.rb index 0ef1d1d50..8ceee502b 100644 --- a/spec/integration/mutant/parallel_spec.rb +++ b/spec/integration/mutant/parallel_spec.rb @@ -51,16 +51,19 @@ def status [ Mutant::Parallel::Response.new( error: nil, + job: Mutant::Parallel::Source::Job.new(index: 0, payload: 1), log: "Booting: 0\nPayload: 1\n", result: 2 ), Mutant::Parallel::Response.new( error: nil, + job: Mutant::Parallel::Source::Job.new(index: 1, payload: 2), log: "Payload: 2\n", result: 4 ), Mutant::Parallel::Response.new( error: nil, + job: Mutant::Parallel::Source::Job.new(index: 2, payload: 3), log: "Payload: 3\n", result: 6 ) @@ -98,7 +101,14 @@ def status response_a, response_b = responses - expect(response_a).to eql(Mutant::Parallel::Response.new(log: '', result: nil, error: nil)) + expect(response_a).to eql( + Mutant::Parallel::Response.new( + error: nil, + job: Mutant::Parallel::Source::Job.new(index: 0, payload: 1), + log: '', + result: nil + ) + ) expect(response_b.error).to be(EOFError) expect(response_b.result).to be(nil) expect(response_b.log.match?('
')).to be(true) @@ -134,11 +144,13 @@ def status [ Mutant::Parallel::Response.new( error: nil, + job: Mutant::Parallel::Source::Job.new(index: 0, payload: 1), log: "#{b}\n#{b}\n", result: b ), Mutant::Parallel::Response.new( error: nil, + job: Mutant::Parallel::Source::Job.new(index: 1, payload: 2), log: "#{b}#{b}\n", result: b * 2 ) @@ -233,9 +245,10 @@ def status expect(sink.status).to eql( [ Mutant::Parallel::Response.new( + error: Timeout, + job: Mutant::Parallel::Source::Job.new(index: 0, payload: 1), log: '', - result: nil, - error: Timeout + result: nil ) ] ) diff --git a/spec/support/corpus.rb b/spec/support/corpus.rb index 8743574ce..f3ccad50a 100644 --- a/spec/support/corpus.rb +++ b/spec/support/corpus.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true +require 'etc' require 'mutant' require 'parallel' @@ -21,12 +22,9 @@ class Project MUTEX = Mutex.new MUTATION_GENERATION_MESSAGE = 'Total Mutations/Time/Parse-Errors: %s/%0.2fs - %0.2f/s' - START_MESSAGE = 'Starting - %s' - FINISH_MESSAGE = 'Mutations - %4i - %s' + FINISH_MESSAGE = '%4i - %s' RUBY_GLOB_PATTERN = '**/*.rb' - DEFAULT_MUTATION_COUNT = 0 - include Adamantium, Anima.new( :mutation_coverage, :mutation_generation, @@ -73,6 +71,33 @@ def concurrency_limits end end + class Sink + include Mutant::Parallel::Sink + + attr_reader :total + + def initialize + @total = 0 + end + + def stop? + false + end + + def status + @total + end + + def response(response) + if response.error + Mutant::WORLD.stderr.puts(response.log) + fail response.error + end + puts(FINISH_MESSAGE % [response.result, response.job.payload]) + @total += response.result + end + end + # Verify mutation generation # # @return [self] @@ -82,24 +107,38 @@ def concurrency_limits # otherwise def verify_mutation_generation checkout - timer = Mutant::Timer.new(process: Process) - start = timer.now + sink = Sink.new - options = { - finish: method(:finish), - start: method(:start), - in_processes: Etc.nprocessors - } + elapsed = Mutant::WORLD.timer.elapsed do + driver = Mutant::Parallel.async( + config: parallel_config(sink), + world: Mutant::WORLD + ) - total = Parallel.map(effective_ruby_paths, options, &method(:check_generation)) - .reduce(DEFAULT_MUTATION_COUNT, :+) + loop do + status = driver.wait_timeout(1) + break if status.done? + end + end - took = timer.now - start - puts MUTATION_GENERATION_MESSAGE % [total, took, total / took] + puts MUTATION_GENERATION_MESSAGE % [sink.total, elapsed, sink.total / elapsed] self end + def parallel_config(sink) + Mutant::Parallel::Config.new( + block: method(:check_generation), + jobs: Etc.nprocessors, + sink: sink, + timeout: nil, + process_name: 'mutation-generation', + source: Mutant::Parallel::Source::Array.new(jobs: effective_ruby_paths), + on_process_start: ->(_) {}, + thread_name: 'mutation-generation' + ) + end + # Checkout repository # # @return [self] @@ -220,33 +259,6 @@ def noinstall? ENV.key?('NOINSTALL') end - # Print start progress - # - # @param [Pathname] path - # @param [Integer] _index - # - # @return [undefined] - # - def start(path, _index) - MUTEX.synchronize do - puts START_MESSAGE % path - end - end - - # Print finish progress - # - # @param [Pathname] path - # @param [Integer] _index - # @param [Integer] count - # - # @return [undefined] - # - def finish(path, _index, count) - MUTEX.synchronize do - puts FINISH_MESSAGE % [count, path] - end - end - # Helper method to execute system commands # # @param [Array] arguments diff --git a/spec/unit/mutant/mutation/runner/sink_spec.rb b/spec/unit/mutant/mutation/runner/sink_spec.rb index 62587a779..1b8a72df1 100644 --- a/spec/unit/mutant/mutation/runner/sink_spec.rb +++ b/spec/unit/mutant/mutation/runner/sink_spec.rb @@ -5,17 +5,19 @@ let(:mutation_a_index_response) do Mutant::Parallel::Response.new( - result: mutation_a_index_result, + error: nil, + job: 0, log: '', - error: nil + result: mutation_a_index_result ) end let(:mutation_b_index_response) do Mutant::Parallel::Response.new( - result: mutation_b_index_result, + error: nil, + job: 0, log: '', - error: nil + result: mutation_b_index_result ) end diff --git a/spec/unit/mutant/parallel/connection/reader_spec.rb b/spec/unit/mutant/parallel/connection/reader_spec.rb index 1b6ef33ec..4ef13519d 100644 --- a/spec/unit/mutant/parallel/connection/reader_spec.rb +++ b/spec/unit/mutant/parallel/connection/reader_spec.rb @@ -1,20 +1,22 @@ # frozen_string_literal: true RSpec.describe Mutant::Parallel::Connection::Reader do - let(:deadline) { instance_double(Mutant::Timer::Deadline) } - let(:header_segment) { [result_segment.bytesize].pack('N') } - let(:io) { class_double(IO) } - let(:marshal) { class_double(Marshal) } - let(:log_reader) { instance_double(IO) } - let(:response_reader) { instance_double(IO) } - let(:result) { double('reader-result') } - let(:result_segment) { '' } + let(:deadline) { instance_double(Mutant::Timer::Deadline) } + let(:header_segment) { [result_segment.bytesize].pack('N') } + let(:io) { class_double(IO) } + let(:job) { Mutant::Parallel::Source::Job.new(index: 0, payload: nil) } + let(:log_reader) { instance_double(IO) } + let(:marshal) { class_double(Marshal) } + let(:response_reader) { instance_double(IO) } + let(:result) { double('reader-result') } + let(:result_segment) { '' } describe '.read_response' do def apply described_class.read_response( deadline: deadline, io: io, + job: job, log_reader: log_reader, marshal: marshal, response_reader: response_reader @@ -90,6 +92,7 @@ def marshal_load expect(apply).to eql( Mutant::Parallel::Response.new( error: nil, + job: job, log: '', result: result ) @@ -121,6 +124,7 @@ def marshal_load expect(apply).to eql( Mutant::Parallel::Response.new( error: nil, + job: job, log: '', result: result ) @@ -157,6 +161,7 @@ def marshal_load expect(apply).to eql( Mutant::Parallel::Response.new( error: nil, + job: job, log: '', result: result ) @@ -187,6 +192,7 @@ def marshal_load expect(apply).to eql( Mutant::Parallel::Response.new( error: Timeout, + job: job, log: '', result: nil ) @@ -214,6 +220,7 @@ def marshal_load expect(apply).to eql( Mutant::Parallel::Response.new( error: EOFError, + job: job, log: '', result: nil ) @@ -235,6 +242,7 @@ def marshal_load expect(apply).to eql( Mutant::Parallel::Response.new( error: Timeout, + job: job, log: '', result: nil ) @@ -255,6 +263,7 @@ def marshal_load expect(apply).to eql( Mutant::Parallel::Response.new( error: Timeout, + job: job, log: '', result: nil ) diff --git a/spec/unit/mutant/parallel/worker_spec.rb b/spec/unit/mutant/parallel/worker_spec.rb index e7177c7e8..daa207994 100644 --- a/spec/unit/mutant/parallel/worker_spec.rb +++ b/spec/unit/mutant/parallel/worker_spec.rb @@ -209,15 +209,16 @@ def finalize end # rubocop:disable Metrics/MethodLength - def read_response(response) + def read_response(job, response) { receiver: Mutant::Parallel::Connection::Reader, arguments: [ { deadline: deadline, io: world.io, - marshal: world.marshal, + job: job, log_reader: log_pipe.to_reader, + marshal: world.marshal, response_reader: response_pipe.to_reader } ], @@ -239,6 +240,7 @@ def new_deadline let(:response_a) do Mutant::Parallel::Response.new( error: nil, + job: 0, log: 'log-a', result: result_a ) @@ -247,6 +249,7 @@ def new_deadline let(:response_b) do Mutant::Parallel::Response.new( error: nil, + job: 0, log: 'log-b', result: result_b ) @@ -262,7 +265,7 @@ def new_deadline add_job(job_a), send_value(payload_a), new_deadline, - read_response(response_a), + read_response(job_a, response_a), with(var_active_jobs, active_jobs), remove_job(job_a), with(var_sink, sink), @@ -285,7 +288,7 @@ def new_deadline add_job(job_a), send_value(payload_a), new_deadline, - read_response(response_a), + read_response(job_a, response_a), with(var_active_jobs, active_jobs), remove_job(job_a), with(var_sink, sink), @@ -298,7 +301,7 @@ def new_deadline add_job(job_b), send_value(payload_b), new_deadline, - read_response(response_b), + read_response(job_b, response_b), with(var_active_jobs, active_jobs), remove_job(job_b), with(var_sink, sink), @@ -315,6 +318,7 @@ def new_deadline let(:response_a) do Mutant::Parallel::Response.new( error: Timeout, + job: 0, log: 'log', result: nil ) @@ -329,7 +333,7 @@ def new_deadline add_job(job_a), send_value(payload_a), new_deadline, - read_response(response_a), + read_response(job_a, response_a), with(var_active_jobs, active_jobs), remove_job(job_a), with(var_sink, sink), diff --git a/spec/unit/mutant/test/runner/sink_spec.rb b/spec/unit/mutant/test/runner/sink_spec.rb index c6b8543bf..63ebbe412 100644 --- a/spec/unit/mutant/test/runner/sink_spec.rb +++ b/spec/unit/mutant/test/runner/sink_spec.rb @@ -45,12 +45,27 @@ ) end + let(:job_a) do + Mutant::Parallel::Source::Job.new( + index: 0, + payload: nil + ) + end + + let(:job_b) do + Mutant::Parallel::Source::Job.new( + index: 0, + payload: nil + ) + end + let(:test_result_a) { test_result_a_raw.with(output: test_response_a.log) } let(:test_result_b) { test_result_b_raw.with(output: test_response_b.log) } let(:test_response_a) do Mutant::Parallel::Response.new( error: nil, + job: job_a, result: test_result_a_raw, log: '' ) @@ -59,6 +74,7 @@ let(:test_response_b) do Mutant::Parallel::Response.new( error: nil, + job: job_b, result: test_result_b_raw, log: '' ) @@ -112,6 +128,7 @@ object.response( Mutant::Parallel::Response.new( error: EOFError, + job: 0, log: 'some log', result: nil )