Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
darrachequesne committed Jul 16, 2024
1 parent 35c4f47 commit a6244ec
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 32 deletions.
18 changes: 6 additions & 12 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 10 additions & 5 deletions packages/socket.io-clustered-engine/lib/engine.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Server, type ServerOptions, Socket, type Transport } from "engine.io";
import { randomUUID } from "node:crypto";
import { randomBytes } from "node:crypto";
import { setTimeout, clearTimeout } from "node:timers";
import { type IncomingMessage } from "node:http";
import { type Packet } from "engine.io-parser";
Expand All @@ -18,7 +18,11 @@ type Brand<K, T> = K & { __brand: T };

type NodeId = Brand<string, "NodeId">;
type SessionId = Brand<string, "SessionId">;
type RequestId = Brand<string, "RequestId">;
type RequestId = Brand<number, "RequestId">;

function randomId() {
return randomBytes(3).toString("hex");
}

enum MessageType {
ACQUIRE_LOCK = 0,
Expand Down Expand Up @@ -149,9 +153,10 @@ interface ClusterEngineOptions {
// @ts-expect-error onWebSocket() method is private in parent class
export abstract class ClusterEngine extends Server {
private readonly _opts: Required<ClusterEngineOptions>;
protected readonly _nodeId = randomUUID() as NodeId;
protected readonly _nodeId = randomId() as NodeId;
private readonly _requests = new Map<RequestId, ClusterRequest>();
private readonly _remoteTransports = new Map<SessionId, Transport>();
private _requestCount = 0;

constructor(opts?: ServerOptions & ClusterEngineOptions) {
super(opts);
Expand Down Expand Up @@ -426,7 +431,7 @@ export abstract class ClusterEngine extends Server {
onSuccess: (senderId: NodeId) => void,
onError: () => void
) {
const requestId = randomUUID() as RequestId;
const requestId = ++this._requestCount as RequestId;

const timer = setTimeout(() => {
this._requests.delete(requestId);
Expand Down Expand Up @@ -571,7 +576,7 @@ export abstract class ClusterEngine extends Server {
debug("upgrade success");
this._hookTransport(sid, transport, "read", senderId);

const requestId = randomUUID() as RequestId;
const requestId = ++this._requestCount as RequestId;

const onSuccess = (takeOver: boolean, packets: Packet[]) => {
if (takeOver) {
Expand Down
8 changes: 4 additions & 4 deletions packages/socket.io-clustered-engine/lib/redis.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ClusterEngine, type Message } from "./engine";
import { encode, decode } from "notepack.io";
import { encode, decode } from "@msgpack/msgpack";
import { type ServerOptions } from "engine.io";
import cluster from "node:cluster";
import { randomUUID } from "node:crypto";
Expand Down Expand Up @@ -41,7 +41,7 @@ export function setupPrimaryWithRedis(
SUBSCRIBE(subClient, channels, (buffer: Buffer) => {
let message: Message & { _source?: string; _primaryId?: string };
try {
message = decode(buffer);
message = decode(buffer) as Message;
} catch (e) {
debug("ignore malformed buffer");
return;
Expand Down Expand Up @@ -147,7 +147,7 @@ export class RedisEngine extends ClusterEngine {
SUBSCRIBE(subClient, channels, (buffer: Buffer) => {
let message: Message & { _source?: string; _primaryId?: string };
try {
message = decode(buffer);
message = decode(buffer) as Message;
} catch (e) {
debug("ignore malformed buffer");
return;
Expand All @@ -170,7 +170,7 @@ export class RedisEngine extends ClusterEngine {
message._source = MESSAGE_SOURCE;

debug("publish message to channel %s", channel);
this._pubClient.publish(channel, encode(message));
this._pubClient.publish(channel, Buffer.from(encode(message)));
}
}

Expand Down
5 changes: 2 additions & 3 deletions packages/socket.io-clustered-engine/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
"license": "MIT",
"description": "",
"dependencies": {
"@msgpack/msgpack": "^3.0.0-beta2",
"@msgpack/msgpack": "~2.8.0",
"debug": "~4.3.3",
"engine.io": "~6.6.0",
"engine.io-parser": "~5.2.3",
"notepack.io": "^3.0.1"
"engine.io-parser": "~5.2.3"
},
"scripts": {
"compile": "rimraf ./dist && tsc",
Expand Down
11 changes: 10 additions & 1 deletion packages/socket.io-clustered-engine/test/cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,16 @@ describe("cluster", () => {
});

afterEach((done) => {
cluster.disconnect(done);
for (const worker of Object.values(cluster.workers)) {
worker.kill();
}
function onExit() {
if (Object.keys(cluster.workers).length === 0) {
cluster.off("exit", onExit);
done();
}
}
cluster.on("exit", onExit);
});

it("should ping/pong", (done) => {
Expand Down
3 changes: 3 additions & 0 deletions packages/socket.io-clustered-engine/test/in-memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@ describe("in-memory", () => {
engine2.close();
engine3.close();
httpServer1.close();
httpServer1.closeAllConnections();
httpServer2.close();
httpServer2.closeAllConnections();
httpServer3.close();
httpServer3.closeAllConnections();
});

it("should work (read)", (done) => {
Expand Down
18 changes: 11 additions & 7 deletions packages/socket.io-clustered-engine/test/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ describe("redis", () => {
engine2.close();
engine3.close();
httpServer1.close();
httpServer1.closeAllConnections();
httpServer2.close();
httpServer2.closeAllConnections();
httpServer3.close();
httpServer3.closeAllConnections();

return Promise.all([
pubClient.disconnect(),
Expand Down Expand Up @@ -143,20 +146,21 @@ describe("redis", () => {
engine3.attach(httpServer3);
httpServer3.listen(3002);

cleanup = () => {
cleanup = async () => {
engine1.close();
engine2.close();
engine3.close();
httpServer1.close();
httpServer1.closeAllConnections();
httpServer2.close();
httpServer2.closeAllConnections();
httpServer3.close();
httpServer3.closeAllConnections();

return Promise.all([
pubClient.disconnect(),
subClient1.disconnect(),
subClient2.disconnect(),
subClient3.disconnect(),
]).then();
pubClient.disconnect();
subClient1.disconnect();
subClient2.disconnect();
subClient3.disconnect();
};
});

Expand Down

0 comments on commit a6244ec

Please sign in to comment.