diff --git a/README.md b/README.md index bb961c707..97edeb6a7 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ _[Ably](https://ably.com) is the platform that powers synchronized digital experiences in realtime. Whether attending an event in a virtual venue, receiving realtime financial information, or monitoring live car performance data – consumers simply expect realtime digital experiences as standard. Ably provides a suite of APIs to build, extend, and deliver powerful digital experiences in realtime for more than 250 million devices across 80 countries each month. Organizations like Bloomberg, HubSpot, Verizon, and Hopin depend on Ably’s platform to offload the growing complexity of business-critical realtime data synchronization at global scale. For more information, see the [Ably documentation](https://ably.com/documentation)._ -This is a Ruby client library for Ably. The library currently targets the [Ably 1.2 client library specification](https://ably.com/documentation/client-lib-development-guide/features/). You can see the complete list of features this client library supports in [our client library SDKs feature support matrix](https://ably.com/download/sdk-feature-support-matrix). +This is a Ruby client library for Ably. The library currently targets the [Ably 2.0.0 client library specification](https://ably.com/documentation/client-lib-development-guide/features/). You can see the complete list of features this client library supports in [our client library SDKs feature support matrix](https://ably.com/download/sdk-feature-support-matrix). ## Supported platforms diff --git a/lib/ably/auth.rb b/lib/ably/auth.rb index 1e6e03fa6..0befcb27f 100644 --- a/lib/ably/auth.rb +++ b/lib/ably/auth.rb @@ -414,13 +414,20 @@ def auth_header # # @return [Hash] headers def extra_auth_headers - if client_id && using_basic_auth? - { 'X-Ably-ClientId' => Base64.urlsafe_encode64(client_id) } + if client_id_for_request + { 'X-Ably-ClientId' => Base64.urlsafe_encode64(client_id_for_request) } else {} end end + # ClientId that needs to be included with every rest/realtime request + # spec - RSA7e + # @return string + def client_id_for_request + options[:client_id] if options[:client_id] && using_basic_auth? + end + # Auth params used in URI endpoint for Realtime connections # Will reauthorize implicitly if required and capable # @@ -482,15 +489,16 @@ def can_assume_client_id?(assumed_client_id) # # @api private def configure_client_id(new_client_id) - # If new client ID from Ably is a wildcard, but preconfigured clientId is set, then keep the existing clientId - if has_client_id? && new_client_id == '*' - @client_id_validated = true - return - end - - # If client_id is defined and not a wildcard, prevent it changing, this is not supported - if client_id && client_id != '*' && new_client_id != client_id - raise Ably::Exceptions::IncompatibleClientId.new("Client ID is immutable once configured for a client. Client ID cannot be changed to '#{new_client_id}'") + if has_client_id? + # If new client ID from Ably is a wildcard, but preconfigured clientId is set, then keep the existing clientId + if new_client_id == "*" + @client_id_validated = true + return + end + # If client_id is defined and not a wildcard, prevent it changing, this is not supported + if new_client_id != client_id + raise Ably::Exceptions::IncompatibleClientId.new("Client ID is immutable once configured for a client. Client ID cannot be changed to '#{new_client_id}'") + end end @client_id_validated = true @client_id = new_client_id diff --git a/lib/ably/models/protocol_message.rb b/lib/ably/models/protocol_message.rb index 8490ebc33..0088fded7 100644 --- a/lib/ably/models/protocol_message.rb +++ b/lib/ably/models/protocol_message.rb @@ -20,8 +20,6 @@ module Ably::Models # @return [String] Contains a serial number for a message on the current channel # @!attribute [r] connection_id # @return [String] Contains a string private connection key used to recover this connection - # @!attribute [r] connection_serial - # @return [Bignum] Contains a serial number for a message sent from the server to the client # @!attribute [r] message_serial # @return [Bignum] Contains a serial number for a message sent from the client to the server # @!attribute [r] timestamp @@ -129,12 +127,6 @@ def message_serial raise TypeError, "msg_serial '#{attributes[:msg_serial]}' is invalid, a positive Integer is expected for a ProtocolMessage" end - def connection_serial - Integer(attributes[:connection_serial]) - rescue TypeError - raise TypeError, "connection_serial '#{attributes[:connection_serial]}' is invalid, a positive Integer is expected for a ProtocolMessage" - end - def count [1, attributes[:count].to_i].max end @@ -146,26 +138,12 @@ def has_message_serial? false end - # @api private - def has_connection_serial? - connection_serial && true + def has_channel_serial? + channel_serial && true rescue TypeError false end - def serial - if has_connection_serial? - connection_serial - else - message_serial - end - end - - # @api private - def has_serial? - has_connection_serial? || has_message_serial? - end - def messages @messages ||= Array(attributes[:messages]).map do |message| @@ -271,7 +249,7 @@ def attributes # Return a JSON ready object from the underlying #attributes using Ably naming conventions for keys def as_json(*args) raise TypeError, ':action is missing, cannot generate a valid Hash for ProtocolMessage' unless action - raise TypeError, ':msg_serial or :connection_serial is missing, cannot generate a valid Hash for ProtocolMessage' if ack_required? && !has_serial? + raise TypeError, ':msg_serial is missing, cannot generate a valid Hash for ProtocolMessage' if ack_required? && !has_message_serial? attributes.dup.tap do |hash_object| hash_object['action'] = action.to_i @@ -296,11 +274,12 @@ def to_s end # True if the ProtocolMessage appears to be invalid, however this is not a guarantee + # Used for validating incoming protocol messages, so no need to add unnecessary checks # @return [Boolean] # @api private def invalid? action_enum = action rescue nil - !action_enum || (ack_required? && !has_serial?) + !action_enum end # @!attribute [r] logger diff --git a/lib/ably/modules/safe_deferrable.rb b/lib/ably/modules/safe_deferrable.rb index c011b4375..1849544db 100644 --- a/lib/ably/modules/safe_deferrable.rb +++ b/lib/ably/modules/safe_deferrable.rb @@ -39,7 +39,7 @@ def errback(&block) end end - # Mark the Deferrable as succeeded and trigger all callbacks. + # Mark the Deferrable as succeeded and trigger all success callbacks. # See http://www.rubydoc.info/gems/eventmachine/1.0.7/EventMachine/Deferrable#succeed-instance_method # # @return [void] @@ -48,7 +48,7 @@ def succeed(*args) super(*args) end - # Mark the Deferrable as failed and trigger all callbacks. + # Mark the Deferrable as failed and trigger all error callbacks. # See http://www.rubydoc.info/gems/eventmachine/1.0.7/EventMachine/Deferrable#fail-instance_method # # @return [void] diff --git a/lib/ably/modules/state_emitter.rb b/lib/ably/modules/state_emitter.rb index c638cb9a4..2a5f8b516 100644 --- a/lib/ably/modules/state_emitter.rb +++ b/lib/ably/modules/state_emitter.rb @@ -3,7 +3,7 @@ module Ably::Modules # the instance variable @state is used exclusively, the {Enum} STATE is defined prior to inclusion of this # module, and the class is an {EventEmitter}. It then emits state changes. # - # It also ensures the EventEmitter is configured to retrict permitted events to the + # It also ensures the EventEmitter is configured to restrict permitted events to the # the available STATEs or EVENTs if defined i.e. if EVENTs includes an additional type such as # :update, then it will support all EVENTs being emitted. EVENTs must be a superset of STATEs # diff --git a/lib/ably/realtime/auth.rb b/lib/ably/realtime/auth.rb index 4d12f87a1..c1fc59831 100644 --- a/lib/ably/realtime/auth.rb +++ b/lib/ably/realtime/auth.rb @@ -226,6 +226,10 @@ def auth_header_sync auth_sync.auth_header end + def client_id_for_request_sync + auth_sync.client_id_for_request + end + # Auth params used in URI endpoint for Realtime connections # Will reauthorize implicitly if required and capable # diff --git a/lib/ably/realtime/channel.rb b/lib/ably/realtime/channel.rb index 6192cf666..af1772c0f 100644 --- a/lib/ably/realtime/channel.rb +++ b/lib/ably/realtime/channel.rb @@ -42,7 +42,7 @@ class Channel # # @spec RTL2b # - # The permited states for this channel + # The permitted states for this channel STATE = ruby_enum('STATE', :initialized, :attaching, @@ -364,8 +364,8 @@ def __incoming_msgbus__ # @return [Ably::Models::ChannelOptions] def set_options(channel_options) @options = Ably::Models::ChannelOptions(channel_options) - - manager.request_reattach if need_reattach? + # RTL4i + manager.request_reattach if (need_reattach? and connection.state?(:connected)) end alias options= set_options diff --git a/lib/ably/realtime/channel/channel_manager.rb b/lib/ably/realtime/channel/channel_manager.rb index 5b1761087..90f00b9e6 100644 --- a/lib/ably/realtime/channel/channel_manager.rb +++ b/lib/ably/realtime/channel/channel_manager.rb @@ -18,7 +18,7 @@ def initialize(channel, connection) def attach if can_transition_to?(:attached) connect_if_connection_initialized - send_attach_protocol_message + send_attach_protocol_message if connection.state?(:connected) # RTL4i end end @@ -34,9 +34,9 @@ def detach(error, previous_state) # Channel is attached, notify presence if sync is expected def attached(attached_protocol_message) # If no attached ProtocolMessage then this attached request was triggered by the client - # library, such as returning to attached whne detach has failed + # library, such as returning to attached when detach has failed if attached_protocol_message - update_presence_sync_state_following_attached attached_protocol_message + channel.presence.manager.on_attach attached_protocol_message.has_presence_flag? channel.properties.set_attach_serial(attached_protocol_message.channel_serial) channel.options.set_modes_from_flags(attached_protocol_message.flags) channel.options.set_params(attached_protocol_message.params) @@ -49,17 +49,16 @@ def log_channel_error(error) end # Request channel to be reattached by sending an attach protocol message - # @param [Hash] options - # @option options [Ably::Models::ErrorInfo] :reason - def request_reattach(options = {}) - reason = options[:reason] - send_attach_protocol_message - logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" } + # @param [Ably::Models::ErrorInfo] reason + def request_reattach(reason = nil) channel.set_channel_error_reason(reason) if reason channel.transition_state_machine! :attaching, reason: reason unless channel.attaching? + send_attach_protocol_message + logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" } end def duplicate_attached_received(protocol_message) + logger.debug { "Server initiated ATTACHED message received for channel '#{channel.name}' with state #{channel.state}" } if protocol_message.error channel.set_channel_error_reason protocol_message.error log_channel_error protocol_message.error @@ -68,9 +67,7 @@ def duplicate_attached_received(protocol_message) channel.properties.set_attach_serial(protocol_message.channel_serial) channel.options.set_modes_from_flags(protocol_message.flags) - if protocol_message.has_channel_resumed_flag? - logger.debug { "ChannelManager: Additional resumed ATTACHED message received for #{channel.state} channel '#{channel.name}'" } - else + unless protocol_message.has_channel_resumed_flag? channel.emit :update, Ably::Models::ChannelStateChange.new( current: channel.state, previous: channel.state, @@ -78,7 +75,7 @@ def duplicate_attached_received(protocol_message) reason: protocol_message.error, resumed: false, ) - update_presence_sync_state_following_attached protocol_message + channel.presence.manager.on_attach protocol_message.has_presence_flag? end end @@ -170,6 +167,12 @@ def start_attach_from_suspended_timer end end + # RTL13c + def notify_state_change + @pending_state_change_timer.cancel if @pending_state_change_timer + @pending_state_change_timer = nil + end + private attr_reader :pending_state_change_timer @@ -209,56 +212,56 @@ def send_attach_protocol_message message_options[:flags] = message_options[:flags].to_i | Ably::Models::ProtocolMessage::ATTACH_FLAGS_MAPPING[:resume] end - send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach, :suspended, message_options - end - - def send_detach_protocol_message(previous_state) - send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Detach, previous_state # return to previous state if failed - end + message_options[:channelSerial] = channel.properties.channel_serial # RTL4c1 - def send_state_change_protocol_message(new_state, state_if_failed, message_options = {}) state_at_time_of_request = channel.state + attach_action = Ably::Models::ProtocolMessage::ACTION.Attach + # RTL4f @pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do if channel.state == state_at_time_of_request - error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{new_state} operation failed (timed out)") - channel.transition_state_machine state_if_failed, reason: error + error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{attach_action} operation failed (timed out)") + channel.transition_state_machine :suspended, reason: error # return to suspended state if failed end end + # Shouldn't queue attach message as per RTL4i, so message is added top of the queue + # to be sent immediately while processing next message + connection.send_protocol_message_immediately( + action: attach_action.to_i, + channel: channel.name, + **message_options.to_h + ) + end - channel.once_state_changed do - @pending_state_change_timer.cancel if @pending_state_change_timer - @pending_state_change_timer = nil + def send_detach_protocol_message(previous_state) + state_at_time_of_request = channel.state + detach_action = Ably::Models::ProtocolMessage::ACTION.Detach + + @pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do + if channel.state == state_at_time_of_request + error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{detach_action} operation failed (timed out)") + channel.transition_state_machine previous_state, reason: error # return to previous state if failed + end end - resend_if_disconnected_and_connected = lambda do + on_disconnected_and_connected = lambda do connection.unsafe_once(:disconnected) do - next unless pending_state_change_timer connection.unsafe_once(:connected) do - next unless pending_state_change_timer - connection.send_protocol_message( - action: new_state.to_i, - channel: channel.name, - **message_options.to_h - ) - resend_if_disconnected_and_connected.call - end + yield if pending_state_change_timer + end if pending_state_change_timer end end - resend_if_disconnected_and_connected.call - - connection.send_protocol_message( - action: new_state.to_i, - channel: channel.name, - **message_options.to_h - ) - end - def update_presence_sync_state_following_attached(attached_protocol_message) - if attached_protocol_message.has_presence_flag? - channel.presence.manager.sync_expected - else - channel.presence.manager.sync_not_expected + send_detach_message = lambda do + on_disconnected_and_connected.call do + send_detach_message.call + end + connection.send_protocol_message( + action: detach_action.to_i, + channel: channel.name + ) end + + send_detach_message.call end def logger diff --git a/lib/ably/realtime/channel/channel_properties.rb b/lib/ably/realtime/channel/channel_properties.rb index 26ddf2622..60f1911a8 100644 --- a/lib/ably/realtime/channel/channel_properties.rb +++ b/lib/ably/realtime/channel/channel_properties.rb @@ -18,6 +18,15 @@ class ChannelProperties # attr_reader :attach_serial + # ChannelSerial contains the channelSerial from latest ProtocolMessage of action type + # Message/PresenceMessage received on the channel. + # + # @spec CP2b, RTL15b + # + # @return [String] + # + attr_accessor :channel_serial + def initialize(channel) @channel = channel end diff --git a/lib/ably/realtime/channel/channel_state_machine.rb b/lib/ably/realtime/channel/channel_state_machine.rb index fb91deee1..e3cedd18c 100644 --- a/lib/ably/realtime/channel/channel_state_machine.rb +++ b/lib/ably/realtime/channel/channel_state_machine.rb @@ -29,6 +29,7 @@ class ChannelStateMachine transition :from => :failed, :to => [:attaching, :initialized] after_transition do |channel, transition| + channel.manager.notify_state_change # RTL13c channel.synchronize_state_with_statemachine end @@ -55,6 +56,7 @@ class ChannelStateMachine end after_transition(to: [:detached, :failed, :suspended]) do |channel, current_transition| + channel.properties.channel_serial = nil # RTP5a1 err = error_from_state_change(current_transition) channel.manager.fail_queued_messages(err) if channel.failed? or channel.suspended? #RTL11 channel.manager.log_channel_error err if err diff --git a/lib/ably/realtime/channels.rb b/lib/ably/realtime/channels.rb index 9b814db12..e7cf788dd 100644 --- a/lib/ably/realtime/channels.rb +++ b/lib/ably/realtime/channels.rb @@ -46,6 +46,26 @@ def release(channel) @channels.delete(channel) end if @channels.has_key?(channel) end + + # Sets channel serial to each channel from given serials hashmap + # @param [Hash] serials - map of channel name to respective channel serial + # @api private + def set_channel_serials(serials) + serials.each do |channel_name, channel_serial| + get(channel_name).properties.channel_serial = channel_serial + end + end + + # @return [Hash] serials - map of channel name to respective channel serial + # @api private + def get_channel_serials + channel_serials = {} + self.each do |channel| + channel_serials[channel.name] = channel.properties.channel_serial if channel.state == :attached + end + channel_serials + end + end end end diff --git a/lib/ably/realtime/client.rb b/lib/ably/realtime/client.rb index bf04776c2..b35aaa94b 100644 --- a/lib/ably/realtime/client.rb +++ b/lib/ably/realtime/client.rb @@ -1,5 +1,6 @@ require 'uri' require 'ably/realtime/channel/publisher' +require 'ably/realtime/recovery_key_context' module Ably module Realtime @@ -11,6 +12,7 @@ class Client include Ably::Modules::Conversions extend Forwardable + using Ably::Util::AblyExtensions DOMAIN = 'realtime.ably.io' @@ -120,17 +122,23 @@ def initialize(options) acc[key.to_s] = value.to_s end @rest_client = Ably::Rest::Client.new(options.merge(realtime_client: self)) - @echo_messages = rest_client.options.fetch(:echo_messages, true) == false ? false : true - @queue_messages = rest_client.options.fetch(:queue_messages, true) == false ? false : true + @echo_messages = rest_client.options.fetch_with_default(:echo_messages, true) + @queue_messages = rest_client.options.fetch_with_default(:queue_messages, true) @custom_realtime_host = rest_client.options[:realtime_host] || rest_client.options[:ws_host] - @auto_connect = rest_client.options.fetch(:auto_connect, true) == false ? false : true - @recover = rest_client.options[:recover] - - raise ArgumentError, "Recovery key '#{recover}' is invalid" if recover && !recover.match(Connection::RECOVER_REGEX) + @auto_connect = rest_client.options.fetch_with_default(:auto_connect, true) + @recover = rest_client.options.fetch_with_default(:recover, '') @auth = Ably::Realtime::Auth.new(self) @channels = Ably::Realtime::Channels.new(self) @connection = Ably::Realtime::Connection.new(self, options) + + unless @recover.nil_or_empty? + recovery_context = RecoveryKeyContext.from_json(@recover, logger) + unless recovery_context.nil? + @channels.set_channel_serials recovery_context.channel_serials # RTN16j + @connection.set_msg_serial_from_recover = recovery_context.msg_serial # RTN16f + end + end end # Return a {Ably::Realtime::Channel Realtime Channel} for the given name diff --git a/lib/ably/realtime/client/incoming_message_dispatcher.rb b/lib/ably/realtime/client/incoming_message_dispatcher.rb index 292a713b7..435d70660 100644 --- a/lib/ably/realtime/client/incoming_message_dispatcher.rb +++ b/lib/ably/realtime/client/incoming_message_dispatcher.rb @@ -43,19 +43,24 @@ def dispatch_protocol_message(*args) raise ArgumentError, "Expected a ProtocolMessage. Received #{protocol_message}" end - unless protocol_message.action.match_any?(:nack, :error) - logger.debug { "#{protocol_message.action} received: #{protocol_message}" } + # RTL15b + if protocol_message.has_channel_serial? && + ( + protocol_message.action == :message || + protocol_message.action == :presence || + protocol_message.action == :attached + ) + get_channel(protocol_message.channel).tap do |channel| + logger.info "Setting channel serial for channel #{channel.name}, " << + "Previous: #{channel.properties.channel_serial}, New: #{protocol_message.channel_serial}" + channel.properties.channel_serial = protocol_message.channel_serial + end end - if protocol_message.action.match_any?(:sync, :presence, :message) - if connection.serial && protocol_message.has_connection_serial? && protocol_message.connection_serial <= connection.serial - error_message = "Protocol error, duplicate message received for serial #{protocol_message.connection_serial}" - logger.error error_message - return - end + unless protocol_message.action.match_any?(:nack, :error) + logger.debug { "#{protocol_message.action} received: #{protocol_message}" } end - update_connection_recovery_info protocol_message connection.set_connection_confirmed_alive case protocol_message.action @@ -172,10 +177,6 @@ def process_connected_update_message(protocol_message) end end - def update_connection_recovery_info(protocol_message) - connection.update_connection_serial protocol_message.connection_serial if protocol_message.has_connection_serial? - end - def ack_pending_queue_for_message_serial(ack_protocol_message) drop_pending_queue_from_ack(ack_protocol_message) do |protocol_message| ack_messages protocol_message.messages diff --git a/lib/ably/realtime/connection.rb b/lib/ably/realtime/connection.rb index 195abe383..6295ef09e 100644 --- a/lib/ably/realtime/connection.rb +++ b/lib/ably/realtime/connection.rb @@ -9,6 +9,8 @@ class Connection include Ably::Modules::Conversions include Ably::Modules::SafeYield extend Ably::Modules::Enum + using Ably::Util::AblyExtensions + # The current {Ably::Realtime::Connection::STATE} of the connection. # Describes the realtime [Connection]{@link Connection} object states. @@ -77,9 +79,6 @@ class Connection include Ably::Modules::UsesStateMachine ensure_state_machine_emits 'Ably::Models::ConnectionStateChange' - # Expected format for a connection recover key - RECOVER_REGEX = /^(?[^:]+):(?[^:]+):(?\-?\d+)$/ - # Defaults for automatic connection recovery and timeouts DEFAULTS = { channel_retry_timeout: 15, # when a channel becomes SUSPENDED, after this delay in seconds, the channel will automatically attempt to reattach if the connection is CONNECTED @@ -113,16 +112,6 @@ class Connection # attr_reader :key - # The serial number of the last message to be received on this connection, used automatically by the library when - # recovering or resuming a connection. When recovering a connection explicitly, the recoveryKey is used in - # the recover client options as it contains both the key and the last message serial. - # - # @spec RTN10 - # - # @return [Integer] - # - attr_reader :serial - # An {Ably::Models::ErrorInfo} object describing the last error received if a connection failure occurs. # # @spec RTN14a @@ -177,17 +166,6 @@ def initialize(client, options) end if options.kind_of?(Hash) @defaults.freeze - # If a recover client options is provided, then we need to ensure that the msgSerial matches the - # recover serial immediately at client library instantiation. This is done immediately so that any queued - # publishes use the correct serial number for these queued messages as well. - # There is no harm if the msgSerial is higher than expected if the recover fails. - recovery_msg_serial = connection_recover_parts && connection_recover_parts[:msg_serial].to_i - if recovery_msg_serial - @client_msg_serial = recovery_msg_serial - else - reset_client_msg_serial - end - Client::IncomingMessageDispatcher.new client, self Client::OutgoingMessageDispatcher.new client, self @@ -196,6 +174,8 @@ def initialize(client, options) @manager = ConnectionManager.new(self) @current_host = client.endpoint.host + + reset_client_msg_serial end # Causes the connection to close, entering the {Ably::Realtime::Connection::STATE} CLOSING state. @@ -347,33 +327,40 @@ def internet_up? end end - # The recovery key string can be used by another client to recover this connection's state in the recover client options property. See connection state recover options for more information. + # The recovery key string can be used by another client to recover this connection's state in the + # recover client options property. See connection state recover options for more information. # # @spec RTN16b, RTN16c # - # @return [String] + # @deprecated Use {#create_recovery_key} instead # def recovery_key - "#{key}:#{serial}:#{client_msg_serial}" if connection_resumable? + logger.warn "[DEPRECATION] recovery_key is deprecated, use create_recovery_key method instead" + create_recovery_key + end + + # The recovery key string can be used by another client to recover this connection's state in the recover client + # options property. See connection state recover options for more information. + # + # @spec RTN16g, RTN16c + # + # @return [String] a json string which incorporates the @connectionKey@, the current @msgSerial@ and collection + # of pairs of channel @name@ and current @channelSerial@ for every currently attached channel + def create_recovery_key + if key.nil_or_empty? || state == :closing || state == :closed || state == :failed || state == :suspended + return nil #RTN16g2 + end + RecoveryKeyContext.new(key, client_msg_serial, client.channels.get_channel_serials).to_json end # Following a new connection being made, the connection ID, connection key - # and connection serial need to match the details provided by the server. + # need to match the details provided by the server. # # @return [void] # @api private - def configure_new(connection_id, connection_key, connection_serial) + def configure_new(connection_id, connection_key) @id = connection_id @key = connection_key - - update_connection_serial connection_serial - end - - # Store last received connection serial so that the connection can be resumed from the last known point-in-time - # @return [void] - # @api private - def update_connection_serial(connection_serial) - @serial = connection_serial end # Disable automatic resume of a connection @@ -381,7 +368,7 @@ def update_connection_serial(connection_serial) # @api private def reset_resume_info @key = nil - @serial = nil + @id = nil end # @!attribute [r] __outgoing_protocol_msgbus__ @@ -444,17 +431,28 @@ def logger # @api private def send_protocol_message(protocol_message) add_message_serial_if_ack_required_to(protocol_message) do - Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |message| - add_message_to_outgoing_queue message - notify_message_dispatcher_of_new_message message - logger.debug { "Connection: Prot msg queued =>: #{message.action} #{message}" } - end + message = Ably::Models::ProtocolMessage.new(protocol_message, logger: logger) + add_message_to_outgoing_queue(message) + notify_message_dispatcher_of_new_message message end end + def send_protocol_message_immediately(protocol_message) + message = Ably::Models::ProtocolMessage.new(protocol_message, logger: logger) + add_message_to_outgoing_queue(message, true) + notify_message_dispatcher_of_new_message message + end + # @api private - def add_message_to_outgoing_queue(protocol_message) - __outgoing_message_queue__ << protocol_message + def add_message_to_outgoing_queue(protocol_message, send_immediately = false) + if send_immediately + # Adding msg at the top of the queue to get processed immediately while connection is CONNECTED + __outgoing_message_queue__.prepend(protocol_message) + logger.debug { "Connection: protocol msg pushed at the top =>: #{protocol_message.action} #{protocol_message}" } + else + __outgoing_message_queue__ << protocol_message + logger.debug { "Connection: protocol msg queued =>: #{protocol_message.action} #{protocol_message}" } + end end # @api private @@ -472,7 +470,7 @@ def create_websocket_transport url_params = auth_params.merge( 'format' => client.protocol, 'echo' => client.echo_messages, - 'v' => Ably::PROTOCOL_VERSION, + 'v' => Ably::PROTOCOL_VERSION, # RSC7a 'agent' => client.rest_client.agent ) @@ -482,18 +480,19 @@ def create_websocket_transport else 'false' end - - url_params['clientId'] = client.auth.client_id if client.auth.has_client_id? + # RSA7e1 + url_params['clientId'] = client.auth.client_id_for_request_sync if client.auth.client_id_for_request_sync url_params.merge!(client.transport_params) - if connection_resumable? - url_params.merge! resume: key, connection_serial: serial - logger.debug { "Resuming connection key #{key} with serial #{serial}" } - elsif connection_recoverable? - url_params.merge! recover: connection_recover_parts[:recover], connectionSerial: connection_recover_parts[:connection_serial] - logger.debug { "Recovering connection with key #{client.recover}" } - unsafe_once(:connected, :closed, :failed) do - client.disable_automatic_connection_recovery + if !key.nil_or_empty? and connection_state_available? + url_params.merge! resume: key + logger.debug { "Resuming connection with key #{key}" } + elsif !client.recover.nil_or_empty? + recovery_context = RecoveryKeyContext.from_json(client.recover, logger) + unless recovery_context.nil? + key = recovery_context.connection_key + logger.debug { "Recovering connection with key #{key}" } + url_params.merge! recover: key end end @@ -544,24 +543,6 @@ def set_connection_details(connection_details) @details = connection_details end - # Executes registered callbacks for a successful connection resume event - # @api private - def trigger_resumed - resume_callbacks.each(&:call) - end - - # Provides a simple hook to inject a callback when a connection is successfully resumed - # @api private - def on_resume(&callback) - resume_callbacks << callback - end - - # Remove a registered connection resume callback - # @api private - def off_resume(&callback) - resume_callbacks.delete(callback) - end - # Returns false if messages cannot be published as a result of message queueing being disabled # @api private def can_publish_messages? @@ -600,6 +581,12 @@ def reset_client_msg_serial @client_msg_serial = -1 end + # Sets the client message serial from recover clientOption. + # @api private + def set_msg_serial_from_recover=(value) + @client_msg_serial = value + end + # When a hearbeat or any other message from Ably is received # we know it's alive, see #RTN23 # @api private @@ -620,20 +607,12 @@ def time_since_connection_confirmed_alive? private # The client message serial (msgSerial) is incremented for every message that is published that requires an ACK. - # Note that this is different to the connection serial that contains the last known serial number - # received from the server. - # # A message serial number does not guarantee a message has been received, only sent. - # A connection serial guarantees the server has received the message and is thus used for connection recovery and resumes. # @return [Integer] starting at -1 indicating no messages sent, 0 when the first message is sent def client_msg_serial @client_msg_serial end - def resume_callbacks - @resume_callbacks ||= [] - end - def create_pub_sub_message_bus Ably::Util::PubSub.new( coerce_into: lambda do |event| @@ -665,10 +644,6 @@ def when_initialized EventMachine.next_tick { yield } end - def connection_resumable? - !key.nil? && !serial.nil? && connection_state_available? - end - def connection_state_available? return true if connected? @@ -682,14 +657,6 @@ def connection_state_available? end end - def connection_recoverable? - connection_recover_parts - end - - def connection_recover_parts - client.recover.to_s.match(RECOVER_REGEX) - end - def production? client.environment.nil? || client.environment == :production end @@ -740,3 +707,4 @@ def second_reconnect_attempt_for(state, first_attempt_count) require 'ably/realtime/connection/connection_manager' require 'ably/realtime/connection/connection_state_machine' require 'ably/realtime/connection/websocket_transport' +require 'ably/realtime/recovery_key_context' diff --git a/lib/ably/realtime/connection/connection_manager.rb b/lib/ably/realtime/connection/connection_manager.rb index 8d514b9f9..5c2d2ab8b 100644 --- a/lib/ably/realtime/connection/connection_manager.rb +++ b/lib/ably/realtime/connection/connection_manager.rb @@ -14,12 +14,14 @@ class ConnectionManager RESOLVABLE_ERROR_CODES = { token_expired: Ably::Exceptions::TOKEN_EXPIRED_CODE } + using Ably::Util::AblyExtensions def initialize(connection) @connection = connection @timers = Hash.new { |hash, key| hash[key] = [] } - connection.unsafe_on(:closed) do + # RTN8c, RTN9c + connection.unsafe_on(:closing, :closed, :suspended, :failed) do connection.reset_resume_info end @@ -111,23 +113,28 @@ def connected(protocol_message) # Update the connection details and any associated defaults connection.set_connection_details protocol_message.connection_details + is_connection_resume_or_recover_attempt = !connection.key.nil_or_empty? || !client.recover.nil_or_empty? + + # RTN15c7, RTN16d + failed_resume_or_recover = !protocol_message.connection_id == connection.id && !protocol_message.error.nil? + if is_connection_resume_or_recover_attempt and failed_resume_or_recover # RTN15c7 + connection.reset_client_msg_serial + end + client.disable_automatic_connection_recovery # RTN16k, explicitly setting null, so it won't be used for subsequent connection requests + if connection.key if protocol_message.connection_id == connection.id logger.debug { "ConnectionManager: Connection resumed successfully - ID #{connection.id} and key #{connection.key}" } - EventMachine.next_tick { connection.trigger_resumed } resend_pending_message_ack_queue else - logger.debug { "ConnectionManager: Connection was not resumed, old connection ID #{connection.id} has been updated with new connection ID #{protocol_message.connection_id} and key #{protocol_message.connection_details.connection_key}" } nack_messages_on_all_channels protocol_message.error - force_reattach_on_channels protocol_message.error end else logger.debug { "ConnectionManager: New connection created with ID #{protocol_message.connection_id} and key #{protocol_message.connection_details.connection_key}" } end - reattach_suspended_channels protocol_message.error - - connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key, protocol_message.connection_serial + connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key + force_reattach_on_channels protocol_message.error # irrespective of connection success/failure, reattach channels end # When connection is CONNECTED and receives an update @@ -139,7 +146,7 @@ def connected_update(protocol_message) # Update the connection details and any associated defaults connection.set_connection_details protocol_message.connection_details - connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key, protocol_message.connection_serial + connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key state_change = Ably::Models::ConnectionStateChange.new( current: connection.state, @@ -281,7 +288,7 @@ def retry_count_for_state(state) # Any message sent before an ACK/NACK was received on the previous transport # need to be resent to the Ably service so that a subsequent ACK/NACK is received. # It is up to Ably to ensure that duplicate messages are not retransmitted on the channel - # base on the serial numbers + # based on the message serial numbers # # @api private def resend_pending_message_ack_queue @@ -568,22 +575,14 @@ def currently_renewing_token? client.auth.authorization_in_flight? end - def reattach_suspended_channels(error) - channels.select do |channel| - channel.suspended? - end.each do |channel| - channel.transition_state_machine :attaching - end - end - - # When continuity on a connection is lost all messages - # Channels in the ATTACHED or ATTACHING state should explicitly be re-attached - # by sending a new ATTACH to Ably + # When reconnected if channel is in ATTACHING, ATTACHED or SUSPENDED state + # it should explicitly be re-attached by sending a new ATTACH to Ably + # Spec : RTN15c6, RTN15c7 def force_reattach_on_channels(error) channels.select do |channel| - channel.attached? || channel.attaching? + channel.attached? || channel.attaching? || channel.suspended? end.each do |channel| - channel.manager.request_reattach reason: error + channel.manager.request_reattach error end end diff --git a/lib/ably/realtime/presence.rb b/lib/ably/realtime/presence.rb index 0c4fcd3cd..5d66b7e99 100644 --- a/lib/ably/realtime/presence.rb +++ b/lib/ably/realtime/presence.rb @@ -110,6 +110,14 @@ def enter_client(client_id, data = nil, &success_block) send_presence_action_for_client(Ably::Models::PresenceMessage::ACTION.Enter, client_id, data, &success_block) end + # @api private + def enter_client_with_id(id, client_id, data = nil, &success_block) + ensure_supported_client_id client_id + ensure_supported_payload data + + send_presence_action_for_client(Ably::Models::PresenceMessage::ACTION.Enter, client_id, data, id, &success_block) + end + # Leave this client from this channel. This client will be removed from the presence # set and presence subscribers will see a leave message for this client. # @@ -338,8 +346,8 @@ def sync_complete? private # @return [Ably::Models::PresenceMessage] presence message is returned allowing callbacks to be added - def send_presence_protocol_message(presence_action, client_id, data) - presence_message = create_presence_message(presence_action, client_id, data) + def send_presence_protocol_message(presence_action, client_id, data, id = nil) + presence_message = create_presence_message(presence_action, client_id, data, id) unless presence_message.client_id raise Ably::Exceptions::Standard.new('Unable to enter create presence message without a client_id', 400, Ably::Exceptions::Codes::UNABLE_TO_ENTER_PRESENCE_CHANNEL_NO_CLIENTID) end @@ -355,12 +363,13 @@ def send_presence_protocol_message(presence_action, client_id, data) presence_message end - def create_presence_message(action, client_id, data) + def create_presence_message(action, client_id, data, id = nil) model = { action: Ably::Models::PresenceMessage.ACTION(action).to_i, clientId: client_id, - data: data + data: data, } + model[:id] = id unless id.nil? Ably::Models::PresenceMessage.new(model, logger: logger).tap do |presence_message| presence_message.encode(client.encoders, channel.options) do |encode_error, error_message| @@ -433,13 +442,13 @@ def deferrable_fail(deferrable, *args, &block) deferrable end - def send_presence_action_for_client(action, client_id, data, &success_block) + def send_presence_action_for_client(action, client_id, data, id = nil, &success_block) requirements_failed_deferrable = ensure_presence_publishable_on_connection_deferrable return requirements_failed_deferrable if requirements_failed_deferrable deferrable = create_deferrable ensure_channel_attached(deferrable) do - send_presence_protocol_message(action, client_id, data).tap do |protocol_message| + send_presence_protocol_message(action, client_id, data, id).tap do |protocol_message| protocol_message.callback { |message| deferrable_succeed deferrable, &success_block } protocol_message.errback { |error| deferrable_fail deferrable, error } end diff --git a/lib/ably/realtime/presence/members_map.rb b/lib/ably/realtime/presence/members_map.rb index ef2d8df32..8183db526 100644 --- a/lib/ably/realtime/presence/members_map.rb +++ b/lib/ably/realtime/presence/members_map.rb @@ -23,7 +23,7 @@ class MembersMap :sync_starting, # Indicates the client is waiting for SYNC ProtocolMessages from Ably :sync_none, # Indicates the ATTACHED ProtocolMessage had no presence flag and thus no members on the channel :finalizing_sync, - :in_sync, + :sync_complete, # Indicates completion of server initiated sync :failed ) include Ably::Modules::StateEmitter @@ -49,16 +49,6 @@ def initialize(presence) setup_event_handlers end - # When attaching to a channel that has members present, the server - # initiates a sync automatically so that the client has a complete list of members. - # - # Until this sync is complete, this method returns false - # - # @return [Boolean] - def sync_complete? - in_sync? - end - # Update the SYNC serial from the ProtocolMessage so that SYNC can be resumed. # If the serial is nil, or the part after the first : is empty, then the SYNC is complete # @@ -110,27 +100,27 @@ def get(options = {}, &block) # Must be defined before subsequent procs reference this callback reset_callbacks = nil - in_sync_callback = lambda do + sync_complete_callback = lambda do reset_callbacks.call if reset_callbacks result_block.call end - failed_callback = lambda do |error| + sync_failed_callback = lambda do |error| reset_callbacks.call if reset_callbacks deferrable.fail error end reset_callbacks = lambda do - off(&in_sync_callback) - off(&failed_callback) - channel.off(&failed_callback) + off(&sync_complete_callback) + off(&sync_failed_callback) + channel.off(&sync_failed_callback) end - unsafe_once(:in_sync, &in_sync_callback) - unsafe_once(:failed, &failed_callback) + unsafe_once(:sync_complete, &sync_complete_callback) + unsafe_once(:failed, &sync_failed_callback) channel.unsafe_once(:detaching, :detached, :failed) do |error_reason| - failed_callback.call error_reason + sync_failed_callback.call error_reason end end @@ -156,12 +146,33 @@ def each(&block) # and thus the responsibility of this library to re-enter on the channel automatically if the # channel loses continuity # - # @return [Array] + # @return [Hash] # @api private def local_members @local_members end + def enter_local_members + local_members.values.each do |member| + logger.debug { "#{self.class.name}: Manually re-entering local presence member, client ID: #{member.client_id} with data: #{member.data}" } + presence.enter_client_with_id(member.id, member.client_id, member.data).tap do |deferrable| + deferrable.errback do |error| + re_enter_error = Ably::Models::ErrorInfo.new( + message: "unable to automatically re-enter presence channel for client_id '#{member.client_id}'. Source error code #{error.code} and message '#{error.message}'", + code: Ably::Exceptions::Codes::UNABLE_TO_AUTOMATICALLY_REENTER_PRESENCE_CHANNEL + ) + channel.emit :update, Ably::Models::ChannelStateChange.new( + current: channel.state, + previous: channel.state, + event: Ably::Realtime::Channel::EVENT(:update), + reason: re_enter_error, + resumed: true + ) + end + end + end + end + private attr_reader :sync_session_id @@ -213,23 +224,14 @@ def setup_event_handlers update_members_and_emit_events presence_message end + # RTP5a channel.unsafe_on(:failed, :detached) do reset_members reset_local_members end - resume_sync_proc = method(:resume_sync).to_proc - unsafe_on(:sync_starting) do @sync_session_id += 1 - - channel.unsafe_once(:attached) do - connection.on_resume(&resume_sync_proc) - end - - unsafe_once(:in_sync, :failed) do - connection.off_resume(&resume_sync_proc) - end end unsafe_on(:sync_none) do @@ -240,61 +242,9 @@ def setup_event_handlers unsafe_on(:finalizing_sync) do clean_up_absent_members - clean_up_members_not_present_in_sync - change_state :in_sync + clean_up_members_not_present_after_sync + change_state :sync_complete end - - unsafe_on(:in_sync) do - update_local_member_state - end - end - - # Listen for events that change the PresenceMap state and thus - # need to be replicated to the local member set - def update_local_member_state - new_local_members = members.select do |member_key, member| - member.fetch(:message).connection_id == connection.id - end.each_with_object({}) do |(member_key, member), hash_object| - hash_object[member_key] = member.fetch(:message) - end - - @local_members.reject do |member_key, message| - new_local_members.keys.include?(member_key) - end.each do |member_key, message| - re_enter_local_member_missing_from_presence_map message - end - - @local_members = new_local_members - end - - def re_enter_local_member_missing_from_presence_map(presence_message) - local_client_id = presence_message.client_id || client.auth.client_id - logger.debug { "#{self.class.name}: Manually re-entering local presence member, client ID: #{local_client_id} with data: #{presence_message.data}" } - presence.enter_client(local_client_id, presence_message.data).tap do |deferrable| - deferrable.errback do |error| - presence_message_client_id = presence_message.client_id || client.auth.client_id - re_enter_error = Ably::Models::ErrorInfo.new( - message: "unable to automatically re-enter presence channel for client_id '#{presence_message_client_id}'. Source error code #{error.code} and message '#{error.message}'", - code: Ably::Exceptions::Codes::UNABLE_TO_AUTOMATICALLY_REENTER_PRESENCE_CHANNEL - ) - channel.emit :update, Ably::Models::ChannelStateChange.new( - current: channel.state, - previous: channel.state, - event: Ably::Realtime::Channel::EVENT(:update), - reason: re_enter_error, - resumed: true - ) - end - end - end - - # Trigger a manual SYNC operation to resume member synchronisation from last known cursor position - def resume_sync - connection.send_protocol_message( - action: Ably::Models::ProtocolMessage::ACTION.Sync.to_i, - channel: channel.name, - channel_serial: sync_serial - ) if channel.attached? end def update_members_and_emit_events(presence_message) @@ -375,7 +325,7 @@ def add_presence_member(presence_message) def remove_presence_member(presence_message) logger.debug { "#{self.class.name}: Member '#{presence_message.member_key}' removed.\n#{presence_message.to_json}" } - if in_sync? + if sync_complete? member_set_delete presence_message else member_set_upsert presence_message, false @@ -394,17 +344,16 @@ def touch_presence_member(presence_message) def member_set_upsert(presence_message, present) members[presence_message.member_key] = { present: present, message: presence_message, sync_session_id: sync_session_id } if presence_message.connection_id == connection.id - local_members[presence_message.member_key] = presence_message - logger.debug { "#{self.class.name}: Local member '#{presence_message.member_key}' added" } + local_members[presence_message.client_id] = presence_message + logger.debug { "#{self.class.name}: Local member '#{presence_message.client_id}' added" } end end def member_set_delete(presence_message) members.delete presence_message.member_key - if in_sync? - # If not in SYNC, then local members missing may need to be re-entered - # Let #update_local_member_state handle missing members - local_members.delete presence_message.member_key + if sync_complete? and presence_message.connection_id == connection.id + local_members.delete presence_message.client_id + logger.debug { "#{self.class.name}: Local member '#{presence_message.client_id}' deleted" } end end @@ -431,7 +380,7 @@ def clean_up_absent_members end end - def clean_up_members_not_present_in_sync + def clean_up_members_not_present_after_sync members.select do |member_key, member| member.fetch(:sync_session_id) != sync_session_id end.each do |member_key, member| diff --git a/lib/ably/realtime/presence/presence_manager.rb b/lib/ably/realtime/presence/presence_manager.rb index 54c27f9bf..3a32a5e0e 100644 --- a/lib/ably/realtime/presence/presence_manager.rb +++ b/lib/ably/realtime/presence/presence_manager.rb @@ -19,13 +19,19 @@ def initialize(presence) setup_channel_event_handlers end - # Expect SYNC ProtocolMessages from the server with a list of current members on this channel - # - # @return [void] - # # @api private - def sync_expected - presence.members.change_state :sync_starting + def on_attach(has_presence_flag) + # RTP1 + if has_presence_flag + # Expect SYNC ProtocolMessages from the server with a list of current members on this channel + presence.members.change_state :sync_starting + else + # There server has indicated that there are no SYNC ProtocolMessages to come because + # there are no members on this channel + logger.debug { "#{self.class.name}: Emitting leave events for all members as a SYNC is not expected and thus there are no members on the channel" } + presence.members.change_state :sync_none + end + presence.members.enter_local_members # RTP17f end # Process presence messages from SYNC messages. Sync can be server-initiated or triggered following ATTACH @@ -47,17 +53,6 @@ def sync_process_messages(serial, presence_messages) presence.members.change_state :finalizing_sync if presence.members.sync_serial_cursor_at_end? end - # There server has indicated that there are no SYNC ProtocolMessages to come because - # there are no members on this channel - # - # @return [void] - # - # @api private - def sync_not_expected - logger.debug { "#{self.class.name}: Emitting leave events for all members as a SYNC is not expected and thus there are no members on the channel" } - presence.members.change_state :sync_none - end - private def_delegators :presence, :members, :channel diff --git a/lib/ably/realtime/recovery_key_context.rb b/lib/ably/realtime/recovery_key_context.rb new file mode 100644 index 000000000..30ec3bc2e --- /dev/null +++ b/lib/ably/realtime/recovery_key_context.rb @@ -0,0 +1,36 @@ +require 'json' +# frozen_string_literal: true + +module Ably + module Realtime + class RecoveryKeyContext + attr_reader :connection_key + attr_reader :msg_serial + attr_reader :channel_serials + + def initialize(connection_key, msg_serial, channel_serials) + @connection_key = connection_key + @msg_serial = msg_serial + @channel_serials = channel_serials + if @channel_serials.nil? + @channel_serials = {} + end + end + + def to_json + { 'connection_key' => @connection_key, 'msg_serial' => @msg_serial, 'channel_serials' => @channel_serials }.to_json + end + + def self.from_json(obj, logger = nil) + begin + data = JSON.load obj + self.new data['connection_key'], data['msg_serial'], data['channel_serials'] + rescue => e + logger.warn "unable to decode recovery key, found error #{e}" unless logger.nil? + return nil + end + end + + end + end +end diff --git a/lib/ably/rest/client.rb b/lib/ably/rest/client.rb index 235bd81f0..3dbda2dfb 100644 --- a/lib/ably/rest/client.rb +++ b/lib/ably/rest/client.rb @@ -16,6 +16,7 @@ class Client include Ably::Modules::Conversions include Ably::Modules::HttpHelpers extend Forwardable + using Ably::Util::AblyExtensions # Default Ably domain for REST DOMAIN = 'rest.ably.io' @@ -186,7 +187,7 @@ def initialize(options) @agent = options.delete(:agent) || Ably::AGENT @realtime_client = options.delete(:realtime_client) - @tls = options.delete(:tls) == false ? false : true + @tls = options.delete_with_default(:tls, true) @environment = options.delete(:environment) # nil is production @environment = nil if [:production, 'production'].include?(@environment) @protocol = options.delete(:protocol) || :msgpack @@ -200,10 +201,7 @@ def initialize(options) @log_retries_as_info = options.delete(:log_retries_as_info) @max_message_size = options.delete(:max_message_size) || MAX_MESSAGE_SIZE @max_frame_size = options.delete(:max_frame_size) || MAX_FRAME_SIZE - - if (@idempotent_rest_publishing = options.delete(:idempotent_rest_publishing)).nil? - @idempotent_rest_publishing = Ably::PROTOCOL_VERSION.to_f > 1.1 - end + @idempotent_rest_publishing = options.delete_with_default(:idempotent_rest_publishing, true) if options[:fallback_hosts_use_default] && options[:fallback_hosts] raise ArgumentError, "fallback_hosts_use_default cannot be set to try when fallback_hosts is also provided" @@ -599,7 +597,7 @@ def send_request(method, path, params, options) end unless options[:send_auth_header] == false request.headers[:authorization] = auth.auth_header - + # RSA7e2 options[:headers].to_h.merge(auth.extra_auth_headers).map do |key, val| request.headers[key] = val end diff --git a/lib/ably/util/ably_extensions.rb b/lib/ably/util/ably_extensions.rb new file mode 100644 index 000000000..c4304eb8a --- /dev/null +++ b/lib/ably/util/ably_extensions.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module Ably::Util + module AblyExtensions + refine Object do + def nil_or_empty? + self.nil? || self.empty? + end + end + + refine Hash do + def fetch_with_default(key, default) + value = self.fetch(key, default) + if value.nil? + return default + end + return value + end + + def delete_with_default(key, default) + value = self.delete(key) + if value.nil? + return default + end + return value + end + end + end +end diff --git a/lib/ably/util/safe_deferrable.rb b/lib/ably/util/safe_deferrable.rb index 05b13c1cd..36cdd1cf8 100644 --- a/lib/ably/util/safe_deferrable.rb +++ b/lib/ably/util/safe_deferrable.rb @@ -1,6 +1,6 @@ module Ably::Util # SafeDeferrable class provides a Deferrable that is safe to use for for public interfaces - # of this client library. Any exceptions raised in the success or failure callbacks are + # of this client library. Any exceptions raised in the success or failure callbacks are # caught and logged to the provided logger. # # An exception in a callback provided by a developer should not break this client library diff --git a/lib/ably/version.rb b/lib/ably/version.rb index c0265f5c9..c0144ed69 100644 --- a/lib/ably/version.rb +++ b/lib/ably/version.rb @@ -1,9 +1,7 @@ module Ably VERSION = '1.2.5' - PROTOCOL_VERSION = '1.2' - - # @api private - def self.major_minor_version_numeric - VERSION.gsub(/\.\d+$/, '').to_f - end + # The level of compatibility with the Ably service that this SDK supports. + # Also referred to as the 'wire protocol version'. + # spec : CSV2 + PROTOCOL_VERSION = '2' end diff --git a/spec/acceptance/realtime/channel_history_spec.rb b/spec/acceptance/realtime/channel_history_spec.rb index a9caf7fc6..56f1b4a00 100644 --- a/spec/acceptance/realtime/channel_history_spec.rb +++ b/spec/acceptance/realtime/channel_history_spec.rb @@ -184,26 +184,22 @@ def ensure_message_history_direction_and_paging_is_correct(direction) end context 'when channel receives update event after an attachment' do + old_attach_serial = "" + new_attach_serial = "xxxx-xxxx-1" before do channel.on(:attached) do - channel.publish(event, message_after_attach) do - subsequent_serial = channel.properties.attach_serial.dup.tap { |serial| serial[-1] = '1' } - attached_message = Ably::Models::ProtocolMessage.new(action: 11, channel: channel_name, flags: 0, channel_serial: subsequent_serial) - client.connection.__incoming_protocol_msgbus__.publish :protocol_message, attached_message - end + old_attach_serial = channel.properties.attach_serial + attached_message = Ably::Models::ProtocolMessage.new(action: 11, channel: channel_name, flags: 0, channel_serial: new_attach_serial) + client.connection.__incoming_protocol_msgbus__.publish :protocol_message, attached_message end end - xit 'updates attach_serial' do - rest_channel.publish event, message_before_attach - + it 'updates attach_serial' do channel.on(:update) do - channel.history(until_attach: true) do |messages| - expect(messages.items.count).to eql(2) - stop_reactor - end + expect(channel.properties.attach_serial).not_to eq(old_attach_serial) + expect(channel.properties.attach_serial).to eq(new_attach_serial) + stop_reactor end - channel.attach end end diff --git a/spec/acceptance/realtime/channel_spec.rb b/spec/acceptance/realtime/channel_spec.rb index d2c4250e1..4286c6a91 100644 --- a/spec/acceptance/realtime/channel_spec.rb +++ b/spec/acceptance/realtime/channel_spec.rb @@ -342,8 +342,10 @@ def disconnect_transport channel.attach end - channel.attach do - channel.detach + client.connection.once :connected do + channel.attach do + channel.detach + end end end end @@ -430,9 +432,35 @@ def disconnect_transport end context 'with connection state' do + + sent_attach_messages = [] + received_attached_messages = [] + before(:each) do + sent_attach_messages = [] + received_attached_messages = [] + client.connection.__outgoing_protocol_msgbus__.subscribe do |message| + if message.action == :attach + sent_attach_messages << message + end + end + client.connection.__incoming_protocol_msgbus__.subscribe do |message| + if message.action == :attached + received_attached_messages << message + end + end + end + + # Should send/receive attach/attached message only once + # No duplicates should be sent or received + let(:check_for_attach_messages) do + expect(sent_attach_messages.size).to eq(1) + expect(received_attached_messages.size).to eq(1) + end + it 'is initialized (#RTL4i)' do expect(connection).to be_initialized channel.attach do + check_for_attach_messages stop_reactor end end @@ -440,6 +468,7 @@ def disconnect_transport it 'is connecting (#RTL4i)' do connection.once(:connecting) do channel.attach do + check_for_attach_messages stop_reactor end end @@ -449,6 +478,7 @@ def disconnect_transport connection.once(:connected) do connection.once(:disconnected) do channel.attach do + check_for_attach_messages stop_reactor end end @@ -467,7 +497,9 @@ def disconnect_transport stop_reactor end - channel.attach + client.connection.once :connected do + channel.attach + end end end @@ -488,7 +520,9 @@ def disconnect_transport channel.detach end - channel.attach + client.connection.once :connected do + channel.attach + end end end end @@ -497,23 +531,29 @@ def disconnect_transport describe '#detach' do context 'when state is :attached' do it 'it detaches from a channel (#RTL5d)' do - channel.attach do + channel.once :attached do channel.detach channel.on(:detached) do expect(channel.state).to eq(:detached) stop_reactor end end + connection.once :connected do + channel.attach + end end it 'detaches from a channel and calls the provided block (#RTL5d, #RTL5e)' do - channel.attach do + channel.once :attached do expect(channel.state).to eq(:attached) channel.detach do expect(channel.state).to eq(:detached) stop_reactor end end + connection.once :connected do + channel.attach + end end it 'emits :detaching then :detached events' do @@ -523,26 +563,34 @@ def disconnect_transport end end - channel.attach do - channel.detach + connection.once :connected do + channel.attach do + channel.detach + end end end it 'returns a SafeDeferrable that catches exceptions in callbacks and logs them' do - channel.attach do + channel.once :attached do expect(channel.detach).to be_a(Ably::Util::SafeDeferrable) stop_reactor end + connection.once :connected do + channel.attach + end end it 'calls the Deferrable callback on success' do - channel.attach do + channel.once :attached do channel.detach.callback do expect(channel).to be_a(Ably::Realtime::Channel) expect(channel.state).to eq(:detached) stop_reactor end end + connection.once :connected do + channel.attach + end end context 'and DETACHED message is not received within realtime request timeout' do @@ -551,7 +599,6 @@ def disconnect_transport it 'fails the deferrable and returns to the previous state (#RTL5f, #RTL5e)' do channel.attach do - # don't process any incoming ProtocolMessages so the channel never becomes detached connection.__incoming_protocol_msgbus__.unsubscribe detached_requested_at = Time.now.to_i channel.detach do @@ -566,6 +613,7 @@ def disconnect_transport end end + context 'when state is :failed' do let(:client_options) { default_options.merge(log_level: :fatal) } @@ -609,15 +657,17 @@ def disconnect_transport channel.detach end - channel.attach do - channel.detach + connection.once :connected do + channel.attach do + channel.detach + end end end end context 'when state is :suspended' do it 'moves the channel state immediately to DETACHED state (#RTL5j)' do - channel.attach do + channel.once :attached do channel.once(:suspended) do channel.on do |channel_state_change| expect(channel_state_change.current).to eq(:detached) @@ -632,6 +682,9 @@ def disconnect_transport end channel.transition_state_machine :suspended end + connection.once :connected do + channel.attach + end end end @@ -655,7 +708,7 @@ def disconnect_transport context 'when state is :detached' do it 'does nothing as the channel is detached (#RTL5a)' do - channel.attach do + channel.once :attached do channel.detach do expect(channel).to be_detached channel.on do @@ -666,6 +719,9 @@ def disconnect_transport end end end + connection.once :connected do + channel.attach + end end end @@ -735,8 +791,10 @@ def disconnect_transport context 'initialized' do it 'does the detach operation once the connection state is connected (#RTL5h)' do expect(connection).to be_initialized + channel.on :attaching do + channel.detach + end channel.attach - channel.detach connection.once(:connected) do channel.once(:attached) do channel.once(:detached) do @@ -750,8 +808,10 @@ def disconnect_transport context 'connecting' do it 'does the detach operation once the connection state is connected (#RTL5h)' do connection.once(:connecting) do + channel.on :attaching do + channel.detach + end channel.attach - channel.detach connection.once(:connected) do channel.once(:attached) do channel.once(:detached) do @@ -770,8 +830,10 @@ def disconnect_transport it 'does the detach operation once the connection state is connected (#RTL5h)' do connection.once(:connected) do connection.once(:disconnected) do + channel.on :attaching do + channel.detach + end channel.attach - channel.detach connection.once(:connected) do channel.once(:attached) do channel.once(:detached) do @@ -877,84 +939,92 @@ def disconnect_transport describe '#(RTL17)' do context 'when channel is initialized' do it 'sends messages only on attach' do - expect(channel).to be_initialized - channel.publish('event', payload) + connection.once :connected do + expect(channel).to be_initialized + channel.publish('event', payload) - channel.subscribe do |message| - stop_reactor if message.data == payload && channel.attached? - end + channel.subscribe do |message| + stop_reactor if message.data == payload && channel.attached? + end - channel.attach + channel.attach + end end end context 'when channel is attaching' do it 'sends messages only on attach' do - channel.publish('event', payload) + connection.once :connected do + channel.publish('event', payload) - sent_message = nil - channel.subscribe do |message| - return if message.data != payload - sent_message = message + sent_message = nil + channel.subscribe do |message| + return if message.data != payload + sent_message = message - stop_reactor if channel.attached? - end + stop_reactor if channel.attached? + end - channel.on(:attaching) do - expect(channel).to be_attaching - expect(sent_message).to be_nil - end + channel.on(:attaching) do + expect(channel).to be_attaching + expect(sent_message).to be_nil + end - channel.attach + channel.attach + end end end context 'when channel is detaching' do it 'stops sending message' do - sent_message = nil - event_published = false - channel.subscribe do |message| - sent_message = message if message.data == payload - end + connection.once :connected do + sent_message = nil + event_published = false + channel.subscribe do |message| + sent_message = message if message.data == payload + end - channel.on(:detaching) do - channel.publish('event', payload) - event_published = true - end + channel.on(:detaching) do + channel.publish('event', payload) + event_published = true + end - channel.on(:detaching) do - EventMachine.next_tick do - expect(sent_message).to be_nil - stop_reactor if event_published + channel.on(:detaching) do + EventMachine.next_tick do + expect(sent_message).to be_nil + stop_reactor if event_published + end end - end - channel.attach do - channel.detach + channel.attach do + channel.detach + end end end end context 'when channel is detached' do it 'stops sending message' do - sent_message = nil - event_published = false - channel.subscribe do |message| - sent_message = message if message.data == payload - end + connection.once :connected do + sent_message = nil + event_published = false + channel.subscribe do |message| + sent_message = message if message.data == payload + end - channel.on(:detaching) do - channel.publish('event', payload) - event_published = true - end + channel.on(:detaching) do + channel.publish('event', payload) + event_published = true + end - channel.on(:detached) do - expect(sent_message).to be_nil - stop_reactor if event_published - end + channel.on(:detached) do + expect(sent_message).to be_nil + stop_reactor if event_published + end - channel.attach do - channel.detach + channel.attach do + channel.detach + end end end end @@ -968,8 +1038,10 @@ def disconnect_transport end end - channel.attach do - channel.transition_state_machine(:failed) + connection.once :connected do + channel.attach do + channel.transition_state_machine(:failed) + end end end end @@ -1019,7 +1091,7 @@ def disconnect_transport context 'when channel is Detaching (#RTL6c1)' do it 'publishes messages immediately (#RTL6c1)' do - sub_channel.attach do + sub_channel.once :attached do channel.attach do channel.once(:detaching) do outgoing_message_count = 0 @@ -1041,32 +1113,37 @@ def disconnect_transport channel.detach end end + connection.once :connected do + sub_channel.attach + end end end context 'when channel is Detached (#RTL6c1)' do it 'publishes messages immediately (#RTL6c1)' do - sub_channel.attach do - channel.attach - channel.once(:attached) do - channel.once(:detached) do - outgoing_message_count = 0 - client.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| - if protocol_message.action == :message - raise "Expected channel state to be attaching when publishing messages, not #{channel.state}" unless channel.detached? - outgoing_message_count += protocol_message.messages.count + connection.once :connected do + sub_channel.attach do + channel.attach + channel.once(:attached) do + channel.once(:detached) do + outgoing_message_count = 0 + client.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + if protocol_message.action == :message + raise "Expected channel state to be attaching when publishing messages, not #{channel.state}" unless channel.detached? + outgoing_message_count += protocol_message.messages.count + end end - end - sub_channel.subscribe do |message| - messages << message if message.name == 'event' - if messages.count == 3 - expect(outgoing_message_count).to eql(3) - stop_reactor + sub_channel.subscribe do |message| + messages << message if message.name == 'event' + if messages.count == 3 + expect(outgoing_message_count).to eql(3) + stop_reactor + end end + 3.times { channel.publish('event', random_str) } end - 3.times { channel.publish('event', random_str) } + channel.detach end - channel.detach end end end @@ -1429,12 +1506,14 @@ def disconnect_transport context 'with a valid client_id in the message' do it 'succeeds' do - channel.publish([name: 'event', client_id: 'validClient']).tap do |deferrable| - deferrable.errback { raise 'Should have succeeded' } - end - channel.subscribe('event') do |message| - expect(message.client_id).to eql('validClient') - EM.add_timer(0.5) { stop_reactor } + connection.once :connected do + channel.publish([name: 'event', client_id: 'validClient']).tap do |deferrable| + deferrable.errback { raise 'Should have succeeded' } + end + channel.subscribe('event') do |message| + expect(message.client_id).to eql('validClient') + EM.add_timer(0.5) { stop_reactor } + end end end end @@ -1455,12 +1534,14 @@ def disconnect_transport context 'with an empty client_id in the message' do it 'succeeds and publishes without a client_id' do - channel.publish([name: 'event', client_id: nil]).tap do |deferrable| - deferrable.errback { raise 'Should have succeeded' } - end - channel.subscribe('event') do |message| - expect(message.client_id).to be_nil - EM.add_timer(0.5) { stop_reactor } + connection.once :connected do + channel.publish([name: 'event', client_id: nil]).tap do |deferrable| + deferrable.errback { raise 'Should have succeeded' } + end + channel.subscribe('event') do |message| + expect(message.client_id).to be_nil + EM.add_timer(0.5) { stop_reactor } + end end end end @@ -1475,20 +1556,25 @@ def disconnect_transport context 'before the client is CONNECTED and the client\'s identity has been obtained' do context 'with a valid client_id in the message' do it 'succeeds' do - channel.publish([name: 'event', client_id: 'valid']).tap do |deferrable| - deferrable.errback { raise 'Should have succeeded' } - end - channel.subscribe('event') do |message| - expect(message.client_id).to eql('valid') - EM.add_timer(0.5) { stop_reactor } + connection.once :connected do + channel.publish([name: 'event', client_id: 'valid']).tap do |deferrable| + deferrable.errback { raise 'Should have succeeded' } + end + channel.subscribe('event') do |message| + expect(message.client_id).to eql('valid') + EM.add_timer(0.5) { stop_reactor } + end end end end context 'with an invalid client_id in the message' do let(:client_options) { default_options.merge(key: nil, token: token, log_level: :error) } - it 'succeeds in the client library but then fails when delivered to Ably' do + it 'succeeds in the client library ( while connecting ) but then fails when delivered to Ably' do channel.publish([name: 'event', client_id: 'invalid']).tap do |deferrable| + deferrable.errback do |err| + expect(err).to be_truthy + end EM.add_timer(0.5) { stop_reactor } end channel.subscribe('event') do |message| @@ -1499,12 +1585,14 @@ def disconnect_transport context 'with an empty client_id in the message' do it 'succeeds and publishes with an implicit client_id' do - channel.publish([name: 'event', client_id: nil]).tap do |deferrable| - deferrable.errback { raise 'Should have succeeded' } - end - channel.subscribe('event') do |message| - expect(message.client_id).to eql('valid') - EM.add_timer(0.5) { stop_reactor } + connection.once :connected do + channel.publish([name: 'event', client_id: nil]).tap do |deferrable| + deferrable.errback { raise 'Should have succeeded' } + end + channel.subscribe('event') do |message| + expect(message.client_id).to eql('valid') + EM.add_timer(0.5) { stop_reactor } + end end end end @@ -1558,12 +1646,14 @@ def disconnect_transport context 'with a valid client_id' do it 'succeeds' do - channel.publish([name: 'event', client_id: 'valid']).tap do |deferrable| - deferrable.errback { raise 'Should have succeeded' } - end - channel.subscribe('event') do |message| - expect(message.client_id).to eql('valid') - EM.add_timer(0.5) { stop_reactor } + connection.once :connected do + channel.publish([name: 'event', client_id: 'valid']).tap do |deferrable| + deferrable.errback { raise 'Should have succeeded' } + end + channel.subscribe('event') do |message| + expect(message.client_id).to eql('valid') + EM.add_timer(0.5) { stop_reactor } + end end end end @@ -1584,12 +1674,14 @@ def disconnect_transport context 'with an empty client_id in the message' do it 'succeeds and publishes with an implicit client_id' do - channel.publish([name: 'event', client_id: nil]).tap do |deferrable| - deferrable.errback { raise 'Should have succeeded' } - end - channel.subscribe('event') do |message| - expect(message.client_id).to eql('valid') - EM.add_timer(0.5) { stop_reactor } + connection.once :connected do + channel.publish([name: 'event', client_id: nil]).tap do |deferrable| + deferrable.errback { raise 'Should have succeeded' } + end + channel.subscribe('event') do |message| + expect(message.client_id).to eql('valid') + EM.add_timer(0.5) { stop_reactor } + end end end end @@ -1617,12 +1709,14 @@ def disconnect_transport context 'with an empty client_id in the message' do it 'succeeds and publishes with an implicit client_id' do - channel.publish([name: 'event', client_id: nil]).tap do |deferrable| - deferrable.errback { raise 'Should have succeeded' } - end - channel.subscribe('event') do |message| - expect(message.client_id).to be_nil - EM.add_timer(0.5) { stop_reactor } + connection.once :connected do + channel.publish([name: 'event', client_id: nil]).tap do |deferrable| + deferrable.errback { raise 'Should have succeeded' } + end + channel.subscribe('event') do |message| + expect(message.client_id).to be_nil + EM.add_timer(0.5) { stop_reactor } + end end end end @@ -1766,18 +1860,20 @@ def disconnect_transport let(:exception) { StandardError.new("Intentional error") } it 'logs the error and continues' do - emitted_exception = false - expect(client.logger).to receive(:error) do |*args, &block| - expect(args.concat([block ? block.call : nil]).join(',')).to match(/#{exception.message}/) - end - channel.subscribe('click') do |message| - emitted_exception = true - raise exception - end - channel.publish('click', 'data') do - EventMachine.add_timer(1) do - expect(emitted_exception).to eql(true) - stop_reactor + connection.once :connected do + emitted_exception = false + expect(client.logger).to receive(:error) do |*args, &block| + expect(args.concat([block ? block.call : nil]).join(',')).to match(/#{exception.message}/) + end + channel.subscribe('click') do |message| + emitted_exception = true + raise exception + end + channel.publish('click', 'data') do + EventMachine.add_timer(1) do + expect(emitted_exception).to eql(true) + stop_reactor + end end end end @@ -1785,19 +1881,21 @@ def disconnect_transport context 'many times with different event names' do it 'filters events accordingly to each callback' do - click_callback = lambda { |message| messages << message } + connection.once :connected do + click_callback = lambda { |message| messages << message } - channel.subscribe('click', &click_callback) - channel.subscribe('move', &click_callback) - channel.subscribe('press', &click_callback) + channel.subscribe('click', &click_callback) + channel.subscribe('move', &click_callback) + channel.subscribe('press', &click_callback) - channel.attach do - channel.publish('click', 'data') - channel.publish('move', 'data') - channel.publish('press', 'data') do - EventMachine.add_timer(2) do - expect(messages.count).to eql(3) - stop_reactor + channel.attach do + channel.publish('click', 'data') + channel.publish('move', 'data') + channel.publish('press', 'data') do + EventMachine.add_timer(2) do + expect(messages.count).to eql(3) + stop_reactor + end end end end @@ -1808,12 +1906,14 @@ def disconnect_transport describe '#unsubscribe' do context 'with an event argument' do it 'unsubscribes for a single event' do - channel.subscribe('click') { raise 'Should not have been called' } - channel.unsubscribe('click') + connection.once :connected do + channel.subscribe('click') { raise 'Should not have been called' } + channel.unsubscribe('click') - channel.publish('click', 'data') do - EventMachine.add_timer(1) do - stop_reactor + channel.publish('click', 'data') do + EventMachine.add_timer(1) do + stop_reactor + end end end end @@ -1821,12 +1921,14 @@ def disconnect_transport context 'with no event argument' do it 'unsubscribes for a single event' do - channel.subscribe { raise 'Should not have been called' } - channel.unsubscribe + connection.once :connected do + channel.subscribe { raise 'Should not have been called' } + channel.unsubscribe - channel.publish('click', 'data') do - EventMachine.add_timer(1) do - stop_reactor + channel.publish('click', 'data') do + EventMachine.add_timer(1) do + stop_reactor + end end end end @@ -1861,33 +1963,37 @@ def fake_error(error) context 'an :attached channel' do it 'transitions state to :failed (#RTL3a)' do - channel.attach do - channel.on(:failed) do |connection_state_change| - error = connection_state_change.reason - expect(error).to be_a(Ably::Exceptions::ConnectionFailed) - expect(error.code).to eql(50000) - stop_reactor + connection.once :connected do + channel.attach do + channel.on(:failed) do |connection_state_change| + error = connection_state_change.reason + expect(error).to be_a(Ably::Exceptions::ConnectionFailed) + expect(error.code).to eql(50000) + stop_reactor + end + fake_error connection_error end - fake_error connection_error end end it 'updates the channel error_reason (#RTL3a)' do - channel.attach do - channel.on(:failed) do |connection_state_change| - error = connection_state_change.reason - expect(error).to be_a(Ably::Exceptions::ConnectionFailed) - expect(error.code).to eql(50000) - stop_reactor + connection.once :connected do + channel.attach do + channel.on(:failed) do |connection_state_change| + error = connection_state_change.reason + expect(error).to be_a(Ably::Exceptions::ConnectionFailed) + expect(error.code).to eql(50000) + stop_reactor + end + fake_error connection_error end - fake_error connection_error end end end context 'a :detached channel' do it 'remains in the :detached state (#RTL3a)' do - channel.attach do + channel.once :attached do channel.on(:failed) { raise 'Failed state should not have been reached' } channel.detach do @@ -1899,6 +2005,10 @@ def fake_error(error) fake_error connection_error end end + + connection.once :connected do + channel.attach + end end end @@ -1906,20 +2016,22 @@ def fake_error(error) let(:original_error) { RuntimeError.new } it 'remains in the :failed state and ignores the failure error (#RTL3a)' do - channel.attach do - channel.on(:failed) do - channel.on(:failed) { raise 'Failed state should not have been reached' } + connection.once :connected do + channel.attach do + channel.on(:failed) do + channel.on(:failed) { raise 'Failed state should not have been reached' } - EventMachine.add_timer(1) do - expect(channel).to be_failed - expect(channel.error_reason).to eql(original_error) - stop_reactor + EventMachine.add_timer(1) do + expect(channel).to be_failed + expect(channel.error_reason).to eql(original_error) + stop_reactor + end + + fake_error connection_error end - fake_error connection_error + channel.transition_state_machine :failed, reason: original_error end - - channel.transition_state_machine :failed, reason: original_error end end end @@ -1942,11 +2054,13 @@ def fake_error(error) context ':closed' do context 'an :attached channel' do it 'transitions state to :detached (#RTL3b)' do - channel.attach do - channel.on(:detached) do - stop_reactor + connection.once :connected do + channel.attach do + channel.on(:detached) do + stop_reactor + end + client.connection.close end - client.connection.close end end end @@ -1962,13 +2076,15 @@ def fake_error(error) closed_message = Ably::Models::ProtocolMessage.new(action: 8) # CLOSED client.connection.__incoming_protocol_msgbus__.publish :protocol_message, closed_message end - channel.attach + connection.once :connected do + channel.attach + end end end context 'a :detached channel' do it 'remains in the :detached state (#RTL3b)' do - channel.attach do + channel.once :attached do channel.detach do channel.on(:detached) { raise 'Detached state should not have been reached' } @@ -1980,6 +2096,9 @@ def fake_error(error) client.connection.close end end + connection.once :connected do + channel.attach + end end end @@ -1988,20 +2107,22 @@ def fake_error(error) let(:original_error) { Ably::Models::ErrorInfo.new(message: 'Error') } it 'remains in the :failed state and retains the error_reason (#RTL3b)' do - channel.attach do - channel.once(:failed) do - channel.on(:detached) { raise 'Detached state should not have been reached' } + connection.on :connected do + channel.attach do + channel.once(:failed) do + channel.on(:detached) { raise 'Detached state should not have been reached' } - EventMachine.add_timer(1) do - expect(channel).to be_failed - expect(channel.error_reason).to eql(original_error) - stop_reactor + EventMachine.add_timer(1) do + expect(channel).to be_failed + expect(channel.error_reason).to eql(original_error) + stop_reactor + end + + client.connection.close end - client.connection.close + channel.transition_state_machine :failed, reason: original_error end - - channel.transition_state_machine :failed, reason: original_error end end end @@ -2046,23 +2167,26 @@ def fake_error(error) client.connection.transition_state_machine :suspended end end - channel.attach + channel.attach end end context 'an :attached channel' do it 'transitions state to :suspended (#RTL3c)' do - channel.attach do + channel.once :attached do channel.on(:suspended) do stop_reactor end client.connection.transition_state_machine :suspended end + connection.once :connected do + channel.attach + end end describe 'reattaching (#RTN15c3)' do it 'transitions state automatically to :attaching once the connection is re-established ' do - channel.attach do + channel.once :attached do channel.on(:suspended) do client.connection.connect channel.once(:attached) do @@ -2071,10 +2195,13 @@ def fake_error(error) end client.connection.transition_state_machine :suspended end + connection.once :connected do + channel.attach + end end it 'sends ATTACH_RESUME flag when reattaching (RTL4j)' do - channel.attach do + channel.once :attached do channel.on(:suspended) do client.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| next if protocol_message.action != :attach @@ -2087,13 +2214,16 @@ def fake_error(error) end client.connection.transition_state_machine :suspended end + connection.once :connected do + channel.attach + end end end end context 'a :detached channel' do it 'remains in the :detached state (#RTL3c)' do - channel.attach do + channel.once :attached do channel.detach do channel.on(:detached) { raise 'Detached state should not have been reached' } @@ -2105,6 +2235,9 @@ def fake_error(error) client.connection.transition_state_machine :suspended end end + connection.once :connected do + channel.attach + end end end @@ -2113,20 +2246,22 @@ def fake_error(error) let(:client_options) { default_options.merge(log_level: :fatal) } it 'remains in the :failed state and retains the error_reason (#RTL3c)' do - channel.attach do - channel.once(:failed) do - channel.on(:detached) { raise 'Detached state should not have been reached' } + connection.once :connected do + channel.attach do + channel.once(:failed) do + channel.on(:detached) { raise 'Detached state should not have been reached' } - EventMachine.add_timer(1) do - expect(channel).to be_failed - expect(channel.error_reason).to eql(original_error) - stop_reactor + EventMachine.add_timer(1) do + expect(channel).to be_failed + expect(channel.error_reason).to eql(original_error) + stop_reactor + end + + client.connection.transition_state_machine :suspended end - client.connection.transition_state_machine :suspended + channel.transition_state_machine :failed, reason: original_error end - - channel.transition_state_machine :failed, reason: original_error end end end @@ -2151,14 +2286,16 @@ def fake_error(error) context ':connected' do context 'a :suspended channel' do it 'is automatically reattached (#RTL3d)' do - channel.attach do - channel.once(:suspended) do - client.connection.connect - channel.once(:attached) do - stop_reactor + connection.once :connected do + channel.attach do + channel.once(:suspended) do + client.connection.connect + channel.once(:attached) do + stop_reactor + end end + client.connection.transition_state_machine :suspended end - client.connection.transition_state_machine :suspended end end @@ -2168,29 +2305,32 @@ def fake_error(error) end it 'returns to a suspended state (#RTL3d)' do - channel.attach do - channel.once(:attached) do - fail "Channel should not have become attached" - end + connection.once :connected do + channel.attach do + channel.once(:attached) do + fail "Channel should not have become attached" + end - channel.once(:suspended) do - client.connection.connect - channel.once(:attaching) do - # don't process any incoming ProtocolMessages so the connection never opens - client.connection.__incoming_protocol_msgbus__.unsubscribe - channel.once(:suspended) do |state_change| - expect(state_change.reason.code).to eql(90007) - stop_reactor + channel.once(:suspended) do + client.connection.connect + channel.once(:attaching) do + # don't process any incoming ProtocolMessages so the connection never opens + client.connection.__incoming_protocol_msgbus__.unsubscribe + channel.once(:suspended) do |state_change| + expect(state_change.reason.code).to eql(90007) + stop_reactor + end end end + client.connection.transition_state_machine :suspended end - client.connection.transition_state_machine :suspended end end end end end + context ':disconnected' do context 'with an initialized channel' do it 'has no effect on the channel states (#RTL3e)' do @@ -2222,25 +2362,31 @@ def fake_error(error) context 'with an attached channel' do it 'has no effect on the channel states (#RTL3e)' do - channel.attach do + channel.once :attached do connection.once(:disconnected) do expect(channel).to be_attached stop_reactor end disconnect_transport end + + connection.once :connected do + channel.attach + end end end context 'with a detached channel' do it 'has no effect on the channel states (#RTL3e)' do - channel.attach do - channel.detach do - connection.once(:disconnected) do - expect(channel).to be_detached - stop_reactor + connection.once :connected do + channel.attach do + channel.detach do + connection.once(:disconnected) do + expect(channel).to be_detached + stop_reactor + end + disconnect_transport end - disconnect_transport end end end @@ -2288,12 +2434,12 @@ def self.build_flags(flags) shared_examples 'an update that sends ATTACH message' do |state, flags| it 'sends an ATTACH message on options change' do - attach_sent = nil + attach_sent_with_flags_set_via_channel_options = nil client.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :attach && protocol_message.flags.nonzero? - attach_sent = true expect(protocol_message.flags).to eq(flags) + attach_sent_with_flags_set_via_channel_options = true end end @@ -2302,10 +2448,7 @@ def self.build_flags(flags) end channel.on(:attached) do - client.connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| - next if protocol_message.action != :attached - - expect(attach_sent).to eq(true) + wait_until(lambda { attach_sent_with_flags_set_via_channel_options }) do stop_reactor end end @@ -2318,7 +2461,7 @@ def self.build_flags(flags) it_behaves_like 'an update that sends ATTACH message', :attaching, build_flags(%i[subscribe]) end - context 'when channel is attaching' do + context 'when channel is attached' do it_behaves_like 'an update that sends ATTACH message', :attached, build_flags(%i[resume subscribe]) end @@ -2389,8 +2532,10 @@ def self.build_flags(flags) expect(channel_state_change.reason).to be_nil stop_reactor end - channel.attach do - channel.detach + connection.once :connected do + channel.attach do + channel.detach + end end end @@ -2426,7 +2571,7 @@ def self.build_flags(flags) connection_id = client.connection.id expect(channel_state_change.resumed).to be_falsey - recover_client = auto_close Ably::Realtime::Client.new(client_options.merge(recover: client.connection.recovery_key)) + recover_client = auto_close Ably::Realtime::Client.new(client_options.merge(recover: client.connection.create_recovery_key)) recover_client.connection.once(:connected) do expect(recover_client.connection.id).to eql(connection_id) recover_channel = recover_client.channels.get(channel_name) @@ -2441,7 +2586,7 @@ def self.build_flags(flags) it 'is false when a connection fails to recover and the channel is attached' do client.connection.once(:connected) do - recovery_key = client.connection.recovery_key + recovery_key = client.connection.create_recovery_key client.connection.once(:closed) do recover_client = auto_close Ably::Realtime::Client.new(client_options.merge(recover: recovery_key, log_level: :error)) recover_client.connection.once(:connected) do @@ -2458,11 +2603,11 @@ def self.build_flags(flags) end end - context 'when a resume fails' do + context 'when a connection resume fails' do let(:client_options) { default_options.merge(log_level: :error) } - it 'is false when a resume fails to recover and the channel is automatically re-attached' do - channel.attach do + it 'is false when channel_serial goes nil (RTP5a1) and the channel is automatically re-attached' do + channel.once :attached do connection_id = client.connection.id channel.once(:attached) do |channel_state_change| expect(client.connection.id).to_not eql(connection_id) @@ -2470,7 +2615,29 @@ def self.build_flags(flags) stop_reactor end client.connection.transport.close_connection_after_writing - client.connection.configure_new '0123456789abcdef', 'wVIsgTHAB1UvXh7z-1991d8586', -1 # force the resume connection key to be invalid + channel.properties.channel_serial = nil + client.connection.configure_new '0123456789abcdef', 'wVIsgTHAB1UvXh7z-1991d8586' # force the resume connection key to be invalid + end + + connection.once :connected do + channel.attach + end + end + + it 'is true when channel_serial is intact and the channel is automatically re-attached' do + channel.once :attached do + connection_id = client.connection.id + channel.once(:attached) do |channel_state_change| + expect(client.connection.id).to_not eql(connection_id) + expect(channel_state_change.resumed).to be_truthy + stop_reactor + end + client.connection.transport.close_connection_after_writing + client.connection.configure_new '0123456789abcdef', 'wVIsgTHAB1UvXh7z-1991d8586' # force the resume connection key to be invalid + end + + connection.once :connected do + channel.attach end end end @@ -2601,7 +2768,9 @@ def self.build_flags(flags) client.connection.__incoming_protocol_msgbus__.publish :protocol_message, detach_message end - channel.attach + connection.once :connected do + channel.attach + end end end @@ -2632,26 +2801,30 @@ def self.build_flags(flags) client.connection.__incoming_protocol_msgbus__.publish :protocol_message, detach_message end - channel.attach + connection.once :connected do + channel.attach + end end context 'when connection is no longer connected' do it 'will not attempt to reattach (#RTL13c)' do - channel.attach do - connection.once(:closing) do - channel.once(:attaching) do |state_change| - raise 'Channel should not attempt to reattach' + connection.once :connected do + channel.attach do + connection.once(:closing) do + channel.once(:attaching) do |state_change| + raise 'Channel should not attempt to reattach' + end + + channel.transition_state_machine! :suspended end - channel.transition_state_machine! :suspended - end + connection.once(:closed) do + expect(channel).to be_suspended + stop_reactor + end - connection.once(:closed) do - expect(channel).to be_suspended - stop_reactor + connection.close end - - connection.close end end end @@ -2672,35 +2845,36 @@ def self.build_flags(flags) end end prevent_protocol_messages_proc.call - end - channel.once(:attaching) do - attaching_at = Time.now - # First attaching fails during server-initiated ATTACHED received - channel.once(:suspended) do |state_change| - expect(Time.now.to_i - attaching_at.to_i).to be_within(1).of(1) - - suspended_at = Time.now - # Automatic attach happens at channel_retry_timeout - channel.once(:attaching) do - expect(Time.now.to_i - attaching_at.to_i).to be_within(1).of(2) - channel.once(:suspended) do - channel.once(:attaching) do - channel.once(:attached) do - stop_reactor + channel.once(:attaching) do + attaching_at = Time.now + # First attaching fails during server-initiated ATTACHED received + channel.once(:suspended) do |state_change| + expect(Time.now.to_i - attaching_at.to_i).to be_within(1).of(1) + + suspended_at = Time.now + # Automatic attach happens at channel_retry_timeout + channel.once(:attaching) do + expect(Time.now.to_i - attaching_at.to_i).to be_within(1).of(2) + channel.once(:suspended) do + channel.once(:attaching) do + channel.once(:attached) do + stop_reactor + end + # Simulate ATTACHED from Ably + attached_message = Ably::Models::ProtocolMessage.new(action: 11, channel: channel_name) # ATTACHED + client.connection.__incoming_protocol_msgbus__.publish :protocol_message, attached_message end - # Simulate ATTACHED from Ably - attached_message = Ably::Models::ProtocolMessage.new(action: 11, channel: channel_name) # ATTACHED - client.connection.__incoming_protocol_msgbus__.publish :protocol_message, attached_message end end end + + detach_message = Ably::Models::ProtocolMessage.new(action: detached_action, channel: channel_name) + client.connection.__incoming_protocol_msgbus__.publish :protocol_message, detach_message end - detach_message = Ably::Models::ProtocolMessage.new(action: detached_action, channel: channel_name) - client.connection.__incoming_protocol_msgbus__.publish :protocol_message, detach_message + channel.attach end - channel.attach end end end @@ -2709,14 +2883,16 @@ def self.build_flags(flags) let(:client_options) { default_options.merge(log_level: :fatal) } it 'should transition to the failed state and the error_reason should be set (#RTL14)' do - channel.attach do - channel.once(:failed) do |state_change| - expect(state_change.reason.code).to eql(50505) - expect(channel.error_reason.code).to eql(50505) - stop_reactor + connection.once :connected do + channel.attach do + channel.once(:failed) do |state_change| + expect(state_change.reason.code).to eql(50505) + expect(channel.error_reason.code).to eql(50505) + stop_reactor + end + error_message = Ably::Models::ProtocolMessage.new(action: 9, channel: channel_name, error: { code: 50505 }) # ProtocolMessage ERROR type + client.connection.__incoming_protocol_msgbus__.publish :protocol_message, error_message end - error_message = Ably::Models::ProtocolMessage.new(action: 9, channel: channel_name, error: { code: 50505 }) # ProtocolMessage ERROR type - client.connection.__incoming_protocol_msgbus__.publish :protocol_message, error_message end end end diff --git a/spec/acceptance/realtime/client_spec.rb b/spec/acceptance/realtime/client_spec.rb index 28b7a1dea..a4f50ae99 100644 --- a/spec/acceptance/realtime/client_spec.rb +++ b/spec/acceptance/realtime/client_spec.rb @@ -133,7 +133,7 @@ end end - context 'with a wildcard client_id token' do + context 'with a wildcard client_id token ' do subject { auto_close Ably::Realtime::Client.new(client_options) } let(:client_options) { default_options.merge(auth_callback: lambda { |token_params| auth_token_object }, client_id: client_id) } let(:rest_auth_client) { Ably::Rest::Client.new(default_options.merge(key: api_key)) } @@ -142,7 +142,8 @@ context 'and an explicit client_id in ClientOptions' do let(:client_id) { random_str } - it 'allows uses the explicit client_id in the connection' do + # Skipped because more clarification needed on RSA7e, see https://github.com/ably/ably-ruby/issues/425 + xit 'allows uses the explicit client_id in the connection' do connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :connected expect(protocol_message.connection_details.client_id).to eql(client_id) diff --git a/spec/acceptance/realtime/connection_failures_spec.rb b/spec/acceptance/realtime/connection_failures_spec.rb index 21f74b230..6afb7616a 100644 --- a/spec/acceptance/realtime/connection_failures_spec.rb +++ b/spec/acceptance/realtime/connection_failures_spec.rb @@ -747,8 +747,6 @@ def send_disconnect_message resumed_connection = false connection.once(:disconnected) do - disconnected_at = Time.now - allow(connection).to receive(:time_since_connection_confirmed_alive?).and_return(connection.connection_state_ttl + 1) # Make sure the next connect does not have the resume param @@ -781,8 +779,6 @@ def send_disconnect_message resumed_with_clean_connection = false connection.once(:disconnected) do - disconnected_at = Time.now - pseudo_time_passed = connection.connection_state_ttl + connection.details.max_idle_interval + 1 allow(connection).to receive(:time_since_connection_confirmed_alive?).and_return(pseudo_time_passed) @@ -815,14 +811,11 @@ def send_disconnect_message channel_emitted_an_attached = false channel.attach do - channel.once(:attached) do |channel_state_change| - expect(channel_state_change.resumed).to be_falsey + channel.once(:attached) do channel_emitted_an_attached = true end connection.once(:disconnected) do - disconnected_at = Time.now - pseudo_time_passed = connection.connection_state_ttl + connection.details.max_idle_interval + 1 allow(connection).to receive(:time_since_connection_confirmed_alive?).and_return(pseudo_time_passed) @@ -955,7 +948,7 @@ def send_disconnect_message previous_connection_id = connection.id connection.transport.close_connection_after_writing - expect(connection).to receive(:configure_new).with(previous_connection_id, anything, anything).and_call_original + expect(connection).to receive(:configure_new).with(previous_connection_id, anything).and_call_original connection.once(:connected) do expect(connection.key).to_not be_nil @@ -1008,16 +1001,6 @@ def send_disconnect_message end end - it 'executes the resume callback', api_private: true do - channel.attach do - connection.transport.close_connection_after_writing - connection.on_resume do - expect(connection).to be_connected - stop_reactor - end - end - end - context 'when messages were published whilst the client was disconnected' do it 'receives the messages published whilst offline' do messages_received = false @@ -1089,7 +1072,7 @@ def send_disconnect_message def kill_connection_transport_and_prevent_valid_resume connection.transport.close_connection_after_writing - connection.configure_new '0123456789abcdef', 'wVIsgTHAB1UvXh7z-1991d8586', -1 # force the resume connection key to be invalid + connection.configure_new '0123456789abcdef', '0123456789abcdef-99' # force the resume connection key to be invalid end it 'updates the connection_id and connection_key' do @@ -1122,7 +1105,7 @@ def kill_connection_transport_and_prevent_valid_resume end channel.on(:attaching) do |channel_state_change| error = channel_state_change.reason - expect(error.message).to match(/Unable to recover connection/i) + expect(error.message).to match(/Invalid connection key/i) reattaching_channels << channel end channel.on(:attached) do @@ -1222,9 +1205,9 @@ def kill_connection_transport_and_prevent_valid_resume it 'sets the error reason on each channel' do channel.attach do channel.on(:attaching) do |state_change| - expect(state_change.reason.message).to match(/Unable to recover connection/i) - expect(state_change.reason.code).to eql(80008) - expect(channel.error_reason.code).to eql(80008) + expect(state_change.reason.message).to match(/Invalid connection key/i) + expect(state_change.reason.code).to eql(80018) + expect(channel.error_reason.code).to eql(80018) channel.on(:attached) do |state_change| stop_reactor @@ -1375,7 +1358,7 @@ def kill_connection_transport_and_prevent_valid_resume end) end - xit 'triggers a re-authentication and then resumes the connection' do + it 'triggers a re-authentication and then resumes the connection' do # [PENDING] After sandbox env update connection isn't found and a new connection is created. Spec fails # connection.once(:connected) do diff --git a/spec/acceptance/realtime/connection_spec.rb b/spec/acceptance/realtime/connection_spec.rb index 53fc9c752..8827257a9 100644 --- a/spec/acceptance/realtime/connection_spec.rb +++ b/spec/acceptance/realtime/connection_spec.rb @@ -383,7 +383,8 @@ def publish_and_check_disconnect(options = {}) let(:client_id) { random_str } let(:client_options) { default_options.merge(client_id: 'incompatible', token: token_string, key: nil, log_level: :none) } - it 'fails the connection' do + # Skipped because more clarification needed on RSA7e, see https://github.com/ably/ably-ruby/issues/425 + xit 'fails the connection' do expect(client.client_id).to eql('incompatible') client.connection.once(:failed) do expect(client.client_id).to eql('incompatible') @@ -744,58 +745,6 @@ def expect_ordered_phases end end - describe '#serial connection serial' do - let(:channel) { client.channel(random_str) } - - it 'is set to -1 when a new connection is opened' do - connection.connect do - expect(connection.serial).to eql(-1) - stop_reactor - end - end - - context 'when a message is sent but the ACK has not yet been received' do - it 'the sent message msgSerial is 0 but the connection serial remains at -1' do - channel.attach do - connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| - if protocol_message.action == :message - connection.__outgoing_protocol_msgbus__.unsubscribe - expect(protocol_message['msgSerial']).to eql(0) - expect(connection.serial).to eql(-1) - stop_reactor - end - end - channel.publish('event', 'data') - end - end - end - - it 'is set to 0 when a message is received back' do - channel.publish('event', 'data') - channel.subscribe do - expect(connection.serial).to eql(0) - stop_reactor - end - end - - it 'is set to 1 when the second message is received' do - channel.attach do - messages = [] - channel.subscribe do |message| - messages << message - if messages.length == 2 - expect(connection.serial).to eql(1) - stop_reactor - end - end - - channel.publish('event', 'data') do - channel.publish('event', 'data') - end - end - end - end - describe '#msgSerial' do context 'when messages are queued for publish before a connection is established' do let(:batches) { 6 } @@ -922,7 +871,6 @@ def log_connection_changes let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 55, connection_details: { max_idle_interval: 2 * 1000 } @@ -1232,7 +1180,6 @@ def log_connection_changes let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 55, connection_details: { max_idle_interval: 2 * 1000 } @@ -1403,34 +1350,6 @@ def self.available_states let(:states) { Hash.new } let(:channel) { client.channel(random_str) } - it 'is composed of connection key and serial that is kept up to date with each message ACK received' do - connection.on(:connected) do - expected_serial = -1 - expect(connection.key).to_not be_nil - expect(connection.serial).to eql(expected_serial) - - channel.attach do - channel.publish('event', 'data') - channel.subscribe do - channel.unsubscribe - - expected_serial += 1 # attach message received - expect(connection.serial).to eql(expected_serial) - - channel.publish('event', 'data') - channel.subscribe do - channel.unsubscribe - expected_serial += 1 # attach message received - expect(connection.serial).to eql(expected_serial) - - expect(connection.recovery_key).to eql("#{connection.key}:#{connection.serial}:#{connection.send(:client_msg_serial)}") - stop_reactor - end - end - end - end - end - it "is available when connection is in one of the states: #{available_states.join(', ')}" do connection.once(:connected) do allow(client).to receive(:endpoint).and_return( @@ -1446,7 +1365,7 @@ def self.available_states available_states.each do |state| connection.on(state) do - states[state.to_sym] = true if connection.recovery_key + states[state.to_sym] = true if connection.create_recovery_key end end @@ -1464,7 +1383,7 @@ def self.available_states it 'is nil when connection is explicitly CLOSED' do connection.once(:connected) do connection.close do - expect(connection.recovery_key).to be_nil + expect(connection.create_recovery_key).to be_nil stop_reactor end end @@ -1475,36 +1394,22 @@ def self.available_states context 'connection#id after recovery' do it 'remains the same' do previous_connection_id = nil + recovery_key = nil connection.once(:connected) do previous_connection_id = connection.id + recovery_key = client.connection.create_recovery_key connection.transition_state_machine! :failed end connection.once(:failed) do - recover_client = auto_close Ably::Realtime::Client.new(default_options.merge(recover: client.connection.recovery_key)) + recover_client = auto_close Ably::Realtime::Client.new(default_options.merge(recover: recovery_key)) recover_client.connection.on(:connected) do expect(recover_client.connection.id).to eql(previous_connection_id) stop_reactor end end end - - it 'does not call a resume callback', api_private: true do - connection.once(:connected) do - connection.transition_state_machine! :failed - end - - connection.once(:failed) do - recover_client = auto_close Ably::Realtime::Client.new(default_options.merge(recover: client.connection.recovery_key)) - recover_client.connection.on_resume do - raise 'Should not call the resume callback' - end - recover_client.connection.on(:connected) do - EventMachine.add_timer(0.5) { stop_reactor } - end - end - end end context 'when messages have been sent whilst the old connection is disconnected' do @@ -1514,7 +1419,7 @@ def self.available_states channel.attach do connection_id = client.connection.id - recovery_key = client.connection.recovery_key + recovery_key = client.connection.create_recovery_key connection.transport.__incoming_protocol_msgbus__ publishing_client_channel.publish('event', 'message') do connection.transition_state_machine! :failed @@ -1548,7 +1453,7 @@ def self.available_states channel.publish('event', 'message') do msg_serial = connection.send(:client_msg_serial) expect(msg_serial).to eql(0) - recovery_key = client.connection.recovery_key + recovery_key = client.connection.create_recovery_key connection.transition_state_machine! :failed end end @@ -1579,7 +1484,7 @@ def self.available_states expect(message.data).to eql('message-1') msg_serial = connection.send(:client_msg_serial) expect(msg_serial).to eql(0) - recovery_key = client.connection.recovery_key + recovery_key = client.connection.create_recovery_key connection.transition_state_machine! :failed end channel.publish('event', 'message-1') @@ -1613,23 +1518,29 @@ def self.available_states context 'with :recover option' do context 'with invalid syntax' do - let(:invaid_client_options) { default_options.merge(recover: 'invalid') } + let(:client_options) { default_options.merge(recover: 'invalid') } - it 'raises an exception' do - expect { Ably::Realtime::Client.new(invaid_client_options) }.to raise_error ArgumentError, /Recover/ - stop_reactor + it 'logs recovery decode error as a warning and connects successfully' do + connection.once(:connected) do + EventMachine.add_timer(1) { stop_reactor } + end + expect(client.logger).to receive(:warn).at_least(:once) do |*args, &block| + expect(args.concat([block ? block.call : nil]).join(',')).to match(/unable to decode recovery key/) + end end end - context 'with expired (missing) value sent to server' do - let(:client_options) { default_options.merge(recover: 'wVIsgTHAB1UvXh7z-1991d8586:0:0', log_level: :fatal) } + context 'with invalid connection key' do + recovery_key = "{\"connection_key\":\"0123456789abcdef-99\",\"msg_serial\":2," << + "\"channel_serials\":{\"channel1\":\"serial1\",\"channel2\":\"serial2\"}}" + let(:client_options) { default_options.merge(recover: recovery_key, log_level: :fatal) } it 'connects but sets the error reason and includes the reason in the state change' do connection.once(:connected) do |state_change| expect(connection.state).to eq(:connected) - expect(state_change.reason.message).to match(/Unable to recover connection/i) - expect(connection.error_reason.message).to match(/Unable to recover connection/i) - expect(connection.error_reason.code).to eql(80008) + expect(state_change.reason.message).to match(/Invalid connection key/i) + expect(connection.error_reason.message).to match(/Invalid connection key/i) + expect(connection.error_reason.code).to eql(80018) expect(connection.error_reason).to eql(state_change.reason) stop_reactor end @@ -2001,7 +1912,6 @@ def self.available_states let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 55, connection_details: { client_id: 'bob', connection_key: connection_key, @@ -2037,7 +1947,6 @@ def self.available_states connection.once(:update) do |connection_state_change| expect(client.auth.client_id).to eql('bob') expect(connection.key).to eql(connection_key) - expect(connection.serial).to eql(55) expect(connection.connection_state_ttl).to eql(33) expect(connection.details.client_id).to eql('bob') @@ -2060,7 +1969,6 @@ def self.available_states let(:protocol_message_attributes) do { action: Ably::Models::ProtocolMessage::ACTION.Connected.to_i, - connection_serial: 22, error: { code: 50000, message: 'Internal failure' }, } end @@ -2089,7 +1997,7 @@ def self.available_states it 'sends the protocol version param v (#G4, #RTN2f)' do expect(EventMachine).to receive(:connect) do |host, port, transport, object, url| uri = URI.parse(url) - expect(CGI::parse(uri.query)['v'][0]).to eql('1.2') + expect(CGI::parse(uri.query)['v'][0]).to eql('2') stop_reactor end client diff --git a/spec/acceptance/realtime/message_spec.rb b/spec/acceptance/realtime/message_spec.rb index 53bd5be3a..e5eec8e77 100644 --- a/spec/acceptance/realtime/message_spec.rb +++ b/spec/acceptance/realtime/message_spec.rb @@ -304,8 +304,6 @@ def publish_and_check_extras(extras) it 'will not echo messages to the client but will still broadcast messages to other connected clients', em_timeout: 10 do channel.attach do |echo_channel| no_echo_channel.attach do - no_echo_channel.publish 'test_event', payload - no_echo_channel.subscribe('test_event') do |message| fail "Message should not have been echoed back" end @@ -316,6 +314,7 @@ def publish_and_check_extras(extras) stop_reactor end end + no_echo_channel.publish 'test_event', payload end end end @@ -418,41 +417,6 @@ def publish_and_check_extras(extras) end end - context 'server incorrectly resends a message that was already received by the client library' do - let(:messages_received) { [] } - let(:connection) { client.connection } - let(:client_options) { default_options.merge(log_level: :fatal) } - - it 'discards the message and logs it as an error to the channel' do - first_message_protocol_message = nil - connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| - first_message_protocol_message ||= protocol_message unless protocol_message.messages.empty? - end - - channel.attach do - channel.subscribe do |message| - messages_received << message - if messages_received.count == 2 - # simulate a duplicate protocol message being received - EventMachine.next_tick do - connection.__incoming_protocol_msgbus__.publish :protocol_message, first_message_protocol_message - end - end - end - 2.times { |i| EventMachine.add_timer(i.to_f / 5) { channel.publish('event', 'data') } } - - expect(client.logger).to receive(:error) do |*args, &block| - expect(args.concat([block ? block.call : nil]).join(',')).to match(/duplicate/) - - EventMachine.add_timer(0.5) do - expect(messages_received.count).to eql(2) - stop_reactor - end - end - end - end - end - context 'encoding and decoding encrypted messages' do shared_examples 'an Ably encrypter and decrypter' do |item, data| let(:algorithm) { data['algorithm'].upcase } @@ -622,11 +586,13 @@ def publish_and_check_extras(extras) let(:payload) { MessagePack.pack({ 'key' => random_str }) } it 'does not attempt to decrypt the message' do - unencrypted_channel_client1.publish 'example', payload - encrypted_channel_client2.subscribe do |message| - expect(message.data).to eql(payload) - expect(message.encoding).to be_nil - stop_reactor + wait_until(lambda { client.connection.state == :connected and other_client.connection.state == :connected }) do + encrypted_channel_client2.subscribe do |message| + expect(message.data).to eql(payload) + expect(message.encoding).to be_nil + stop_reactor + end + unencrypted_channel_client1.publish 'example', payload end end end @@ -671,11 +637,13 @@ def publish_and_check_extras(extras) let(:payload) { MessagePack.pack({ 'key' => random_str }) } it 'delivers the message but still encrypted with the cipher detials in the #encoding attribute (#RTL7e)' do - encrypted_channel_client1.publish 'example', payload - encrypted_channel_client2.subscribe do |message| - expect(message.data).to_not eql(payload) - expect(message.encoding).to match(/^cipher\+aes-256-cbc/) - stop_reactor + encrypted_channel_client2.attach do + encrypted_channel_client2.subscribe do |message| + expect(message.data).to_not eql(payload) + expect(message.encoding).to match(/^cipher\+aes-256-cbc/) + stop_reactor + end + encrypted_channel_client1.publish 'example', payload end end @@ -751,10 +719,13 @@ def publish_and_check_extras(extras) end end - channel.publish(event_name).tap do |deferrable| - deferrable.callback { message_state << :delivered } - deferrable.errback do - raise 'Message delivery should not fail' + # Attaching channel first before publishing message in order to get channel serial set on channel + channel.attach do + channel.publish(event_name).tap do |deferrable| + deferrable.callback { message_state << :delivered } + deferrable.errback do + raise 'Message delivery should not fail' + end end end @@ -777,7 +748,7 @@ def publish_and_check_extras(extras) if protocol_message.messages.find { |message| message.name == event_name } EventMachine.add_timer(0.0001) do connection.transport.unbind # trigger failure - connection.configure_new '0123456789abcdef', 'wVIsgTHAB1UvXh7z-1991d8586', -1 # force the resume connection key to be invalid + connection.configure_new '0123456789abcdef', 'wVIsgTHAB1UvXh7z-1991d8586' # force the resume connection key to be invalid end end end diff --git a/spec/acceptance/realtime/presence_spec.rb b/spec/acceptance/realtime/presence_spec.rb index 2250a14b1..c24f5cee0 100644 --- a/spec/acceptance/realtime/presence_spec.rb +++ b/spec/acceptance/realtime/presence_spec.rb @@ -60,13 +60,13 @@ def setup_test(method_name, args, options) end unless expected_state == :left - it 'raise an exception if the channel is detached' do + it "presence #{method_name} : raise an exception if the channel is detached" do setup_test(method_name, args, options) do channel_client_one.attach do channel_client_one.transition_state_machine :detaching channel_client_one.once(:detached) do presence_client_one.public_send(method_name, args).tap do |deferrable| - deferrable.callback { raise 'Get should not succeed' } + deferrable.callback { raise "presence #{method_name} should not succeed" } deferrable.errback do |error| expect(error).to be_a(Ably::Exceptions::InvalidState) expect(error.message).to match(/Operation is not allowed when channel is in STATE.Detached/) @@ -78,12 +78,12 @@ def setup_test(method_name, args, options) end end - it 'raise an exception if the channel becomes detached' do + it "presence #{method_name} : raise an exception if the channel becomes detached" do setup_test(method_name, args, options) do channel_client_one.attach do channel_client_one.transition_state_machine :detaching presence_client_one.public_send(method_name, args).tap do |deferrable| - deferrable.callback { raise 'Get should not succeed' } + deferrable.callback { raise "presence #{method_name} should not succeed" } deferrable.errback do |error| expect(error).to be_a(Ably::Exceptions::InvalidState) expect(error.message).to match(/Operation failed as channel transitioned to STATE.Detached/) @@ -94,13 +94,13 @@ def setup_test(method_name, args, options) end end - it 'raise an exception if the channel is failed' do + it "presence #{method_name} : raise an exception if the channel is failed" do setup_test(method_name, args, options) do channel_client_one.attach do channel_client_one.transition_state_machine :failed expect(channel_client_one.state).to eq(:failed) presence_client_one.public_send(method_name, args).tap do |deferrable| - deferrable.callback { raise 'Get should not succeed' } + deferrable.callback { raise "presence #{method_name} : Get should not succeed" } deferrable.errback do |error| expect(error).to be_a(Ably::Exceptions::InvalidState) expect(error.message).to match(/Operation is not allowed when channel is in STATE.Failed/) @@ -111,11 +111,11 @@ def setup_test(method_name, args, options) end end - it 'raise an exception if the channel becomes failed' do + it "presence #{method_name} : raise an exception if the channel becomes failed" do setup_test(method_name, args, options) do channel_client_one.attach do presence_client_one.public_send(method_name, args).tap do |deferrable| - deferrable.callback { raise 'Get should not succeed' } + deferrable.callback { raise "presence #{method_name} : Get should not succeed" } deferrable.errback do |error| expect(error).to be_a(Ably::Exceptions::MessageDeliveryFailed) stop_reactor @@ -516,11 +516,11 @@ def presence_action(method_name, data) stop_reactor end - it 'will emit an :in_sync event when synchronisation is complete' do + it 'will emit an :sync_complete event when synchronisation is complete' do presence_client_one.enter presence_client_two.enter - presence_anonymous_client.members.once(:in_sync) do + presence_anonymous_client.members.once(:sync_complete) do stop_reactor end @@ -545,7 +545,7 @@ def presence_action(method_name, data) entered += 1 next unless entered == 2 - presence_anonymous_client.members.once(:in_sync) do + presence_anonymous_client.members.once(:sync_complete) do expect(presence_anonymous_client.members.count).to eql(2) member_ids = presence_anonymous_client.members.map(&:member_key) expect(member_ids.count).to eql(2) @@ -580,7 +580,6 @@ def presence_action(method_name, data) action = Ably::Models::ProtocolMessage::ACTION.Presence presence_msg = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 20, channel: channel_name, presence: presence_data, timestamp: Time.now.to_i * 1000 @@ -633,7 +632,6 @@ def allow_sync_fabricate_data_final_sync_and_assert_members action = Ably::Models::ProtocolMessage::ACTION.Presence presence_msg = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: anonymous_client.connection.serial + 1, channel: channel_name, presence: presence_data, timestamp: Time.now.to_i * 1000 @@ -644,7 +642,6 @@ def allow_sync_fabricate_data_final_sync_and_assert_members action = Ably::Models::ProtocolMessage::ACTION.Sync sync_msg = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: anonymous_client.connection.serial + 2, channel: channel_name, channel_serial: 'validserialprefix:', # with no part after the `:` this indicates the end to the SYNC presence: [], @@ -707,21 +704,29 @@ def allow_sync_fabricate_data_final_sync_and_assert_members context '#sync_complete? and SYNC flags (#RTP1)' do context 'when attaching to a channel without any members present' do - xit 'sync_complete? is true, there is no presence flag, and the presence channel is considered synced immediately (#RTP1)' do - flag_checked = false + it 'sync_complete? is true, no members are received and the presence channel is synced (#RTP1)' do + sync_info_received = false anonymous_client.connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| if protocol_message.action == :attached - flag_checked = true - expect(protocol_message.has_presence_flag?).to eql(false) + if protocol_message.has_presence_flag? + sync_info_received = false + else + sync_info_received = true + end + end + if protocol_message.action == Ably::Models::ProtocolMessage::ACTION.Sync + expect(protocol_message.presence).to be_empty + sync_info_received = true end end channel_anonymous_client.attach do - expect(channel_anonymous_client.presence).to be_sync_complete - EventMachine.next_tick do - expect(flag_checked).to eql(true) - stop_reactor + wait_until(lambda { channel_anonymous_client.presence.sync_complete? and sync_info_received}) do + channel_anonymous_client.presence.get do |members| + expect(members).to be_empty + stop_reactor + end end end end @@ -851,7 +856,7 @@ def setup_members_on(presence) # Hacky accessing a private method, but absent members are intentionally not exposed to any public APIs expect(presence_anonymous_client.members.send(:absent_members).length).to eql(1) - presence_anonymous_client.members.once(:in_sync) do + presence_anonymous_client.members.once(:sync_complete) do # Check that members count is exact indicating the members with LEAVE action after sync are removed expect(presence_anonymous_client).to be_sync_complete expect(presence_anonymous_client.members.length).to eql(enter_expected_count - 1) @@ -1007,7 +1012,7 @@ def setup_members_on(presence) channel_anonymous_client.attach do presence_anonymous_client.get(wait_for_sync: false) do |members| - expect(presence_anonymous_client.members).to_not be_in_sync + expect(presence_anonymous_client.members).to_not be_sync_complete expect(members.count).to eql(0) stop_reactor end @@ -1214,7 +1219,7 @@ def setup_members_on(presence) presence_client_one.subscribe(:enter) do presence_client_one.unsubscribe :enter EventMachine.add_timer(0.5) do - expect(presence_client_one.members).to be_in_sync + expect(presence_client_one.members).to be_sync_complete expect(presence_client_one.members.send(:members).count).to eql(1) presence_client_one.leave data end @@ -1592,14 +1597,16 @@ def setup_members_on(presence) end it 'fails if the connection is DETACHED (#RTP11b)' do - channel_client_one.attach do - channel_client_one.detach do - presence_client_one.get.tap do |deferrable| - deferrable.callback { raise 'Get should not succeed' } - deferrable.errback do |error| - expect(error).to be_a(Ably::Exceptions::InvalidState) - expect(error.message).to match(/Operation is not allowed when channel is in STATE.Detached/) - stop_reactor + client_one.connection.once :connected do + channel_client_one.attach do + channel_client_one.detach do + presence_client_one.get.tap do |deferrable| + deferrable.callback { raise 'Get should not succeed' } + deferrable.errback do |error| + expect(error).to be_a(Ably::Exceptions::InvalidState) + expect(error.message).to match(/Operation is not allowed when channel is in STATE.Detached/) + stop_reactor + end end end end @@ -1815,29 +1822,31 @@ def connect_members_deferrables let(:total_members) { members_per_client * 2 } it 'returns a complete list of members on all clients' do - members_per_client.times do |indx| - presence_client_one.enter_client("client_1:#{indx}") - presence_client_two.enter_client("client_2:#{indx}") - end + wait_until(lambda { client_one.connection.state == :connected and client_two.connection.state == :connected }) do + presence_client_one.subscribe(:enter) do + clients_entered[:client_one] += 1 + end - presence_client_one.subscribe(:enter) do - clients_entered[:client_one] += 1 - end + presence_client_two.subscribe(:enter) do + clients_entered[:client_two] += 1 + end - presence_client_two.subscribe(:enter) do - clients_entered[:client_two] += 1 - end + members_per_client.times do |indx| + presence_client_one.enter_client("client_1:#{indx}") + presence_client_two.enter_client("client_2:#{indx}") + end - wait_until(lambda { clients_entered[:client_one] + clients_entered[:client_two] == total_members * 2 }) do - presence_anonymous_client.get(wait_for_sync: true) do |anonymous_members| - expect(anonymous_members.count).to eq(total_members) - expect(anonymous_members.map(&:client_id).uniq.count).to eq(total_members) + wait_until(lambda { clients_entered[:client_one] + clients_entered[:client_two] == total_members * 2 }) do + presence_anonymous_client.get(wait_for_sync: true) do |anonymous_members| + expect(anonymous_members.count).to eq(total_members) + expect(anonymous_members.map(&:client_id).uniq.count).to eq(total_members) - presence_client_one.get(wait_for_sync: true) do |client_one_members| - presence_client_two.get(wait_for_sync: true) do |client_two_members| - expect(client_one_members.count).to eq(total_members) - expect(client_one_members.count).to eq(client_two_members.count) - stop_reactor + presence_client_one.get(wait_for_sync: true) do |client_one_members| + presence_client_two.get(wait_for_sync: true) do |client_two_members| + expect(client_one_members.count).to eq(total_members) + expect(client_one_members.count).to eq(client_two_members.count) + stop_reactor + end end end end @@ -2243,7 +2252,6 @@ def connect_members_deferrables action = Ably::Models::ProtocolMessage::ACTION.Sync sync_message = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 10, channel_serial: 'sequenceid:cursor', channel: channel_name, presence: presence_sync_1, @@ -2253,7 +2261,6 @@ def connect_members_deferrables sync_message = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 11, channel_serial: 'sequenceid:', # indicates SYNC is complete channel: channel_name, presence: presence_sync_2, @@ -2294,7 +2301,6 @@ def connect_members_deferrables action = Ably::Models::ProtocolMessage::ACTION.Sync sync_message = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 10, channel: channel_name, presence: presence_sync, timestamp: Time.now.to_i * 1000 @@ -2348,7 +2354,6 @@ def connect_members_deferrables action = Ably::Models::ProtocolMessage::ACTION.Sync sync_message = Ably::Models::ProtocolMessage.new( action: action, - connection_serial: 10, channel: channel_name, presence: presence_sync_protocol_message, timestamp: Time.now.to_i * 1000 @@ -2467,7 +2472,7 @@ def connect_members_deferrables end leave_message = Ably::Models::PresenceMessage.new( - 'id' => "#{client_two.connection.id}:#{presence_client_two.client_id}:1", + 'id' => "#{client_two.connection.id}:#{client_two.connection.send(:client_msg_serial)}:1", 'clientId' => presence_client_two.client_id, 'connectionId' => client_two.connection.id, 'timestamp' => as_since_epoch(Time.now), @@ -2532,37 +2537,59 @@ def cripple_websocket_transport let(:member_data) { random_str } it 'immediately resends all local presence members (#RTP5c2, #RTP19a)' do - in_sync_confirmed_no_local_members = false - local_member_leave_event_fired = false + member_leave_event_fired = false + local_members_sent = false + + presence_client_one.subscribe(:enter) do |entered_member| + expect(entered_member.action).to eq(Ably::Models::PresenceMessage::ACTION.Enter) + expect(entered_member.data).to eq(member_data) + expect(entered_member.client_id).to eq(client_one.auth.client_id) + expect(entered_member.id).to be_truthy + entered_member_id = entered_member.id - presence_client_one.enter(member_data) - presence_client_one.subscribe(:enter) do presence_client_one.unsubscribe :enter - presence_client_one.subscribe(:leave) do |message| - # The local member will leave the PresenceMap due to the ATTACHED without Presence - local_member_leave_event_fired = true + expect(presence_client_one.members.length).to eql(1) + expect(presence_client_one.members.local_members.length).to eql(1) + + # subscribe to outgoing messages to check for entered local members with id + client_one.connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + if protocol_message.action == :presence + protocol_message.presence.each do |local_member| + expect(local_member.id).to eq(entered_member_id) + expect(local_member.action).to eq(Ably::Models::PresenceMessage::ACTION.Enter) + expect(local_member.data).to eq(member_data) + expect(local_member.client_id).to eq(client_one.auth.client_id) + local_members_sent = true + end + end end - # Local members re-entered automatically appear as updates due to the - # fabricated ATTACHED message sent and the members already being present - presence_client_one.subscribe(:update) do |message| - expect(local_member_leave_event_fired).to be_truthy + presence_client_one.subscribe(:leave) do |message| + # Member will leave the PresenceMap due to the ATTACHED without Presence expect(message.data).to eq(member_data) expect(message.client_id).to eq(client_one.auth.client_id) - EventMachine.next_tick do - expect(presence_client_one.members.length).to eql(1) - expect(presence_client_one.members.local_members.length).to eql(1) - expect(in_sync_confirmed_no_local_members).to be_truthy - stop_reactor - end + member_leave_event_fired = true + end + + # Shouldn't receive enter/update message when local_members are entered + # This is due to the fact that, when we enter local member we also send + # member id, and server automatically checks for duplicate id and doesn't + # send presenceEnter or presenceUpdate if id is found. + presence_client_one.subscribe(:enter, :update) do |message| + raise { "client shouldn't receive update event for entered local members" } end - presence_client_one.members.once(:in_sync) do - # Immediately after SYNC (no sync actually occurred, but this event fires immediately after a channel SYNCs or is not expecting to SYNC) + presence_client_one.members.once(:sync_complete) do expect(presence_client_one.members.length).to eql(0) - expect(presence_client_one.members.local_members.length).to eql(0) - in_sync_confirmed_no_local_members = true + + # Since, this is a client sent event, local_members are not cleared + # local_members acts a source of truth for server and not vice versa + expect(presence_client_one.members.local_members.length).to eql(1) + + wait_until(lambda { member_leave_event_fired and local_members_sent}) do + stop_reactor + end end # ATTACHED ProtocolMessage with no presence flag will clear the presence set immediately, #RTP19a @@ -2572,6 +2599,8 @@ def cripple_websocket_transport flags: 0 # no resume or presence flag ) end + + presence_client_one.enter(member_data) end end end @@ -2684,23 +2713,25 @@ def cripple_websocket_transport context 'channel transitions to the DETACHED state' do it 'clears the PresenceMap and local member map copy and does not emit any presence events (#RTP5a)' do - presence_client_one.enter - presence_client_one.subscribe(:enter) do - presence_client_one.unsubscribe :enter + wait_until(lambda { client_one.connection.state == :connected and anonymous_client.connection.state == :connected }) do + presence_client_one.enter + presence_client_one.subscribe(:enter) do + presence_client_one.unsubscribe :enter - channel_anonymous_client.attach do - presence_anonymous_client.get do |members| - expect(members.count).to eq(1) + channel_anonymous_client.attach do + presence_anonymous_client.get do |members| + expect(members.count).to eq(1) - presence_anonymous_client.subscribe { raise 'No presence events should be emitted' } - channel_anonymous_client.detach do - expect(presence_anonymous_client.members.length).to eq(0) - expect(channel_anonymous_client).to be_detached + presence_anonymous_client.subscribe { raise 'No presence events should be emitted' } + channel_anonymous_client.detach do + expect(presence_anonymous_client.members.length).to eq(0) + expect(channel_anonymous_client).to be_detached - expect(presence_client_one.members.local_members.count).to eq(1) - channel_client_one.detach do - expect(presence_client_one.members.local_members.count).to eq(0) - stop_reactor + expect(presence_client_one.members.local_members.count).to eq(1) + channel_client_one.detach do + expect(presence_client_one.members.local_members.count).to eq(0) + stop_reactor + end end end end diff --git a/spec/acceptance/rest/client_spec.rb b/spec/acceptance/rest/client_spec.rb index db80bb65f..e1619c6ef 100644 --- a/spec/acceptance/rest/client_spec.rb +++ b/spec/acceptance/rest/client_spec.rb @@ -1095,9 +1095,16 @@ def encode64(text) end it 'sends a protocol version and lib version header (#G4, #RSC7a, #RSC7b)' do - client.channels.get('foo').publish("event") + response = client.channels.get('foo').publish("event") + expect(response).to eql true expect(publish_message_stub).to have_been_requested - expect(Ably::PROTOCOL_VERSION).to eql('1.2') + if agent.nil? + expect(publish_message_stub.to_s).to include("'Ably-Agent'=>'#{Ably::AGENT}'") + expect(publish_message_stub.to_s).to include("'X-Ably-Version'=>'2'") + else + expect(publish_message_stub.to_s).to include("'Ably-Agent'=>'ably-ruby/1.1.1 ruby/3.1.1'") + expect(publish_message_stub.to_s).to include("'X-Ably-Version'=>'2'") + end end end end diff --git a/spec/acceptance/rest/message_spec.rb b/spec/acceptance/rest/message_spec.rb index 4291d6aa6..49960226e 100644 --- a/spec/acceptance/rest/message_spec.rb +++ b/spec/acceptance/rest/message_spec.rb @@ -204,20 +204,17 @@ end end - specify 'idempotent publishing is disabled by default with <= 1.1 (#TO3n)' do - stub_const 'Ably::PROTOCOL_VERSION', '1.0' - client = Ably::Rest::Client.new(key: api_key, protocol: protocol) - expect(client.idempotent_rest_publishing).to be_falsey - stub_const 'Ably::PROTOCOL_VERSION', '1.1' - client = Ably::Rest::Client.new(key: api_key, protocol: protocol) + specify 'idempotent publishing is set as per clientOptions' do + # set idempotent_rest_publishing to false + client = Ably::Rest::Client.new(key: api_key, protocol: protocol, idempotent_rest_publishing: false) expect(client.idempotent_rest_publishing).to be_falsey - end - specify 'idempotent publishing is enabled by default with >= 1.2 (#TO3n)' do - stub_const 'Ably::PROTOCOL_VERSION', '1.2' - client = Ably::Rest::Client.new(key: api_key, protocol: protocol) + # set idempotent_rest_publishing to true + client = Ably::Rest::Client.new(key: api_key, protocol: protocol, idempotent_rest_publishing: true) expect(client.idempotent_rest_publishing).to be_truthy - stub_const 'Ably::PROTOCOL_VERSION', '1.3' + end + + specify 'idempotent publishing is enabled by default (#TO3n)' do client = Ably::Rest::Client.new(key: api_key, protocol: protocol) expect(client.idempotent_rest_publishing).to be_truthy end diff --git a/spec/shared/client_initializer_behaviour.rb b/spec/shared/client_initializer_behaviour.rb index 5ccfbeb22..84ff2df4c 100644 --- a/spec/shared/client_initializer_behaviour.rb +++ b/spec/shared/client_initializer_behaviour.rb @@ -130,7 +130,7 @@ def rest? end context 'with token' do - let(:client_options) { { token: 'token', auth_connect: false } } + let(:client_options) { { token: 'token', auto_connect: false } } it 'sets the token' do expect(subject.auth.current_token_details.token).to eql('token') diff --git a/spec/unit/models/protocol_message_spec.rb b/spec/unit/models/protocol_message_spec.rb index ddfea5733..b279b3c54 100644 --- a/spec/unit/models/protocol_message_spec.rb +++ b/spec/unit/models/protocol_message_spec.rb @@ -127,14 +127,6 @@ def new_protocol_message(options) end end - context '#connection_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "55") } - it 'converts :connection_serial to an Integer' do - expect(protocol_message.connection_serial).to be_a(Integer) - expect(protocol_message.connection_serial).to eql(55) - end - end - context '#flags (#TR4i)' do context 'when nil' do let(:protocol_message) { new_protocol_message({}) } @@ -241,76 +233,6 @@ def new_protocol_message(options) end end - context '#has_connection_serial?' do - context 'without connection_serial' do - let(:protocol_message) { new_protocol_message({}) } - - it 'returns false' do - expect(protocol_message.has_connection_serial?).to eql(false) - end - end - - context 'with connection_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "55") } - - it 'returns true' do - expect(protocol_message.has_connection_serial?).to eql(true) - end - end - end - - context '#serial' do - context 'with underlying msg_serial' do - let(:protocol_message) { new_protocol_message(msg_serial: "55") } - it 'converts :msg_serial to an Integer' do - expect(protocol_message.serial).to be_a(Integer) - expect(protocol_message.serial).to eql(55) - end - end - - context 'with underlying connection_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "55") } - it 'converts :connection_serial to an Integer' do - expect(protocol_message.serial).to be_a(Integer) - expect(protocol_message.serial).to eql(55) - end - end - - context 'with underlying connection_serial and msg_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "99", msg_serial: "11") } - it 'prefers connection_serial and converts :connection_serial to an Integer' do - expect(protocol_message.serial).to be_a(Integer) - expect(protocol_message.serial).to eql(99) - end - end - end - - context '#has_serial?' do - context 'without msg_serial or connection_serial' do - let(:protocol_message) { new_protocol_message({}) } - - it 'returns false' do - expect(protocol_message.has_serial?).to eql(false) - end - end - - context 'with msg_serial' do - let(:protocol_message) { new_protocol_message(msg_serial: "55") } - - it 'returns true' do - expect(protocol_message.has_serial?).to eql(true) - end - end - - context 'with connection_serial' do - let(:protocol_message) { new_protocol_message(connection_serial: "55") } - - it 'returns true' do - expect(protocol_message.has_serial?).to eql(true) - end - end - end - context '#error' do context 'with no error attribute' do let(:protocol_message) { new_protocol_message(action: 1) } diff --git a/spec/unit/realtime/channels_spec.rb b/spec/unit/realtime/channels_spec.rb index 9fbc70b6f..a415a7144 100644 --- a/spec/unit/realtime/channels_spec.rb +++ b/spec/unit/realtime/channels_spec.rb @@ -2,7 +2,7 @@ require 'spec_helper' describe Ably::Realtime::Channels do - let(:connection) { instance_double('Ably::Realtime::Connection', unsafe_on: true, on_resume: true) } + let(:connection) { instance_double('Ably::Realtime::Connection', unsafe_on: true) } let(:client) do instance_double('Ably::Realtime::Client', connection: connection, client_id: 'clientId', logger: double('logger').as_null_object) end diff --git a/spec/unit/realtime/connection_spec.rb b/spec/unit/realtime/connection_spec.rb index deac7432e..9c227af0a 100644 --- a/spec/unit/realtime/connection_spec.rb +++ b/spec/unit/realtime/connection_spec.rb @@ -34,36 +34,6 @@ it_behaves_like 'an incoming protocol message bus' it_behaves_like 'an outgoing protocol message bus' - describe 'connection resume callbacks', api_private: true do - let(:callbacks) { [] } - - describe '#trigger_resumed' do - it 'executes the callbacks' do - subject.on_resume { callbacks << true } - subject.trigger_resumed - expect(callbacks.count).to eql(1) - end - end - - describe '#on_resume' do - it 'registers a callback' do - subject.on_resume { callbacks << true } - subject.trigger_resumed - expect(callbacks.count).to eql(1) - end - end - - describe '#off_resume' do - it 'registers a callback' do - subject.on_resume { callbacks << true } - additional_proc = lambda { raise 'This should not be called' } - subject.off_resume(&additional_proc) - subject.trigger_resumed - expect(callbacks.count).to eql(1) - end - end - end - after(:all) do sleep 1 # let realtime library shut down any open clients end diff --git a/spec/unit/realtime/recovery_key_context_spec.rb b/spec/unit/realtime/recovery_key_context_spec.rb new file mode 100644 index 000000000..fd373a395 --- /dev/null +++ b/spec/unit/realtime/recovery_key_context_spec.rb @@ -0,0 +1,36 @@ +require 'spec_helper' +require 'ably/realtime/recovery_key_context' + +describe Ably::Realtime::RecoveryKeyContext do + + context 'connection recovery key' do + + it 'should encode recovery key - RTN16i, RTN16f, RTN16j' do + connection_key = 'key' + msg_serial = 123 + channel_serials = { + 'channel1' => 'serial1', + 'channel2' => 'serial2' + } + recovery_context = Ably::Realtime::RecoveryKeyContext.new(connection_key, msg_serial, channel_serials) + encoded_recovery_key = recovery_context.to_json + expect(encoded_recovery_key).to eq "{\"connection_key\":\"key\",\"msg_serial\":123," << + "\"channel_serials\":{\"channel1\":\"serial1\",\"channel2\":\"serial2\"}}" + end + + it 'should decode recovery key - RTN16i, RTN16f, RTN16j' do + encoded_recovery_key = "{\"connection_key\":\"key\",\"msg_serial\":123," << + "\"channel_serials\":{\"channel1\":\"serial1\",\"channel2\":\"serial2\"}}" + decoded_recovery_key = Ably::Realtime::RecoveryKeyContext.from_json(encoded_recovery_key) + expect(decoded_recovery_key.connection_key).to eq("key") + expect(decoded_recovery_key.msg_serial).to eq(123) + end + + it 'should return nil for invalid recovery key - RTN16i, RTN16f, RTN16j' do + encoded_recovery_key = "{\"invalid key\"}" + decoded_recovery_key = Ably::Realtime::RecoveryKeyContext.from_json(encoded_recovery_key) + expect(decoded_recovery_key).to be_nil + end + + end +end