Skip to content

Commit

Permalink
Merge branch 'master' into next
Browse files Browse the repository at this point in the history
  • Loading branch information
icebob committed Oct 29, 2023
2 parents 9f5f7c8 + da2aabc commit 72c369e
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 31 deletions.
54 changes: 54 additions & 0 deletions dev/issue-1241.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
const ServiceBroker = require("../src/service-broker");

const broker = new ServiceBroker({
middlewares: [
{
call(next) {
return (actionName, params, opts) => {
const p = next(actionName, params, opts);

const pp = p.then(res => {
return res;
});

pp.ctx = p.ctx;

return pp;
};
}
}
]
});

broker.createService({
name: "statusCodeTest",

actions: {
testNotFound: {
rest: "GET /testNotFound",
handler(ctx) {
ctx.meta.$statusCode = 404;
}
}
}
});

broker.createService({
name: "test",
actions: {
hello: {
async handler(ctx) {
await ctx.call("statusCodeTest.testNotFound");
this.logger.info("Context meta", ctx.meta);
}
}
}
});

broker.start().then(() => {
broker.repl();

broker.call("test.hello").then(res => {
console.log("Result:", res);
});
});
58 changes: 32 additions & 26 deletions src/transit.js
Original file line number Diff line number Diff line change
Expand Up @@ -914,22 +914,23 @@ class Transit {
} else {
chunks.push(chunk);
}
for (const ch of chunks) {
const copy = { ...payload };
copy.seq = ++payload.seq;
copy.stream = true;
copy.params = ch;

this.logger.debug(
`=> Send stream chunk ${requestID}to ${nodeName} node. Seq: ${copy.seq}`
);

this.publish(new Packet(P.PACKET_REQUEST, ctx.nodeID, copy)).catch(
publishCatch
);
}
stream.resume();
return;

return this.Promise.all(
chunks.map(ch => {
const copy = Object.assign({}, payload);
copy.seq = ++payload.seq;
copy.stream = true;
copy.params = ch;

this.logger.debug(
`=> Send stream chunk ${requestID}to ${nodeName} node. Seq: ${copy.seq}`
);

return this.publish(new Packet(P.PACKET_REQUEST, ctx.nodeID, copy));
})
)
.then(() => stream.resume())
.catch(publishCatch);
});

stream.on("end", () => {
Expand Down Expand Up @@ -1147,18 +1148,23 @@ class Transit {
} else {
chunks.push(chunk);
}
for (const ch of chunks) {
const copy = { ...payload };
copy.seq = ++payload.seq;
copy.stream = true;
copy.data = ch;

this.logger.debug(`=> Send stream chunk to ${nodeID} node. Seq: ${copy.seq}`);
return this.Promise.all(
chunks.map(ch => {
const copy = Object.assign({}, payload);
copy.seq = ++payload.seq;
copy.stream = true;
copy.data = ch;

this.publish(new Packet(P.PACKET_RESPONSE, nodeID, copy)).catch(publishCatch);
}
stream.resume();
return;
this.logger.debug(
`=> Send stream chunk to ${nodeID} node. Seq: ${copy.seq}`
);

return this.publish(new Packet(P.PACKET_RESPONSE, nodeID, copy));
})
)
.then(() => stream.resume())
.catch(publishCatch);
});

stream.on("end", () => {
Expand Down
26 changes: 21 additions & 5 deletions test/unit/transit.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -1968,7 +1968,7 @@ describe("Test Transit._sendRequest", () => {
const reject = jest.fn();

beforeEach(() => {
transit.publish = jest.fn(() => Promise.resolve());
transit.publish = jest.fn(() => Promise.resolve().delay(40));
});

it("should send stream chunks", () => {
Expand Down Expand Up @@ -2115,8 +2115,11 @@ describe("Test Transit._sendRequest", () => {
transit.publish.mockClear();
stream.push(randomData);
})
.delay(100)
.delay(20)
.then(() => expect(stream.isPaused()).toBeTruthy())
.delay(80)
.then(() => {
expect(stream.isPaused()).toBeFalsy();
expect(transit.publish).toHaveBeenCalledTimes(
Math.ceil(randomData.length / transit.opts.maxChunkSize)
);
Expand Down Expand Up @@ -2211,8 +2214,11 @@ describe("Test Transit._sendRequest", () => {
});
transit.publish.mockClear();
})
.delay(100)
.delay(20)
.then(() => expect(stream.isPaused()).toBeTruthy())
.delay(80)
.then(() => {
expect(stream.isPaused()).toBeFalsy();
expect(transit.publish).toHaveBeenCalledTimes(
Math.ceil(randomData.length / transit.opts.maxChunkSize) + 1
);
Expand Down Expand Up @@ -2791,6 +2797,10 @@ describe("Test Transit.sendResponse", () => {
});

describe("with Stream", () => {
beforeEach(() => {
transit.publish = jest.fn(() => Promise.resolve().delay(40));
});

it("should send stream chunks", () => {
transit.publish.mockClear();

Expand Down Expand Up @@ -2900,8 +2910,11 @@ describe("Test Transit.sendResponse", () => {
transit.publish.mockClear();
stream.push("first chunk");
})
.delay(100)
.delay(20)
.then(() => expect(stream.isPaused()).toBeTruthy())
.delay(80)
.then(() => {
expect(stream.isPaused()).toBeFalsy();
expect(transit.publish).toHaveBeenCalledTimes(1);
expect(transit.publish).toHaveBeenCalledWith({
payload: {
Expand Down Expand Up @@ -2973,8 +2986,11 @@ describe("Test Transit.sendResponse", () => {
transit.publish.mockClear();
stream.push(randomData);
})
.delay(100)
.delay(20)
.then(() => expect(stream.isPaused()).toBeTruthy())
.delay(80)
.then(() => {
expect(stream.isPaused()).toBeFalsy();
expect(transit.publish).toHaveBeenCalledTimes(
Math.ceil(randomData.length / transit.opts.maxChunkSize)
);
Expand Down

0 comments on commit 72c369e

Please sign in to comment.