Skip to content

Commit

Permalink
adapt to incoming httpun / httpun-ws / h2 APIs (#207)
Browse files Browse the repository at this point in the history
* adapt to incoming httpun / httpun-ws / h2 APIs

* make it build

* bring back some pins

* wip

* wip

* wip
  • Loading branch information
anmonteiro authored Aug 27, 2024
1 parent 1217ea2 commit cb0e8fa
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 63 deletions.
14 changes: 7 additions & 7 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 31 additions & 23 deletions lib/body.ml
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ module Raw = struct
val write_string : t -> ?off:int -> ?len:int -> string -> unit
val write_bigstring : t -> ?off:int -> ?len:int -> Bigstringaf.t -> unit
val schedule_bigstring : t -> ?off:int -> ?len:int -> Bigstringaf.t -> unit
val flush : t -> (unit -> unit) -> unit
val flush : t -> ([ `Written | `Closed ] -> unit) -> unit
val close : t -> unit
val is_closed : t -> bool
end
Expand Down Expand Up @@ -438,36 +438,44 @@ module Raw = struct
to_t reader ?on_eof ~body_length ~body_error:incomplete_body_error body

let flush_and_close :
type a. (module Writer with type t = a) -> a -> (unit -> unit) -> unit
=
fun (module Writer) body f ->
Writer.close body;
Writer.flush body f

let stream_write_body :
type a.
(module Writer with type t = a)
-> a
-> Bigstringaf.t IOVec.t Stream.t
-> ([ `Written | `Closed ] -> unit)
-> unit
=
fun (module Writer) body stream ->
Stream.iter
~f:(fun { IOVec.buffer; off; len } ->
(* If the peer left abruptly the connection will be shutdown. Avoid
* crashing the server with exceptions related to the writer being
* closed. *)
if not (Writer.is_closed body)
then (
fun (module Writer) body f ->
Writer.close body;
Writer.flush body f

exception Local

let stream_write_body =
let stream_write_body :
type a.
(module Writer with type t = a)
-> a
-> Bigstringaf.t IOVec.t Stream.t
-> unit
=
fun (module Writer) body stream ->
Stream.iter
~f:(fun { IOVec.buffer; off; len } ->
(* If the peer left abruptly the connection will be shutdown. Avoid
* crashing the server with exceptions related to the writer being
* closed. *)
Writer.schedule_bigstring body ~off ~len buffer;
let p, u = Promise.create () in
Writer.flush body (fun () ->
Promise.resolve u ();
Writer.flush body (fun reason ->
Promise.resolve u reason;
Logs.debug (fun m -> m "Flushed output chunk of length %d" len));
Promise.await p)
else ())
stream;
flush_and_close (module Writer) body ignore
match Promise.await p with `Closed -> raise Local | `Written -> ())
stream;
flush_and_close (module Writer) body ignore
in

fun writer body stream ->
try stream_write_body writer body stream with Local -> ()
end

(* Traversal *)
Expand Down
14 changes: 6 additions & 8 deletions lib/http_impl.ml
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ let flush_and_close :
type a. (module Body.Raw.Writer with type t = a) -> a -> unit
=
fun b request_body ->
Body.Raw.flush_and_close b request_body (fun () ->
Logs.info (fun m ->
m "Request body has been completely and successfully uploaded"))
Body.Raw.flush_and_close b request_body (function
| `Closed -> Logs.warn (fun m -> m "Request body not completely written")
| `Written ->
Logs.info (fun m ->
m "Request body has been completely and successfully uploaded"))

let handle_response :
sw:Switch.t
Expand Down Expand Up @@ -215,12 +217,8 @@ let upgrade_connection :

Logs.info (fun m -> m "Upgrading connection to the Websocket protocol");
let ws_conn =
let error_handler _wsd error =
Promise.resolve notify_error (error :> Error.client)
in
Httpun_ws.Client_connection.create
~error_handler
(Ws.Handler.websocket_handler ~sw ~notify_wsd)
(Ws.Handler.websocket_handler ~sw ~notify_wsd ~notify_error)
in
let result =
Fiber.any
Expand Down
26 changes: 14 additions & 12 deletions lib/http_server_impl.ml
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,20 @@ let do_sendfile :
(* Flush everything to the wire before calling `sendfile`, as we're gonna
bypass the http/af runtime and write bytes to the file descriptor
directly. *)
Http.Body.Writer.flush response_body (fun () ->
match
Posix.sendfile
(module Http.Body.Writer)
~src_fd
~dst_fd:fd
response_body
with
| Ok () -> Http.Body.Writer.close response_body
| Error exn ->
Http.Body.Writer.close response_body;
report_exn exn))
Http.Body.Writer.flush response_body (function
| `Closed -> ()
| `Written ->
(match
Posix.sendfile
(module Http.Body.Writer)
~src_fd
~dst_fd:fd
response_body
with
| Ok () -> Http.Body.Writer.close response_body
| Error exn ->
Http.Body.Writer.close response_body;
report_exn exn)))

let handle_request :
type reqd writer.
Expand Down
8 changes: 2 additions & 6 deletions lib/response.ml
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,12 @@ module Upgrade = struct
| HTTP_1_1 ->
let upgrade_handler =
let wsd_received, notify_wsd = Promise.create () in
(* TODO(anmonteiro): not handling this error? *)
let _error_received, notify_error = Promise.create () in
fun ~sw upgrade ->
let error_handler _wsd error =
Promise.resolve notify_error (error :> Error.client)
in

let ws_conn =
Httpun_ws.Server_connection.create_websocket
~error_handler
(Ws.Handler.websocket_handler ~sw ~notify_wsd)
(Ws.Handler.websocket_handler ~sw ~notify_wsd ~notify_error)
in
Fiber.fork ~sw (fun () -> f (Promise.await wsd_received));
upgrade (Gluten.make (module Httpun_ws.Server_connection) ws_conn)
Expand Down
15 changes: 10 additions & 5 deletions lib/ws.ml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ end = struct
end

module Handler = struct
let websocket_handler ~sw ~notify_wsd wsd =
let websocket_handler ~sw ~notify_wsd ~notify_error wsd =
let frameq = Queue.create () in
let messages, push_to_messages = Stream.create 256 in
Promise.resolve notify_wsd (Descriptor.create ~messages wsd);
Expand Down Expand Up @@ -190,10 +190,15 @@ module Handler = struct
| `Other _ -> failwith "Custom WebSocket frame types not yet supported")
in

let eof () =
Logs.info (fun m -> m "Websocket connection EOF");
Httpun_ws.Wsd.close wsd;
push_to_messages None
let eof ?error () =
match error with
| Some error ->
Httpun_ws.Wsd.close wsd;
Promise.resolve notify_error (error :> Error.client)
| None ->
Logs.info (fun m -> m "Websocket connection EOF");
Httpun_ws.Wsd.close wsd;
push_to_messages None
in
{ Httpun_ws.Websocket_connection.frame; eof }
end
8 changes: 7 additions & 1 deletion piaf.opam
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ build: [
]
dev-repo: "git+https://github.com/anmonteiro/piaf.git"
pin-depends: [
[ "eio-ssl.dev" "git+https://github.com/anmonteiro/eio-ssl.git#0.3.0" ]
[ "httpun-types.dev" "git+https://github.com/anmonteiro/httpun.git" ]
[ "httpun.dev" "git+https://github.com/anmonteiro/httpun.git" ]
[ "httpun-eio.dev" "git+https://github.com/anmonteiro/httpun.git" ]
[ "hpack.dev" "git+https://github.com/anmonteiro/ocaml-h2.git" ]
[ "h2.dev" "git+https://github.com/anmonteiro/ocaml-h2.git" ]
[ "h2-eio.dev" "git+https://github.com/anmonteiro/ocaml-h2.git" ]
[ "httpun-ws.dev" "git+https://github.com/anmonteiro/httpun-ws.git" ]
[ "multipart_form.dev" "git+https://github.com/anmonteiro/multipart_form.git" ]
]
8 changes: 7 additions & 1 deletion piaf.opam.template
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
pin-depends: [
[ "eio-ssl.dev" "git+https://github.com/anmonteiro/eio-ssl.git#0.3.0" ]
[ "httpun-types.dev" "git+https://github.com/anmonteiro/httpun.git" ]
[ "httpun.dev" "git+https://github.com/anmonteiro/httpun.git" ]
[ "httpun-eio.dev" "git+https://github.com/anmonteiro/httpun.git" ]
[ "hpack.dev" "git+https://github.com/anmonteiro/ocaml-h2.git" ]
[ "h2.dev" "git+https://github.com/anmonteiro/ocaml-h2.git" ]
[ "h2-eio.dev" "git+https://github.com/anmonteiro/ocaml-h2.git" ]
[ "httpun-ws.dev" "git+https://github.com/anmonteiro/httpun-ws.git" ]
[ "multipart_form.dev" "git+https://github.com/anmonteiro/multipart_form.git" ]
]

0 comments on commit cb0e8fa

Please sign in to comment.