forked from omniti-labs/Net--RabbitMQ
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRabbitMQ.pm
346 lines (233 loc) · 9.85 KB
/
RabbitMQ.pm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
package Net::RabbitMQ;
require DynaLoader;
use strict;
use vars qw($VERSION @ISA);
$VERSION = "0.2.5";
@ISA = qw/DynaLoader/;
bootstrap Net::RabbitMQ $VERSION ;
use Scalar::Util qw(blessed);
=head1 NAME
Net::RabbitMQ - interact with RabbitMQ over AMQP using librabbitmq
=head1 SYNOPSIS
use Net::RabbitMQ;
my $mq = Net::RabbitMQ->new();
$mq->connect("localhost", { user => "guest", password => "guest" });
$mq->channel_open(1);
$mq->publish(1, "queuename", "Hi there!");
$mq->disconnect();
=head1 DESCRIPTION
C<Net::RabbitMQ> provides a simple wrapper around the librabbitmq library
that allows connecting, delcaring exchanges and queues, binding and unbinding
queues, publising, consuming and receiving events.
Error handling in this module is primarily achieve by Perl_croak (die). You
should be making good use of eval around these methods to ensure that you
appropriately catch the errors.
=head2 Methods
All methods, unless specifically stated, return nothing on success
and die on failure.
=over 4
=item new()
Creates a new Net::RabbitMQ object.
=item connect( $hostname, $options )
C<$hostname> is the host to which a connection will be attempted.
C<$options> is an optional hash respecting the following keys:
{
user => $user, #default 'guest'
password => $password, #default 'guest'
port => $port, #default 5672
vhost => $vhost, #default '/'
channel_max => $cmax, #default 0
frame_max => $fmax, #default 131072
heartbeat => $hearbeat, #default 0
timeout => $seconds #default undef (no timeout)
}
=item disconnect()
Causes the connection to RabbitMQ to be torn down.
=item channel_open($channel)
C<$channel> is a positive integer describing the channel you which to open.
=item channel_close($channel)
C<$channel> is a positive integer describing the channel you which to close.
=item get_channel_max()
Returns the maximum allowed channel number.
=item exchange_declare($channel, $exchange, $options)
C<$channel> is a channel that has been opened with C<channel_open>.
C<$exchange> is the name of the exchange to be instantiated.
C<$options> is an optional hash respecting the following keys:
{
exchange_type => $type, #default 'direct'
passive => $boolean, #default 0
durable => $boolean, #default 0
auto_delete => $boolean, #default 1
}
=item exchange_delete($channel, $exchange, $options)
C<$channel> is a channel that has been opened with C<channel_open>.
C<$exchange> is the name of the exchange to be deleted.
C<$options> is an optional hash respecting the following keys:
{
if_unused => $boolean, #default 1
nowait => $boolean, #default 0
}
=item queue_declare($channel, $queuename, $options)
C<$channel> is a channel that has been opened with C<channel_open>.
C<$queuename> is the name of the queuename to be instantiated. If
C<$queuename> is undef or an empty string, then an auto generated
queuename will be used.
C<$options> is an optional hash respecting the following keys:
{
passive => $boolean, #default 0
durable => $boolean, #default 0
exclusive => $boolean, #default 0
auto_delete => $boolean, #default 1
}
In scalar context, this method returns the queuename delcared
(important for retrieving the autogenerated queuename in the
event that one was requested).
In array context, this method returns three items: queuename,
the number of message waiting on the queue, and the number
of consumers bound to the queue.
=item queue_bind($channel, $queuename, $exchange, $routing_key)
C<$channel> is a channel that has been opened with C<channel_open>.
C<$queuename> is a previously declared queue, C<$exchange> is a
previously declared exchange, and C<$routing_key> is the routing
key that will bind the specified queue to the specified exchange.
=item queue_unbind($channel, $queuename, $exchange, $routing_key)
This is like the C<queue_bind> with respect to arguments. This command
unbinds the queue from the exchange.
=item publish($channel, $routing_key, $body, $options, $props)
C<$channel> is a channel that has been opened with C<channel_open>.
C<$routing_key> is the name of the routing key for this message.
C<$body> is the payload to enqueue.
C<$options> is an optional hash respecting the following keys:
{
exchange => $exchange, #default 'amq.direct'
mandatory => $boolean, #default 0
immediate => $boolean, #default 0
}
C<$props> is an optional hash (the AMQP 'props') respecting the following keys:
{
content_type => $string,
content_encoding => $string,
correlation_id => $string,
reply_to => $string,
expiration => $string,
message_id => $string,
type => $string,
user_id => $string,
app_id => $string,
delivery_mode => $integer,
priority => $integer,
timestamp => $integer,
headers => $href
}
=item consume($channel, $queuename, $options)
C<$channel> is a channel that has been opened with C<channel_open>.
C<$queuename> is the name of the queue from which we'd like to consume.
C<$options> is an optional hash respecting the following keys:
{
consumer_tag => $tag, #absent by default
no_local => $boolean, #default 0
no_ack => $boolean, #default 1
exclusive => $boolean, #default 0
}
The consumer_tag is returned. This command does B<not> return AMQP
frames, it simply notifies RabbitMQ that messages for this queue should
be delivered down the specified channel.
=item recv()
This command receives and reconstructs AMQP frames and returns a hash
containing the following information:
{
body => 'Magic Transient Payload', # the reconstructed body
routing_key => 'nr_test_q', # route the message took
exchange => 'nr_test_x', # exchange used
delivery_tag => 1, # (used for acks)
consumer_tag => 'c_tag', # tag from consume()
props => $props, # hashref sent in
}
C<$props> is the hash sent by publish() respecting the following keys:
{
content_type => $string,
content_encoding => $string,
correlation_id => $string,
reply_to => $string,
expiration => $string,
message_id => $string,
type => $string,
user_id => $string,
app_id => $string,
delivery_mode => $integer,
priority => $integer,
timestamp => $integer,
}
=item get($channel, $queuename, $options)
C<$channel> is a channel that has been opened with C<channel_open>.
C<$queuename> is the name of the queue from which we'd like to consume.
C<$options> is an optional hash respecting the following keys:
This command runs an amqp_basic_get which returns undef immediately
if no messages are available on the queue and returns a has as follows
if a message is available.
{
body => 'Magic Transient Payload', # the reconstructed body
routing_key => 'nr_test_q', # route the message took
exchange => 'nr_test_x', # exchange used
content_type => 'foo', # (only if specified)
delivery_tag => 1, # (used for acks)
redelivered => 0, # if message is redelivered
message_count => 0, # message count
}
=item ack($channel, $delivery_tag, $multiple = 0)
C<$channel> is a channel that has been opened with C<channel_open>.
C<$delivery_tag> the delivery tag seen from a returned frame from the
C<recv> method.
C<$multiple> specifies if multiple are to be acknowledged at once.
=item purge($channel, $queuename, $no_wait = 0)
C<$channel> is a channel that has been opened with C<channel_open>.
C<$queuename> is the queue to be purged.
C<$no_wait> a boolean specifying if the call should not wait for
the server to acknowledge the acknoledgement.
=item tx_select($channel)
C<$channel> is a channel that has been opened with C<channel_open>.
Start a server-side (tx) transaction over $channel.
=item tx_commit($channel)
C<$channel> is a channel that has been opened with C<channel_open>.
Commit a server-side (tx) transaction over $channel.
=item tx_rollback($channel)
C<$channel> is a channel that has been opened with C<channel_open>.
Rollback a server-side (tx) transaction over $channel.
=item basic_qos($channel, $options)
C<$channel> is a channel that has been opened with C<channel_open>.
C<$options> is an optional hash respecting the following keys:
{
prefetch_count => $cnt, #default 0
prefetch_size => $size, #default 0
global => $bool, #default 0
}
Set quality of service flags on the current $channel.
=item hearbeat()
Send a hearbeat frame. If you've connected with a heartbeat parameter,
you must send a heartbeat periodically matching connection parameter or
the server may snip the connection.
=item basic_return($subroutine)
C<$subroutine> is a perl coderef that takes two arguments:
$channel is the channel on which the message is returned.
$m the message which is a hash ref containing reply_code,
reply_text, exchange, and routing_key.
=item channel_close_cb($subroutine)
C<$subroutine> is a perl coderef that takes two arguments:
$channel is the channel on which the message is returned.
$m the message which is a hash ref containing reply_code,
reply_text, class_id, and method_id.
=back
=cut
sub publish {
my ($self, $channel, $routing_key, $body, $options, $props) = @_;
$options ||= {};
$props ||= {};
# Do a shallow clone to avoid modifying variable passed by caller
$props = { %$props };
# Convert blessed variables in headers to strings
if( $props->{headers} ) {
$props->{headers} = { map { blessed($_) ? "$_" : $_ } %{ $props->{headers} } };
}
$self->_publish($channel, $routing_key, $body, $options, $props);
}
1;