-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
138 lines (110 loc) · 3.22 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
'use strict'
const RSMQ = require('rsmq')
/*
* Message Queue Wrapper -
* Manages connection status, sends messages to the queue and
* respects message signatures that should be deferred until
* a later time (specified in config)
*/
let QueueWrapper = function (options) {
this.options = options
// initialise queue
this.rsmq = this.initialiseQueue()
// initialise connection state
this.connected = null
this.rsmq.on('connect', () => {
this.connected = true
})
this.rsmq.on('disconnect', () => {
this.connected = false
})
}
// instantiate message queue
QueueWrapper.prototype.initialiseQueue = function () {
return new RSMQ({
host: this.options.host,
port: this.options.port
})
}
// public send function
QueueWrapper.prototype.send = function (address, data, done) {
const options = {
qname: this.options.name
}
if (typeof data === 'function') {
done = data
options.message = address
} else {
const serializedData = typeof data === 'object' ? JSON.stringify(data) : data
const encodedData = new Buffer(serializedData).toString('base64')
options.message = `${address}:[[${encodedData}]]`
}
const send = () => {
options.delay = this.getDelay(options.message)
this.rsmq.sendMessage(options, done)
}
function error () {
done(new Error('Queue server connection refused'))
}
this.request(send, error)
}
// do request, fail or wait
QueueWrapper.prototype.request = function (req, err) {
let rsmq = this.rsmq
if (this.connected === true) {
return req()
}
if (this.connected === false) {
return err()
}
function connect () {
req()
removeListeners()
}
function disconnect () {
err()
removeListeners()
}
function removeListeners () {
rsmq.removeListener('connect', connect)
rsmq.removeListener('disconnect', disconnect)
}
this.rsmq.on('connect', connect)
this.rsmq.on('disconnect', disconnect)
}
// determine message delay (or 0)
QueueWrapper.prototype.getDelay = function (message) {
return this.isDeferred(message)
? this.untilStart() / 1000
: 0
}
// is message signature in deferred list?
QueueWrapper.prototype.isDeferred = function (message) {
if (!this.options.deferred) return false
if (!Array.isArray(this.options.deferred.messages)) return false
return this.options.deferred.messages.some((value) => {
return message.startsWith(value)
})
}
// how long until deferred message window?
QueueWrapper.prototype.untilStart = function () {
const now = new Date()
const start = this.parseTime(this.options.deferred.start)
const stop = this.parseTime(this.options.deferred.stop)
if (now >= start) { // in or later than window
if (now < stop || stop < start) return 0 // in window or rollover
return 24 * 60 * 60 * 1000 - (now - start) // later than window
}
if (now < start) { // in or before window
if (now < stop && stop < start) return 0 // in window or rollover
return start - now // earlier than window
}
}
// parse time string from configured options
QueueWrapper.prototype.parseTime = function (string) {
const time = new Date()
const timeParts = string.split(':')
time.setUTCHours(timeParts[0], timeParts[1])
return time
}
module.exports = QueueWrapper