diff --git a/dev/issue-1241.js b/dev/issue-1241.js new file mode 100644 index 000000000..0954331ac --- /dev/null +++ b/dev/issue-1241.js @@ -0,0 +1,54 @@ +const ServiceBroker = require("../src/service-broker"); + +const broker = new ServiceBroker({ + middlewares: [ + { + call(next) { + return (actionName, params, opts) => { + const p = next(actionName, params, opts); + + const pp = p.then(res => { + return res; + }); + + pp.ctx = p.ctx; + + return pp; + }; + } + } + ] +}); + +broker.createService({ + name: "statusCodeTest", + + actions: { + testNotFound: { + rest: "GET /testNotFound", + handler(ctx) { + ctx.meta.$statusCode = 404; + } + } + } +}); + +broker.createService({ + name: "test", + actions: { + hello: { + async handler(ctx) { + await ctx.call("statusCodeTest.testNotFound"); + this.logger.info("Context meta", ctx.meta); + } + } + } +}); + +broker.start().then(() => { + broker.repl(); + + broker.call("test.hello").then(res => { + console.log("Result:", res); + }); +}); diff --git a/src/transit.js b/src/transit.js index a5c03df15..51474eb23 100644 --- a/src/transit.js +++ b/src/transit.js @@ -914,22 +914,23 @@ class Transit { } else { chunks.push(chunk); } - for (const ch of chunks) { - const copy = { ...payload }; - copy.seq = ++payload.seq; - copy.stream = true; - copy.params = ch; - - this.logger.debug( - `=> Send stream chunk ${requestID}to ${nodeName} node. Seq: ${copy.seq}` - ); - - this.publish(new Packet(P.PACKET_REQUEST, ctx.nodeID, copy)).catch( - publishCatch - ); - } - stream.resume(); - return; + + return this.Promise.all( + chunks.map(ch => { + const copy = Object.assign({}, payload); + copy.seq = ++payload.seq; + copy.stream = true; + copy.params = ch; + + this.logger.debug( + `=> Send stream chunk ${requestID}to ${nodeName} node. Seq: ${copy.seq}` + ); + + return this.publish(new Packet(P.PACKET_REQUEST, ctx.nodeID, copy)); + }) + ) + .then(() => stream.resume()) + .catch(publishCatch); }); stream.on("end", () => { @@ -1147,18 +1148,23 @@ class Transit { } else { chunks.push(chunk); } - for (const ch of chunks) { - const copy = { ...payload }; - copy.seq = ++payload.seq; - copy.stream = true; - copy.data = ch; - this.logger.debug(`=> Send stream chunk to ${nodeID} node. Seq: ${copy.seq}`); + return this.Promise.all( + chunks.map(ch => { + const copy = Object.assign({}, payload); + copy.seq = ++payload.seq; + copy.stream = true; + copy.data = ch; - this.publish(new Packet(P.PACKET_RESPONSE, nodeID, copy)).catch(publishCatch); - } - stream.resume(); - return; + this.logger.debug( + `=> Send stream chunk to ${nodeID} node. Seq: ${copy.seq}` + ); + + return this.publish(new Packet(P.PACKET_RESPONSE, nodeID, copy)); + }) + ) + .then(() => stream.resume()) + .catch(publishCatch); }); stream.on("end", () => { diff --git a/test/unit/transit.spec.js b/test/unit/transit.spec.js index 311d4208a..2cd4b33be 100644 --- a/test/unit/transit.spec.js +++ b/test/unit/transit.spec.js @@ -1968,7 +1968,7 @@ describe("Test Transit._sendRequest", () => { const reject = jest.fn(); beforeEach(() => { - transit.publish = jest.fn(() => Promise.resolve()); + transit.publish = jest.fn(() => Promise.resolve().delay(40)); }); it("should send stream chunks", () => { @@ -2115,8 +2115,11 @@ describe("Test Transit._sendRequest", () => { transit.publish.mockClear(); stream.push(randomData); }) - .delay(100) + .delay(20) + .then(() => expect(stream.isPaused()).toBeTruthy()) + .delay(80) .then(() => { + expect(stream.isPaused()).toBeFalsy(); expect(transit.publish).toHaveBeenCalledTimes( Math.ceil(randomData.length / transit.opts.maxChunkSize) ); @@ -2211,8 +2214,11 @@ describe("Test Transit._sendRequest", () => { }); transit.publish.mockClear(); }) - .delay(100) + .delay(20) + .then(() => expect(stream.isPaused()).toBeTruthy()) + .delay(80) .then(() => { + expect(stream.isPaused()).toBeFalsy(); expect(transit.publish).toHaveBeenCalledTimes( Math.ceil(randomData.length / transit.opts.maxChunkSize) + 1 ); @@ -2791,6 +2797,10 @@ describe("Test Transit.sendResponse", () => { }); describe("with Stream", () => { + beforeEach(() => { + transit.publish = jest.fn(() => Promise.resolve().delay(40)); + }); + it("should send stream chunks", () => { transit.publish.mockClear(); @@ -2900,8 +2910,11 @@ describe("Test Transit.sendResponse", () => { transit.publish.mockClear(); stream.push("first chunk"); }) - .delay(100) + .delay(20) + .then(() => expect(stream.isPaused()).toBeTruthy()) + .delay(80) .then(() => { + expect(stream.isPaused()).toBeFalsy(); expect(transit.publish).toHaveBeenCalledTimes(1); expect(transit.publish).toHaveBeenCalledWith({ payload: { @@ -2973,8 +2986,11 @@ describe("Test Transit.sendResponse", () => { transit.publish.mockClear(); stream.push(randomData); }) - .delay(100) + .delay(20) + .then(() => expect(stream.isPaused()).toBeTruthy()) + .delay(80) .then(() => { + expect(stream.isPaused()).toBeFalsy(); expect(transit.publish).toHaveBeenCalledTimes( Math.ceil(randomData.length / transit.opts.maxChunkSize) );