Skip to content

Commit

Permalink
server: process requests after EOF (#136)
Browse files Browse the repository at this point in the history
* server: process requests after EOF

Co-Authored-By: Doug Patti <[email protected]>
Co-Authored-By: David House <[email protected]>

* add changelog

* fix tests, add a new one

---------

Co-authored-by: Doug Patti <[email protected]>
Co-authored-by: David House <[email protected]>
  • Loading branch information
3 people authored Aug 27, 2024
1 parent 02d006c commit d163945
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 13 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ Unreleased

- client: report exceptions before closing the response body
([#135](https://github.com/anmonteiro/httpun/pull/135))
- server: process requests after EOF
([#136](https://github.com/anmonteiro/httpun/pull/136))

0.1.0 2024-06-08
--------------
Expand Down
3 changes: 2 additions & 1 deletion lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ let input_state t : Input_state.t =
then Ready
else Wait

let output_state t = Response_state.output_state t.response_state
let output_state { response_state; writer; _ } =
Response_state.output_state response_state ~writer

let flush_request_body t =
if Body.Reader.has_pending_output t.request_body
Expand Down
9 changes: 6 additions & 3 deletions lib/response_state.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ type t =
| Streaming of Response.t * Body.Writer.t
| Upgrade of Response.t * (unit -> unit)

let output_state t : Output_state.t =
let output_state t ~writer : Output_state.t =
match t with
| Fixed _ -> Complete
| Waiting -> Waiting
| Waiting ->
if Serialize.Writer.is_closed writer then Complete
else Waiting
| Streaming(_, response_body) ->
if Body.Writer.requires_output response_body
if Serialize.Writer.is_closed writer then Complete
else if Body.Writer.requires_output response_body
then Ready
else Complete
| Upgrade _ -> Ready
Expand Down
2 changes: 1 addition & 1 deletion lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ let rec _next_write_operation t =
then shutdown t;
Writer.next t.writer
| Error { response_state; _ } ->
match Response_state.output_state response_state with
match Response_state.output_state response_state ~writer:t.writer with
| Waiting -> `Yield
| Ready ->
flush_response_error_body response_state;
Expand Down
97 changes: 89 additions & 8 deletions lib_test/test_server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ module Runtime : sig

val report_exn : t -> exn -> unit

val do_force_read : t -> (Server_connection.t -> 'a) -> 'a
val with_server_connection : t -> (Server_connection.t -> 'a) -> 'a
val shutdown : t -> unit
val is_closed : t -> bool
end = struct
Expand Down Expand Up @@ -147,7 +147,7 @@ end = struct
Read_operation.pp_hum op
;;

let do_force_read t f = f t.server_connection
let with_server_connection t f = f t.server_connection

let do_write t f =
match current_write_operation t with
Expand Down Expand Up @@ -195,7 +195,7 @@ let read ?(eof=false) t str ~off ~len =
let read_eof = read ~eof:true

let force_read_eof t str ~off ~len =
do_force_read t (fun conn -> Server_connection.read_eof conn str ~off ~len)
with_server_connection t (fun conn -> Server_connection.read_eof conn str ~off ~len)
;;

let feed_string ?eof t str =
Expand All @@ -210,7 +210,7 @@ let read_string ?eof t str =
;;

let force_read t str ~off ~len =
do_force_read t (fun conn -> Server_connection.read conn str ~off ~len)
with_server_connection t (fun conn -> Server_connection.read conn str ~off ~len)
;;

let force_feed_string t str =
Expand Down Expand Up @@ -1420,7 +1420,7 @@ let test_finish_response_after_read_eof () =
Alcotest.(check int) "malformed chunked encoding read completely" len just_read;

let (_ : Read_operation.t) =
do_force_read t (fun t -> Server_connection.next_read_operation t)
with_server_connection t (fun t -> Server_connection.next_read_operation t)
in
reader_errored t;
Alcotest.(check bool) "Writer woken up" true !writer_woken_up;
Expand Down Expand Up @@ -2060,9 +2060,10 @@ let test_flush_response_before_shutdown () =
write_response t response;
!continue ();
shutdown t;
raises_writer_closed (fun () ->
write_string t "b\r\nhello world\r\n";
connection_is_shutdown t);

writer_ready t;
write_string t "b\r\nhello world\r\n";
connection_is_shutdown t
;;

let test_schedule_read_with_data_available () =
Expand Down Expand Up @@ -2337,6 +2338,83 @@ let test_input_consumed_before_closing_req_body () =
write_string t "0\r\n\r\n";
;;

let test_can_read_more_requests_after_write_eof () =
let request = Request.create `GET "/" ~headers:(Headers.encoding_fixed 0) in
let reqd = ref None in
let request_handler reqd' = reqd := Some reqd' in
let t = create request_handler in
let response = Response.create `OK ~headers:Headers.encoding_chunked in
read_request t request;
Reqd.respond_with_streaming (Option.get !reqd) response ~flush_headers_immediately:true
|> (ignore : Body.Writer.t -> unit);
write_eof t;
(* In many runtimes, there are separate reader and writer threads that drive
the reading and writing from httpaf independently. So just because the
writer thread has told us that the socket is closed doesn't mean we won't
get a bunch more requests delivered to us from the reader thread. We
should be ready to receive them, and call the request handler for them,
even if there is no possibility of writing responses (e.g. those requests
might be side-effecting requests like POST requests). *)
reqd := None;
writer_closed ~unread:47 t;
reader_ready t;
read_request t request;
raises_writer_closed (fun () ->
Reqd.respond_with_streaming (Option.get !reqd) response ~flush_headers_immediately:true
|> (ignore : Body.Writer.t -> unit));
Alcotest.(check bool) "request handler fired" true (Option.is_some !reqd)
;;

let test_can_read_more_requests_after_write_eof_before_send_response () =
let request = Request.create `GET "/" ~headers:(Headers.encoding_fixed 0) in
let reqd = ref None in
let request_handler reqd' = reqd := Some reqd' in
let t = create request_handler in
let response = Response.create `OK ~headers:Headers.encoding_chunked in
read_request t request;

let write_op = ref (current_write_operation t) in
with_server_connection t (fun t ->
Server_connection.report_write_result t `Closed;
write_op := Server_connection.next_write_operation t;
);
Alcotest.check write_operation "Writer is closed"
(`Close 0) !write_op;
reader_ready t;
read_request t request;
raises_writer_closed (fun () ->
Reqd.respond_with_streaming (Option.get !reqd) response ~flush_headers_immediately:true
|> (ignore : Body.Writer.t -> unit));
Alcotest.(check bool) "request handler fired" true (Option.is_some !reqd)
;;

let test_write_response_after_read_eof () =
let reqd = ref None in
let t = create (fun _reqd -> reqd := Some _reqd) in
let request_line = "GET / HTTP/1.1\r\n" in
let single_header = "Links: /path/to/some/website\r\n" in
let r =
(* Each header is 30 bytes *)
request_line ^ single_header ^ single_header ^ "connection: close\r\n\r\n"
in
let bs = Bigstringaf.of_string r ~off:0 ~len:(String.length r) in
let c = read t bs ~off:0 ~len:30 in
Alcotest.(check int) "only reads the request line" (String.length request_line) c;

let c' =
read t bs ~off:c ~len:(String.length single_header)
in
Alcotest.(check int) "parser can read a single header and commit" (String.length single_header) c';

let c'' = read_eof t bs ~off:(c + c') ~len:(String.length r - (c + c')) in
Alcotest.(check int) "read_eof with the rest of the input is accepted" (String.length r - (c + c')) c'';
writer_yielded t;
let body = Reqd.respond_with_streaming ~flush_headers_immediately:true (Option.get !reqd) (Response.create `OK);
in
write_response t (Response.create `OK);
Body.Writer.close body;
connection_is_shutdown t;
;;

let tests =
[ "initial reader state" , `Quick, test_initial_reader_state
Expand Down Expand Up @@ -2417,4 +2495,7 @@ let tests =
; "multiple pipelined requests", `Quick, test_multiple_pipelined_requests
; "Body.Writer.flush waits for bytes to have been written to the wire", `Quick, test_body_flush_after_bytes_in_the_wire
; "input consumed before closing request body", `Quick, test_input_consumed_before_closing_req_body
; "can read more requests after write eof", `Quick, test_can_read_more_requests_after_write_eof
; "can read more requests after write eof (before response sent)", `Quick, test_can_read_more_requests_after_write_eof_before_send_response
; "write response after reader EOF", `Quick,test_write_response_after_read_eof
]

0 comments on commit d163945

Please sign in to comment.