Version: | 0.5.6 |
---|---|
Download: | http://pypi.python.org/pypi/haigha |
Source: | https://github.com/agoragames/haigha |
This document describes Haigha, a client for AMQP servers. AMQP is a messaging protocol which can be used to route large volumes of data across a wide network of application servers. The document covers the design, implementation and usage of Haigha to support fast, event-driven Python applications using AMQP. It should provide sufficient specifications for an engineer to integrate haigha into their applications.
This document is divided into chapters according to the layout of the AMQP 0.9.1 specification [PDF].
- Overview Read this for a general introduction
- Architecture The architecture of haigha code base
- Functional Specifications How applications work with haigha
- Technical Specifications How haigha transport layer is implemented
TODO Write conventions on common terms and usage
This section describes the semantics of haigha to integrate with AMQP servers
The AMQP protocol divides the tasks of message routing and delivery between two distinct objects:
- Exchange, to which messages are written
- Queue, to which messages are routed and stored for consumption by clients
To connect an exchange and a queue, a binding is defined. When a message is published to an exchange, a route is supplied which is compared against the binding to determine delivery.
To manage the stateful connection to a broker for both the publishing and consuming of messages, the following entities are defined:
- Connection The authenticated socket connection between a client and broker on a specific vhost
- Channel TODO how to even describe this
The message queue is the final destination of any published message, and it is the location from which a client will consume messages. Each queue with a binding to an exchange for which a message was published with a matching routing key will receive a copy of a message [1].
Haigha implements queue declaration and deletion, in the QueueClass.
The exchange accepts messages from applications. There are several different exchange types, the standard ones defined in the specification and possibly some additional ones supplied by your broker. The common types of exchanges are:
- direct The routing key and binding key must exactly match
- topic The routing key must match the pattern defined by the binding key
- fanout All queues will receive a copy of the message.
Haigha implements exchange declaration and deletion in the ExchangeClass.
After an exchange and a queue have been declared, one or more bindings can be defined between them. It is possible for a single queue to be bound to multiple exchanges, or a shared queue can be used to distribute messages among a pool of consumers.
Haigha implements bindings in the QueueClass and consumers in the BasicClass.
Shared queues are the standard point-to-point queue, useful for distributing messages among consumers. It assumes a Connection is initiated to connection
and that the user has the method application_consumer
defined to receive messages.
ch = connection.channel() ch.exchange.declare('an_exchange', 'direct') ch.queue.declare('a_queue') ch.queue.bind('a_queue', 'an_exchange', routing_key='route') ch.basic.consume('a_queue', application_consumer)
Handling replies, or receiving consumer-targetted messages, is a common use case for creating exclusive queues for a process. In this example, we'll let the broker assign the queue name and use callbacks to set up a consumer after the server has replied.
ch = connection.channel() ch.exchange.declare('reply', 'direct') ch.queue.declare(exclusive=True,cb=lambda queue,messages,consumers: \ ch.queue.bind(queue, 'reply', route=queue)
By convention, we'll now use a reply-to
header in our messages when this consumer requests data from another consumer, so that the reply can be routed using the appropriate binding key.
Topic routing forms the basis of pub-sub models. When combined with a shared queue semantics, it allows for AMQP to be used as a powerful routing engine across a large pool of varied applications.
ch = connection.channel() ch.exchange.declare('pub', 'topic') ch.queue.declare('stock.usd') ch.queue.bind('stock.usd', 'pub', routing_key='stock.usd.*')
This section describes how haigha talks to the broker.
The AMQP protocol divides its commands among classes of functionality. The ProtocolClass defines the base class for each of these, with each class of functionality defined in a subclass such as QueueClass, ExchangeClass, etc, for each of the AMQP protocol classes [basic, channel, exchange, queue, transaction]
. These are exposed in the Channel as properties as shown in the examples above.
The protocol also separates commands between synchronous and asynchronous. In all cases[#]_, if an operation is (optionally) synchronous it will support a cb=
keyword argument. Many methods support both synchronous and asynchronous behavior; haigha always defaults to asynchronous operation when available through the nowait=True
keyword argument, and automatically switches to synchronous mode if an application callback is supplied.
Commands are further identified as originating from the client, server or either. As haigha is a client library, it only supports those commands which can be initiated by the client. With the exception of publishing, these commands are available soley in the respective ProtocolClass to which the command belongs. For convenience, the Channel exposes two publishing methods, publish
and publish_synchronous
, as well as open
and close
. All methods of a ProtocolClass which handle server-originated messages are named beginning with the string _recv_
.
The mapping of classes and commands has already been described via the ProtocolClass and its implementations. Each method is responsible for constructing the frame(s) necessary to implement the command, and the user should never have to worry about constructing frames by hand.
The Connection class manages the state of the AMQP connection. The life-cycle is:
- User creates a new Connection object, setting the configuration through keyword params (TODO document).
- A ConnectionStrategy is created and a blocking TCP connection is initiated to the broker.
- After a socket connection is created, it is set to non-blocking mode.
- The Connection sends a protocol header defining specification 0.9.1.
- The ConnectionChannel, id
0
, receives thestart
command and replies withstart-ok
login credentials. - If authorized, the server responds with the
secure
command, to which ConnectionChannel responds withopen
. If not authorized, the socket is immediately closed. - The server responds with
open-ok
and any pending frames are flushed. - At any time, the client or server may send or reply with
tune
ortune-ok
respectively to negotiate frame size or channel limits. - The connection is available for the application.
- The server sends a
close
command, or client sends it by callingconnection.close
. - Peer acknowledges with
close-ok
and sock is disconnected.
The Connection class manages the state of the socket connection and the negotiation with the broker. It is also responsible for maintaining a buffer of both input and output frames. The output buffer is used during the initialization of the connection, so that it can be used immediately by the application.
connection = Connection() channel = connection.channel()
In this example, the channel will be negotiated immediately following the receipt of the open-ok
command in the ConnectionChannel.
AMQP multiplexes frames across channels. The Channel class implements the stateful behavior of channels, and writes frames back to the Connection on which it was created. The life-cycle is:
- User creates a Channel by calling
connection.channel
. The channel is enumerated, and references to existing channels can be fetched by id. - The Channel initializes all supported protocol classes and internal buffers.
- The channel immediate sends the
open
command. - The server responds with
open-ok
. - The channel is available for the application.
- The server sends a
close
command, or the client sends it by callingchannel.close
. - Peer acknowledges with
close-ok
and the channel is closed. All future use will raise aChannelClosed
exception.
The AMQP protocol isolates all synchronous and asynchronous transactions per channel. The Channel class implements this behavior by maintaining a buffer of pending outbound frames. If the buffer is empty, a frame is immediately forwarded to the Connection, else it's appended to the end. When a synchronous method is called by the user, after all frames have been sent or queued, a callback is appended to the buffer.
When a command is received from the broker, the dispatch will find the appropriate haigha method and if that method is at the front of the buffer, will pop it off. All remaining frames are then flushed until the buffer is empty, or the first item is another pending synchronous callback. This solution implements a very lightweight system for reliably managing multiple outstanding synchronous calls in an asynchronous dispatch loop. The user is free to interact with AMQP without worrying about whether a method is synchronous or not [2].
When receiving frames, the Connection first queues frames to each channel via channel.buffer_frame()
. It then iterates over all channels for which a frame was queued and calls channel.process_frames()
. In most cases, an AMQP command is isolated to one frame, but in the case of messages, the content may be split across multiple frames. In the situation where not all content frames have been received yet, the BasicClass will raise a ProtocolClass.FrameUnderflow
exception and re-buffer any message frames on the channel. When the next frame arrives for the channel, the process will repeat, until all frames have arrived and the message is complete.
The ExchangeClass is used to declare and delete exchanges.
All methods of ExchangeClass are optionally synchronous and can callback to user code.
TODO say something more
The QueueClass is used to declare, delete, bind and purge queues.
All methods of QueueClass are optionally or permanently synchronous and can callback to user code.
TODO say something more
The BasicClass is used to publish messages, manage consumers, handle message delivery, acknolwedge receipts, and synchronously fetch messages.
TODO say something more
The TransactionClass is used to setup and use server-side transaction isolation. The life-cycle is:
- User calls
channel.transaction.select()
to sendselect
command to the server. - Server replies with
select-ok
and the channel is permanently in transaction mode. - The application publishes or acknowledges messages.
- The application commits or rolls-back the publish or acknowledge commands through
channel.transaction.commit()
orchannel.transaction.rollback()
.
All methos of the TransactionClass are synchronous and can callback to application code.
This section describes how haigha implements the wire-level protocol.
AMQP is a frame-oriented protocol and haigha is designed around this in every respect.
The Connection class implements an EventSocket callback which will call connection._read_frames()
. It will take the current buffer on the socket, place it in a Reader object, and pass that to the read_frames()
method of the Frame class. The reader acts as both a stream object, with methods such as seek()
and tell()
, as well as an implementation of the basic data types in AMQP.
For each frame read, the connection will queue the frame on to the channel specified in the frame, for later processing. If the input buffer contains a partial frame, a Reader.BufferUnderflow
exception will be raised and Frame.read_frames()
will exit, leaving the reader positioned at the end of the last full frame (or beginning of the buffer). The connection will re-buffer any pending data on the socket and wait for the next callback to attempt to read frames from the byte stream.
To send frames, each command implemented by a ProtocolClass will construct a Writer object which is used to format the arguments for that command. It then constructs a subclass of Frame, usually a MethodFrame, and writes that to the channel to which the protocol class is bound.
AMQP defines several data types which form the basis of all frames. One of these data types, tables (i.e. dicts), supports the basic types in addition to a few others. There is disagreement on official versus supported types in tables, as well as subtle differences in the encoding of some types. Haigha is written to conform to the errata implemented in RabbitMQ.
The implementation of the data types is in both the Reader and Writer_ classes. When converting from Python to AMQP data types when serializing tables, the Writer assumes that all floats are double-precision, converts unicode to utf8 strings, and intelligently packs integers according to their required byte-width.
AMQP defines two classes of exceptions for error handling. Operational errors, such as invalid queue names, will close a channel. Structural errors, such as invalid or out-of-order frames, will result in a connection closure.
Because haigha is asynchronous, handlers must be defined to receive notification when a connection or channel are closed [3]. The closed state will be saved on the respective connection or channel, and accessible via the close_info
property. This will always return a dictionary with the following fields defined:
- reply_code The 3 digit error code
- reply_text The text of the error message
- class_id The class id of the offending command
- method_id The method id of the offending command
When closing due to an error on the client side, these same parameters can be supplied to connection.close()
and channel.close()
.
Haigha's client architecture closely matches AMQP's recommended abstraction layers.
The framing layer is shared across a number of different classes.
- Connection Manages input byte buffer, calls into frame reader, and writes frames to the socket
- Frame Implements frame reading, calls into frame implementations for further decoding, subclasses implement
write_frame()
method - Channel Implements input frame buffer, dispatch to protocol classes, and interfaces for sending frames
The connection management is handled primarily by the Connection class. The AMQP specification suggests that this layer may also be responsible for sending content, but that is handled in the frame buffering implementation of Channel and the specific implementation of BasicClass.
The primary API of haigha are the methods exposed through the subclasses of ProtocolClass and which are made available in the afore-mentioned per-channel properties that map to the classes of AMQP protocol messages, [basic, channel, exchange, queue, transaction]
. Additional APIs of which the user should be aware:
- Connection Exposes
channel()
andclose()
- Channel Exposes
close()
,publish()
andpublish_synchronous()
- ChannelPool Transaction-based publishing for guaranteed delivery and high-throughput
TODO Document other features that the client implements.
Any application must of course first connect to a broker. The Connection class implements 3 methods that any developer will need to be aware of to use the majority of its features.
__init__
Connection constructor, also establishes the socket connectionchannel
Creates or fetches a Channelread_frames
Read any pending frames, process them and write frames that result from the processing
In addition, as of version 0.5.2, haigha supports the extensions exposed by RabbitMQ in the haigha.connections.RabbitConnection
class. The interface is identical to the Connection class but its channels expose additional methods to match RabbitMQ's protocol. Briefly, these extensions are:
exchange.declare
Acceptsauto_delete
andinternal
keyword argumentsexchange.bind
Supports exchange to exchange bindingsexchange.unbind
To remove an exchange to exchange bindingbasic.set_ack_listener
Local method to set a callback on publisher confirm ackbasic.set_nack_listener
Local method to set a callback on publisher confirm nackbasic.publish
Returns the message id when publisher confirms are enabledbasic.nack
Send a nack to the broker when rejecting a messageconfirm.select
Enable publisher confirms
The constructor takes many keyword arguments that will affect its behavior.
debug
DefaultFalse
. IfTrue
, basic logging. If 2, verbose logging of frames.logger
Defaultlogging.root
. A logging instance.user
Default "guest". The AMQP user to authenticate as.password
Default "guest". The password of the AMQP user.host
Default "localhost".port
Default 5672.vhost
Default "/".connect_timeout
Default 5 seconds. Time before socket connection fails.sock_opts
Default None. Recommend at least{(socket.IPPROTO_TCP, socket.TCP_NODELAY) : 1}
hearbeat
Default None (disabled). If 0, broker assigned. If >0, negotiated with broker.open_cb
Default None. A no-arg method to be called after connection fully negotiated and pending frames written.close_cb
Default None. A no-arg method to be called when connection closes due to protocol handshake or transport closure.login_method
Defaults to "AMQPLAIN".locale
Defaults to "en_US".client_properties
A hash of properties to send in addition to{ 'library' : ..., 'library_version' : ... }
class_map
Defaults to None. Optionally override the default mapping of AMQPclass_id
to the haigha ProtocolClass that implements the AMQP class.transport
Defaults to "socket". If a string, maps ["socket","gevent","gevent_pool","event"] toSocketTransport
,GeventTransport
,GeventPoolTransport
orEventTransport
respectively. If aTransport
object, uses it directly.
Messages are created with the Message class and sent via one of several publishing methods.
channel.basic.publish
The "standard" publish which is the publish command exposed by the BasicClass.channel.publish
A convenience method that aliasesbasic.publish
.channel.publish_synchronous
A wrapper aroundtransaction.select
,basic.publish
,transaction.commit
. A callback argument will be called when the server acknowledgescommit
.channelpool.publish
Publish using a pool of transaction-isolated channels. Will create a new channel if none are free. A callback argument will be called when the server acknowledges transaction commit.
The preferred mechanism for reading messages from an AMQP queue is to register a consumer via basic.consume
call. This will register a Python function to be called each time the client receives a message from a queue.
Footnotes
[1] | Your broker may support other types of exchanges, such as a deliver-once exchange. |
[2] | All synchronous methods will support callbacks by 0.4.0. |
[3] | Synchronous methods have more overhead, so some awareness and caution is recommended. |
[4] | Channel close callbacks will be supported by 0.4.0. |