Skip to content

Commit

Permalink
Add parallel output process capture
Browse files Browse the repository at this point in the history
  • Loading branch information
mbj committed Mar 5, 2024
1 parent 567b60d commit 92ce923
Show file tree
Hide file tree
Showing 16 changed files with 1,062 additions and 221 deletions.
3 changes: 2 additions & 1 deletion lib/mutant.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ module Mutant
require 'mutant/bootstrap'
require 'mutant/version'
require 'mutant/env'
require 'mutant/pipe'
require 'mutant/util'
require 'mutant/registry'
require 'mutant/ast'
Expand Down Expand Up @@ -102,6 +101,8 @@ module Mutant
require 'mutant/isolation/fork'
require 'mutant/isolation/none'
require 'mutant/parallel'
require 'mutant/parallel/connection'
require 'mutant/parallel/pipe'
require 'mutant/parallel/driver'
require 'mutant/parallel/source'
require 'mutant/parallel/worker'
Expand Down
3 changes: 2 additions & 1 deletion lib/mutant/mutation/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def self.run_mutation_analysis(env)
private_class_method :run_mutation_analysis

def self.async_driver(env)
Parallel.async(env.world, mutation_test_config(env))
Parallel.async(world: env.world, config: mutation_test_config(env))
end
private_class_method :async_driver

Expand All @@ -47,6 +47,7 @@ def self.mutation_test_config(env)
process_name: 'mutant-worker-process',
sink: Sink.new(env: env),
source: Parallel::Source::Array.new(jobs: env.mutations.each_index.to_a),
timeout: nil,
thread_name: 'mutant-worker-thread'
)
end
Expand Down
10 changes: 7 additions & 3 deletions lib/mutant/mutation/runner/sink.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ module Mutant
class Mutation
module Runner
class Sink
include Parallel::Sink

include Anima.new(:env)

# Initialize object
Expand Down Expand Up @@ -35,11 +37,13 @@ def stop?

# Handle mutation finish
#
# @param [Result::MutationIndex] mutation_index_result
# @param [Parallel::Response] response
#
# @return [self]
def result(mutation_index_result)
mutation_result = mutation_result(mutation_index_result)
def response(response)
fail response.error if response.error

mutation_result = mutation_result(response.result)

subject = mutation_result.mutation.subject

Expand Down
20 changes: 14 additions & 6 deletions lib/mutant/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Parallel
# @param [Config] config
#
# @return [Driver]
def self.async(world, config)
def self.async(config:, world:)
shared = shared_state(world, config)

world.process_warmup
Expand All @@ -23,19 +23,22 @@ def self.async(world, config)
)
end

# rubocop:disable Metric/MethodLength
def self.workers(world, config, shared)
Array.new(config.jobs) do |index|
Worker.start(
block: config.block,
index: index,
on_process_start: config.on_process_start,
process_name: "#{config.process_name}-#{index}",
timeout: config.timeout,
world: world,
**shared
)
end
end
private_class_method :workers
# rubocop:enable Metric/MethodLength

def self.shared_state(world, config)
{
Expand Down Expand Up @@ -69,16 +72,16 @@ def self.shared(klass, world, **attributes)
end
private_class_method :shared

# Job result sink
class Sink
# Job result sink signature
module Sink
include AbstractType

# Process job result
#
# @param [Object]
# @param [Response]
#
# @return [self]
abstract_method :result
abstract_method :response

# The sink status
#
Expand All @@ -100,10 +103,15 @@ class Config
:process_name,
:sink,
:source,
:thread_name
:thread_name,
:timeout
)
end # Config

class Response
include Anima.new(:error, :log, :result)
end

# Parallel execution status
class Status
include Adamantium, Anima.new(
Expand Down
177 changes: 177 additions & 0 deletions lib/mutant/parallel/connection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# frozen_string_literal: true

module Mutant
module Parallel
class Connection
include Anima.new(:marshal, :reader, :writer)

Error = Class.new(RuntimeError)

HEADER_FORMAT = 'N'
HEADER_SIZE = 4
MAX_BYTES = (2**32).pred

class Reader
include Anima.new(:deadline, :io, :marshal, :response_reader, :log_reader)

private(*anima.attribute_names)

private_class_method :new

attr_reader :log

def error
@errors.first
end

def result
@results.first
end

def initialize(*)
super

@buffer = +''
@log = +''

# Array of size max 1 as surrogate for
# terrible default nil ivars.
@errors = []
@lengths = []
@results = []
end

def self.read_response(**attributes)
reader = new(**attributes).read_till_final

Response.new(
log: reader.log,
error: reader.error,
result: reader.result
)
end

# rubocop:disable Metrics/MethodLength
def read_till_final
readers = [response_reader, log_reader]

until result || error
status = deadline.status

break timeout unless status.ok?

reads, _others = io.select(readers, nil, nil, status.time_left)

break timeout unless reads

reads.each do |ready|
if ready.equal?(response_reader)
advance_result
else
advance_log
end
end
end

self
end
# rubocop:enable Metrics/MethodLength

private

def timeout
@errors << Timeout
end

def advance_result
if length
if read_buffer(length)
@results << marshal.load(@buffer)
end
elsif read_buffer(HEADER_SIZE)
@lengths << Util.one(@buffer.unpack(HEADER_FORMAT))
@buffer = +''
end
end

def length
@lengths.first
end

def advance_log
with_nonblock_read(io: log_reader, max_bytes: 4096, &log.public_method(:<<))
end

def read_buffer(max_bytes)
with_nonblock_read(
io: response_reader,
max_bytes: max_bytes - @buffer.bytesize
) do |chunk|
@buffer << chunk
@buffer.bytesize.equal?(max_bytes)
end
end

# rubocop:disable Metrics/MethodLength
def with_nonblock_read(io:, max_bytes:)
io.binmode

chunk = io.read_nonblock(max_bytes, exception: false)

case chunk
when nil
@errors << EOFError
false
when String
yield chunk
else
fail "Unexpected nonblocking read return: #{chunk.inspect}"
end
end
# rubocop:enable Metrics/MethodLength
end

class Frame
include Anima.new(:io)

def receive_value
read(Util.one(read(HEADER_SIZE).unpack(HEADER_FORMAT)))
end

def send_value(body)
bytesize = body.bytesize

fail Error, 'message to big' if bytesize > MAX_BYTES

io.binmode
io.write([bytesize].pack(HEADER_FORMAT))
io.write(body)
end

private

def read(bytes)
io.binmode
io.read(bytes) or fail Error, 'Unexpected EOF'
end
end

def receive_value
marshal.load(reader.receive_value)
end

def send_value(value)
writer.send_value(marshal.dump(value))
self
end

def self.from_pipes(marshal:, reader:, writer:)
new(
marshal: marshal,
reader: Frame.new(io: reader.to_reader),
writer: Frame.new(io: writer.to_writer)
)
end
end # Connection
end # Parallel
end # Mutant
39 changes: 39 additions & 0 deletions lib/mutant/parallel/pipe.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# frozen_string_literal: true

module Mutant
module Parallel
class Pipe
include Adamantium, Anima.new(:reader, :writer)

# Run block with pipe in binmode
#
# @return [undefined]
def self.with(io)
io.pipe(binmode: true) do |(reader, writer)|
yield new(reader: reader, writer: writer)
end
end

def self.from_io(io)
reader, writer = io.pipe(binmode: true)
new(reader: reader, writer: writer)
end

# Writer end of the pipe
#
# @return [IO]
def to_writer
reader.close
writer
end

# Parent reader end of the pipe
#
# @return [IO]
def to_reader
writer.close
reader
end
end # Pipe
end # Parallel
end # Mutant
Loading

0 comments on commit 92ce923

Please sign in to comment.