Skip to content

Commit

Permalink
Fixes #4443 (#13596)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarred-Sumner authored Aug 29, 2024
1 parent 743f40b commit e48369d
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 31 deletions.
40 changes: 9 additions & 31 deletions src/bun.js/api/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4382,15 +4382,10 @@ pub const ServerWebSocket = struct {
return .zero;
}

if (this.isClosed() and !publish_to_self) {
// We can't access the socket context on a closed socket.
return JSValue.jsNumber(0);
}

if (message_value.asArrayBuffer(globalThis)) |array_buffer| {
const buffer = array_buffer.slice();

const result = if (!publish_to_self)
const result = if (!publish_to_self and !this.isClosed())
this.websocket().publish(topic_slice.slice(), buffer, .binary, compress)
else
uws.AnyWebSocket.publishWithOptions(ssl, app, topic_slice.slice(), buffer, .binary, compress);
Expand All @@ -4408,7 +4403,7 @@ pub const ServerWebSocket = struct {

const buffer = string_slice.slice();

const result = if (!publish_to_self)
const result = if (!publish_to_self and !this.isClosed())
this.websocket().publish(topic_slice.slice(), buffer, .text, compress)
else
uws.AnyWebSocket.publishWithOptions(ssl, app, topic_slice.slice(), buffer, .text, compress);
Expand Down Expand Up @@ -4469,17 +4464,12 @@ pub const ServerWebSocket = struct {
return .zero;
}

if (this.isClosed() and !publish_to_self) {
// Can't publish on a closed socket.
return JSValue.jsNumber(0);
}

var string_slice = message_value.toSlice(globalThis, bun.default_allocator);
defer string_slice.deinit();

const buffer = string_slice.slice();

const result = if (!publish_to_self)
const result = if (!publish_to_self and !this.isClosed())
this.websocket().publish(topic_slice.slice(), buffer, .text, compress)
else
uws.AnyWebSocket.publishWithOptions(ssl, app, topic_slice.slice(), buffer, .text, compress);
Expand Down Expand Up @@ -4540,18 +4530,13 @@ pub const ServerWebSocket = struct {
return .zero;
}

if (this.isClosed() and !publish_to_self) {
// Can't publish on a closed socket.
return JSValue.jsNumber(0);
}

const array_buffer = message_value.asArrayBuffer(globalThis) orelse {
globalThis.throw("publishBinary expects an ArrayBufferView", .{});
return .zero;
};
const buffer = array_buffer.slice();

const result = if (!publish_to_self)
const result = if (!publish_to_self and !this.isClosed())
this.websocket().publish(topic_slice.slice(), buffer, .binary, compress)
else
uws.AnyWebSocket.publishWithOptions(ssl, app, topic_slice.slice(), buffer, .binary, compress);
Expand Down Expand Up @@ -4591,12 +4576,7 @@ pub const ServerWebSocket = struct {
return JSC.JSValue.jsNumber(0);
}

if (this.isClosed() and !publish_to_self) {
// We can't access the socket context on a closed socket.
return JSValue.jsNumber(0);
}

const result = if (!publish_to_self)
const result = if (!publish_to_self and !this.isClosed())
this.websocket().publish(topic_slice.slice(), buffer, .binary, compress)
else
uws.AnyWebSocket.publishWithOptions(ssl, app, topic_slice.slice(), buffer, .binary, compress);
Expand Down Expand Up @@ -4639,12 +4619,7 @@ pub const ServerWebSocket = struct {
return JSC.JSValue.jsNumber(0);
}

if (this.isClosed() and !publish_to_self) {
// We can't access the socket context on a closed socket.
return JSValue.jsNumber(0);
}

const result = if (!publish_to_self)
const result = if (!publish_to_self and !this.isClosed())
this.websocket().publish(topic_slice.slice(), buffer, .text, compress)
else
uws.AnyWebSocket.publishWithOptions(ssl, app, topic_slice.slice(), buffer, .text, compress);
Expand Down Expand Up @@ -6052,6 +6027,9 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
if (!abrupt) {
listener.close();
} else if (!this.flags.terminated) {
if (this.config.websocket) |*ws| {
ws.handler.app = null;
}
this.flags.terminated = true;
this.app.close();
}
Expand Down
67 changes: 67 additions & 0 deletions test/js/bun/websocket/websocket-server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,73 @@ afterEach(() => {
}
});

// publish on a closed websocket
// connecct 2 websocket clients to one server
// wait for one to call close callback
// publish to the other client
// the other client should not receive the message
// the server should not crash
// https://github.com/oven-sh/bun/issues/4443
it("websocket/4443", async () => {
var serverSockets: ServerWebSocket<unknown>[] = [];
var onFirstConnected = Promise.withResolvers();
var onSecondMessageEchoedBack = Promise.withResolvers();
using server = Bun.serve({
port: 0,
websocket: {
open(ws) {
serverSockets.push(ws);
ws.subscribe("test");
if (serverSockets.length === 2) {
onFirstConnected.resolve();
}
},
message(ws, message) {
onSecondMessageEchoedBack.resolve();
ws.close();
},
close(ws) {
ws.publish("test", "close");
},
},
fetch(req, server) {
server.upgrade(req);
return new Response();
},
});

var clients = [];
var closedCount = 0;
var onClientsOpened = Promise.withResolvers();

var { promise, resolve } = Promise.withResolvers();
for (let i = 0; i < 2; i++) {
const ws = new WebSocket(`ws://${server.hostname}:${server.port}`);
ws.binaryType = "arraybuffer";

const clientSocket = new WebSocket(`ws://${server.hostname}:${server.port}`);
clientSocket.binaryType = "arraybuffer";
clientSocket.onopen = () => {
clients.push(clientSocket);
if (clients.length === 2) {
onClientsOpened.resolve();
}
};
clientSocket.onmessage = e => {
clientSocket.send(e.data);
};
clientSocket.onclose = () => {
if (closedCount++ === 1) {
resolve();
}
};
}

await Promise.all([onFirstConnected.promise, onClientsOpened.promise]);
clients[0].close();
await promise;
});

describe("Server", () => {
test("subscribe", done => ({
open(ws) {
Expand Down

0 comments on commit e48369d

Please sign in to comment.