Skip to content

Commit

Permalink
Fix socket/stream mutual destruction, remove setTimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
boronine committed Nov 3, 2024
1 parent fb6cc45 commit f84a580
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 62 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# h2tunnel

![NPM Version](https://img.shields.io/npm/v/h2tunnel)
![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/boronine/h2tunnel/node.js.yml)
[![NPM Version](https://img.shields.io/npm/v/h2tunnel)](https://www.npmjs.com/package/h2tunnel)
[![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/boronine/h2tunnel/node.js.yml)](https://github.com/boronine/h2tunnel/actions/workflows/node.js.yml)

A CLI tool and Node.js library for a popular "tunneling" workflow, similar to the proprietary [ngrok](https://ngrok.com/)
or the openssh-based `ssh -L` solution. All in [less than 500 LOC](https://github.com/boronine/h2tunnel/blob/main/src/h2tunnel.ts)
Expand Down
65 changes: 38 additions & 27 deletions src/h2tunnel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {
TunnelServer,
} from "./h2tunnel.js";
import net from "node:net";
import * as http2 from "node:http2";
import { strictEqual } from "node:assert";

// localhost HTTP1 server "python3 -m http.server"
const LOCAL_PORT = 14000;
Expand All @@ -20,6 +22,9 @@ const TUNNEL_PORT = 14005;
// remote HTTPS server that is piped through the tunnel to localhost
const MUX_PORT = 14006;

// Reduce this to make tests faster
const TIME_MULTIPLIER = 0.1;

const CLIENT_KEY = `-----BEGIN PRIVATE KEY-----
MIG2AgEAMBAGByqGSM49AgEGBSuBBAAiBIGeMIGbAgEBBDCDzcLnOqzvCrnUyd4P
1QcIG/Xi/VPpA5dVIwPVkutr9y/wZo3aJsYUX5xExQMsEeihZANiAAQfSPquV3P/
Expand Down Expand Up @@ -65,13 +70,13 @@ const clientOptions: ClientOptions = {
cert: CLIENT_CRT,
localHttpPort: LOCAL_PORT,
demuxListenPort: DEMUX_PORT,
tunnelRestartTimeout: 500,
tunnelRestartTimeout: 500 * TIME_MULTIPLIER,
};

type Conn = { clientSocket: net.Socket; originSocket: net.Socket };

async function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
return new Promise((resolve) => setTimeout(resolve, ms * TIME_MULTIPLIER));
}

async function createBadTlsServer(port: number): Promise<() => Promise<void>> {
Expand Down Expand Up @@ -294,9 +299,9 @@ async function testConn(
if (term === "FIN") {
await t.test(
`clean termination by ${by} FIN`,
{ plan: 12 },
{ plan: 12, timeout: 1000 },
(t: TestContext) =>
new Promise<void>((resolve) => {
new Promise<void>((resolve, reject) => {
let i = 0;
const done = () => i === 2 && resolve();
t.assert.strictEqual(socket2.readyState, "open");
Expand Down Expand Up @@ -332,7 +337,7 @@ async function testConn(
} else if (term == "RST") {
await t.test(
`clean reset by ${by} RST`,
{ plan: 8 },
{ plan: 8, timeout: 1000 },
(t: TestContext) =>
new Promise<void>((resolve) => {
let i = 0;
Expand Down Expand Up @@ -360,37 +365,43 @@ async function testConn(
}
}

await test("basic connection and termination", async (t) => {
await test.only("basic connection and termination", async (t) => {
const net = new NetworkEmulator(LOCAL_PORT, PROXY_TEST_PORT);
const server = new TunnelServer(serverOptions);
const client = new TunnelClient(clientOptions);
server.start();
client.start();
await server.waitUntilListening();
await client.waitUntilConnected();
await server.waitUntilConnected();
console.log(0, client.state);
await net.startAndWaitUntilReady();
for (const term of ["FIN", "RST"] satisfies ("FIN" | "RST")[]) {
for (const by of ["client", "server"] satisfies ("client" | "server")[]) {
for (const numBytes of [1, 4]) {
for (const proxyPort of [LOCAL_PORT, PROXY_TEST_PORT, PROXY_PORT]) {
const echoServer = new EchoServer(LOCAL_PORT, proxyPort);
await echoServer.startAndWaitUntilReady();
const strict = proxyPort !== PROXY_PORT;
// Test single
await testConn(t, echoServer, numBytes, term, by, 0, strict);
// Test double simultaneous
await Promise.all([
testConn(t, echoServer, numBytes, term, by, 0, strict),
testConn(t, echoServer, numBytes, term, by, 0, strict),
]);
// Test triple delayed
await Promise.all([
testConn(t, echoServer, numBytes, term, by, 0, strict),
testConn(t, echoServer, numBytes, term, by, 10, strict),
testConn(t, echoServer, numBytes, term, by, 100, strict),
]);
await echoServer.stopAndWaitUntilClosed();
}
for (const proxyPort of [LOCAL_PORT, PROXY_TEST_PORT, PROXY_PORT]) {
await t.test(
`clean termination by ${by} ${term} on ${proxyPort}`,
async (t) => {
const echoServer = new EchoServer(LOCAL_PORT, proxyPort);
await echoServer.startAndWaitUntilReady();
const strict = proxyPort !== PROXY_PORT;
// Test single
await testConn(t, echoServer, 1, term, by, 0, strict);
await testConn(t, echoServer, 4, term, by, 0, strict);
// Test double simultaneous
await Promise.all([
testConn(t, echoServer, 3, term, by, 0, strict),
testConn(t, echoServer, 3, term, by, 0, strict),
]);
// Test triple delayed
await Promise.all([
testConn(t, echoServer, 4, term, by, 0, strict),
testConn(t, echoServer, 4, term, by, 10, strict),
testConn(t, echoServer, 4, term, by, 100, strict),
]);
await echoServer.stopAndWaitUntilClosed();
},
);
}
}
}
Expand All @@ -400,7 +411,7 @@ await test("basic connection and termination", async (t) => {
await server.stop();
});

await test.only("happy-path", async (t) => {
await test("happy-path", async (t) => {
const echo = new EchoServer(LOCAL_PORT, PROXY_PORT);
await echo.startAndWaitUntilReady();

Expand Down
68 changes: 35 additions & 33 deletions src/h2tunnel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,6 @@ export abstract class AbstractTunnel<

linkSocketsIfNecessary() {
if (this.tunnelSocket && !this.tunnelSocket.closed && this.muxSocket) {
this.tunnelSocket.on("data", (data) =>
this.log({ tunnelSocketBytes: data.length }),
);
this.muxSocket.on("data", (data) =>
this.log({ muxSocketBytes: data.length }),
);
this.tunnelSocket.pipe(this.muxSocket);
this.muxSocket.pipe(this.tunnelSocket);
this.log({ linked: true });
Expand Down Expand Up @@ -147,39 +141,47 @@ export abstract class AbstractTunnel<

addDemuxSocket(socket: net.Socket, stream: http2.Http2Stream): void {
this.log({ demuxSocket: "added", streamId: stream.id });
socket.on("data", (chunk) => stream.write(chunk));
stream.on("data", (chunk) => socket.write(chunk));
socket.on("error", (err) => {
this.log({ demuxSocket: "error", err });
if (!stream.closed) {
stream.close(http2.constants.NGHTTP2_CANCEL);
}
socket.on("data", (chunk) => {
this.log({ streamDataWrite: chunk.length, streamId: stream.id });
stream.write(chunk);
});
socket.on("end", () => {
this.log({ demuxSocket: "end" });
stream.end();
stream.on("data", (chunk) => {
this.log({ streamDataRead: chunk.length, streamId: stream.id });
socket.write(chunk);
});
stream.on("aborted", () => {
this.log({ demuxStream: "aborted" });
});
stream.on("error", (err) => {
this.log({ demuxStream: "error", err });
// Prevent error being logged, we are handling it during the "close" event
socket.on("error", () => {});
socket.on("close", () => {
this.log({
demuxSocket: "close",
streamId: stream.id,
streamError: stream.errored,
socketError: socket.errored,
});
if (!stream.destroyed) {
if (socket.errored) {
stream.close(http2.constants.NGHTTP2_INTERNAL_ERROR);
} else {
stream.destroy();
}
}
});
stream.on("end", () => {
this.log({ demuxStream: "end" });
// This is a hack to workaround Node.js behavior where "end"/"close" event is emitted before "aborted"/"error"
setTimeout(() => {
if (stream.aborted) {
this.log({
demuxStream: "actually-aborted",
socketAlreadyDestroyed: socket.destroyed,
});
// Prevent error being logged, we are handling it during the "close" event
stream.on("error", () => {});
stream.on("close", () => {
this.log({
demuxStream: "close",
streamId: stream.id,
streamError: stream.errored,
socketError: socket.errored,
});
if (!socket.destroyed) {
if (stream.errored) {
socket.resetAndDestroy();
} else {
this.log({ demuxStream: "actually-ended" });
socket.end();
socket.destroy();
}
}, 1);
}
});
}

Expand Down

0 comments on commit f84a580

Please sign in to comment.