From 8aa1b7c7710eddd8856b9507173e321c3e38e958 Mon Sep 17 00:00:00 2001 From: Jan Romann Date: Sat, 25 Sep 2021 12:30:00 +0200 Subject: [PATCH] refactor: replace prototypes with classes (#273) * refactor(agent): convert prototype to class * refactor(cache): convert prototype to class * refactor(incoming_message): convert prototype to class * refactor(observe_read_stream): convert prototype to class * refactor(observe_write_stream): convert prototype to class * refactor(server): convert prototypes to classes * refactor(segmentation): convert prototype to class * refactor(retry_send): convert prototype to class * refactor(outgoing_message): convert prototype to class --- index.js | 4 +- lib/agent.js | 763 +++++++++++++++--------------- lib/cache.js | 100 ++-- lib/incoming_message.js | 39 +- lib/observe_read_stream.js | 71 ++- lib/observe_write_stream.js | 108 ++--- lib/outgoing_message.js | 136 +++--- lib/retry_send.js | 108 +++-- lib/segmentation.js | 150 +++--- lib/server.js | 904 ++++++++++++++++++------------------ test/agent.js | 4 +- test/retry_send.js | 4 +- test/segmentation.js | 4 +- 13 files changed, 1193 insertions(+), 1202 deletions(-) diff --git a/index.js b/index.js index 21f3c19a..01350f2d 100644 --- a/index.js +++ b/index.js @@ -40,7 +40,9 @@ module.exports.request = function (url) { return agent.request(url) } -module.exports.createServer = Server +module.exports.createServer = function (options, listener) { + return new Server(options, listener) +} module.exports.Agent = Agent module.exports.globalAgent = globalAgent diff --git a/lib/agent.js b/lib/agent.js index 06d95b9d..e74916c0 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -8,7 +8,6 @@ * See the included LICENSE file for more details. */ -const util = require('util') const crypto = require('crypto') const events = require('events') const dgram = require('dgram') @@ -28,501 +27,499 @@ const pf = require('./polyfill') const SegmentedTransmission = require('./segmentation').SegmentedTransmission const parseBlockOption = require('./block').parseBlockOption -function Agent (opts) { - if (!(this instanceof Agent)) { - return new Agent(opts) - } - - if (!opts) { - opts = {} - } +class Agent extends events.EventEmitter { + constructor (opts) { + super() - if (!opts.type) { - opts.type = 'udp4' - } - - if (opts.socket) { - opts.type = opts.socket.type - delete opts.port - } - - this._opts = opts + if (!opts) { + opts = {} + } - this._init(opts.socket) -} + if (!opts.type) { + opts.type = 'udp4' + } -util.inherits(Agent, events.EventEmitter) + if (opts.socket) { + opts.type = opts.socket.type + delete opts.port + } -Agent.prototype._init = function initSock (socket) { - this._closing = false + this._opts = opts - if (this._sock) { - return + this._init(opts.socket) } - const that = this - this._sock = socket || dgram.createSocket(this._opts.type) - this._sock.on('message', function (msg, rsinfo) { - let packet - try { - packet = parse(msg) - } catch (err) { - return - } + _init (socket) { + this._closing = false - if (packet.code[0] === '0' && packet.code !== '0.00') { - // ignore this packet since it's not a response. + if (this._sock) { return } - const outSocket = that._sock.address() - that._handle(msg, rsinfo, outSocket) - }) + const that = this + this._sock = socket || dgram.createSocket(this._opts.type) + this._sock.on('message', function (msg, rsinfo) { + let packet + try { + packet = parse(msg) + } catch (err) { + return + } - if (this._opts.port) { - this._sock.bind(this._opts.port) - } + if (packet.code[0] === '0' && packet.code !== '0.00') { + // ignore this packet since it's not a response. + return + } - this._sock.on('error', function (err) { - that.emit('error', err) - }) + const outSocket = that._sock.address() + that._handle(msg, rsinfo, outSocket) + }) - this._msgIdToReq = {} - this._tkToReq = {} - this._tkToMulticastResAddr = {} + if (this._opts.port) { + this._sock.bind(this._opts.port) + } - this._lastToken = Math.floor(Math.random() * (maxToken - 1)) - this._lastMessageId = Math.floor(Math.random() * (maxMessageId - 1)) + this._sock.on('error', function (err) { + that.emit('error', err) + }) - this._msgInFlight = 0 - this._requests = 0 -} + this._msgIdToReq = {} + this._tkToReq = {} + this._tkToMulticastResAddr = {} -Agent.prototype._cleanUp = function cleanUp () { - if (--this._requests !== 0) { - return - } + this._lastToken = Math.floor(Math.random() * (maxToken - 1)) + this._lastMessageId = Math.floor(Math.random() * (maxMessageId - 1)) - if (!this._opts.socket) { - this._closing = true + this._msgInFlight = 0 + this._requests = 0 } - if (this._msgInFlight !== 0) { - return - } + _cleanUp () { + if (--this._requests !== 0) { + return + } - this._doClose() -} + if (!this._opts.socket) { + this._closing = true + } -Agent.prototype._doClose = function () { - for (const k in this._msgIdToReq) { - this._msgIdToReq[k].sender.reset() - } + if (this._msgInFlight !== 0) { + return + } - if (this._opts.socket) { - return + this._doClose() } - this._sock.close() - this._sock = null -} - -Agent.prototype._handle = function handle (msg, rsinfo, outSocket) { - const packet = parse(msg) - let buf - let response - const that = this - let req = this._msgIdToReq[packet.messageId] - const ackSent = function (err) { - if (err && req) { - req.emit('error', err) + _doClose () { + for (const k in this._msgIdToReq) { + this._msgIdToReq[k].sender.reset() } - that._msgInFlight-- - if (that._closing && that._msgInFlight === 0) { - that._doClose() + if (this._opts.socket) { + return } + + this._sock.close() + this._sock = null } - if (!req) { - if (packet.token.length > 0) { - req = this._tkToReq[packet.token.toString('hex')] + + _handle (msg, rsinfo, outSocket) { + const packet = parse(msg) + let buf + let response + const that = this + let req = this._msgIdToReq[packet.messageId] + const ackSent = function (err) { + if (err && req) { + req.emit('error', err) + } + + that._msgInFlight-- + if (that._closing && that._msgInFlight === 0) { + that._doClose() + } } + if (!req) { + if (packet.token.length > 0) { + req = this._tkToReq[packet.token.toString('hex')] + } - if ((packet.ack || packet.reset) && !req) { - // Nothing to do on unknown or duplicate ACK/RST packet - return + if ((packet.ack || packet.reset) && !req) { + // Nothing to do on unknown or duplicate ACK/RST packet + return + } + + if (!req) { + buf = generate({ + code: '0.00', + reset: true, + messageId: packet.messageId + }) + + this._msgInFlight++ + this._sock.send(buf, 0, buf.length, rsinfo.port, rsinfo.address, ackSent) + return + } } - if (!req) { + if (packet.confirmable) { buf = generate({ code: '0.00', - reset: true, + ack: true, messageId: packet.messageId }) this._msgInFlight++ this._sock.send(buf, 0, buf.length, rsinfo.port, rsinfo.address, ackSent) - return } - } - - if (packet.confirmable) { - buf = generate({ - code: '0.00', - ack: true, - messageId: packet.messageId - }) - this._msgInFlight++ - this._sock.send(buf, 0, buf.length, rsinfo.port, rsinfo.address, ackSent) - } - - if (packet.code !== '0.00' && (req._packet.token.length !== packet.token.length || pf.compareBuffers(req._packet.token, packet.token) !== 0)) { - // The tokens don't match, ignore the message since it is a malformed response - return - } + if (packet.code !== '0.00' && (req._packet.token.length !== packet.token.length || pf.compareBuffers(req._packet.token, packet.token) !== 0)) { + // The tokens don't match, ignore the message since it is a malformed response + return + } - const block1Buff = getOption(packet.options, 'Block1') - let block1 - if (block1Buff) { - block1 = parseBlockOption(block1Buff) - // check for error - if (!block1) { - req.sender.reset() - return req.emit('error', new Error('Failed to parse block1')) + const block1Buff = getOption(packet.options, 'Block1') + let block1 + if (block1Buff) { + block1 = parseBlockOption(block1Buff) + // check for error + if (!block1) { + req.sender.reset() + return req.emit('error', new Error('Failed to parse block1')) + } } - } - req.sender.reset() - - if (block1 && packet.ack) { - // var initialRequest = this._msgIdToReq[packet.messageId]; - // If the client takes too long to respond then the retry sender will send - // another packet with the previous messageId, which we've already removed. - const segmentedSender = req.segmentedSender - if (segmentedSender) { - // If there's more to send/receive, then carry on! - if (segmentedSender.remaining() > 0) { - if (segmentedSender.isCorrectACK(packet, block1)) { - delete this._msgIdToReq[req._packet.messageId] - req._packet.messageId = that._nextMessageId() - this._msgIdToReq[req._packet.messageId] = req - segmentedSender.receiveACK(packet, block1) + req.sender.reset() + + if (block1 && packet.ack) { + // var initialRequest = this._msgIdToReq[packet.messageId]; + // If the client takes too long to respond then the retry sender will send + // another packet with the previous messageId, which we've already removed. + const segmentedSender = req.segmentedSender + if (segmentedSender) { + // If there's more to send/receive, then carry on! + if (segmentedSender.remaining() > 0) { + if (segmentedSender.isCorrectACK(packet, block1)) { + delete this._msgIdToReq[req._packet.messageId] + req._packet.messageId = that._nextMessageId() + this._msgIdToReq[req._packet.messageId] = req + segmentedSender.receiveACK(packet, block1) + } else { + segmentedSender.resendPreviousPacket() + } + return } else { - segmentedSender.resendPreviousPacket() + // console.log("Packet received done"); + removeOption(req._packet.options, 'Block1') + delete req.segmentedSender } - return - } else { - // console.log("Packet received done"); - removeOption(req._packet.options, 'Block1') - delete req.segmentedSender } } - } - - if (!packet.confirmable && !req.multicast) { - delete this._msgIdToReq[packet.messageId] - } - // Drop empty messages (ACKs), but process RST - if (packet.code === '0.00' && !packet.reset) { - return - } + if (!packet.confirmable && !req.multicast) { + delete this._msgIdToReq[packet.messageId] + } - const block2Buff = getOption(packet.options, 'Block2') - let block2 - // if we got blockwise (2) response - if (block2Buff) { - block2 = parseBlock2(block2Buff) - // check for error - if (!block2) { - req.sender.reset() - return req.emit('error', new Error('failed to parse block2')) + // Drop empty messages (ACKs), but process RST + if (packet.code === '0.00' && !packet.reset) { + return } - } - if (block2) { - if (req.multicast) { - req = this._convertMulticastToUnicastRequest(req, rsinfo) - if (!req) { - return + + const block2Buff = getOption(packet.options, 'Block2') + let block2 + // if we got blockwise (2) response + if (block2Buff) { + block2 = parseBlock2(block2Buff) + // check for error + if (!block2) { + req.sender.reset() + return req.emit('error', new Error('failed to parse block2')) } } + if (block2) { + if (req.multicast) { + req = this._convertMulticastToUnicastRequest(req, rsinfo) + if (!req) { + return + } + } - // accumulate payload - req._totalPayload = Buffer.concat([req._totalPayload, packet.payload]) + // accumulate payload + req._totalPayload = Buffer.concat([req._totalPayload, packet.payload]) + + if (block2.moreBlock2) { + // increase message id for next request + delete this._msgIdToReq[req._packet.messageId] + req._packet.messageId = that._nextMessageId() + this._msgIdToReq[req._packet.messageId] = req + + // next block2 request + const block2Val = createBlock2({ + moreBlock2: false, + num: block2.num + 1, + size: block2.size + }) + if (!block2Val) { + req.sender.reset() + return req.emit('error', new Error('failed to create block2')) + } + req.setOption('Block2', block2Val) + req._packet.payload = null + req.sender.send(generate(req._packet)) - if (block2.moreBlock2) { - // increase message id for next request - delete this._msgIdToReq[req._packet.messageId] - req._packet.messageId = that._nextMessageId() - this._msgIdToReq[req._packet.messageId] = req + return + } else { + // get full payload + packet.payload = req._totalPayload + // clear the payload incase of block2 + req._totalPayload = Buffer.alloc(0) + } + } - // next block2 request - const block2Val = createBlock2({ - moreBlock2: false, - num: block2.num + 1, - size: block2.size - }) - if (!block2Val) { - req.sender.reset() - return req.emit('error', new Error('failed to create block2')) + if (req.response) { + if (req.response.append) { + // it is an observe request + // and we are already streaming + return req.response.append(packet) + } else { + // TODO There is a previous response but is not an ObserveStream ! + return } - req.setOption('Block2', block2Val) - req._packet.payload = null - req.sender.send(generate(req._packet)) + } else if (block2) { + delete that._tkToReq[req._packet.token.toString('hex')] + } else if (!req.url.observe && !req.multicast) { + // it is not, so delete the token + delete that._tkToReq[packet.token.toString('hex')] + } - return + if (req.url.observe && packet.code !== '4.04') { + response = new ObserveStream(packet, rsinfo, outSocket) + response.on('close', function () { + delete that._tkToReq[packet.token.toString('hex')] + that._cleanUp() + }) + response.on('deregister', function () { + const deregisterUrl = Object.assign({}, req.url) + deregisterUrl.observe = 1 + deregisterUrl.token = req._packet.token + + const deregisterReq = that.request(deregisterUrl) + // If the request fails, we'll deal with it with a RST message anyway. + deregisterReq.on('error', function () {}) + deregisterReq.end() + }) } else { - // get full payload - packet.payload = req._totalPayload - // clear the payload incase of block2 - req._totalPayload = Buffer.alloc(0) + response = new IncomingMessage(packet, rsinfo, outSocket) } - } - if (req.response) { - if (req.response.append) { - // it is an observe request - // and we are already streaming - return req.response.append(packet) - } else { - // TODO There is a previous response but is not an ObserveStream ! - return + if (!req.multicast) { + req.response = response } - } else if (block2) { - delete that._tkToReq[req._packet.token.toString('hex')] - } else if (!req.url.observe && !req.multicast) { - // it is not, so delete the token - delete that._tkToReq[packet.token.toString('hex')] - } - if (req.url.observe && packet.code !== '4.04') { - response = new ObserveStream(packet, rsinfo, outSocket) - response.on('close', function () { - delete that._tkToReq[packet.token.toString('hex')] - that._cleanUp() - }) - response.on('deregister', function () { - const deregisterUrl = Object.assign({}, req.url) - deregisterUrl.observe = 1 - deregisterUrl.token = req._packet.token - - const deregisterReq = that.request(deregisterUrl) - // If the request fails, we'll deal with it with a RST message anyway. - deregisterReq.on('error', function () {}) - deregisterReq.end() - }) - } else { - response = new IncomingMessage(packet, rsinfo, outSocket) + req.emit('response', response) } - if (!req.multicast) { - req.response = response - } + _nextToken () { + const buf = Buffer.alloc(8) - req.emit('response', response) -} + if (++this._lastToken === maxToken) { + this._lastToken = 0 + } -Agent.prototype._nextToken = function nextToken () { - const buf = Buffer.alloc(8) + buf.writeUInt32BE(this._lastToken, 0) + crypto.randomBytes(4).copy(buf, 4) - if (++this._lastToken === maxToken) { - this._lastToken = 0 + return buf } - buf.writeUInt32BE(this._lastToken, 0) - crypto.randomBytes(4).copy(buf, 4) - - return buf -} + _nextMessageId () { + if (++this._lastMessageId === maxMessageId) { + this._lastMessageId = 1 + } -Agent.prototype._nextMessageId = function nextToken () { - if (++this._lastMessageId === maxMessageId) { - this._lastMessageId = 1 + return this._lastMessageId } - return this._lastMessageId -} - -/** - * Entry point for a new client-side request. - * @param {*} url A String representing a CoAP URL, or an object with the appropriate parameters. - */ -Agent.prototype.request = function request (url) { - this._init() + /** + * Entry point for a new client-side request. + * @param {*} url A String representing a CoAP URL, or an object with the appropriate parameters. + */ + request (url) { + this._init() - const options = url.options || url.headers - let option - const that = this - const multicastTimeout = url.multicastTimeout !== undefined ? parseInt(url.multicastTimeout) : 20000 + const options = url.options || url.headers + let option + const that = this + const multicastTimeout = url.multicastTimeout !== undefined ? parseInt(url.multicastTimeout) : 20000 - const req = new OutgoingMessage({}, function (req, packet) { - let buf + const req = new OutgoingMessage({}, function (req, packet) { + let buf - if (url.confirmable !== false) { - packet.confirmable = true - } + if (url.confirmable !== false) { + packet.confirmable = true + } - // multicast message should be forced non-confirmable - if (url.multicast === true) { - req.multicast = true - packet.confirmable = false - } + // multicast message should be forced non-confirmable + if (url.multicast === true) { + req.multicast = true + packet.confirmable = false + } - let token - if (!(packet.ack || packet.reset)) { - packet.messageId = that._nextMessageId() - if ((url.token instanceof Buffer) && (url.token.length > 0)) { - if (url.token.length > 8) { - return req.emit('error', new Error('Token may be no longer than 8 bytes.')) + let token + if (!(packet.ack || packet.reset)) { + packet.messageId = that._nextMessageId() + if ((url.token instanceof Buffer) && (url.token.length > 0)) { + if (url.token.length > 8) { + return req.emit('error', new Error('Token may be no longer than 8 bytes.')) + } + packet.token = url.token + } else { + packet.token = that._nextToken() } - packet.token = url.token - } else { - packet.token = that._nextToken() - } - token = packet.token.toString('hex') - that._tkToMulticastResAddr[token] = [] - if (req.multicast) { + token = packet.token.toString('hex') that._tkToMulticastResAddr[token] = [] + if (req.multicast) { + that._tkToMulticastResAddr[token] = [] + } } - } - that._msgIdToReq[packet.messageId] = req - if (token) { - that._tkToReq[token] = req - } + that._msgIdToReq[packet.messageId] = req + if (token) { + that._tkToReq[token] = req + } - const block1Buff = getOption(packet.options, 'Block1') - if (block1Buff) { - // Setup for a segmented transmission - req.segmentedSender = new SegmentedTransmission(block1Buff[0], req, packet) - req.segmentedSender.sendNext() - } else { - try { - buf = generate(packet) - } catch (err) { - req.sender.reset() - return req.emit('error', err) + const block1Buff = getOption(packet.options, 'Block1') + if (block1Buff) { + // Setup for a segmented transmission + req.segmentedSender = new SegmentedTransmission(block1Buff[0], req, packet) + req.segmentedSender.sendNext() + } else { + try { + buf = generate(packet) + } catch (err) { + req.sender.reset() + return req.emit('error', err) + } + req.sender.send(buf, !packet.confirmable) } - req.sender.send(buf, !packet.confirmable) - } - }) + }) - req.sender = new RetrySend(this._sock, url.port, url.hostname || url.host, url.retrySend) + req.sender = new RetrySend(this._sock, url.port, url.hostname || url.host, url.retrySend) - req.url = url + req.url = url - req.statusCode = url.method || 'GET' + req.statusCode = url.method || 'GET' - urlPropertyToPacketOption(url, req, 'pathname', 'Uri-Path', '/') - urlPropertyToPacketOption(url, req, 'query', 'Uri-Query', '&') + this.urlPropertyToPacketOption(url, req, 'pathname', 'Uri-Path', '/') + this.urlPropertyToPacketOption(url, req, 'query', 'Uri-Query', '&') - if (options) { - for (option in options) { - if (Object.prototype.hasOwnProperty.call(options, option)) { - req.setOption(option, options[option]) + if (options) { + for (option in options) { + if (Object.prototype.hasOwnProperty.call(options, option)) { + req.setOption(option, options[option]) + } } } - } - if (url.proxyUri) { - req.setOption('Proxy-Uri', url.proxyUri) - } + if (url.proxyUri) { + req.setOption('Proxy-Uri', url.proxyUri) + } - req.sender.on('error', req.emit.bind(req, 'error')) + req.sender.on('error', req.emit.bind(req, 'error')) - req.sender.on('sending', function () { - that._msgInFlight++ - }) + req.sender.on('sending', function () { + that._msgInFlight++ + }) - req.sender.on('timeout', function (err) { - req.emit('timeout', err) - that.abort(req) - }) + req.sender.on('timeout', function (err) { + req.emit('timeout', err) + that.abort(req) + }) - req.sender.on('sent', function () { - if (req.multicast) return + req.sender.on('sent', function () { + if (req.multicast) return - that._msgInFlight-- - if (that._closing && that._msgInFlight === 0) { - that._doClose() - } - }) - - // Start multicast monitoring timer in case of multicast request - if (url.multicast === true) { - req.multicastTimer = setTimeout(function () { - if (req._packet.token) { - const token = req._packet.token.toString('hex') - delete that._tkToReq[token] - delete that._tkToMulticastResAddr[token] - } - delete that._msgIdToReq[req._packet.messageId] that._msgInFlight-- - if (that._msgInFlight === 0 && that._closing) { + if (that._closing && that._msgInFlight === 0) { that._doClose() } - }, multicastTimeout) - } + }) - if (typeof (url.observe) === 'number') { - req.setOption('Observe', url.observe) - } else if (url.observe) { - req.setOption('Observe', null) - } else { - req.on('response', this._cleanUp.bind(this)) - } + // Start multicast monitoring timer in case of multicast request + if (url.multicast === true) { + req.multicastTimer = setTimeout(function () { + if (req._packet.token) { + const token = req._packet.token.toString('hex') + delete that._tkToReq[token] + delete that._tkToMulticastResAddr[token] + } + delete that._msgIdToReq[req._packet.messageId] + that._msgInFlight-- + if (that._msgInFlight === 0 && that._closing) { + that._doClose() + } + }, multicastTimeout) + } - this._requests++ + if (typeof (url.observe) === 'number') { + req.setOption('Observe', url.observe) + } else if (url.observe) { + req.setOption('Observe', null) + } else { + req.on('response', this._cleanUp.bind(this)) + } - req._totalPayload = Buffer.alloc(0) + this._requests++ - return req -} + req._totalPayload = Buffer.alloc(0) -Agent.prototype.abort = function (req) { - req.sender.removeAllListeners() - req.sender.reset() - this._cleanUp() - delete this._msgIdToReq[req._packet.messageId] - if (req._packet.token) { - delete this._tkToReq[req._packet.token.toString('hex')] + return req } -} -function urlPropertyToPacketOption (url, req, property, option, separator) { - if (url[property]) { - req.setOption(option, url[property].normalize('NFC').split(separator) - .filter(function (part) { return part !== '' }) - .map(function (part) { - const buf = Buffer.alloc(Buffer.byteLength(part)) - buf.write(part) - return buf - })) + abort (req) { + req.sender.removeAllListeners() + req.sender.reset() + this._cleanUp() + delete this._msgIdToReq[req._packet.messageId] + if (req._packet.token) { + delete this._tkToReq[req._packet.token.toString('hex')] + } } -} -Agent.prototype._convertMulticastToUnicastRequest = function (req, rsinfo) { - const unicastReq = this.request(req.url) - const unicastAddress = rsinfo.address.split('%')[0] - const token = req._packet.token.toString('hex') - if (this._tkToMulticastResAddr[token].includes(unicastAddress)) { - return null + urlPropertyToPacketOption (url, req, property, option, separator) { + if (url[property]) { + req.setOption(option, url[property].normalize('NFC').split(separator) + .filter(function (part) { return part !== '' }) + .map(function (part) { + const buf = Buffer.alloc(Buffer.byteLength(part)) + buf.write(part) + return buf + })) + } } - unicastReq.url.host = unicastAddress - unicastReq.sender._host = unicastAddress - clearTimeout(unicastReq.multicastTimer) - unicastReq.url.multicast = false - req.eventNames().forEach(eventName => { - req.listeners(eventName).forEach(listener => { - unicastReq.on(eventName, listener) + _convertMulticastToUnicastRequest (req, rsinfo) { + const unicastReq = this.request(req.url) + const unicastAddress = rsinfo.address.split('%')[0] + const token = req._packet.token.toString('hex') + if (this._tkToMulticastResAddr[token].includes(unicastAddress)) { + return null + } + + unicastReq.url.host = unicastAddress + unicastReq.sender._host = unicastAddress + clearTimeout(unicastReq.multicastTimer) + unicastReq.url.multicast = false + req.eventNames().forEach(eventName => { + req.listeners(eventName).forEach(listener => { + unicastReq.on(eventName, listener) + }) }) - }) - this._tkToMulticastResAddr[token].push(unicastAddress) - unicastReq._packet.token = this._nextToken() - this._requests++ - return unicastReq + this._tkToMulticastResAddr[token].push(unicastAddress) + unicastReq._packet.token = this._nextToken() + this._requests++ + return unicastReq + } } module.exports = Agent diff --git a/lib/cache.js b/lib/cache.js index c41ab212..3b8efdb0 100644 --- a/lib/cache.js +++ b/lib/cache.js @@ -20,64 +20,66 @@ function expiry (cache, k) { * @param {number} retentionPeriod * @param {()=>T} factory Function which returns new cache objects */ -function BlockCache (retentionPeriod, factory) { - /** @type {{[k:string]:{payload:T,timeoutId:NodeJS.Timeout}}} */ - this._cache = {} - this._retentionPeriod = retentionPeriod - debug('Created cache with ' + (this._retentionPeriod / 1000) + 's retention period') - this._factory = factory -} - -BlockCache.prototype.reset = function () { - for (const k in this._cache) { - debug('clean-up cache expiry timer, key:', k) - clearTimeout(this._cache[k].timeoutId) - delete this._cache[k] +class BlockCache { + constructor (retentionPeriod, factory) { + /** @type {{[k:string]:{payload:T,timeoutId:NodeJS.Timeout}}} */ + this._cache = {} + this._retentionPeriod = retentionPeriod + debug('Created cache with ' + (this._retentionPeriod / 1000) + 's retention period') + this._factory = factory } -} -/** - * @param {string} key - * @param {T} payload - */ -BlockCache.prototype.add = function (key, payload) { - if (Object.prototype.hasOwnProperty.call(this._cache, key)) { - debug('reuse old cache entry, key:', key) - clearTimeout(this._cache[key].timeoutId) - this._cache[key].payload = payload - } else { - debug('add payload to cache, key:', key) - this._cache[key] = { payload: payload } + reset () { + for (const k in this._cache) { + debug('clean-up cache expiry timer, key:', k) + clearTimeout(this._cache[k].timeoutId) + delete this._cache[k] + } } - // setup new expiry timer - this._cache[key].timeoutId = setTimeout(expiry, this._retentionPeriod, this._cache, key) -} -BlockCache.prototype.remove = function (key) { - if (Object.prototype.hasOwnProperty.call(this._cache, key)) { - debug('remove cache entry, key:', key) - clearTimeout(this._cache[key].timeoutId) - delete this._cache[key] - return true + /** + * @param {string} key + * @param {T} payload + */ + add (key, payload) { + if (Object.prototype.hasOwnProperty.call(this._cache, key)) { + debug('reuse old cache entry, key:', key) + clearTimeout(this._cache[key].timeoutId) + this._cache[key].payload = payload + } else { + debug('add payload to cache, key:', key) + this._cache[key] = { payload: payload } + } + // setup new expiry timer + this._cache[key].timeoutId = setTimeout(expiry, this._retentionPeriod, this._cache, key) } - return false -} -BlockCache.prototype.contains = function (key) { - return Object.prototype.hasOwnProperty.call(this._cache, key) -} + remove (key) { + if (Object.prototype.hasOwnProperty.call(this._cache, key)) { + debug('remove cache entry, key:', key) + clearTimeout(this._cache[key].timeoutId) + delete this._cache[key] + return true + } + return false + } -BlockCache.prototype.get = function (key) { - return this._cache[key].payload -} + contains (key) { + return Object.prototype.hasOwnProperty.call(this._cache, key) + } -BlockCache.prototype.getWithDefaultInsert = function (key) { - if (this.contains(key)) { + get (key) { return this._cache[key].payload - } else { - const def = this._factory() - this.add(key, def) - return def + } + + getWithDefaultInsert (key) { + if (this.contains(key)) { + return this._cache[key].payload + } else { + const def = this._factory() + this.add(key, def) + return def + } } } diff --git a/lib/incoming_message.js b/lib/incoming_message.js index b4370e6c..6f3e0033 100644 --- a/lib/incoming_message.js +++ b/lib/incoming_message.js @@ -9,35 +9,34 @@ */ const Readable = require('readable-stream').Readable -const util = require('util') const pktToMsg = require('./helpers').packetToMessage -function IncomingMessage (packet, rsinfo, outSocket) { - Readable.call(this) +class IncomingMessage extends Readable { + constructor (packet, rsinfo, outSocket) { + super() - pktToMsg(this, packet) + pktToMsg(this, packet) - this.rsinfo = rsinfo - this.outSocket = outSocket + this.rsinfo = rsinfo + this.outSocket = outSocket - this._packet = packet - this._payloadIndex = 0 -} + this._packet = packet + this._payloadIndex = 0 + } -util.inherits(IncomingMessage, Readable) + _read (size) { + const end = this._payloadIndex + size + const start = this._payloadIndex + const payload = this._packet.payload + let buf = null -IncomingMessage.prototype._read = function (size) { - const end = this._payloadIndex + size - const start = this._payloadIndex - const payload = this._packet.payload - let buf = null + if (start < payload.length) { + buf = payload.slice(start, end) + } - if (start < payload.length) { - buf = payload.slice(start, end) + this._payloadIndex = end + this.push(buf) } - - this._payloadIndex = end - this.push(buf) } module.exports = IncomingMessage diff --git a/lib/observe_read_stream.js b/lib/observe_read_stream.js index 6a02f39d..9326e566 100644 --- a/lib/observe_read_stream.js +++ b/lib/observe_read_stream.js @@ -9,54 +9,53 @@ */ const Readable = require('readable-stream').Readable -const util = require('util') const pktToMsg = require('./helpers').packetToMessage -function ObserveReadStream (packet, rsinfo, outSocket) { - Readable.call(this, { objectMode: true }) +class ObserveReadStream extends Readable { + constructor (packet, rsinfo, outSocket) { + super({ objectMode: true }) - this.rsinfo = rsinfo - this.outSocket = outSocket + this.rsinfo = rsinfo + this.outSocket = outSocket - this._lastId = undefined - this._lastTime = 0 - this._disableFiltering = false - this.append(packet) -} - -util.inherits(ObserveReadStream, Readable) - -ObserveReadStream.prototype.append = function (packet) { - if (!this.readable) { - return + this._lastId = undefined + this._lastTime = 0 + this._disableFiltering = false + this.append(packet) } - pktToMsg(this, packet) + append (packet) { + if (!this.readable) { + return + } - // First notification - if (this._lastId === undefined) { - this._lastId = this.headers.Observe - 1 - } + pktToMsg(this, packet) - const dseq = (this.headers.Observe - this._lastId) & 0xffffff - const dtime = Date.now() - this._lastTime + // First notification + if (this._lastId === undefined) { + this._lastId = this.headers.Observe - 1 + } - if (this._disableFiltering || (dseq > 0 && dseq < (1 << 23)) || dtime > 128 * 1000) { - this._lastId = this.headers.Observe - this._lastTime = Date.now() - this.push(packet.payload) + const dseq = (this.headers.Observe - this._lastId) & 0xffffff + const dtime = Date.now() - this._lastTime + + if (this._disableFiltering || (dseq > 0 && dseq < (1 << 23)) || dtime > 128 * 1000) { + this._lastId = this.headers.Observe + this._lastTime = Date.now() + this.push(packet.payload) + } } -} -ObserveReadStream.prototype.close = function (eagerDeregister) { - this.push(null) - this.emit('close') - if (eagerDeregister) { - this.emit('deregister') + close (eagerDeregister) { + this.push(null) + this.emit('close') + if (eagerDeregister) { + this.emit('deregister') + } } -} -// nothing to do, data will be pushed from the server -ObserveReadStream.prototype._read = function () {} + // nothing to do, data will be pushed from the server + _read () {} +} module.exports = ObserveReadStream diff --git a/lib/observe_write_stream.js b/lib/observe_write_stream.js index 94883f2f..495e8b7e 100644 --- a/lib/observe_write_stream.js +++ b/lib/observe_write_stream.js @@ -9,75 +9,75 @@ */ const Writable = require('readable-stream').Writable -const util = require('util') const helpers = require('./helpers') -function ObserveWriteStream (request, send) { - Writable.call(this) +class ObserveWriteStream extends Writable { + constructor (request, send) { + super() + + this._packet = { + token: request.token, + messageId: request.messageId, + options: [], + confirmable: false, + ack: request.confirmable, + reset: false + } - this._packet = { - token: request.token, - messageId: request.messageId, - options: [], - confirmable: false, - ack: request.confirmable, - reset: false - } + this._request = request + this._send = send + this.statusCode = '' - this._request = request - this._send = send - this.statusCode = '' + this._counter = 0 - this._counter = 0 + const that = this + this.on('finish', function () { + if (that._counter === 0) { // we have sent no messages + that._doSend(null) + } + }) + } - const that = this - this.on('finish', function () { - if (that._counter === 0) { // we have sent no messages - that._doSend(null) - } - }) -} + _write (data, encoding, done) { + this.setOption('Observe', ++this._counter) -util.inherits(ObserveWriteStream, Writable) -helpers.addSetOption(ObserveWriteStream) + if (this._counter === 16777215) { + this._counter = 1 + } -ObserveWriteStream.prototype._write = function write (data, encoding, done) { - this.setOption('Observe', ++this._counter) + this._doSend(data) - if (this._counter === 16777215) { - this._counter = 1 + done() } - this._doSend(data) + _doSend (data) { + const packet = this._packet + packet.code = this.statusCode + packet.payload = data + this._send(this, packet) - done() -} - -ObserveWriteStream.prototype._doSend = function doSend (data) { - const packet = this._packet - packet.code = this.statusCode - packet.payload = data - this._send(this, packet) - - this._packet.confirmable = this._request.confirmable - this._packet.ack = !this._request.confirmable - delete this._packet.messageId - delete this._packet.payload -} + this._packet.confirmable = this._request.confirmable + this._packet.ack = !this._request.confirmable + delete this._packet.messageId + delete this._packet.payload + } -ObserveWriteStream.prototype.reset = function reset () { - const packet = this._packet - packet.code = '0.00' - packet.payload = '' - packet.reset = true - packet.ack = false - packet.token = Buffer.alloc(0) + reset () { + const packet = this._packet + packet.code = '0.00' + packet.payload = '' + packet.reset = true + packet.ack = false + packet.token = Buffer.alloc(0) - this._send(this, packet) + this._send(this, packet) - this._packet.confirmable = this._request.confirmable - delete this._packet.messageId - delete this._packet.payload + this._packet.confirmable = this._request.confirmable + delete this._packet.messageId + delete this._packet.payload + } } +helpers.addSetOption(ObserveWriteStream) + module.exports = ObserveWriteStream diff --git a/lib/outgoing_message.js b/lib/outgoing_message.js index 8aa2729d..0519d432 100644 --- a/lib/outgoing_message.js +++ b/lib/outgoing_message.js @@ -9,104 +9,104 @@ */ const BufferList = require('bl') -const util = require('util') const helpers = require('./helpers') const toCode = helpers.toCode -function OutgoingMessage (request, send) { - BufferList.call(this) +class OutgoingMessage extends BufferList { + constructor (request, send) { + super() + + this._packet = { + messageId: request.messageId, + token: request.token, + options: [], + confirmable: false, + ack: false, + reset: false + } - this._packet = { - messageId: request.messageId, - token: request.token, - options: [], - confirmable: false, - ack: false, - reset: false - } + const that = this - const that = this + if (request.confirmable) { + // replying in piggyback + this._packet.ack = true - if (request.confirmable) { - // replying in piggyback - this._packet.ack = true + this._ackTimer = setTimeout(function () { + send(that, helpers.genAck(request)) - this._ackTimer = setTimeout(function () { - send(that, helpers.genAck(request)) + // we are no more in piggyback + that._packet.confirmable = true + that._packet.ack = false - // we are no more in piggyback - that._packet.confirmable = true - that._packet.ack = false + // we need a new messageId for the CON + // reply + delete that._packet.messageId + + that._ackTimer = null + }, request.piggybackReplyMs) + } - // we need a new messageId for the CON - // reply - delete that._packet.messageId + this._send = send - that._ackTimer = null - }, request.piggybackReplyMs) + this.statusCode = '' + this.code = '' } - this._send = send + end (a, b) { + super.end(a, b) - this.statusCode = '' - this.code = '' -} + const packet = this._packet -util.inherits(OutgoingMessage, BufferList) -helpers.addSetOption(OutgoingMessage) + packet.code = toCode(this.code || this.statusCode) + packet.payload = this -OutgoingMessage.prototype.end = function (a, b) { - BufferList.prototype.end.call(this, a, b) + if (this._ackTimer) { + clearTimeout(this._ackTimer) + } - const packet = this._packet + this._send(this, packet) - packet.code = toCode(this.code || this.statusCode) - packet.payload = this + // easy clean up after generating the packet + delete this._packet.payload - if (this._ackTimer) { - clearTimeout(this._ackTimer) + return this } - this._send(this, packet) + reset () { + super.end() - // easy clean up after generating the packet - delete this._packet.payload + const packet = this._packet - return this -} + packet.code = '0.00' + packet.payload = '' + packet.reset = true + packet.ack = false + packet.token = '' -OutgoingMessage.prototype.reset = function () { - BufferList.prototype.end.call(this) + if (this._ackTimer) { + clearTimeout(this._ackTimer) + } - const packet = this._packet + this._send(this, packet) - packet.code = '0.00' - packet.payload = '' - packet.reset = true - packet.ack = false - packet.token = '' + // easy clean up after generating the packet + delete this._packet.payload - if (this._ackTimer) { - clearTimeout(this._ackTimer) + return this } - this._send(this, packet) - - // easy clean up after generating the packet - delete this._packet.payload - - return this -} - -OutgoingMessage.prototype.writeHead = function (code, headers) { - const packet = this._packet - let header - packet.code = String(code).replace(/(^\d[^.])/, '$1.') - for (header in headers) { - if (Object.prototype.hasOwnProperty.call(headers, header)) { - this.setOption(header, headers[header]) + writeHead (code, headers) { + const packet = this._packet + let header + packet.code = String(code).replace(/(^\d[^.])/, '$1.') + for (header in headers) { + if (Object.prototype.hasOwnProperty.call(headers, header)) { + this.setOption(header, headers[header]) + } } } } +helpers.addSetOption(OutgoingMessage) + module.exports = OutgoingMessage diff --git a/lib/retry_send.js b/lib/retry_send.js index da93c8cf..3db7982d 100644 --- a/lib/retry_send.js +++ b/lib/retry_send.js @@ -9,80 +9,76 @@ */ const parameters = require('./parameters') -const util = require('util') const EventEmitter = require('events').EventEmitter const parse = require('coap-packet').parse -function RetrySend (sock, port, host, maxRetransmit) { - if (!(this instanceof RetrySend)) { - return new RetrySend(sock, port, host, maxRetransmit) - } +class RetrySend extends EventEmitter { + constructor (sock, port, host, maxRetransmit) { + super() + const that = this + + this._sock = sock - const that = this + this._port = port || parameters.coapPort - this._sock = sock + this._host = host - this._port = port || parameters.coapPort + this._maxRetransmit = maxRetransmit || parameters.maxRetransmit + this._sendAttemp = 0 + this._lastMessageId = -1 + this._currentTime = parameters.ackTimeout * (1 + (parameters.ackRandomFactor - 1) * Math.random()) * 1000 + + this._bOff = function () { + that._currentTime = that._currentTime * 2 + that._send() + } + } - this._host = host + _send (avoidBackoff) { + const that = this + + this._sock.send(this._message, 0, this._message.length, + this._port, this._host, function (err, bytes) { + that.emit('sent', err, bytes) + if (err) { + that.emit('error', err) + } + }) + + const messageId = parse(this._message).messageId + if (messageId !== this._lastMessageId) { + this._lastMessageId = messageId + this._sendAttemp = 0 + } - this._maxRetransmit = maxRetransmit || parameters.maxRetransmit - this._sendAttemp = 0 - this._lastMessageId = -1 - this._currentTime = parameters.ackTimeout * (1 + (parameters.ackRandomFactor - 1) * Math.random()) * 1000 + if (!avoidBackoff && ++this._sendAttemp <= this._maxRetransmit) { + this._bOffTimer = setTimeout(this._bOff, this._currentTime) + } - this._bOff = function () { - that._currentTime = that._currentTime * 2 - that._send() + this.emit('sending', this._message) } -} -util.inherits(RetrySend, EventEmitter) + send (message, avoidBackoff) { + const that = this -RetrySend.prototype._send = function (avoidBackoff) { - const that = this + this._message = message + this._send(avoidBackoff) - this._sock.send(this._message, 0, this._message.length, - this._port, this._host, function (err, bytes) { - that.emit('sent', err, bytes) - if (err) { + const timeout = avoidBackoff ? parameters.maxRTT : parameters.exchangeLifetime + this._timer = setTimeout(function () { + const err = new Error('No reply in ' + timeout + 's') + err.retransmitTimeout = timeout + if (!avoidBackoff) { that.emit('error', err) } - }) - - const messageId = parse(this._message).messageId - if (messageId !== this._lastMessageId) { - this._lastMessageId = messageId - this._sendAttemp = 0 + that.emit('timeout', err) + }, timeout * 1000) } - if (!avoidBackoff && ++this._sendAttemp <= this._maxRetransmit) { - this._bOffTimer = setTimeout(this._bOff, this._currentTime) + reset () { + clearTimeout(this._timer) + clearTimeout(this._bOffTimer) } - - this.emit('sending', this._message) -} - -RetrySend.prototype.send = function (message, avoidBackoff) { - const that = this - - this._message = message - this._send(avoidBackoff) - - const timeout = avoidBackoff ? parameters.maxRTT : parameters.exchangeLifetime - this._timer = setTimeout(function () { - const err = new Error('No reply in ' + timeout + 's') - err.retransmitTimeout = timeout - if (!avoidBackoff) { - that.emit('error', err) - } - that.emit('timeout', err) - }, timeout * 1000) -} - -RetrySend.prototype.reset = function () { - clearTimeout(this._timer) - clearTimeout(this._bOffTimer) } module.exports = RetrySend diff --git a/lib/segmentation.js b/lib/segmentation.js index 5c62653e..91a269a3 100644 --- a/lib/segmentation.js +++ b/lib/segmentation.js @@ -12,102 +12,104 @@ const generate = require('coap-packet').generate const generateBlockOption = require('./block').generateBlockOption const exponentToByteSize = require('./block').exponentToByteSize -function SegmentedTransmission (blockSize, req, packet) { - if (blockSize < 0 || blockSize > 6) { - throw new Error('invalid block size ' + blockSize) - } +class SegmentedTransmission { + constructor (blockSize, req, packet) { + if (blockSize < 0 || blockSize > 6) { + throw new Error('invalid block size ' + blockSize) + } - this.blockState = { - sequenceNumber: 0, - moreBlocks: false, - blockSize: 0 - } + this.blockState = { + sequenceNumber: 0, + moreBlocks: false, + blockSize: 0 + } - this.setBlockSizeExp(blockSize) + this.setBlockSizeExp(blockSize) - this.totalLength = packet.payload.length - this.currentByte = 0 - this.lastByte = 0 + this.totalLength = packet.payload.length + this.currentByte = 0 + this.lastByte = 0 - this.req = req - this.payload = packet.payload - this.packet = packet + this.req = req + this.payload = packet.payload + this.packet = packet - this.packet.payload = null - this.resendCount = 0 -} + this.packet.payload = null + this.resendCount = 0 + } -SegmentedTransmission.prototype.setBlockSizeExp = function setBlockSizeExp (blockSizeExp) { - this.blockState.blockSize = blockSizeExp - this.byteSize = exponentToByteSize(blockSizeExp) -} + setBlockSizeExp (blockSizeExp) { + this.blockState.blockSize = blockSizeExp + this.byteSize = exponentToByteSize(blockSizeExp) + } -SegmentedTransmission.prototype.updateBlockState = function updateBlockState () { - this.blockState.sequenceNumber = this.currentByte / this.byteSize - this.blockState.moreBlocks = ((this.currentByte + this.byteSize) < this.totalLength) ? 1 : 0 + updateBlockState () { + this.blockState.sequenceNumber = this.currentByte / this.byteSize + this.blockState.moreBlocks = ((this.currentByte + this.byteSize) < this.totalLength) ? 1 : 0 - this.req.setOption('Block1', generateBlockOption(this.blockState)) -} + this.req.setOption('Block1', generateBlockOption(this.blockState)) + } -SegmentedTransmission.prototype.isCorrectACK = function isCorrectACK (packet, retBlockState) { - return retBlockState.sequenceNumber === this.blockState.sequenceNumber// && packet.code == "2.31" -} + isCorrectACK (packet, retBlockState) { + return retBlockState.sequenceNumber === this.blockState.sequenceNumber// && packet.code == "2.31" + } -SegmentedTransmission.prototype.resendPreviousPacket = function resendPreviousPacket () { - if (this.resendCount < 5) { - this.currentByte = this.lastByte - if (this.remaining() > 0) { - this.sendNext() + resendPreviousPacket () { + if (this.resendCount < 5) { + this.currentByte = this.lastByte + if (this.remaining() > 0) { + this.sendNext() + } + this.resendCount++ + } else { + throw new Error('Too many block re-transfers') } - this.resendCount++ - } else { - throw new Error('Too many block re-transfers') } -} -/** - * - * @param {Packet} packet The packet received which contained the ack - * @param {Object} retBlockState The received block state from the other end - * @returns {Boolean} Returns true if the ACK was for the correct block. - */ -SegmentedTransmission.prototype.receiveACK = function receiveACK (packet, retBlockState) { - if (this.blockState.blockSize !== retBlockState.blockSize) { - this.setBlockSizeExp(retBlockState.blockSize) - } + /** + * + * @param {Packet} packet The packet received which contained the ack + * @param {Object} retBlockState The received block state from the other end + * @returns {Boolean} Returns true if the ACK was for the correct block. + */ + receiveACK (packet, retBlockState) { + if (this.blockState.blockSize !== retBlockState.blockSize) { + this.setBlockSizeExp(retBlockState.blockSize) + } - if (this.remaining() > 0) { - this.sendNext() + if (this.remaining() > 0) { + this.sendNext() + } + this.resendCount = 0 } - this.resendCount = 0 -} -SegmentedTransmission.prototype.remaining = function remaining () { - return this.totalLength - this.currentByte -} + remaining () { + return this.totalLength - this.currentByte + } -SegmentedTransmission.prototype.sendNext = function sendNext () { - const blockLength = Math.min(this.totalLength - this.currentByte, this.byteSize) - const subBuffer = this.payload.slice(this.currentByte, this.currentByte + blockLength) - this.updateBlockState() + sendNext () { + const blockLength = Math.min(this.totalLength - this.currentByte, this.byteSize) + const subBuffer = this.payload.slice(this.currentByte, this.currentByte + blockLength) + this.updateBlockState() - this.packet.ack = false - this.packet.reset = false - this.packet.confirmable = true + this.packet.ack = false + this.packet.reset = false + this.packet.confirmable = true - this.packet.payload = subBuffer + this.packet.payload = subBuffer - this.lastByte = this.currentByte - this.currentByte += blockLength - let buf + this.lastByte = this.currentByte + this.currentByte += blockLength + let buf - try { - buf = generate(this.packet) - } catch (err) { - this.req.sender.reset() - return this.req.emit('error', err) + try { + buf = generate(this.packet) + } catch (err) { + this.req.sender.reset() + return this.req.emit('error', err) + } + this.req.sender.send(buf, !this.packet.confirmable) } - this.req.sender.send(buf, !this.packet.confirmable) } module.exports.SegmentedTransmission = SegmentedTransmission diff --git a/lib/server.js b/lib/server.js index 30c6c475..b1426c41 100644 --- a/lib/server.js +++ b/lib/server.js @@ -13,7 +13,6 @@ require('./polyfill') const dgram = require('dgram') const os = require('os') const net = require('net') -const util = require('util') const series = require('fastseries') const events = require('events') const LRU = require('lru-cache') @@ -44,112 +43,6 @@ function handleEnding (err) { } } -function CoAPServer (options, listener) { - if (!(this instanceof CoAPServer)) { - return new CoAPServer(options, listener) - } - - if (typeof options === 'function') { - listener = options - options = null - } - - if (!options) options = {} - - this._options = options - this._proxiedRequests = {} - - this._middlewares = [middlewares.parseRequest] - - if (options.proxy) { - this._middlewares.push(middlewares.proxyRequest) - this._middlewares.push(middlewares.handleProxyResponse) - } - - if (typeof options.clientIdentifier === 'function') { - this._clientIdentifier = options.clientIdentifier - } else { - this._clientIdentifier = (request) => { - return `${request.rsinfo.address}:${request.rsinfo.port}` - } - } - - if ( - !this._options.piggybackReplyMs || - !isNumeric(this._options.piggybackReplyMs) - ) { - this._options.piggybackReplyMs = parameters.piggybackReplyMs - } - - if (!isBoolean(this._options.sendAcksForNonConfirmablePackets)) { - this._options.sendAcksForNonConfirmablePackets = - parameters.sendAcksForNonConfirmablePackets - } - this._middlewares.push(middlewares.handleServerRequest) - - // Multicast settings - this._multicastAddress = options.multicastAddress - ? options.multicastAddress - : null - this._multicastInterface = options.multicastInterface - ? options.multicastInterface - : null - - // We use an LRU cache for the responses to avoid - // DDOS problems. - // max packet size is 1280 - // 32 MB / 1280 = 26214 - // The max lifetime is roughly 200s per packet. - // Which gave us 131 packets/second guarantee - let max = 32768 * 1024 - - if (typeof options.cacheSize === 'number' && options.cacheSize >= 0) { - max = options.cacheSize - } - - this._lru = new LRU({ - max, - length: function (n) { - return n.buffer.byteLength - }, - maxAge: parameters.exchangeLifetime * 1000, - dispose: function (key, value) { - if (value.sender) value.sender.reset() - } - }) - - this._series = series() - - this._block1Cache = new BlockCache( - parameters.exchangeLifetime * 1000, - () => { - return {} - } - ) - this._block2Cache = new BlockCache( - parameters.exchangeLifetime * 1000, - () => { - return null - } - ) - - if (listener) this.on('request', listener) - debug('initialized') -} - -util.inherits(CoAPServer, events.EventEmitter) - -CoAPServer.prototype._sendError = function (payload, rsinfo, packet) { - const message = generate({ - code: '5.00', - payload: payload, - messageId: packet ? packet.messageId : undefined, - token: packet ? packet.token : undefined - }) - - this._sock.send(message, 0, message.length, rsinfo.port) -} - function removeProxyOptions (packet) { const cleanOptions = [] @@ -167,23 +60,6 @@ function removeProxyOptions (packet) { return packet } -CoAPServer.prototype._sendProxied = function (packet, proxyUri, callback) { - const url = require('url').parse(proxyUri) // eslint-disable-line node/no-deprecated-api - const host = url.hostname - const port = url.port - const message = generate(removeProxyOptions(packet)) - - this._sock.send(message, 0, message.length, port, host, callback) -} - -CoAPServer.prototype._sendReverseProxied = function (packet, rsinfo, callback) { - const host = rsinfo.address - const port = rsinfo.port - const message = generate(packet) - - this._sock.send(message, 0, message.length, port, host, callback) -} - function handleRequest (server) { return function (msg, rsinfo) { const request = { @@ -220,326 +96,455 @@ function allAddresses (type) { return addresses } -CoAPServer.prototype.listen = function (port, address, done) { - const that = this +class CoAPServer extends events.EventEmitter { + constructor (options, listener) { + super() + if (typeof options === 'function') { + listener = options + options = null + } + + if (!options) options = {} + + this._options = options + this._proxiedRequests = {} + + this._middlewares = [middlewares.parseRequest] + + if (options.proxy) { + this._middlewares.push(middlewares.proxyRequest) + this._middlewares.push(middlewares.handleProxyResponse) + } + + if (typeof options.clientIdentifier === 'function') { + this._clientIdentifier = options.clientIdentifier + } else { + this._clientIdentifier = (request) => { + return `${request.rsinfo.address}:${request.rsinfo.port}` + } + } - if (port == null) { - port = parameters.coapPort + if ( + !this._options.piggybackReplyMs || + !isNumeric(this._options.piggybackReplyMs) + ) { + this._options.piggybackReplyMs = parameters.piggybackReplyMs + } + + if (!isBoolean(this._options.sendAcksForNonConfirmablePackets)) { + this._options.sendAcksForNonConfirmablePackets = + parameters.sendAcksForNonConfirmablePackets + } + this._middlewares.push(middlewares.handleServerRequest) + + // Multicast settings + this._multicastAddress = options.multicastAddress + ? options.multicastAddress + : null + this._multicastInterface = options.multicastInterface + ? options.multicastInterface + : null + + // We use an LRU cache for the responses to avoid + // DDOS problems. + // max packet size is 1280 + // 32 MB / 1280 = 26214 + // The max lifetime is roughly 200s per packet. + // Which gave us 131 packets/second guarantee + let max = 32768 * 1024 + + if (typeof options.cacheSize === 'number' && options.cacheSize >= 0) { + max = options.cacheSize + } + + this._lru = new LRU({ + max, + length: function (n) { + return n.buffer.byteLength + }, + maxAge: parameters.exchangeLifetime * 1000, + dispose: function (key, value) { + if (value.sender) value.sender.reset() + } + }) + + this._series = series() + + this._block1Cache = new BlockCache( + parameters.exchangeLifetime * 1000, + () => { + return {} + } + ) + this._block2Cache = new BlockCache( + parameters.exchangeLifetime * 1000, + () => { + return null + } + ) + + if (listener) this.on('request', listener) + debug('initialized') } - if (typeof port === 'function') { - done = port - port = parameters.coapPort + _sendError (payload, rsinfo, packet) { + const message = generate({ + code: '5.00', + payload: payload, + messageId: packet ? packet.messageId : undefined, + token: packet ? packet.token : undefined + }) + + this._sock.send(message, 0, message.length, rsinfo.port) } - if (typeof address === 'function') { - done = address - address = null + _sendProxied (packet, proxyUri, callback) { + const url = require('url').parse(proxyUri) // eslint-disable-line node/no-deprecated-api + const host = url.hostname + const port = url.port + const message = generate(removeProxyOptions(packet)) + + this._sock.send(message, 0, message.length, port, host, callback) } - if (this._sock) { - if (done) done(new Error('Already listening')) - else throw new Error('Already listening') + _sendReverseProxied (packet, rsinfo, callback) { + const host = rsinfo.address + const port = rsinfo.port + const message = generate(packet) - return this + this._sock.send(message, 0, message.length, port, host, callback) } - if (address && net.isIPv6(address)) this._options.type = 'udp6' + listen (port, address, done) { + const that = this + + if (port == null) { + port = parameters.coapPort + } - if (!this._options.type) this._options.type = 'udp4' + if (typeof port === 'function') { + done = port + port = parameters.coapPort + } - if (port instanceof events.EventEmitter) { - this._sock = port - if (done) setImmediate(done) - } else { - this._internal_socket = true - this._sock = dgram.createSocket({ - type: this._options.type, - reuseAddr: true - }) + if (typeof address === 'function') { + done = address + address = null + } - this._sock.bind(port, address || null, function () { - try { - if (that._multicastAddress) { - that._sock.setMulticastLoopback(true) + if (this._sock) { + if (done) done(new Error('Already listening')) + else throw new Error('Already listening') - if (that._multicastInterface) { - that._sock.addMembership( - that._multicastAddress, - that._multicastInterface - ) - } else { - allAddresses(that._options.type).forEach(function ( - _interface - ) { + return this + } + + if (address && net.isIPv6(address)) this._options.type = 'udp6' + + if (!this._options.type) this._options.type = 'udp4' + + if (port instanceof events.EventEmitter) { + this._sock = port + if (done) setImmediate(done) + } else { + this._internal_socket = true + this._sock = dgram.createSocket({ + type: this._options.type, + reuseAddr: true + }) + + this._sock.bind(port, address || null, function () { + try { + if (that._multicastAddress) { + that._sock.setMulticastLoopback(true) + + if (that._multicastInterface) { that._sock.addMembership( that._multicastAddress, - _interface + that._multicastInterface ) - }) + } else { + allAddresses(that._options.type).forEach(function ( + _interface + ) { + that._sock.addMembership( + that._multicastAddress, + _interface + ) + }) + } } + } catch (err) { + if (done) return done(err) + else throw err } - } catch (err) { - if (done) return done(err) - else throw err - } - if (done) return done() - }) - } + if (done) return done() + }) + } - this._sock.on('message', handleRequest(this)) + this._sock.on('message', handleRequest(this)) - this._sock.on('error', function (error) { - that.emit('error', error) - }) + this._sock.on('error', function (error) { + that.emit('error', error) + }) - if (parameters.pruneTimerPeriod) { - // Start LRU pruning timer - this._lru.pruneTimer = setInterval(function () { - that._lru.prune() - }, parameters.pruneTimerPeriod * 1000) - if (this._lru.pruneTimer.unref) { - this._lru.pruneTimer.unref() + if (parameters.pruneTimerPeriod) { + // Start LRU pruning timer + this._lru.pruneTimer = setInterval(function () { + that._lru.prune() + }, parameters.pruneTimerPeriod * 1000) + if (this._lru.pruneTimer.unref) { + this._lru.pruneTimer.unref() + } } - } - - return this -} - -CoAPServer.prototype.close = function (done) { - if (done) { - setImmediate(done) - } - if (this._lru.pruneTimer) { - clearInterval(this._lru.pruneTimer) + return this } - if (this._sock) { - if (this._internal_socket) { - this._sock.close() + close (done) { + if (done) { + setImmediate(done) } - this._lru.reset() - this._sock = null - this.emit('close') - } else { - this._lru.reset() - } - this._block2Cache.reset() - this._block1Cache.reset() + if (this._lru.pruneTimer) { + clearInterval(this._lru.pruneTimer) + } - return this -} + if (this._sock) { + if (this._internal_socket) { + this._sock.close() + } + this._lru.reset() + this._sock = null + this.emit('close') + } else { + this._lru.reset() + } -/** - * Entry point for a new datagram from the client. - * @param {Packet} packet The packet that was sent from the client. - * @param {Object} rsinfo Connection info - */ -CoAPServer.prototype._handle = function (packet, rsinfo) { - if (packet.code[0] !== '0') { - // According to RFC7252 Section 4.2 receiving a confirmable messages - // that can't be processed, should be rejected by ignoring it AND - // sending a reset. In this case confirmable response message would - // be silently ignored, which is not exactly as stated in the standard. - // However, sending a reset would interfere with a coap client which is - // re-using a socket (see pull-request #131). - return - } + this._block2Cache.reset() + this._block1Cache.reset() - const sock = this._sock - const lru = this._lru - let Message = OutMessage - const that = this - const request = new IncomingMessage(packet, rsinfo) - const cached = lru.peek(this._toKey(request, packet, true)) - - if (cached && !packet.ack && !packet.reset) { - return sock.send(cached, 0, cached.length, rsinfo.port, rsinfo.address) - } else if (cached && (packet.ack || packet.reset)) { - if (cached.response && packet.reset) { - cached.response.end() - } - return lru.del(this._toKey(request, packet, false)) - } else if (packet.ack || packet.reset) { - return // nothing to do, ignoring silently + return this } - if (request.headers.Observe === 0) { - Message = ObserveStream - if (packet.code !== '0.01') { - // it is not a GET - return this._sendError( - Buffer.from('Observe can only be present with a GET'), - rsinfo - ) + /** + * Entry point for a new datagram from the client. + * @param {Packet} packet The packet that was sent from the client. + * @param {Object} rsinfo Connection info + */ + _handle (packet, rsinfo) { + if (packet.code[0] !== '0') { + // According to RFC7252 Section 4.2 receiving a confirmable messages + // that can't be processed, should be rejected by ignoring it AND + // sending a reset. In this case confirmable response message would + // be silently ignored, which is not exactly as stated in the standard. + // However, sending a reset would interfere with a coap client which is + // re-using a socket (see pull-request #131). + return } - } - const cacheKey = this._toCacheKey(request, packet) - - packet.piggybackReplyMs = this._options.piggybackReplyMs - const generateResponse = function () { - const response = new Message(packet, function (response, packet) { - let buf - const sender = new RetrySend(sock, rsinfo.port, rsinfo.address) - - try { - buf = generate(packet) - } catch (err) { - return response.emit('error', err) - } - if (Message === OutMessage) { - sender.on('error', response.emit.bind(response, 'error')) - } else { - buf.response = response - sender.on('error', function () { - response.end() - }) + const sock = this._sock + const lru = this._lru + let Message = OutMessage + const that = this + const request = new IncomingMessage(packet, rsinfo) + const cached = lru.peek(this._toKey(request, packet, true)) + + if (cached && !packet.ack && !packet.reset) { + return sock.send(cached, 0, cached.length, rsinfo.port, rsinfo.address) + } else if (cached && (packet.ack || packet.reset)) { + if (cached.response && packet.reset) { + cached.response.end() } + return lru.del(this._toKey(request, packet, false)) + } else if (packet.ack || packet.reset) { + return // nothing to do, ignoring silently + } - const key = that._toKey( - request, - packet, - packet.ack || !packet.confirmable - ) - lru.set(key, buf) - buf.sender = sender - - if ( - that._options.sendAcksForNonConfirmablePackets || - packet.confirmable - ) { - sender.send( - buf, - packet.ack || packet.reset || packet.confirmable === false + if (request.headers.Observe === 0) { + Message = ObserveStream + if (packet.code !== '0.01') { + // it is not a GET + return this._sendError( + Buffer.from('Observe can only be present with a GET'), + rsinfo ) - } else { - debug('OMIT ACK PACKAGE') } - }) + } - response.statusCode = '2.05' - response._request = request._packet - response._cachekey = cacheKey + const cacheKey = this._toCacheKey(request, packet) - // inject this function so the response can add an entry to the cache - response._addCacheEntry = that._block2Cache.add.bind(that._block2Cache) + packet.piggybackReplyMs = this._options.piggybackReplyMs + const generateResponse = function () { + const response = new Message(packet, function (response, packet) { + let buf + const sender = new RetrySend(sock, rsinfo.port, rsinfo.address) - return response - } + try { + buf = generate(packet) + } catch (err) { + return response.emit('error', err) + } + if (Message === OutMessage) { + sender.on('error', response.emit.bind(response, 'error')) + } else { + buf.response = response + sender.on('error', function () { + response.end() + }) + } - const response = generateResponse() - request.rsinfo = rsinfo + const key = that._toKey( + request, + packet, + packet.ack || !packet.confirmable + ) + lru.set(key, buf) + buf.sender = sender + + if ( + that._options.sendAcksForNonConfirmablePackets || + packet.confirmable + ) { + sender.send( + buf, + packet.ack || packet.reset || packet.confirmable === false + ) + } else { + debug('OMIT ACK PACKAGE') + } + }) - if (packet.token && packet.token.length > 0) { - // return cached value only if this request is not the first block request - const block2Buff = getOption(packet.options, 'Block2') - let requestedBlockOption - if (block2Buff) { - requestedBlockOption = parseBlock2(block2Buff) - } - if (!requestedBlockOption) { - requestedBlockOption = { num: 0 } + response.statusCode = '2.05' + response._request = request._packet + response._cachekey = cacheKey + + // inject this function so the response can add an entry to the cache + response._addCacheEntry = that._block2Cache.add.bind(that._block2Cache) + + return response } - if (requestedBlockOption.num < 1) { - if (this._block2Cache.remove(cacheKey)) { - debug('first block2 request, removed old entry from cache') + const response = generateResponse() + request.rsinfo = rsinfo + + if (packet.token && packet.token.length > 0) { + // return cached value only if this request is not the first block request + const block2Buff = getOption(packet.options, 'Block2') + let requestedBlockOption + if (block2Buff) { + requestedBlockOption = parseBlock2(block2Buff) } - } else { - debug('check if packet token is in cache, key:', cacheKey) - if (this._block2Cache.contains(cacheKey)) { - debug('found cached payload, key:', cacheKey) - response.end(this._block2Cache.get(cacheKey)) - return + if (!requestedBlockOption) { + requestedBlockOption = { num: 0 } + } + + if (requestedBlockOption.num < 1) { + if (this._block2Cache.remove(cacheKey)) { + debug('first block2 request, removed old entry from cache') + } + } else { + debug('check if packet token is in cache, key:', cacheKey) + if (this._block2Cache.contains(cacheKey)) { + debug('found cached payload, key:', cacheKey) + response.end(this._block2Cache.get(cacheKey)) + return + } } } - } - // else goes here? - { - const block1Buff = getOption(packet.options, 'Block1') - if (block1Buff) { - const blockState = parseBlockOption(block1Buff) - - if (blockState) { - /** @type {{[k:string]:Buffer}} */ - const cachedData = - this._block1Cache.getWithDefaultInsert(cacheKey) - const blockByteSize = Math.pow(2, 4 + blockState.blockSize) - const incomingByteIndex = - blockState.sequenceNumber * blockByteSize - // Store in the cache object, use the byte index as the key - cachedData[incomingByteIndex] = request.payload - - if (!blockState.moreBlocks) { - // Last block - const byteOffsets = Object.keys(cachedData) - .map((str) => { - return parseInt(str) - }) - .sort((a, b) => { - return a - b - }) - const byteTotalSum = - incomingByteIndex + request.payload.length - let next = 0 - const concat = Buffer.alloc(byteTotalSum) - for (let i = 0; i < byteOffsets.length; i++) { - if (byteOffsets[i] === next) { - const buff = cachedData[byteOffsets[i]] - buff.copy(concat, next, 0, buff.length) - next += buff.length + // else goes here? + { + const block1Buff = getOption(packet.options, 'Block1') + if (block1Buff) { + const blockState = parseBlockOption(block1Buff) + + if (blockState) { + /** @type {{[k:string]:Buffer}} */ + const cachedData = + this._block1Cache.getWithDefaultInsert(cacheKey) + const blockByteSize = Math.pow(2, 4 + blockState.blockSize) + const incomingByteIndex = + blockState.sequenceNumber * blockByteSize + // Store in the cache object, use the byte index as the key + cachedData[incomingByteIndex] = request.payload + + if (!blockState.moreBlocks) { + // Last block + const byteOffsets = Object.keys(cachedData) + .map((str) => { + return parseInt(str) + }) + .sort((a, b) => { + return a - b + }) + const byteTotalSum = + incomingByteIndex + request.payload.length + let next = 0 + const concat = Buffer.alloc(byteTotalSum) + for (let i = 0; i < byteOffsets.length; i++) { + if (byteOffsets[i] === next) { + const buff = cachedData[byteOffsets[i]] + buff.copy(concat, next, 0, buff.length) + next += buff.length + } else { + throw new Error( + 'Byte offset not the next in line...' + ) + } + } + + this._block1Cache.remove(cacheKey) + + if (next === concat.length) { + request.payload = concat } else { throw new Error( - 'Byte offset not the next in line...' + 'Last byte index is not equal to the concat buffer length!' ) } - } - - this._block1Cache.remove(cacheKey) - - if (next === concat.length) { - request.payload = concat } else { - throw new Error( - 'Last byte index is not equal to the concat buffer length!' - ) + // More blocks to come. ACK this block + response.code = '2.31' + response.setOption('Block1', block1Buff) + response.end() + return } } else { - // More blocks to come. ACK this block - response.code = '2.31' - response.setOption('Block1', block1Buff) - response.end() - return + throw new Error('Invalid block state' + blockState) } - } else { - throw new Error('Invalid block state' + blockState) } } + + this.emit('request', request, response) } - this.emit('request', request, response) -} + _toCacheKey (request, packet) { + if (packet.token && packet.token.length > 0) { + return `${packet.token.toString('hex')}/${this._clientIdentifier( + request + )}` + } -CoAPServer.prototype._toCacheKey = function toCacheKey (request, packet) { - if (packet.token && packet.token.length > 0) { - return `${packet.token.toString('hex')}/${this._clientIdentifier( - request - )}` + return null } - return null -} + _toKey (request, packet, appendToken) { + let result = `${this._clientIdentifier(request)}/${packet.messageId}` -CoAPServer.prototype._toKey = function toKey (request, packet, appendToken) { - let result = `${this._clientIdentifier(request)}/${packet.messageId}` + if (appendToken) result += packet.token.toString('hex') - if (appendToken) result += packet.token.toString('hex') + return result + } +} - return result +// maxBlock2 is in formular 2**(i+4), and must <= 2**(6+4) +let maxBlock2 = Math.pow( + 2, + Math.floor(Math.log(parameters.maxPacketSize) / Math.log(2)) +) +if (maxBlock2 > Math.pow(2, 6 + 4)) { + maxBlock2 = Math.pow(2, 6 + 4) } /* @@ -547,95 +552,84 @@ new out message inherit from OutgoingMessage to handle cached answer and blockwise (2) */ -function OutMessage () { - OutgoingMessage.apply(this, Array.prototype.slice.call(arguments)) -} -util.inherits(OutMessage, OutgoingMessage) +class OutMessage extends OutgoingMessage { + /** + * Entry point for a response from the server + * @param {Buffer} payload A buffer-like object containing data to send back to the client. + */ + end (payload) { + const that = this + + // removeOption(this._request.options, 'Block1'); + // add logic for Block1 sending + + const block2Buff = getOption(this._request.options, 'Block2') + let requestedBlockOption + // if we got blockwise (2) request + if (block2Buff) { + requestedBlockOption = parseBlock2(block2Buff) + // bad option + if (!requestedBlockOption) { + that.statusCode = '4.02' + return super.end() + } + } -// maxBlock2 is in formular 2**(i+4), and must <= 2**(6+4) -let maxBlock2 = Math.pow( - 2, - Math.floor(Math.log(parameters.maxPacketSize) / Math.log(2)) -) -if (maxBlock2 > Math.pow(2, 6 + 4)) maxBlock2 = Math.pow(2, 6 + 4) + // if payload is suitable for ONE message, shoot it out + if ( + !payload || + (!requestedBlockOption && payload.length < parameters.maxPacketSize) + ) { + return super.end(payload) + } -/** - * Entry point for a response from the server - * @param {Buffer} payload A buffer-like object containing data to send back to the client. - */ -OutMessage.prototype.end = function (payload) { - const that = this - - // removeOption(this._request.options, 'Block1'); - // add logic for Block1 sending - - const block2Buff = getOption(this._request.options, 'Block2') - let requestedBlockOption - // if we got blockwise (2) request - if (block2Buff) { - requestedBlockOption = parseBlock2(block2Buff) - // bad option + // for the first request, block2 option may be missed if (!requestedBlockOption) { - that.statusCode = '4.02' - return OutgoingMessage.prototype.end.call(that) + requestedBlockOption = { + size: maxBlock2, + num: 0 + } } - } - // if payload is suitable for ONE message, shoot it out - if ( - !payload || - (!requestedBlockOption && payload.length < parameters.maxPacketSize) - ) { - return OutgoingMessage.prototype.end.call(this, payload) - } + // block2 size should not bigger than maxBlock2 + if (requestedBlockOption.size > maxBlock2) { requestedBlockOption.size = maxBlock2 } - // for the first request, block2 option may be missed - if (!requestedBlockOption) { - requestedBlockOption = { - size: maxBlock2, - num: 0 + // block number should have limit + const lastBlockNum = + Math.ceil(payload.length / requestedBlockOption.size) - 1 + if (requestedBlockOption.num > lastBlockNum) { + // precondition fail, may request for out of range block + that.statusCode = '4.02' + return super.end() } - } - - // block2 size should not bigger than maxBlock2 - if (requestedBlockOption.size > maxBlock2) { requestedBlockOption.size = maxBlock2 } + // check if requested block is the last + const moreFlag = requestedBlockOption.num < lastBlockNum - // block number should have limit - const lastBlockNum = - Math.ceil(payload.length / requestedBlockOption.size) - 1 - if (requestedBlockOption.num > lastBlockNum) { - // precondition fail, may request for out of range block - that.statusCode = '4.02' - return OutgoingMessage.prototype.end.call(that) - } - // check if requested block is the last - const moreFlag = requestedBlockOption.num < lastBlockNum - - const block2 = createBlock2({ - moreBlock2: moreFlag, - num: requestedBlockOption.num, - size: requestedBlockOption.size - }) - if (!block2) { - // this catch never be match, - // since we're gentleman, just handle it - that.statusCode = '4.02' - return OutgoingMessage.prototype.end.call(that) - } - this.setOption('Block2', block2) - this.setOption('ETag', _toETag(payload)) + const block2 = createBlock2({ + moreBlock2: moreFlag, + num: requestedBlockOption.num, + size: requestedBlockOption.size + }) + if (!block2) { + // this catch never be match, + // since we're gentleman, just handle it + that.statusCode = '4.02' + super.end() + } + this.setOption('Block2', block2) + this.setOption('ETag', _toETag(payload)) - // cache it - if (this._request.token && this._request.token.length > 0) { - this._addCacheEntry(this._cachekey, payload) - } - OutgoingMessage.prototype.end.call( - this, - payload.slice( - requestedBlockOption.num * requestedBlockOption.size, - (requestedBlockOption.num + 1) * requestedBlockOption.size + // cache it + if (this._request.token && this._request.token.length > 0) { + this._addCacheEntry(this._cachekey, payload) + } + super.end( + payload.slice( + requestedBlockOption.num * requestedBlockOption.size, + (requestedBlockOption.num + 1) * requestedBlockOption.size + ) ) - ) + } } /* diff --git a/test/agent.js b/test/agent.js index 552e09fa..3cb8d8c7 100644 --- a/test/agent.js +++ b/test/agent.js @@ -16,7 +16,7 @@ const request = coap.request describe('Agent config', function () { it('should get agent instance through custom config', function (done) { - const agent = coap.Agent({ type: 'udp4', port: 62754 }) + const agent = new coap.Agent({ type: 'udp4', port: 62754 }) expect(agent._sock.type).to.eql('udp4') expect(agent._sock._bindState).to.eql(1) done() @@ -24,7 +24,7 @@ describe('Agent config', function () { it('should get agent instance through custom socket', function (done) { const socket = dgram.createSocket('udp6') - const agent = coap.Agent({ socket, type: 'udp4', port: 62754 }) + const agent = new coap.Agent({ socket, type: 'udp4', port: 62754 }) expect(agent._opts.type).to.eql('udp6') expect(agent._sock.type).to.eql('udp6') expect(agent._sock._bindState).to.eql(0) diff --git a/test/retry_send.js b/test/retry_send.js index 9838d1bc..3565ad8a 100644 --- a/test/retry_send.js +++ b/test/retry_send.js @@ -22,12 +22,12 @@ describe('RetrySend', function () { }) it('should use default retry count, using the retry_send factory method', function () { - const result = RetrySend({}, 1234, 'localhost') + const result = new RetrySend({}, 1234, 'localhost') expect(result._maxRetransmit).to.eql(parameters.maxRetransmit) }) it('should use a custom retry count, using the retry_send factory method', function () { - const result = RetrySend({}, 1234, 'localhost', 55) + const result = new RetrySend({}, 1234, 'localhost', 55) expect(result._maxRetransmit).to.eql(55) }) }) diff --git a/test/segmentation.js b/test/segmentation.js index dde21eb3..7de7633b 100644 --- a/test/segmentation.js +++ b/test/segmentation.js @@ -12,10 +12,10 @@ describe('Segmentation', () => { describe('Segmented Transmission', () => { it('Should throw invalid block size error', (done) => { expect(() => { - segment.SegmentedTransmission(-1, 0, 0) + new segment.SegmentedTransmission(-1, 0, 0) // eslint-disable-line no-new }).to.throw('invalid block size -1') expect(() => { - segment.SegmentedTransmission(7, 0, 0) + new segment.SegmentedTransmission(7, 0, 0) // eslint-disable-line no-new }).to.throw('invalid block size 7') setImmediate(done) })