Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "fix(server) fix body-stream" #10205

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions packages/bun-uws/src/AsyncSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// clang-format off

#ifndef UWS_ASYNCSOCKET_H
#define UWS_ASYNCSOCKET_H

Expand Down Expand Up @@ -255,8 +255,10 @@ struct AsyncSocket {
if (asyncSocketData->buffer.length()) {
/* Write off as much as we can */
int written = us_socket_write(SSL, (us_socket_t *) this, asyncSocketData->buffer.data(), (int) asyncSocketData->buffer.length(), /*nextLength != 0 | */length);

/* On failure return, otherwise continue down the function */
if ((unsigned int) written < asyncSocketData->buffer.length()) {

/* Update buffering (todo: we can do better here if we keep track of what happens to this guy later on) */
asyncSocketData->buffer.erase((unsigned int) written);

Expand All @@ -266,6 +268,7 @@ struct AsyncSocket {
} else {
/* This path is horrible and points towards erroneous usage */
asyncSocketData->buffer.append(src, (unsigned int) length);

return {length, true};
}
}
Expand Down Expand Up @@ -307,6 +310,7 @@ struct AsyncSocket {
if (optionally) {
return {written, true};
}

/* Fall back to worst possible case (should be very rare for HTTP) */
/* At least we can reserve room for next chunk if we know it up front */
if (nextLength) {
Expand Down Expand Up @@ -340,7 +344,7 @@ struct AsyncSocket {
auto [written, failed] = write(loopData->corkBuffer, (int) loopData->corkOffset, false, length);
loopData->corkOffset = 0;

if (failed && optionally) {
if (failed) {
/* We do not need to care for buffering here, write does that */
return {0, true};
}
Expand Down
4 changes: 3 additions & 1 deletion packages/bun-uws/src/HttpContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,9 @@ struct HttpContext {
return s;
}

/* We need to drain any remaining buffered data if success == true*/
/* We don't want to fall through since we don't want to mess with timeout.
* It makes little sense to drain any backpressure when the user has registered onWritable. */
return s;
}

/* Drain any socket buffer, this might empty our backpressure and thus finish the request */
Expand Down
8 changes: 0 additions & 8 deletions packages/bun-uws/src/HttpResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -573,14 +573,6 @@ struct HttpResponse : public AsyncSocket<SSL> {
return this;
}

/* Remove handler for writable HTTP response */
HttpResponse *clearOnWritable() {
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();

httpResponseData->onWritable = nullptr;
return this;
}

/* Attach handler for aborted HTTP request */
HttpResponse *onAborted(MoveOnlyFunction<void()> &&handler) {
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
Expand Down
123 changes: 63 additions & 60 deletions src/bun.js/webcore/streams.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1967,6 +1967,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
const amount = @as(Blob.SizeType, @truncate(amount1));
this.offset += amount;
this.wrote += amount;
this.buffer.len -|= @as(u32, @truncate(amount));

if (this.offset >= this.buffer.len) {
this.offset = 0;
Expand All @@ -1986,9 +1987,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
fn hasBackpressure(this: *const @This()) bool {
return this.has_backpressure;
}
fn hasBackpressureAndIsTryEnd(this: *const @This()) bool {
return this.has_backpressure and this.end_len > 0;
}

fn sendWithoutAutoFlusher(this: *@This(), buf: []const u8) bool {
bun.assert(!this.done);
defer log("send: {d} bytes (backpressure: {any})", .{ buf.len, this.has_backpressure });
Expand All @@ -1997,29 +1996,29 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.handleFirstWriteIfNecessary();
const success = this.res.tryEnd(buf, this.end_len, false);
this.has_backpressure = !success;
if (this.has_backpressure) {
this.res.onWritable(*@This(), onWritable, this);
}
return success;
}
// clean this so we know when its relevant or not
this.end_len = 0;
// we clear the onWritable handler so uWS can handle the backpressure for us
this.res.clearOnWritable();
this.handleFirstWriteIfNecessary();

// uWebSockets lacks a tryWrite() function
// This means that backpressure will be handled by appending to an "infinite" memory buffer
// It will do the backpressure handling for us
// so in this scenario, we just append to the buffer
// and report success
if (this.requested_end) {
this.handleFirstWriteIfNecessary();
this.res.end(buf, false);
this.has_backpressure = false;
return true;
} else {
this.handleFirstWriteIfNecessary();
this.has_backpressure = !this.res.write(buf);
if (this.has_backpressure) {
this.res.onWritable(*@This(), onWritable, this);
}
return true;
}

return true;
unreachable;
}

fn send(this: *@This(), buf: []const u8) bool {
Expand All @@ -2028,52 +2027,39 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}

fn readableSlice(this: *@This()) []const u8 {
return this.buffer.ptr[this.offset..this.buffer.len];
return this.buffer.ptr[this.offset..this.buffer.cap][0..this.buffer.len];
}

pub fn onWritable(this: *@This(), write_offset: u64, _: *UWSResponse) callconv(.C) bool {
// write_offset is the amount of data that was written not how much we need to write
pub fn onWritable(this: *@This(), write_offset_: u64, _: *UWSResponse) callconv(.C) bool {
const write_offset: u64 = @as(u64, write_offset_);
log("onWritable ({d})", .{write_offset});
// onWritable reset backpressure state to allow flushing
this.has_backpressure = false;
if (this.aborted) {
this.res.clearOnWritable();
this.signal.close(null);
this.flushPromise();

if (this.done) {
if (this.aborted == false) {
this.res.endStream(false);
}
this.finalize();
return false;
}
var total_written: u64 = 0;

// do not write more than available
// if we do, it will cause this to be delayed until the next call, each time
// TODO: should we break it in smaller chunks?
const to_write = @min(@as(Blob.SizeType, @truncate(write_offset)), @as(Blob.SizeType, this.buffer.len - 1));
const chunk = this.readableSlice()[to_write..];
// if we have nothing to write, we are done
if (chunk.len == 0) {
if (this.done) {
this.res.clearOnWritable();
this.signal.close(null);
this.flushPromise();
this.finalize();
return true;
}
} else {
if (!this.send(chunk)) {
// if we were unable to send it, retry
return false;
}
this.handleWrote(@as(Blob.SizeType, @truncate(chunk.len)));
total_written = chunk.len;

if (this.requested_end) {
this.res.clearOnWritable();
this.signal.close(null);
this.flushPromise();
this.finalize();
return true;
}
const to_write = @min(@as(Blob.SizeType, @truncate(write_offset)), @as(Blob.SizeType, this.buffer.len));

// figure out how much data exactly to write
const readable = this.readableSlice()[0..to_write];
if (!this.send(readable)) {
// if we were unable to send it, retry
this.res.onWritable(*@This(), onWritable, this);
return true;
}

this.handleWrote(@as(Blob.SizeType, @truncate(readable.len)));
const initial_wrote = this.wrote;

if (this.buffer.len > 0 and !this.done) {
this.res.onWritable(*@This(), onWritable, this);
return true;
}

// flush the javascript promise from calling .flush()
Expand All @@ -2082,13 +2068,16 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
// pending_flush or callback could have caused another send()
// so we check again if we should report readiness
if (!this.done and !this.requested_end and !this.hasBackpressure()) {
// no pending and total_written > 0
if (total_written > 0 and this.readableSlice().len == 0) {
this.signal.ready(@as(Blob.SizeType, @truncate(total_written)), null);
const pending = @as(Blob.SizeType, @truncate(write_offset)) -| to_write;
const written_after_flush = this.wrote - initial_wrote;
const to_report = pending - @min(written_after_flush, pending);

if ((written_after_flush == initial_wrote and pending == 0) or to_report > 0) {
this.signal.ready(to_report, null);
}
}

return true;
return false;
}

pub fn start(this: *@This(), stream_start: StreamStart) JSC.Maybe(void) {
Expand Down Expand Up @@ -2139,7 +2128,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {

fn flushFromJSNoWait(this: *@This()) JSC.Maybe(JSValue) {
log("flushFromJSNoWait", .{});
if (this.hasBackpressureAndIsTryEnd() or this.done) {
if (this.hasBackpressure() or this.done) {
return .{ .result = JSValue.jsNumberFromInt32(0) };
}

Expand Down Expand Up @@ -2173,14 +2162,16 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
return .{ .result = JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumberFromInt32(0)) };
}

if (!this.hasBackpressureAndIsTryEnd()) {
if (!this.hasBackpressure()) {
const slice = this.readableSlice();
assert(slice.len > 0);
const success = this.send(slice);
if (success) {
this.handleWrote(@as(Blob.SizeType, @truncate(slice.len)));
return .{ .result = JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumber(slice.len)) };
}

this.res.onWritable(*@This(), onWritable, this);
}
this.wrote_at_start_of_flush = this.wrote;
this.pending_flush = JSC.JSPromise.create(globalThis);
Expand Down Expand Up @@ -2228,25 +2219,30 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
_ = this.buffer.write(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
this.registerAutoFlusher();
} else if (this.buffer.len + len >= this.highWaterMark) {

// TODO: attempt to write both in a corked buffer?
_ = this.buffer.write(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
const slice = this.readableSlice();
if (this.send(slice)) {
this.handleWrote(slice.len);
this.buffer.len = 0;
return .{ .owned = len };
}
} else {
// queue the data wait until highWaterMark is reached or the auto flusher kicks in
// queue the data
// do not send it
_ = this.buffer.write(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
this.registerAutoFlusher();
return .{ .owned = len };
}

this.registerAutoFlusher();
this.res.onWritable(*@This(), onWritable, this);

return .{ .owned = len };
}
Expand Down Expand Up @@ -2306,9 +2302,12 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
_ = this.buffer.writeLatin1(this.allocator, bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
this.registerAutoFlusher();
return .{ .owned = len };
}

this.registerAutoFlusher();
this.res.onWritable(*@This(), onWritable, this);

return .{ .owned = len };
}
Expand All @@ -2334,11 +2333,14 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
};

const readable = this.readableSlice();

if (readable.len >= this.highWaterMark or this.hasBackpressure()) {
if (this.send(readable)) {
this.handleWrote(readable.len);
return .{ .owned = @as(Blob.SizeType, @intCast(written)) };
}

this.res.onWritable(*@This(), onWritable, this);
}

this.registerAutoFlusher();
Expand Down Expand Up @@ -2377,6 +2379,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.finalize();
return .{ .result = {} };
}

return .{ .result = {} };
}

Expand Down Expand Up @@ -2429,7 +2432,6 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.unregisterAutoFlusher();

this.aborted = true;

this.signal.close(null);

this.flushPromise();
Expand All @@ -2455,13 +2457,14 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {

const readable = this.readableSlice();

if ((this.hasBackpressureAndIsTryEnd()) or readable.len == 0) {
if (this.hasBackpressure() or readable.len == 0) {
this.auto_flusher.registered = false;
return false;
}

if (!this.sendWithoutAutoFlusher(readable)) {
this.auto_flusher.registered = true;
this.res.onWritable(*@This(), onWritable, this);
return true;
}

Expand All @@ -2479,6 +2482,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}

this.unregisterAutoFlusher();

this.allocator.destroy(this);
}

Expand All @@ -2490,7 +2494,6 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (!this.done) {
this.done = true;
this.unregisterAutoFlusher();
this.res.clearOnWritable();
this.res.endStream(false);
}

Expand Down
10 changes: 0 additions & 10 deletions src/deps/libuwsockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1238,16 +1238,6 @@ extern "C"
{ return handler(res, a, opcional_data); });
}
}

void uws_res_clear_on_writable(int ssl, uws_res_t *res) {
if (ssl) {
uWS::HttpResponse<true> *uwsRes = (uWS::HttpResponse<true> *)res;
uwsRes->clearOnWritable();
} else {
uWS::HttpResponse<false> *uwsRes = (uWS::HttpResponse<false> *)res;
uwsRes->clearOnWritable();
}
}

void uws_res_on_aborted(int ssl, uws_res_t *res,
void (*handler)(uws_res_t *res, void *opcional_data),
Expand Down
5 changes: 0 additions & 5 deletions src/deps/uws.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2036,10 +2036,6 @@ pub fn NewApp(comptime ssl: bool) type {
};
uws_res_on_writable(ssl_flag, res.downcast(), Wrapper.handle, user_data);
}

pub fn clearOnWritable(res: *Response) void {
uws_res_clear_on_writable(ssl_flag, res.downcast());
}
pub inline fn markNeedsMore(res: *Response) void {
if (!ssl) {
us_socket_mark_needs_more_not_ssl(res.downcast());
Expand Down Expand Up @@ -2382,7 +2378,6 @@ extern fn uws_res_get_write_offset(ssl: i32, res: *uws_res) u64;
extern fn uws_res_override_write_offset(ssl: i32, res: *uws_res, u64) void;
extern fn uws_res_has_responded(ssl: i32, res: *uws_res) bool;
extern fn uws_res_on_writable(ssl: i32, res: *uws_res, handler: ?*const fn (*uws_res, u64, ?*anyopaque) callconv(.C) bool, user_data: ?*anyopaque) void;
extern fn uws_res_clear_on_writable(ssl: i32, res: *uws_res) void;
extern fn uws_res_on_aborted(ssl: i32, res: *uws_res, handler: ?*const fn (*uws_res, ?*anyopaque) callconv(.C) void, opcional_data: ?*anyopaque) void;
extern fn uws_res_on_data(
ssl: i32,
Expand Down
Loading