diff --git a/lib/celluloid/io.rb b/lib/celluloid/io.rb index 044ae11..c0231c2 100644 --- a/lib/celluloid/io.rb +++ b/lib/celluloid/io.rb @@ -1,5 +1,6 @@ require "celluloid/io/version" +require "io/wait" require "celluloid" require "celluloid/io/dns_resolver" require "celluloid/io/mailbox" @@ -47,25 +48,41 @@ def self.copy_stream(src, dst, copy_length = nil, src_offset = nil) end end - def wait_readable(io) + def wait_readable(io, timeout=nil) io = io.to_io if IO.evented? mailbox = Thread.current[:celluloid_mailbox] - mailbox.reactor.wait_readable(io) + mailbox.reactor.wait_readable(io, timeout) else - Kernel.select([io]) + # hack because SSLSocket does not have the methods defined + if io.respond_to?(:wait_readable) && + # TCP* and UNIXServer always throws EINVAL exception in Linux when using #wait_readable, + # upstream to ruby! + !(io.is_a?(::TCPServer) || io.is_a?(::UNIXServer)) + io.wait_readable(timeout) + else + Kernel.select([io], nil, nil, timeout) + end end nil end module_function :wait_readable - def wait_writable(io) + def wait_writable(io, timeout=nil) io = io.to_io if IO.evented? mailbox = Thread.current[:celluloid_mailbox] - mailbox.reactor.wait_writable(io) + mailbox.reactor.wait_writable(io, timeout) else - Kernel.select([], [io]) + # hack because SSLSocket does not have the methods defined + if io.respond_to?(:wait_writable) && + # TCP* and UNIXServer always throws EINVAL exception in Linux when using #wait_readable, + # upstream to ruby! + !(io.is_a?(::TCPServer) || io.is_a?(::UNIXServer)) + io.wait_writable(timeout) + else + Kernel.select([], [io], nil, timeout) + end end nil end diff --git a/lib/celluloid/io/reactor.rb b/lib/celluloid/io/reactor.rb index 11278ea..68c301b 100644 --- a/lib/celluloid/io/reactor.rb +++ b/lib/celluloid/io/reactor.rb @@ -17,17 +17,17 @@ def initialize end # Wait for the given IO object to become readable - def wait_readable(io) - wait io, :r + def wait_readable(io, timeout=nil) + wait io, :r, timeout end # Wait for the given IO object to become writable - def wait_writable(io) - wait io, :w + def wait_writable(io, timeout=nil) + wait io, :w, timeout end # Wait for the given IO operation to complete - def wait(io, set) + def wait(io, set, timeout=nil) # zomg ugly type conversion :( unless io.is_a?(::IO) || io.is_a?(OpenSSL::SSL::SSLSocket) if io.respond_to? :to_io @@ -43,7 +43,14 @@ def wait(io, set) monitor.value = Task.current begin - Task.suspend :iowait + # this condition is actually superflous; Timeout.timeout method signature is not respected + # by Celluloid:Actor#timeout; Timeout.timeout has arity 2 and can receive nil; the first one + # is already addressed in a PR in github + if timeout + Thread.current[:celluloid_actor].send(:timeout, timeout) { Task.suspend :iowait } + else + Task.suspend :iowait + end ensure # In all cases we want to ensure that the monitor is closed once we # have woken up. However, in some cases, the monitor is already diff --git a/lib/celluloid/io/socket.rb b/lib/celluloid/io/socket.rb index 9bd369a..8747ebb 100644 --- a/lib/celluloid/io/socket.rb +++ b/lib/celluloid/io/socket.rb @@ -70,6 +70,26 @@ def self.try_convert(socket, convert_io = true) end end + # io/wait API, it belongs here as it is extended on all IOs + + def_delegators :@socket, :ready?, :nread + # Wait until the current object is readable + def wait_readable(timeout=nil) + Celluloid::IO.wait_readable(self, timeout) + self + rescue Celluloid::TaskTimeout => e + raise e unless timeout + end + alias_method :wait, :wait_readable + + # Wait until the current object is writable + def wait_writable(timeout=nil) + Celluloid::IO.wait_writable(self, timeout) + self + rescue Celluloid::TaskTimeout => e + raise e unless timeout + end + class << self extend Forwardable def_delegators '::Socket', *(::Socket.methods - self.methods - [:try_convert]) diff --git a/lib/celluloid/io/stream.rb b/lib/celluloid/io/stream.rb index b9f6185..fa3b0b2 100644 --- a/lib/celluloid/io/stream.rb +++ b/lib/celluloid/io/stream.rb @@ -27,12 +27,6 @@ def initialize(socket) @write_latch = Latch.new end - # Wait until the current object is readable - def wait_readable; Celluloid::IO.wait_readable(self); end - - # Wait until the current object is writable - def wait_writable; Celluloid::IO.wait_writable(self); end - # System read via the nonblocking subsystem def sysread(length = nil, buffer = nil) buffer ||= ''.force_encoding(Encoding::ASCII_8BIT) diff --git a/lib/celluloid/io/tcp_server.rb b/lib/celluloid/io/tcp_server.rb index e0e955a..e9ce115 100644 --- a/lib/celluloid/io/tcp_server.rb +++ b/lib/celluloid/io/tcp_server.rb @@ -33,12 +33,12 @@ def initialize(*args) # @return [TCPSocket] def accept - Celluloid::IO.wait_readable(to_io) accept_nonblock end # @return [TCPSocket] def accept_nonblock + Celluloid::IO.wait_readable(to_io) Celluloid::IO::TCPSocket.new(to_io.accept_nonblock) end diff --git a/lib/celluloid/io/udp_socket.rb b/lib/celluloid/io/udp_socket.rb index b2edc36..6607800 100644 --- a/lib/celluloid/io/udp_socket.rb +++ b/lib/celluloid/io/udp_socket.rb @@ -24,9 +24,6 @@ def initialize(*args) end end - # Wait until the socket is readable - def wait_readable; Celluloid::IO.wait_readable(self); end - # Receives up to maxlen bytes from socket. flags is zero or more of the # MSG_ options. The first element of the results, mesg, is the data # received. The second element, sender_addrinfo, contains diff --git a/spec/celluloid/io/reactor_spec.rb b/spec/celluloid/io/reactor_spec.rb index fcbf386..54103a4 100644 --- a/spec/celluloid/io/reactor_spec.rb +++ b/spec/celluloid/io/reactor_spec.rb @@ -5,35 +5,35 @@ let(:example_port) { assign_port } it "shouldn't crash" do - server = ::TCPServer.new example_addr, example_port + begin + server = ::TCPServer.new example_addr, example_port + thread = Thread.new { server.accept } + socket = within_io_actor { Celluloid::IO::TCPSocket.new example_addr, example_port } + peer = thread.value + peer_thread = Thread.new { loop { peer << payload } } + handle = false - thread = Thread.new { server.accept } - - socket = within_io_actor { Celluloid::IO::TCPSocket.new example_addr, example_port } - peer = thread.value - peer_thread = Thread.new { loop { peer << payload } } - handle = false - - # Main server body: - within_io_actor do - begin - timeout(2) do - loop do - socket.readpartial(2046) + # Main server body: + within_io_actor do + begin + timeout(2) do + loop do + socket.readpartial(2046) + end end + # rescuing timeout, ok. rescuing terminated exception, is it ok? TODO: investigate + rescue Celluloid::TaskTerminated, Celluloid::TaskTimeout, Timeout::Error + ensure + socket.readpartial(2046) + handle = true end - # rescuing timeout, ok. rescuing terminated exception, is it ok? TODO: investigate - rescue Celluloid::TaskTerminated, Celluloid::TaskTimeout, Timeout::Error - ensure - socket.readpartial(2046) - handle = true end - end - expect(handle).to be_truthy - - server.close - peer.close - socket.close + expect(handle).to be_truthy + ensure + server.close + peer.close + socket.close + end end end diff --git a/spec/celluloid/io/ssl_server_spec.rb b/spec/celluloid/io/ssl_server_spec.rb index f18b1fa..4e65776 100644 --- a/spec/celluloid/io/ssl_server_spec.rb +++ b/spec/celluloid/io/ssl_server_spec.rb @@ -78,14 +78,4 @@ end end end - - def with_ssl_server(port, raw_server = nil) - raw_server ||= Celluloid::IO::TCPServer.new(example_addr, port) - server = Celluloid::IO::SSLServer.new(raw_server, server_context) - begin - yield server - ensure - server.close - end - end end diff --git a/spec/celluloid/io/ssl_socket_spec.rb b/spec/celluloid/io/ssl_socket_spec.rb index 327b773..1f294f1 100644 --- a/spec/celluloid/io/ssl_socket_spec.rb +++ b/spec/celluloid/io/ssl_socket_spec.rb @@ -239,4 +239,53 @@ def with_raw_sockets peer.close end end + + context "io/wait API" do + let(:payload) { "ohai" } + describe "#wait_readable/wait_writable" do + it "can timeout on descriptors wait" do + class WaiterActor + include Celluloid::IO + def send_later(socket) + peer = socket.accept + after(2) { peer.write "1" } + peer + end + end + with_ssl_server(example_port) do |subject| + thread = Thread.new do + raw = TCPSocket.new(example_addr, example_port) + Celluloid::IO::SSLSocket.new(raw, client_context).connect + end + actor = WaiterActor.new + begin + within_io_actor do + # wait_wirtable + # if wait_writable(t) timeouts, it returns nil + # expect(subject.wait_writable(0.2)).to be_nil + peer = actor.send_later(subject) + client = thread.value + # wait_writable() is successfully, it returns the socket + expect(client.wait_writable).to be(client) + + # wait_readable + client.write payload + expect(peer.read(payload.size)).to eq payload # confirm the client read + # waiter actor sends info after 2 seconds + # there is the assumption here that after 0.2, + # there is not yet anything to read + # as all timer specs, some variations might occur, but 0.2 to 2 must be reasonable + expect(client.wait_readable(0.2)).to be_nil + expect(client.wait_readable).to be(client) + expect(client.read_nonblock(2)).to eq "1" + end + ensure + actor.terminate if actor.alive? + end + end + end + + end + end + end diff --git a/spec/celluloid/io/tcp_socket_spec.rb b/spec/celluloid/io/tcp_socket_spec.rb index aaba86f..2de3ff4 100644 --- a/spec/celluloid/io/tcp_socket_spec.rb +++ b/spec/celluloid/io/tcp_socket_spec.rb @@ -238,5 +238,50 @@ expect(peer.read(payload.size)).to eq payload end end + + context "io/wait API" do + describe "#wait_readable/wait_writable" do + it "can timeout on descriptors wait" do + class WaiterActor + include Celluloid::IO + def send_later(socket) + peer = socket.accept + after(2) { peer.write "1" } + peer + end + end + with_tcp_server(example_port) do |subject| + thread = Thread.new { Celluloid::IO::TCPSocket.new(example_addr, example_port) } + actor = WaiterActor.new + begin + within_io_actor do + # wait_wirtable + # if wait_writable(t) timeouts, it returns nil + expect(subject.wait_writable(0.2)).to be_nil + peer = actor.send_later(subject) + client = thread.value + # wait_writable() is successfully, it returns the socket + expect(client.wait_writable).to be(client) + + # wait_readable + client.write payload + expect(peer.read(payload.size)).to eq payload # confirm the client read + # waiter actor sends info after 2 seconds + # there is the assumption here that after 0.2, + # there is not yet anything to read + # as all timer specs, some variations might occur, but 0.2 to 2 must be reasonable + expect(client.wait_readable(0.2)).to be_nil + expect(client.wait_readable).to be(client) + expect(client.read_nonblock(2)).to eq "1" + end + ensure + actor.terminate if actor.alive? + end + end + end + + end + end + end end diff --git a/spec/celluloid/io/unix_socket_spec.rb b/spec/celluloid/io/unix_socket_spec.rb index 61fddea..5ec363f 100644 --- a/spec/celluloid/io/unix_socket_spec.rb +++ b/spec/celluloid/io/unix_socket_spec.rb @@ -200,4 +200,48 @@ end end + context "io/wait API" do + describe "#wait_readable/wait_writable" do + it "can timeout on descriptors wait" do + class WaiterActor + include Celluloid::IO + def send_later(socket) + peer = socket.accept + after(2) { peer.write "1" } + peer + end + end + with_unix_server do |subject| + thread = Thread.new { Celluloid::IO::UNIXSocket.new(example_unix_sock) } + actor = WaiterActor.new + begin + within_io_actor do + # wait_writable + # if wait_writable(t) timeouts, it returns nil + # ATTENTION: UNIX Servers seem to be writable all the time. How to test wait_writable(timeout)? + # expect(subject.wait_writable(0.2)).to be_nil + peer = actor.send_later(subject) + client = thread.value + # wait_writable() is successfully, it returns the socket + expect(client.wait_writable).to be(client) + + # wait_readable + client.write payload + expect(peer.read(payload.size)).to eq payload # confirm the client read + # waiter actor sends info after 2 seconds + # there is the assumption here that after 0.2, + # there is not yet anything to read + # as all timer specs, some variations might occur, but 0.2 to 2 must be reasonable + expect(client.wait_readable(0.2)).to be_nil + expect(client.wait_readable).to be(client) + expect(client.read_nonblock(2)).to eq "1" + end + ensure + actor.terminate if actor.alive? + end + end + end + + end + end end diff --git a/spec/support/examples/methods.rb b/spec/support/examples/methods.rb index 2a65916..24173a0 100644 --- a/spec/support/examples/methods.rb +++ b/spec/support/examples/methods.rb @@ -55,6 +55,16 @@ def with_unix_server end end +def with_ssl_server(port, raw_server = nil) + raw_server ||= Celluloid::IO::TCPServer.new(example_addr, port) + server = Celluloid::IO::SSLServer.new(raw_server, server_context) + begin + yield server + ensure + server.close + end +end + def with_connected_sockets(port) with_tcp_server(port) do |server| client = Celluloid::IO::TCPSocket.new(example_addr, port)