Skip to content

Commit

Permalink
Merge pull request #423 from ably/feature/integration-protocol-2
Browse files Browse the repository at this point in the history
[ECO-4058] Feature/Integration protocol 2
  • Loading branch information
sacOO7 authored Jul 5, 2024
2 parents cb684b4 + 3e1882e commit 1596829
Show file tree
Hide file tree
Showing 37 changed files with 1,095 additions and 1,082 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

_[Ably](https://ably.com) is the platform that powers synchronized digital experiences in realtime. Whether attending an event in a virtual venue, receiving realtime financial information, or monitoring live car performance data – consumers simply expect realtime digital experiences as standard. Ably provides a suite of APIs to build, extend, and deliver powerful digital experiences in realtime for more than 250 million devices across 80 countries each month. Organizations like Bloomberg, HubSpot, Verizon, and Hopin depend on Ably’s platform to offload the growing complexity of business-critical realtime data synchronization at global scale. For more information, see the [Ably documentation](https://ably.com/documentation)._

This is a Ruby client library for Ably. The library currently targets the [Ably 1.2 client library specification](https://ably.com/documentation/client-lib-development-guide/features/). You can see the complete list of features this client library supports in [our client library SDKs feature support matrix](https://ably.com/download/sdk-feature-support-matrix).
This is a Ruby client library for Ably. The library currently targets the [Ably 2.0.0 client library specification](https://ably.com/documentation/client-lib-development-guide/features/). You can see the complete list of features this client library supports in [our client library SDKs feature support matrix](https://ably.com/download/sdk-feature-support-matrix).

## Supported platforms

Expand Down
30 changes: 19 additions & 11 deletions lib/ably/auth.rb
Original file line number Diff line number Diff line change
Expand Up @@ -414,13 +414,20 @@ def auth_header
#
# @return [Hash] headers
def extra_auth_headers
if client_id && using_basic_auth?
{ 'X-Ably-ClientId' => Base64.urlsafe_encode64(client_id) }
if client_id_for_request
{ 'X-Ably-ClientId' => Base64.urlsafe_encode64(client_id_for_request) }
else
{}
end
end

# ClientId that needs to be included with every rest/realtime request
# spec - RSA7e
# @return string
def client_id_for_request
options[:client_id] if options[:client_id] && using_basic_auth?
end

# Auth params used in URI endpoint for Realtime connections
# Will reauthorize implicitly if required and capable
#
Expand Down Expand Up @@ -482,15 +489,16 @@ def can_assume_client_id?(assumed_client_id)
#
# @api private
def configure_client_id(new_client_id)
# If new client ID from Ably is a wildcard, but preconfigured clientId is set, then keep the existing clientId
if has_client_id? && new_client_id == '*'
@client_id_validated = true
return
end

# If client_id is defined and not a wildcard, prevent it changing, this is not supported
if client_id && client_id != '*' && new_client_id != client_id
raise Ably::Exceptions::IncompatibleClientId.new("Client ID is immutable once configured for a client. Client ID cannot be changed to '#{new_client_id}'")
if has_client_id?
# If new client ID from Ably is a wildcard, but preconfigured clientId is set, then keep the existing clientId
if new_client_id == "*"
@client_id_validated = true
return
end
# If client_id is defined and not a wildcard, prevent it changing, this is not supported
if new_client_id != client_id
raise Ably::Exceptions::IncompatibleClientId.new("Client ID is immutable once configured for a client. Client ID cannot be changed to '#{new_client_id}'")
end
end
@client_id_validated = true
@client_id = new_client_id
Expand Down
31 changes: 5 additions & 26 deletions lib/ably/models/protocol_message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ module Ably::Models
# @return [String] Contains a serial number for a message on the current channel
# @!attribute [r] connection_id
# @return [String] Contains a string private connection key used to recover this connection
# @!attribute [r] connection_serial
# @return [Bignum] Contains a serial number for a message sent from the server to the client
# @!attribute [r] message_serial
# @return [Bignum] Contains a serial number for a message sent from the client to the server
# @!attribute [r] timestamp
Expand Down Expand Up @@ -129,12 +127,6 @@ def message_serial
raise TypeError, "msg_serial '#{attributes[:msg_serial]}' is invalid, a positive Integer is expected for a ProtocolMessage"
end

def connection_serial
Integer(attributes[:connection_serial])
rescue TypeError
raise TypeError, "connection_serial '#{attributes[:connection_serial]}' is invalid, a positive Integer is expected for a ProtocolMessage"
end

def count
[1, attributes[:count].to_i].max
end
Expand All @@ -146,26 +138,12 @@ def has_message_serial?
false
end

# @api private
def has_connection_serial?
connection_serial && true
def has_channel_serial?
channel_serial && true
rescue TypeError
false
end

def serial
if has_connection_serial?
connection_serial
else
message_serial
end
end

# @api private
def has_serial?
has_connection_serial? || has_message_serial?
end

def messages
@messages ||=
Array(attributes[:messages]).map do |message|
Expand Down Expand Up @@ -271,7 +249,7 @@ def attributes
# Return a JSON ready object from the underlying #attributes using Ably naming conventions for keys
def as_json(*args)
raise TypeError, ':action is missing, cannot generate a valid Hash for ProtocolMessage' unless action
raise TypeError, ':msg_serial or :connection_serial is missing, cannot generate a valid Hash for ProtocolMessage' if ack_required? && !has_serial?
raise TypeError, ':msg_serial is missing, cannot generate a valid Hash for ProtocolMessage' if ack_required? && !has_message_serial?

attributes.dup.tap do |hash_object|
hash_object['action'] = action.to_i
Expand All @@ -296,11 +274,12 @@ def to_s
end

# True if the ProtocolMessage appears to be invalid, however this is not a guarantee
# Used for validating incoming protocol messages, so no need to add unnecessary checks
# @return [Boolean]
# @api private
def invalid?
action_enum = action rescue nil
!action_enum || (ack_required? && !has_serial?)
!action_enum
end

# @!attribute [r] logger
Expand Down
4 changes: 2 additions & 2 deletions lib/ably/modules/safe_deferrable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def errback(&block)
end
end

# Mark the Deferrable as succeeded and trigger all callbacks.
# Mark the Deferrable as succeeded and trigger all success callbacks.
# See http://www.rubydoc.info/gems/eventmachine/1.0.7/EventMachine/Deferrable#succeed-instance_method
#
# @return [void]
Expand All @@ -48,7 +48,7 @@ def succeed(*args)
super(*args)
end

# Mark the Deferrable as failed and trigger all callbacks.
# Mark the Deferrable as failed and trigger all error callbacks.
# See http://www.rubydoc.info/gems/eventmachine/1.0.7/EventMachine/Deferrable#fail-instance_method
#
# @return [void]
Expand Down
2 changes: 1 addition & 1 deletion lib/ably/modules/state_emitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module Ably::Modules
# the instance variable @state is used exclusively, the {Enum} STATE is defined prior to inclusion of this
# module, and the class is an {EventEmitter}. It then emits state changes.
#
# It also ensures the EventEmitter is configured to retrict permitted events to the
# It also ensures the EventEmitter is configured to restrict permitted events to the
# the available STATEs or EVENTs if defined i.e. if EVENTs includes an additional type such as
# :update, then it will support all EVENTs being emitted. EVENTs must be a superset of STATEs
#
Expand Down
4 changes: 4 additions & 0 deletions lib/ably/realtime/auth.rb
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ def auth_header_sync
auth_sync.auth_header
end

def client_id_for_request_sync
auth_sync.client_id_for_request
end

# Auth params used in URI endpoint for Realtime connections
# Will reauthorize implicitly if required and capable
#
Expand Down
6 changes: 3 additions & 3 deletions lib/ably/realtime/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Channel
#
# @spec RTL2b
#
# The permited states for this channel
# The permitted states for this channel
STATE = ruby_enum('STATE',
:initialized,
:attaching,
Expand Down Expand Up @@ -364,8 +364,8 @@ def __incoming_msgbus__
# @return [Ably::Models::ChannelOptions]
def set_options(channel_options)
@options = Ably::Models::ChannelOptions(channel_options)

manager.request_reattach if need_reattach?
# RTL4i
manager.request_reattach if (need_reattach? and connection.state?(:connected))
end
alias options= set_options

Expand Down
99 changes: 51 additions & 48 deletions lib/ably/realtime/channel/channel_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def initialize(channel, connection)
def attach
if can_transition_to?(:attached)
connect_if_connection_initialized
send_attach_protocol_message
send_attach_protocol_message if connection.state?(:connected) # RTL4i
end
end

Expand All @@ -34,9 +34,9 @@ def detach(error, previous_state)
# Channel is attached, notify presence if sync is expected
def attached(attached_protocol_message)
# If no attached ProtocolMessage then this attached request was triggered by the client
# library, such as returning to attached whne detach has failed
# library, such as returning to attached when detach has failed
if attached_protocol_message
update_presence_sync_state_following_attached attached_protocol_message
channel.presence.manager.on_attach attached_protocol_message.has_presence_flag?
channel.properties.set_attach_serial(attached_protocol_message.channel_serial)
channel.options.set_modes_from_flags(attached_protocol_message.flags)
channel.options.set_params(attached_protocol_message.params)
Expand All @@ -49,17 +49,16 @@ def log_channel_error(error)
end

# Request channel to be reattached by sending an attach protocol message
# @param [Hash] options
# @option options [Ably::Models::ErrorInfo] :reason
def request_reattach(options = {})
reason = options[:reason]
send_attach_protocol_message
logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" }
# @param [Ably::Models::ErrorInfo] reason
def request_reattach(reason = nil)
channel.set_channel_error_reason(reason) if reason
channel.transition_state_machine! :attaching, reason: reason unless channel.attaching?
send_attach_protocol_message
logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" }
end

def duplicate_attached_received(protocol_message)
logger.debug { "Server initiated ATTACHED message received for channel '#{channel.name}' with state #{channel.state}" }
if protocol_message.error
channel.set_channel_error_reason protocol_message.error
log_channel_error protocol_message.error
Expand All @@ -68,17 +67,15 @@ def duplicate_attached_received(protocol_message)
channel.properties.set_attach_serial(protocol_message.channel_serial)
channel.options.set_modes_from_flags(protocol_message.flags)

if protocol_message.has_channel_resumed_flag?
logger.debug { "ChannelManager: Additional resumed ATTACHED message received for #{channel.state} channel '#{channel.name}'" }
else
unless protocol_message.has_channel_resumed_flag?
channel.emit :update, Ably::Models::ChannelStateChange.new(
current: channel.state,
previous: channel.state,
event: Ably::Realtime::Channel::EVENT(:update),
reason: protocol_message.error,
resumed: false,
)
update_presence_sync_state_following_attached protocol_message
channel.presence.manager.on_attach protocol_message.has_presence_flag?
end
end

Expand Down Expand Up @@ -170,6 +167,12 @@ def start_attach_from_suspended_timer
end
end

# RTL13c
def notify_state_change
@pending_state_change_timer.cancel if @pending_state_change_timer
@pending_state_change_timer = nil
end

private
attr_reader :pending_state_change_timer

Expand Down Expand Up @@ -209,56 +212,56 @@ def send_attach_protocol_message
message_options[:flags] = message_options[:flags].to_i | Ably::Models::ProtocolMessage::ATTACH_FLAGS_MAPPING[:resume]
end

send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach, :suspended, message_options
end

def send_detach_protocol_message(previous_state)
send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Detach, previous_state # return to previous state if failed
end
message_options[:channelSerial] = channel.properties.channel_serial # RTL4c1

def send_state_change_protocol_message(new_state, state_if_failed, message_options = {})
state_at_time_of_request = channel.state
attach_action = Ably::Models::ProtocolMessage::ACTION.Attach
# RTL4f
@pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do
if channel.state == state_at_time_of_request
error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{new_state} operation failed (timed out)")
channel.transition_state_machine state_if_failed, reason: error
error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{attach_action} operation failed (timed out)")
channel.transition_state_machine :suspended, reason: error # return to suspended state if failed
end
end
# Shouldn't queue attach message as per RTL4i, so message is added top of the queue
# to be sent immediately while processing next message
connection.send_protocol_message_immediately(
action: attach_action.to_i,
channel: channel.name,
**message_options.to_h
)
end

channel.once_state_changed do
@pending_state_change_timer.cancel if @pending_state_change_timer
@pending_state_change_timer = nil
def send_detach_protocol_message(previous_state)
state_at_time_of_request = channel.state
detach_action = Ably::Models::ProtocolMessage::ACTION.Detach

@pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do
if channel.state == state_at_time_of_request
error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{detach_action} operation failed (timed out)")
channel.transition_state_machine previous_state, reason: error # return to previous state if failed
end
end

resend_if_disconnected_and_connected = lambda do
on_disconnected_and_connected = lambda do
connection.unsafe_once(:disconnected) do
next unless pending_state_change_timer
connection.unsafe_once(:connected) do
next unless pending_state_change_timer
connection.send_protocol_message(
action: new_state.to_i,
channel: channel.name,
**message_options.to_h
)
resend_if_disconnected_and_connected.call
end
yield if pending_state_change_timer
end if pending_state_change_timer
end
end
resend_if_disconnected_and_connected.call

connection.send_protocol_message(
action: new_state.to_i,
channel: channel.name,
**message_options.to_h
)
end

def update_presence_sync_state_following_attached(attached_protocol_message)
if attached_protocol_message.has_presence_flag?
channel.presence.manager.sync_expected
else
channel.presence.manager.sync_not_expected
send_detach_message = lambda do
on_disconnected_and_connected.call do
send_detach_message.call
end
connection.send_protocol_message(
action: detach_action.to_i,
channel: channel.name
)
end

send_detach_message.call
end

def logger
Expand Down
9 changes: 9 additions & 0 deletions lib/ably/realtime/channel/channel_properties.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ class ChannelProperties
#
attr_reader :attach_serial

# ChannelSerial contains the channelSerial from latest ProtocolMessage of action type
# Message/PresenceMessage received on the channel.
#
# @spec CP2b, RTL15b
#
# @return [String]
#
attr_accessor :channel_serial

def initialize(channel)
@channel = channel
end
Expand Down
2 changes: 2 additions & 0 deletions lib/ably/realtime/channel/channel_state_machine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ChannelStateMachine
transition :from => :failed, :to => [:attaching, :initialized]

after_transition do |channel, transition|
channel.manager.notify_state_change # RTL13c
channel.synchronize_state_with_statemachine
end

Expand All @@ -55,6 +56,7 @@ class ChannelStateMachine
end

after_transition(to: [:detached, :failed, :suspended]) do |channel, current_transition|
channel.properties.channel_serial = nil # RTP5a1
err = error_from_state_change(current_transition)
channel.manager.fail_queued_messages(err) if channel.failed? or channel.suspended? #RTL11
channel.manager.log_channel_error err if err
Expand Down
Loading

0 comments on commit 1596829

Please sign in to comment.