From d1639456126f2dd1688747e54bd0e745811e0c9b Mon Sep 17 00:00:00 2001 From: Antonio Nuno Monteiro Date: Mon, 26 Aug 2024 22:05:03 -0700 Subject: [PATCH] server: process requests after EOF (#136) * server: process requests after EOF Co-Authored-By: Doug Patti Co-Authored-By: David House * add changelog * fix tests, add a new one --------- Co-authored-by: Doug Patti Co-authored-by: David House --- CHANGES.md | 2 + lib/reqd.ml | 3 +- lib/response_state.ml | 9 ++- lib/server_connection.ml | 2 +- lib_test/test_server_connection.ml | 97 +++++++++++++++++++++++++++--- 5 files changed, 100 insertions(+), 13 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 886d286..370561e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,6 +3,8 @@ Unreleased - client: report exceptions before closing the response body ([#135](https://github.com/anmonteiro/httpun/pull/135)) +- server: process requests after EOF + ([#136](https://github.com/anmonteiro/httpun/pull/136)) 0.1.0 2024-06-08 -------------- diff --git a/lib/reqd.ml b/lib/reqd.ml index 8ef0d86..93eed26 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -264,7 +264,8 @@ let input_state t : Input_state.t = then Ready else Wait -let output_state t = Response_state.output_state t.response_state +let output_state { response_state; writer; _ } = + Response_state.output_state response_state ~writer let flush_request_body t = if Body.Reader.has_pending_output t.request_body diff --git a/lib/response_state.ml b/lib/response_state.ml index 586b7a5..92dfa38 100644 --- a/lib/response_state.ml +++ b/lib/response_state.ml @@ -4,12 +4,15 @@ type t = | Streaming of Response.t * Body.Writer.t | Upgrade of Response.t * (unit -> unit) -let output_state t : Output_state.t = +let output_state t ~writer : Output_state.t = match t with | Fixed _ -> Complete - | Waiting -> Waiting + | Waiting -> + if Serialize.Writer.is_closed writer then Complete + else Waiting | Streaming(_, response_body) -> - if Body.Writer.requires_output response_body + if Serialize.Writer.is_closed writer then Complete + else if Body.Writer.requires_output response_body then Ready else Complete | Upgrade _ -> Ready diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 37d20b3..f51c650 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -339,7 +339,7 @@ let rec _next_write_operation t = then shutdown t; Writer.next t.writer | Error { response_state; _ } -> - match Response_state.output_state response_state with + match Response_state.output_state response_state ~writer:t.writer with | Waiting -> `Yield | Ready -> flush_response_error_body response_state; diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index 2eb026d..bfd53b0 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -41,7 +41,7 @@ module Runtime : sig val report_exn : t -> exn -> unit - val do_force_read : t -> (Server_connection.t -> 'a) -> 'a + val with_server_connection : t -> (Server_connection.t -> 'a) -> 'a val shutdown : t -> unit val is_closed : t -> bool end = struct @@ -147,7 +147,7 @@ end = struct Read_operation.pp_hum op ;; - let do_force_read t f = f t.server_connection + let with_server_connection t f = f t.server_connection let do_write t f = match current_write_operation t with @@ -195,7 +195,7 @@ let read ?(eof=false) t str ~off ~len = let read_eof = read ~eof:true let force_read_eof t str ~off ~len = - do_force_read t (fun conn -> Server_connection.read_eof conn str ~off ~len) + with_server_connection t (fun conn -> Server_connection.read_eof conn str ~off ~len) ;; let feed_string ?eof t str = @@ -210,7 +210,7 @@ let read_string ?eof t str = ;; let force_read t str ~off ~len = - do_force_read t (fun conn -> Server_connection.read conn str ~off ~len) + with_server_connection t (fun conn -> Server_connection.read conn str ~off ~len) ;; let force_feed_string t str = @@ -1420,7 +1420,7 @@ let test_finish_response_after_read_eof () = Alcotest.(check int) "malformed chunked encoding read completely" len just_read; let (_ : Read_operation.t) = - do_force_read t (fun t -> Server_connection.next_read_operation t) + with_server_connection t (fun t -> Server_connection.next_read_operation t) in reader_errored t; Alcotest.(check bool) "Writer woken up" true !writer_woken_up; @@ -2060,9 +2060,10 @@ let test_flush_response_before_shutdown () = write_response t response; !continue (); shutdown t; - raises_writer_closed (fun () -> - write_string t "b\r\nhello world\r\n"; - connection_is_shutdown t); + + writer_ready t; + write_string t "b\r\nhello world\r\n"; + connection_is_shutdown t ;; let test_schedule_read_with_data_available () = @@ -2337,6 +2338,83 @@ let test_input_consumed_before_closing_req_body () = write_string t "0\r\n\r\n"; ;; +let test_can_read_more_requests_after_write_eof () = + let request = Request.create `GET "/" ~headers:(Headers.encoding_fixed 0) in + let reqd = ref None in + let request_handler reqd' = reqd := Some reqd' in + let t = create request_handler in + let response = Response.create `OK ~headers:Headers.encoding_chunked in + read_request t request; + Reqd.respond_with_streaming (Option.get !reqd) response ~flush_headers_immediately:true + |> (ignore : Body.Writer.t -> unit); + write_eof t; + (* In many runtimes, there are separate reader and writer threads that drive + the reading and writing from httpaf independently. So just because the + writer thread has told us that the socket is closed doesn't mean we won't + get a bunch more requests delivered to us from the reader thread. We + should be ready to receive them, and call the request handler for them, + even if there is no possibility of writing responses (e.g. those requests + might be side-effecting requests like POST requests). *) + reqd := None; + writer_closed ~unread:47 t; + reader_ready t; + read_request t request; + raises_writer_closed (fun () -> + Reqd.respond_with_streaming (Option.get !reqd) response ~flush_headers_immediately:true + |> (ignore : Body.Writer.t -> unit)); + Alcotest.(check bool) "request handler fired" true (Option.is_some !reqd) +;; + +let test_can_read_more_requests_after_write_eof_before_send_response () = + let request = Request.create `GET "/" ~headers:(Headers.encoding_fixed 0) in + let reqd = ref None in + let request_handler reqd' = reqd := Some reqd' in + let t = create request_handler in + let response = Response.create `OK ~headers:Headers.encoding_chunked in + read_request t request; + + let write_op = ref (current_write_operation t) in + with_server_connection t (fun t -> + Server_connection.report_write_result t `Closed; + write_op := Server_connection.next_write_operation t; + ); + Alcotest.check write_operation "Writer is closed" + (`Close 0) !write_op; + reader_ready t; + read_request t request; + raises_writer_closed (fun () -> + Reqd.respond_with_streaming (Option.get !reqd) response ~flush_headers_immediately:true + |> (ignore : Body.Writer.t -> unit)); + Alcotest.(check bool) "request handler fired" true (Option.is_some !reqd) +;; + +let test_write_response_after_read_eof () = + let reqd = ref None in + let t = create (fun _reqd -> reqd := Some _reqd) in + let request_line = "GET / HTTP/1.1\r\n" in + let single_header = "Links: /path/to/some/website\r\n" in + let r = + (* Each header is 30 bytes *) + request_line ^ single_header ^ single_header ^ "connection: close\r\n\r\n" + in + let bs = Bigstringaf.of_string r ~off:0 ~len:(String.length r) in + let c = read t bs ~off:0 ~len:30 in + Alcotest.(check int) "only reads the request line" (String.length request_line) c; + + let c' = + read t bs ~off:c ~len:(String.length single_header) + in + Alcotest.(check int) "parser can read a single header and commit" (String.length single_header) c'; + + let c'' = read_eof t bs ~off:(c + c') ~len:(String.length r - (c + c')) in + Alcotest.(check int) "read_eof with the rest of the input is accepted" (String.length r - (c + c')) c''; + writer_yielded t; + let body = Reqd.respond_with_streaming ~flush_headers_immediately:true (Option.get !reqd) (Response.create `OK); + in + write_response t (Response.create `OK); + Body.Writer.close body; + connection_is_shutdown t; +;; let tests = [ "initial reader state" , `Quick, test_initial_reader_state @@ -2417,4 +2495,7 @@ let tests = ; "multiple pipelined requests", `Quick, test_multiple_pipelined_requests ; "Body.Writer.flush waits for bytes to have been written to the wire", `Quick, test_body_flush_after_bytes_in_the_wire ; "input consumed before closing request body", `Quick, test_input_consumed_before_closing_req_body + ; "can read more requests after write eof", `Quick, test_can_read_more_requests_after_write_eof + ; "can read more requests after write eof (before response sent)", `Quick, test_can_read_more_requests_after_write_eof_before_send_response + ; "write response after reader EOF", `Quick,test_write_response_after_read_eof ]