diff --git a/lib/mqtt/client.rb b/lib/mqtt/client.rb index 68bf9af..8310e80 100644 --- a/lib/mqtt/client.rb +++ b/lib/mqtt/client.rb @@ -167,6 +167,12 @@ def initialize(*args) @read_thread = nil @write_semaphore = Mutex.new @pubacks_semaphore = Mutex.new + + if OpenSSL::SSL::SSLSocket.method_defined?(:read_nonblock) + @wait_for_read_func = method(:nonblocking_read_before_select) + else + @wait_for_read_func = method(:select_without_nonblocking_read) + end end # Get the OpenSSL context, that is used if SSL/TLS is enabled @@ -438,9 +444,12 @@ def unsubscribe(*topics) private - # Try to read a packet from the server - # Also sends keep-alive ping packets. - def receive_packet + def select_without_nonblocking_read + # Poll socket - is there data waiting? + [nil, !IO.select([@socket], [], [], SELECT_TIMEOUT).nil?] + end + + def nonblocking_read_before_select first_byte_in_packet = nil data_available_to_read = false begin @@ -456,6 +465,13 @@ def receive_packet [@socket], [], [], SELECT_TIMEOUT ).nil? end + [first_byte_in_packet, data_available_to_read] + end + + # Try to read a packet from the server + # Also sends keep-alive ping packets. + def receive_packet + first_byte_in_packet, data_available_to_read = @wait_for_read_func.call if data_available_to_read # Yes - read in the packet packet = MQTT::Packet.read(@socket, first_byte_in_packet) diff --git a/spec/mqtt_client_spec.rb b/spec/mqtt_client_spec.rb index b25d81e..240cda8 100644 --- a/spec/mqtt_client_spec.rb +++ b/spec/mqtt_client_spec.rb @@ -845,11 +845,9 @@ expect(@read_queue.size).to eq(1) end - it "should put PUBLISH messages on to the read queue following a wait readable exception" do - wait_readable_exception = Class.new(StandardError) do - include IO::WaitReadable - end - allow(socket).to receive(:read_nonblock).and_raise(wait_readable_exception) + it "should put PUBLISH messages on to the read queue following an IO::WaitReadable exception", + :if => OpenSSL::SSL::SSLSocket.respond_to?(:read_nonblock) do + allow(socket).to receive(:read_nonblock).and_raise(IO::WaitReadable) socket.write("\x30\x0e\x00\x05topicpayload") socket.rewind client.send(:receive_packet)