forked from strongloop/strong-pubsub-redis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
137 lines (117 loc) · 4.37 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// Copyright IBM Corp. 2015. All Rights Reserved.
// Node module: strong-pubsub-redis
// This file is licensed under the MIT License.
// License text available at https://opensource.org/licenses/MIT
module.exports = Adapter;
var redis = require("redis")
var EventEmitter = require('events').EventEmitter;
var inherits = require('util').inherits;
var debug = require('debug')('strong-pubsub:redis');
var defaults = require('lodash').defaults;
function noop() {};
/**
* The **Redis** `Adapter`.
*
* @class
*/
function Adapter(client) {
var adapter = this;
this.client = client;
var options = this.options = client.options;
}
inherits(Adapter, EventEmitter);
Adapter.prototype.connect = function(cb) {
var adapter = this;
var client = this.client;
var options = this.options;
var pubClient;
var subClient;
if(typeof this.options.createClient === 'function'){
pubClient = this.redisPubClient = this.options.createClient();
subClient = this.redisSubClient = this.options.createClient();
} else {
pubClient = this.redisPubClient = redis.createClient(
this.options.port,
this.options.host,
this.options.redis
);
subClient = this.redisSubClient = redis.createClient(
this.options.port,
this.options.host,
this.options.redis
);
}
var connacks = 0;
var clients = this.clients = new EventEmitter();
subClient.once('connect', onConnect);
pubClient.once('connect', onConnect);
pubClient.on('error', clients.emit.bind(clients, 'error'));
subClient.on('error', clients.emit.bind(clients, 'error'));
function onConnect() {
connacks++;
console.log('firstConnect');
if(connacks === 2) {
console.log(pubClient.publish, subClient.end, subClient.subscribe)
clients.emit('connect');
}
}
this.clients.once('connect', function() {
adapter.clients.removeListener('error', cb);
cb();
});
this.clients.once('error', cb);
subClient.on('message', function(topic, message, options) {
client.emit('message', topic, message, options);
});
}
Adapter.prototype.end = function(cb) {
this.pubClient.end();
this.subClient.end();
}
/**
* Publish a `message` to the specified `topic`.
*
* @param {String} topic The topic to publish to.
* @param {String|Buffer} message The message to publish to the topic.
* @param {Object} [options] Additional options that are not required for publishing a message.
* @param {Number} [options.qos] **default: `0`** The **MQTT** QoS (Quality of Service) setting.
* @param {Boolean} [options.retain] **default: `false`** The `MQTT` retain setting. Whether or not the message should be retained.
*
* **Supported Values**
*
* - `0` - Just as reliable as TCP. Adapter will not get any missed messages (while it was disconnected).
* - `1` - Adapter receives missed messages at least once and sometimes more than once.
* - `2` - Adapter receives missed messages only once.
*
* @callback {Function} callback Called once the adapter has successfully finished publishing the message.
* @param {Error} err An error object is included if an error was supplied by the adapter.
*/
Adapter.prototype.publish = function(topic, message, options, cb) {
this.redisPubClient.publish(topic, message, cb);
}
/**
* Subscribe to the specified `topic` or **topic pattern**.
*
* @param {String} topic The topic to subscribe to.
* @param {Object} options The MQTT specific options.
* @param {Object} options.qos See `publish()` for `options.qos`.
*
* @callback {Function} callback Called once the adapter has finished subscribing.
* @param {Error} err An error object is included if an error was supplied by the adapter.
* @param {Object[]} granted An array of topics granted formatted as an object `{topic: 't', qos: n}`.
* @param {String} granted[n].topic The topic granted
* @param {String} granted[n].qos The qos for the topic
*/
Adapter.prototype.subscribe = function(topic, options, cb) {
this.redisSubClient.subscribe(topic, cb);
}
/**
* Unsubscribe from the specified `topic` or **topic pattern**.
*
* @param {String} topic The topic or **topic pattern** to unsubscribe.
* @callback {Function} callback Called once the adapter has finished unsubscribing.
* @param {Error} err An error object is included if an error was supplied by the adapter.
*/
Adapter.prototype.unsubscribe = function(topic, cb) {
this.redisSubClient.unsubscribe(topic, cb);
}