diff --git a/packages/bun-uws/src/AsyncSocket.h b/packages/bun-uws/src/AsyncSocket.h index b69a8312bde71..a01a08502b2bd 100644 --- a/packages/bun-uws/src/AsyncSocket.h +++ b/packages/bun-uws/src/AsyncSocket.h @@ -16,7 +16,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -// clang-format off + #ifndef UWS_ASYNCSOCKET_H #define UWS_ASYNCSOCKET_H @@ -255,8 +255,10 @@ struct AsyncSocket { if (asyncSocketData->buffer.length()) { /* Write off as much as we can */ int written = us_socket_write(SSL, (us_socket_t *) this, asyncSocketData->buffer.data(), (int) asyncSocketData->buffer.length(), /*nextLength != 0 | */length); + /* On failure return, otherwise continue down the function */ if ((unsigned int) written < asyncSocketData->buffer.length()) { + /* Update buffering (todo: we can do better here if we keep track of what happens to this guy later on) */ asyncSocketData->buffer.erase((unsigned int) written); @@ -266,6 +268,7 @@ struct AsyncSocket { } else { /* This path is horrible and points towards erroneous usage */ asyncSocketData->buffer.append(src, (unsigned int) length); + return {length, true}; } } @@ -307,6 +310,7 @@ struct AsyncSocket { if (optionally) { return {written, true}; } + /* Fall back to worst possible case (should be very rare for HTTP) */ /* At least we can reserve room for next chunk if we know it up front */ if (nextLength) { @@ -340,7 +344,7 @@ struct AsyncSocket { auto [written, failed] = write(loopData->corkBuffer, (int) loopData->corkOffset, false, length); loopData->corkOffset = 0; - if (failed && optionally) { + if (failed) { /* We do not need to care for buffering here, write does that */ return {0, true}; } diff --git a/packages/bun-uws/src/HttpContext.h b/packages/bun-uws/src/HttpContext.h index 96373612047f5..7521953ecce7c 100644 --- a/packages/bun-uws/src/HttpContext.h +++ b/packages/bun-uws/src/HttpContext.h @@ -374,7 +374,9 @@ struct HttpContext { return s; } - /* We need to drain any remaining buffered data if success == true*/ + /* We don't want to fall through since we don't want to mess with timeout. + * It makes little sense to drain any backpressure when the user has registered onWritable. */ + return s; } /* Drain any socket buffer, this might empty our backpressure and thus finish the request */ diff --git a/packages/bun-uws/src/HttpResponse.h b/packages/bun-uws/src/HttpResponse.h index ea0658d5f1ae5..69eb1f7a8e8e3 100644 --- a/packages/bun-uws/src/HttpResponse.h +++ b/packages/bun-uws/src/HttpResponse.h @@ -573,14 +573,6 @@ struct HttpResponse : public AsyncSocket { return this; } - /* Remove handler for writable HTTP response */ - HttpResponse *clearOnWritable() { - HttpResponseData *httpResponseData = getHttpResponseData(); - - httpResponseData->onWritable = nullptr; - return this; - } - /* Attach handler for aborted HTTP request */ HttpResponse *onAborted(MoveOnlyFunction &&handler) { HttpResponseData *httpResponseData = getHttpResponseData(); diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 11cfadf62cbdb..29675c570e426 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -1967,6 +1967,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { const amount = @as(Blob.SizeType, @truncate(amount1)); this.offset += amount; this.wrote += amount; + this.buffer.len -|= @as(u32, @truncate(amount)); if (this.offset >= this.buffer.len) { this.offset = 0; @@ -1986,9 +1987,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { fn hasBackpressure(this: *const @This()) bool { return this.has_backpressure; } - fn hasBackpressureAndIsTryEnd(this: *const @This()) bool { - return this.has_backpressure and this.end_len > 0; - } + fn sendWithoutAutoFlusher(this: *@This(), buf: []const u8) bool { bun.assert(!this.done); defer log("send: {d} bytes (backpressure: {any})", .{ buf.len, this.has_backpressure }); @@ -1997,29 +1996,29 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { this.handleFirstWriteIfNecessary(); const success = this.res.tryEnd(buf, this.end_len, false); this.has_backpressure = !success; - if (this.has_backpressure) { - this.res.onWritable(*@This(), onWritable, this); - } return success; } - // clean this so we know when its relevant or not - this.end_len = 0; - // we clear the onWritable handler so uWS can handle the backpressure for us - this.res.clearOnWritable(); - this.handleFirstWriteIfNecessary(); + // uWebSockets lacks a tryWrite() function // This means that backpressure will be handled by appending to an "infinite" memory buffer // It will do the backpressure handling for us // so in this scenario, we just append to the buffer // and report success if (this.requested_end) { + this.handleFirstWriteIfNecessary(); this.res.end(buf, false); this.has_backpressure = false; + return true; } else { + this.handleFirstWriteIfNecessary(); this.has_backpressure = !this.res.write(buf); + if (this.has_backpressure) { + this.res.onWritable(*@This(), onWritable, this); + } + return true; } - return true; + unreachable; } fn send(this: *@This(), buf: []const u8) bool { @@ -2028,52 +2027,39 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { } fn readableSlice(this: *@This()) []const u8 { - return this.buffer.ptr[this.offset..this.buffer.len]; + return this.buffer.ptr[this.offset..this.buffer.cap][0..this.buffer.len]; } - pub fn onWritable(this: *@This(), write_offset: u64, _: *UWSResponse) callconv(.C) bool { - // write_offset is the amount of data that was written not how much we need to write + pub fn onWritable(this: *@This(), write_offset_: u64, _: *UWSResponse) callconv(.C) bool { + const write_offset: u64 = @as(u64, write_offset_); log("onWritable ({d})", .{write_offset}); - // onWritable reset backpressure state to allow flushing - this.has_backpressure = false; - if (this.aborted) { - this.res.clearOnWritable(); - this.signal.close(null); - this.flushPromise(); + + if (this.done) { + if (this.aborted == false) { + this.res.endStream(false); + } this.finalize(); return false; } - var total_written: u64 = 0; // do not write more than available // if we do, it will cause this to be delayed until the next call, each time - // TODO: should we break it in smaller chunks? - const to_write = @min(@as(Blob.SizeType, @truncate(write_offset)), @as(Blob.SizeType, this.buffer.len - 1)); - const chunk = this.readableSlice()[to_write..]; - // if we have nothing to write, we are done - if (chunk.len == 0) { - if (this.done) { - this.res.clearOnWritable(); - this.signal.close(null); - this.flushPromise(); - this.finalize(); - return true; - } - } else { - if (!this.send(chunk)) { - // if we were unable to send it, retry - return false; - } - this.handleWrote(@as(Blob.SizeType, @truncate(chunk.len))); - total_written = chunk.len; - - if (this.requested_end) { - this.res.clearOnWritable(); - this.signal.close(null); - this.flushPromise(); - this.finalize(); - return true; - } + const to_write = @min(@as(Blob.SizeType, @truncate(write_offset)), @as(Blob.SizeType, this.buffer.len)); + + // figure out how much data exactly to write + const readable = this.readableSlice()[0..to_write]; + if (!this.send(readable)) { + // if we were unable to send it, retry + this.res.onWritable(*@This(), onWritable, this); + return true; + } + + this.handleWrote(@as(Blob.SizeType, @truncate(readable.len))); + const initial_wrote = this.wrote; + + if (this.buffer.len > 0 and !this.done) { + this.res.onWritable(*@This(), onWritable, this); + return true; } // flush the javascript promise from calling .flush() @@ -2082,13 +2068,16 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { // pending_flush or callback could have caused another send() // so we check again if we should report readiness if (!this.done and !this.requested_end and !this.hasBackpressure()) { - // no pending and total_written > 0 - if (total_written > 0 and this.readableSlice().len == 0) { - this.signal.ready(@as(Blob.SizeType, @truncate(total_written)), null); + const pending = @as(Blob.SizeType, @truncate(write_offset)) -| to_write; + const written_after_flush = this.wrote - initial_wrote; + const to_report = pending - @min(written_after_flush, pending); + + if ((written_after_flush == initial_wrote and pending == 0) or to_report > 0) { + this.signal.ready(to_report, null); } } - return true; + return false; } pub fn start(this: *@This(), stream_start: StreamStart) JSC.Maybe(void) { @@ -2139,7 +2128,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { fn flushFromJSNoWait(this: *@This()) JSC.Maybe(JSValue) { log("flushFromJSNoWait", .{}); - if (this.hasBackpressureAndIsTryEnd() or this.done) { + if (this.hasBackpressure() or this.done) { return .{ .result = JSValue.jsNumberFromInt32(0) }; } @@ -2173,7 +2162,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { return .{ .result = JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumberFromInt32(0)) }; } - if (!this.hasBackpressureAndIsTryEnd()) { + if (!this.hasBackpressure()) { const slice = this.readableSlice(); assert(slice.len > 0); const success = this.send(slice); @@ -2181,6 +2170,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { this.handleWrote(@as(Blob.SizeType, @truncate(slice.len))); return .{ .result = JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumber(slice.len)) }; } + + this.res.onWritable(*@This(), onWritable, this); } this.wrote_at_start_of_flush = this.wrote; this.pending_flush = JSC.JSPromise.create(globalThis); @@ -2228,8 +2219,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { _ = this.buffer.write(this.allocator, bytes) catch { return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; }; + this.registerAutoFlusher(); } else if (this.buffer.len + len >= this.highWaterMark) { - // TODO: attempt to write both in a corked buffer? _ = this.buffer.write(this.allocator, bytes) catch { return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; @@ -2237,16 +2228,21 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { const slice = this.readableSlice(); if (this.send(slice)) { this.handleWrote(slice.len); + this.buffer.len = 0; return .{ .owned = len }; } } else { - // queue the data wait until highWaterMark is reached or the auto flusher kicks in + // queue the data + // do not send it _ = this.buffer.write(this.allocator, bytes) catch { return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; }; + this.registerAutoFlusher(); + return .{ .owned = len }; } this.registerAutoFlusher(); + this.res.onWritable(*@This(), onWritable, this); return .{ .owned = len }; } @@ -2306,9 +2302,12 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { _ = this.buffer.writeLatin1(this.allocator, bytes) catch { return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) }; }; + this.registerAutoFlusher(); + return .{ .owned = len }; } this.registerAutoFlusher(); + this.res.onWritable(*@This(), onWritable, this); return .{ .owned = len }; } @@ -2334,11 +2333,14 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { }; const readable = this.readableSlice(); + if (readable.len >= this.highWaterMark or this.hasBackpressure()) { if (this.send(readable)) { this.handleWrote(readable.len); return .{ .owned = @as(Blob.SizeType, @intCast(written)) }; } + + this.res.onWritable(*@This(), onWritable, this); } this.registerAutoFlusher(); @@ -2377,6 +2379,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { this.finalize(); return .{ .result = {} }; } + return .{ .result = {} }; } @@ -2429,7 +2432,6 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { this.unregisterAutoFlusher(); this.aborted = true; - this.signal.close(null); this.flushPromise(); @@ -2455,13 +2457,14 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { const readable = this.readableSlice(); - if ((this.hasBackpressureAndIsTryEnd()) or readable.len == 0) { + if (this.hasBackpressure() or readable.len == 0) { this.auto_flusher.registered = false; return false; } if (!this.sendWithoutAutoFlusher(readable)) { this.auto_flusher.registered = true; + this.res.onWritable(*@This(), onWritable, this); return true; } @@ -2479,6 +2482,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { } this.unregisterAutoFlusher(); + this.allocator.destroy(this); } @@ -2490,7 +2494,6 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { if (!this.done) { this.done = true; this.unregisterAutoFlusher(); - this.res.clearOnWritable(); this.res.endStream(false); } diff --git a/src/deps/libuwsockets.cpp b/src/deps/libuwsockets.cpp index e744c8859a321..c5e742f85a0d2 100644 --- a/src/deps/libuwsockets.cpp +++ b/src/deps/libuwsockets.cpp @@ -1238,16 +1238,6 @@ extern "C" { return handler(res, a, opcional_data); }); } } - - void uws_res_clear_on_writable(int ssl, uws_res_t *res) { - if (ssl) { - uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; - uwsRes->clearOnWritable(); - } else { - uWS::HttpResponse *uwsRes = (uWS::HttpResponse *)res; - uwsRes->clearOnWritable(); - } - } void uws_res_on_aborted(int ssl, uws_res_t *res, void (*handler)(uws_res_t *res, void *opcional_data), diff --git a/src/deps/uws.zig b/src/deps/uws.zig index 386934c50d2dd..7cf5a5fe6035a 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -2036,10 +2036,6 @@ pub fn NewApp(comptime ssl: bool) type { }; uws_res_on_writable(ssl_flag, res.downcast(), Wrapper.handle, user_data); } - - pub fn clearOnWritable(res: *Response) void { - uws_res_clear_on_writable(ssl_flag, res.downcast()); - } pub inline fn markNeedsMore(res: *Response) void { if (!ssl) { us_socket_mark_needs_more_not_ssl(res.downcast()); @@ -2382,7 +2378,6 @@ extern fn uws_res_get_write_offset(ssl: i32, res: *uws_res) u64; extern fn uws_res_override_write_offset(ssl: i32, res: *uws_res, u64) void; extern fn uws_res_has_responded(ssl: i32, res: *uws_res) bool; extern fn uws_res_on_writable(ssl: i32, res: *uws_res, handler: ?*const fn (*uws_res, u64, ?*anyopaque) callconv(.C) bool, user_data: ?*anyopaque) void; -extern fn uws_res_clear_on_writable(ssl: i32, res: *uws_res) void; extern fn uws_res_on_aborted(ssl: i32, res: *uws_res, handler: ?*const fn (*uws_res, ?*anyopaque) callconv(.C) void, opcional_data: ?*anyopaque) void; extern fn uws_res_on_data( ssl: i32,