diff --git a/lib/transport/receiver/jsonp.js b/lib/transport/receiver/jsonp.js index 365fecc2..c4dae1e0 100644 --- a/lib/transport/receiver/jsonp.js +++ b/lib/transport/receiver/jsonp.js @@ -55,12 +55,16 @@ JsonpReceiver.prototype._callback = function(data) { return; } - if (data) { - debug('message', data); - this.emit('message', data); + try { + if (data) { + debug('message', data); + this.emit('message', data); + } + } finally { + // Must clean up even if 'message' event handlers throw + this.emit('close', null, 'network'); + this.removeAllListeners(); } - this.emit('close', null, 'network'); - this.removeAllListeners(); }; JsonpReceiver.prototype._abort = function(err) { diff --git a/lib/transport/receiver/xhr.js b/lib/transport/receiver/xhr.js index 8ec7bc84..9672c017 100644 --- a/lib/transport/receiver/xhr.js +++ b/lib/transport/receiver/xhr.js @@ -37,16 +37,23 @@ XhrReceiver.prototype._chunkHandler = function(status, text) { return; } - for (var idx = -1; ; this.bufferPosition += idx + 1) { + for (var idx = -1; ; ) { var buf = text.slice(this.bufferPosition); idx = buf.indexOf('\n'); if (idx === -1) { break; } var msg = buf.slice(0, idx); - if (msg) { - debug('message', msg); - this.emit('message', msg); + try { + if (msg) { + debug('message', msg); + this.emit('message', msg); + } + } finally { + // The bufferPosition must advance even if 'message' handler + // throws an exception, otherwise the same message will be + // replayed the next time _chunkHandler is called. + this.bufferPosition += idx + 1; } } }; diff --git a/tests/lib/receivers.js b/tests/lib/receivers.js index 823c9698..8281cef5 100644 --- a/tests/lib/receivers.js +++ b/tests/lib/receivers.js @@ -55,6 +55,32 @@ describe('Receivers', function () { }); }); + it('survives errors in handlers', function (done) { + var test = this.runnable(); + JsonpReceiver.prototype._createScript = function () { + var self = this; + setTimeout(function () { + try { + global[utils.WPrefix][self.id]('datadata'); + } catch (e) { + if (!(test.timedOut || test.duration)) { + done(new Error('close event not fired')); + } + } + }, 5); + }; + var jpr = new JsonpReceiver('test'); + jpr.on('close', function () { + if (test.timedOut || test.duration) { + return; + } + done(); + }); + jpr.on('message', function () { + throw new Error('boom'); + }); + }); + it('will timeout', function (done) { this.timeout(500); var test = this.runnable(); @@ -226,6 +252,7 @@ describe('Receivers', function () { return; } try { + expect(i).to.equal(3); expect(reason).to.equal('network'); } catch (e) { done(e); @@ -233,7 +260,7 @@ describe('Receivers', function () { } done(); }); - xhr._chunkHandler(200, 'test\nmultiple\nlines'); + xhr._chunkHandler(200, 'test\nmultiple\nlines\n'); }); it('emits no messages for an empty string response', function (done) { @@ -287,5 +314,41 @@ describe('Receivers', function () { }); xhr.abort(); }); + + it('survives errors in message handlers', function (done) { + var test = this.runnable(); + var xhr = new XhrReceiver('test', XhrFake); + var messages = []; + xhr.on('message', function (msg) { + messages.push(msg); + if (msg === 'one') { + throw new Error('boom'); + } + }); + xhr.on('close', function (code, reason) { + if (test.timedOut || test.duration) { + return; + } + try { + expect(messages).to.eql(['one', 'two', 'three']); + } catch (e) { + done(e); + return; + } + done(); + }); + try { + xhr._chunkHandler(200, 'one\n'); + } catch (e) { + // This is expected + } + try { + xhr._chunkHandler(200, 'one\ntwo\nthree\n'); + } catch (e) { + // This is NOT expected, but let the error be reported in the close handler + // instead of here + } + xhr.abort(); + }); }); });