Skip to content

Commit

Permalink
surface write errors through flush (#138)
Browse files Browse the repository at this point in the history
* surface write errors through `flush`

Co-Authored-By: David House <[email protected]>
Co-Authored-By: Doug Patti <[email protected]>

* add changelog entry

---------

Co-authored-by: David House <[email protected]>
Co-authored-by: Doug Patti <[email protected]>
  • Loading branch information
3 people authored Aug 27, 2024
1 parent 6438771 commit cebf3ac
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 60 deletions.
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ Unreleased
([#135](https://github.com/anmonteiro/httpun/pull/135))
- server: process requests after EOF
([#136](https://github.com/anmonteiro/httpun/pull/136))
- surface (body) write errors through `flush`
([#138](https://github.com/anmonteiro/httpun/pull/138))
- `Body.Writer.flush` now takes a callback of the type
``([ `Written | ` Closed] -> unit)``, informing the caller whether the
previous writes have been written or whether the output channel was
closed.

0.1.0 2024-06-08
--------------
Expand Down
39 changes: 29 additions & 10 deletions lib/body.ml
Original file line number Diff line number Diff line change
Expand Up @@ -151,27 +151,41 @@ module Writer = struct
t

let write_char t c =
Faraday.write_char t.faraday c
if not (Faraday.is_closed t.faraday) then
Faraday.write_char t.faraday c

let write_string t ?off ?len s =
Faraday.write_string ?off ?len t.faraday s
if not (Faraday.is_closed t.faraday) then
Faraday.write_string ?off ?len t.faraday s

let write_bigstring t ?off ?len b =
Faraday.write_bigstring ?off ?len t.faraday b
if not (Faraday.is_closed t.faraday) then
Faraday.write_bigstring ?off ?len t.faraday b

let schedule_bigstring t ?off ?len (b:Bigstringaf.t) =
Faraday.schedule_bigstring ?off ?len t.faraday b
if not (Faraday.is_closed t.faraday) then
Faraday.schedule_bigstring ?off ?len t.faraday b

let ready_to_write t = Serialize.Writer.wakeup t.writer

let flush t kontinue =
Faraday.flush t.faraday (fun () ->
Serialize.Writer.flush t.writer kontinue);
ready_to_write t
if Serialize.Writer.is_closed t.writer then
kontinue `Closed
else begin
Faraday.flush_with_reason t.faraday (function
| Drain -> kontinue `Closed
| Nothing_pending | Shift -> Serialize.Writer.flush t.writer kontinue);
ready_to_write t
end

let is_closed t =
Faraday.is_closed t.faraday

let close_and_drain t =
Faraday.close t.faraday;
(* Resolve all pending flushes *)
ignore (Faraday.drain t.faraday : int)

let close t =
Serialize.Writer.unyield t.writer;
Faraday.close t.faraday;
Expand Down Expand Up @@ -202,6 +216,9 @@ module Writer = struct

let transfer_to_writer t =
let faraday = t.faraday in
if Serialize.Writer.is_closed t.writer then
close_and_drain t
else
begin match Faraday.operation faraday with
| `Yield -> ()
| `Close ->
Expand All @@ -222,9 +239,11 @@ module Writer = struct
| Identity -> Serialize.Writer.schedule_fixed t.writer iovecs
| Chunked _ -> Serialize.Writer.schedule_chunk t.writer iovecs
end;
Serialize.Writer.flush t.writer (fun () ->
Faraday.shift faraday lengthv;
t.buffered_bytes <- t.buffered_bytes - lengthv)
Serialize.Writer.flush t.writer (function
| `Closed -> close_and_drain t
| `Written ->
Faraday.shift faraday lengthv;
t.buffered_bytes <- t.buffered_bytes - lengthv)
end
end
end
13 changes: 8 additions & 5 deletions lib/httpun.mli
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ module Body : sig
modified until a subsequent call to {!flush} has successfully
completed. *)

val flush : t -> (unit -> unit) -> unit
(** [flush t f] makes all bytes in [t] available for writing to the awaiting
output channel. Once those bytes have reached that output channel, [f]
val flush : t -> ([ `Written | `Closed ] -> unit) -> unit
(** [flush t f] makes all bytes in [t] available for writing to the
awaiting output channel. Once those bytes have reached that output
channel, [f `Written] will be called. If instead, the output channel is
closed before all of those bytes are successfully written, [f `Closed]
will be called.
The type of the output channel is runtime-dependent, as are guarantees
Expand All @@ -112,8 +114,9 @@ module Body : sig
to the output channel. *)

val is_closed : t -> bool
(** [is_closed t] is [true] if {!close} has been called on [t] and [false]
otherwise. A closed [t] may still have pending output. *)
(** [is_closed t] is [true] if {!close} has been called on [t], or if the
attached output channel is closed (e.g. because [report_write_result
`Closed] has been called). A closed [t] may still have pending output. *)
end

end
Expand Down
4 changes: 3 additions & 1 deletion lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ let unsafe_respond_with_upgrade t headers upgrade_handler =
if t.persistent then
t.persistent <- Response.persistent_connection response;
t.response_state <- Upgrade (response, upgrade_handler);
Writer.flush t.writer upgrade_handler;
Writer.flush t.writer (fun _reason ->
(* TODO(anmonteiro): probably need to check `Closed here? *)
upgrade_handler ());
Body.Reader.close t.request_body;
Writer.wakeup t.writer
| Streaming _ | Upgrade _ ->
Expand Down
10 changes: 8 additions & 2 deletions lib/serialize.ml
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,19 @@ module Writer = struct
;;

let flush t f =
flush t.encoder f
flush_with_reason t.encoder (fun reason ->
let result =
match reason with
| Nothing_pending | Shift -> `Written
| Drain -> `Closed
in
f result)

let unyield t =
(* This would be better implemented by a function that just takes the
encoder out of a yielded state if it's in that state. Requires a change
to the faraday library. *)
flush t (fun () -> ())
flush t (fun _reason -> ())

let yield t =
Faraday.yield t.encoder
Expand Down
2 changes: 1 addition & 1 deletion lib_test/test_client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1770,7 +1770,7 @@ let test_flush_response_before_shutdown () =
(Body.Writer.is_closed body);

writer_closed t;
connection_is_shutdown t;
connection_is_shutdown t
;;

let test_report_exn_during_body_read () =
Expand Down
104 changes: 63 additions & 41 deletions lib_test/test_server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,9 @@ let echo_handler response reqd =
let response_body = Reqd.respond_with_streaming reqd response in
let rec on_read buffer ~off ~len =
Body.Writer.write_string response_body (Bigstringaf.substring ~off ~len buffer);
Body.Writer.flush response_body (fun () ->
Body.Reader.schedule_read request_body ~on_eof ~on_read)
Body.Writer.flush response_body (function
| `Closed -> assert false
| `Written -> Body.Reader.schedule_read request_body ~on_eof ~on_read)
and on_eof () =
print_endline "echo handler eof";
Body.Writer.close response_body
Expand All @@ -340,17 +341,20 @@ let streaming_handler ?(flush=false) ?(error=false) response writes reqd =
let request_body = Reqd.request_body reqd in
Body.Reader.close request_body;
let body = Reqd.respond_with_streaming ~flush_headers_immediately:flush reqd response in
let rec write () =
match !writes with
| [] -> (match error with
| false -> Body.Writer.close body
| true -> Reqd.report_exn reqd (Failure "exn"))
| w :: ws ->
Body.Writer.write_string body w;
writes := ws;
Body.Writer.flush body write
in
write ();
let rec write reason =
match reason with
| `Closed -> assert false
| `Written ->
match !writes with
| [] -> (match error with
| false -> Body.Writer.close body
| true -> Reqd.report_exn reqd (Failure "exn"))
| w :: ws ->
Body.Writer.write_string body w;
writes := ws;
Body.Writer.flush body write
in
write `Written;
;;

let synchronous_raise reqd =
Expand Down Expand Up @@ -875,9 +879,11 @@ let test_chunked_encoding () =
let response = Response.create `OK ~headers:Headers.encoding_chunked in
let resp_body = Reqd.respond_with_streaming reqd response in
Body.Writer.write_string resp_body "First chunk";
Body.Writer.flush resp_body (fun () ->
Body.Writer.write_string resp_body "Second chunk";
Body.Writer.close resp_body);
Body.Writer.flush resp_body (function
| `Closed -> assert false
| `Written ->
Body.Writer.write_string resp_body "Second chunk";
Body.Writer.close resp_body);
in
let t = create ~error_handler request_handler in
writer_yielded t;
Expand All @@ -903,9 +909,11 @@ let test_chunked_encoding_for_error () =
`Bad_request error;
let body = start_response Headers.encoding_chunked in
Body.Writer.write_string body "Bad";
Body.Writer.flush body (fun () ->
Body.Writer.write_string body " request";
Body.Writer.close body);
Body.Writer.flush body (function
| `Closed -> assert false
| `Written ->
Body.Writer.write_string body " request";
Body.Writer.close body);
in
let t = create ~error_handler (fun _ -> assert false) in
let c = feed_string t " X\r\n\r\n" in
Expand Down Expand Up @@ -1079,10 +1087,12 @@ let streaming_error_handler
let resp_body = start_response headers in
continue_error := (fun () ->
Body.Writer.write_string resp_body "got an error\n";
Body.Writer.flush resp_body (fun () ->
continue_error := (fun () ->
Body.Writer.write_string resp_body "more output";
Body.Writer.close resp_body)))
Body.Writer.flush resp_body (function
| `Closed -> assert false
| `Written ->
continue_error := (fun () ->
Body.Writer.write_string resp_body "more output";
Body.Writer.close resp_body)))
;;

let test_malformed_request_streaming_error_response () =
Expand Down Expand Up @@ -1119,13 +1129,17 @@ let chunked_error_handler continue_error ?request:_ _error start_response =
start_response (Headers.of_list ["transfer-encoding", "chunked"])
in
Body.Writer.write_string resp_body "chunk 1\n";
Body.Writer.flush resp_body (fun () ->
continue_error := (fun () ->
Body.Writer.write_string resp_body "chunk 2\n";
Body.Writer.flush resp_body (fun () ->
continue_error := (fun () ->
Body.Writer.write_string resp_body "chunk 3\n";
Body.Writer.close resp_body))))
Body.Writer.flush resp_body (function
| `Closed -> assert false
| `Written ->
continue_error := (fun () ->
Body.Writer.write_string resp_body "chunk 2\n";
Body.Writer.flush resp_body (function
| `Closed -> assert false
| `Written ->
continue_error := (fun () ->
Body.Writer.write_string resp_body "chunk 3\n";
Body.Writer.close resp_body))))
;;

let test_malformed_request_chunked_error_response () =
Expand Down Expand Up @@ -1475,9 +1489,11 @@ let test_streaming_response_before_reading_entire_body_no_error () =
let resp_body = Reqd.respond_with_streaming reqd response in
continue_response := (fun () ->
Body.Writer.write_string resp_body "hello";
Body.Writer.flush resp_body (fun () ->
continue_response := (fun () ->
Body.Writer.close resp_body))))
Body.Writer.flush resp_body (function
| `Closed -> assert false
| `Written ->
continue_response := (fun () ->
Body.Writer.close resp_body))))
in
let error_handler ?request:_ _error _start_response = assert false in
let t = create ~error_handler request_handler in
Expand Down Expand Up @@ -1748,9 +1764,11 @@ let test_race_condition_writer_issues_yield_after_reader_eof () =
~on_eof:(fun () ->
let resp_body = Reqd.respond_with_streaming reqd response in
Body.Writer.write_string resp_body (String.make 10 'a');
Body.Writer.flush resp_body (fun () ->
continue_response := (fun () ->
Body.Writer.close resp_body))))
Body.Writer.flush resp_body (function
| `Closed -> assert false
| `Written ->
continue_response := (fun () ->
Body.Writer.close resp_body))))
in
let t = create ~error_handler response_handler in
let request =
Expand Down Expand Up @@ -1870,9 +1888,11 @@ let test_errored_chunked_streaming_response_async () =
Body.Reader.close request_body;
let body = Reqd.respond_with_streaming reqd response in
Body.Writer.write_string body "hello";
Body.Writer.flush body (fun () ->
continue := (fun () ->
Reqd.report_exn reqd (Failure "heh")))
Body.Writer.flush body (function
| `Closed -> assert false
| `Written ->
continue := (fun () ->
Reqd.report_exn reqd (Failure "heh")))
in

let t = create request_handler in
Expand Down Expand Up @@ -2061,7 +2081,6 @@ let test_flush_response_before_shutdown () =
!continue ();
shutdown t;

writer_ready t;
write_string t "b\r\nhello world\r\n";
connection_is_shutdown t
;;
Expand Down Expand Up @@ -2287,7 +2306,10 @@ let test_body_flush_after_bytes_in_the_wire () =
Response.create ~headers:(Headers.of_list ["content-length", "5"]) `OK
in
let callback_called = ref false in
let callback () = callback_called := true in
let callback = function
| `Closed -> assert false
| `Written -> callback_called := true
in
let request_handler ~flush_headers_immediately reqd =
let response_body =
Reqd.respond_with_streaming
Expand Down

0 comments on commit cebf3ac

Please sign in to comment.