From a1fc3f4f5683d2a9f25fe5022db714b4c71e8cfa Mon Sep 17 00:00:00 2001 From: Calascibetta Romain Date: Tue, 15 Oct 2024 16:47:16 +0200 Subject: [PATCH] New SMTP server: elit, which does not submission and the relay concurrently --- lib/dune | 6 ++ lib/elit.ml | 223 +++++++++++++++++++++++++++++++++++++++++++++ lib/elit.mli | 29 ++++++ lib/lipap.ml | 4 +- lib/logic.ml | 12 ++- lib/submission.ml | 2 +- lib/submission.mli | 2 +- 7 files changed, 270 insertions(+), 8 deletions(-) create mode 100644 lib/elit.ml create mode 100644 lib/elit.mli diff --git a/lib/dune b/lib/dune index 4d91bea..ce1249e 100644 --- a/lib/dune +++ b/lib/dune @@ -72,6 +72,12 @@ (modules hm) (libraries mirage-time mirage-clock mirage-random ptt ptt.map ptt.server uspf-mirage)) +(library + (name elit) + (public_name ptt.elit) + (modules elit) + (libraries mirage-time mirage-clock mirage-random ptt ptt.map ptt.server dns-client-mirage)) + (library (name ptt_value) (public_name ptt.value) diff --git a/lib/elit.ml b/lib/elit.ml new file mode 100644 index 0000000..b9c388f --- /dev/null +++ b/lib/elit.ml @@ -0,0 +1,223 @@ +open Rresult +open Lwt.Infix + +let src = Logs.Src.create "ptt.elit" + +module Log : Logs.LOG = (val Logs.src_log src) + +let ( $ ) f g = fun x -> f (g x) + +module Make + (Time : Mirage_time.S) + (Mclock : Mirage_clock.MCLOCK) + (Pclock : Mirage_clock.PCLOCK) + (Stack : Tcpip.Stack.V4V6) + (Dns_client : Dns_client_mirage.S + (Happy_eyeballs : Happy_eyeballs_mirage.S with type flow = Stack.TCP.flow) = +struct + module Server = Ptt_server.Make (Time) (Stack) + module Sendmail = Ptt_sendmail.Make (Pclock) (Stack) (Happy_eyeballs) + module Nss = Ca_certs_nss.Make (Pclock) + + module Local = struct + module Submission = Ptt.Submission.Make (Stack) + + let submission_resolver = + let open Ptt_common in + let getmxbyname _ipaddr mail_exchange = + Dns.Rr_map.Mx_set.(singleton { Dns.Mx.preference= 0; mail_exchange }) + |> Lwt.return_ok in + let gethostbyname ipaddr _domain_name = + Lwt.return_ok ipaddr in + { getmxbyname; gethostbyname } + + let submission_job ~pool ?stop ?(port= 465) ~destination + random hash stack server close = + let handler flow = + let ipaddr, port = Stack.TCP.dst flow in + Lwt.finalize + (fun () -> + Lwt_pool.use pool @@ fun (encoder, decoder, _) -> + Submission.accept_without_starttls + ~encoder:(Fun.const encoder) ~decoder:(Fun.const decoder) ~ipaddr + flow destination submission_resolver + random hash server + >|= R.reword_error (R.msgf "%a" Submission.pp_error)) + (fun () -> Stack.TCP.close flow) + >>= function + | Ok () -> Lwt.return () + | Error (`Msg err) -> + Log.err (fun m -> m "<%a:%d> raised an error: %s" Ipaddr.pp ipaddr port err); + Lwt.return () in + Server.init ~port stack >>= fun service -> + Server.serve_when_ready ?stop ~handler service + |> fun (`Initialized job) -> + let job = job >|= close in job + + let submission_logic_job ~info map (ic, oc) = + let rec go () = + Lwt_stream.get ic >>= function + | None -> oc None; Lwt.return_unit + | Some (key, stream) -> + let sender = fst (Ptt.Messaged.from key) in + let recipients = Ptt.Messaged.recipients key in + let recipients = List.map fst recipients in + let recipients = Ptt_map.expand ~info map recipients in + let recipients = Ptt_aggregate.to_recipients ~info recipients in + let id = Ptt_common.id_to_messageID ~info (Ptt.Messaged.id key) in + let elts = List.map (fun recipients -> + { Ptt_sendmail.sender + ; recipients + ; data= Lwt_stream.clone stream + ; policies= [] + ; id }) recipients in + List.iter (oc $ Option.some) elts; + Lwt.pause () >>= go in + go () + + let job ?(limit = 20) ?stop ~locals ?port ~tls ~info ~destination + stack he random hash authenticator mechanisms = + let pool0 = + Lwt_pool.create limit @@ fun () -> + let encoder = Bytes.create 0x7ff in + let decoder = Bytes.create 0x7ff in + let queue = Ke.Rke.create ~capacity:0x800 Bigarray.char in + Lwt.return (encoder, decoder, queue) in + let pool1 = + Lwt_pool.create limit @@ fun () -> + let encoder = Bytes.create 0x7ff in + let decoder = Bytes.create 0x7ff in + let queue = Ke.Rke.create ~capacity:0x800 Bigarray.char in + Lwt.return (encoder, decoder, queue) in + let pool1 = + { Ptt_sendmail.pool= fun fn -> Lwt_pool.use pool1 fn } in + let ic_server, stream0, close0 = Submission.create ~info ~authenticator mechanisms in + let oc_server, push0 = Sendmail.v ~resolver:submission_resolver ~pool:pool1 ~info tls in + Lwt.join + [ submission_job ~pool:pool0 ?stop ?port ~destination random hash stack ic_server close0 + ; submission_logic_job ~info locals (stream0, push0) + ; Sendmail.job destination he oc_server ] + end + + module Out = struct + module Relay = Ptt.Relay.Make (Stack) + + let mail_exchange_resolver = + let open Ptt_common in + let getmxbyname dns domain_name = + Dns_client.getaddrinfo dns Dns.Rr_map.Mx domain_name + >|= Result.map snd in + let gethostbyname dns domain_name = + let ipv4 = + Dns_client.gethostbyname dns domain_name + >|= Result.map (fun ipv4 -> Ipaddr.V4 ipv4) in + let ipv6 = + Dns_client.gethostbyname6 dns domain_name + >|= Result.map (fun ipv6 -> Ipaddr.V6 ipv6) in + Lwt.all [ ipv4; ipv6 ] >|= function + | [ _; (Ok _ as ipv6) ] -> ipv6 + | [ (Ok _ as ipv4); Error _ ] -> ipv4 + | [ (Error _ as err); _ ] -> err + | [] | [_] | _ :: _ :: _ -> assert false in + { getmxbyname; gethostbyname } + + let mail_exchange_job ~pool ?stop ?(port= 25) stack dns server close = + let handler flow = + let ipaddr, port = Stack.TCP.dst flow in + Lwt.finalize + (fun () -> + Lwt_pool.use pool @@ fun (encoder, decoder, queue) -> + Relay.accept ~encoder:(Fun.const encoder) ~decoder:(Fun.const decoder) + ~queue:(Fun.const queue) ~ipaddr flow dns mail_exchange_resolver server + >|= R.reword_error (R.msgf "%a" Relay.pp_error)) + (fun () -> Stack.TCP.close flow) + >>= function + | Ok () -> Lwt.return () + | Error (`Msg err) -> + Log.err (fun m -> m "<%a:%d> raised an error: %s" Ipaddr.pp ipaddr port err); + Lwt.return () in + Server.init ~port stack >>= fun service -> + Server.serve_when_ready ?stop ~handler service + |> fun (`Initialized job) -> + let job = job >|= close in job + + let mail_exchange_logic_job ~info map (ic, oc) = + let sender = + let local = `Dot_string [ "ptt"; "mti-gf" ] in + Some (Colombe.Path.{ local; domain= info.Ptt_common.domain; rest= [] }) in + let rec go () = + Lwt_stream.get ic >>= function + | None -> oc None; Lwt.return_unit + | Some (key, stream) -> + let recipients = Ptt.Messaged.recipients key in + let recipients = List.map fst recipients in + let recipients = Ptt_map.expand ~info map recipients in + let recipients = Ptt_aggregate.to_recipients ~info recipients in + let id = Ptt_common.id_to_messageID ~info (Ptt.Messaged.id key) in + let elts = List.map (fun recipients -> + { Ptt_sendmail.sender + ; recipients + ; data= Lwt_stream.clone stream + ; policies= [] + ; id }) recipients in + List.iter (oc $ Option.some) elts; + Lwt.pause () >>= go in + go () + + let job ?(limit = 20) ?stop ~locals ?port ~tls ~info stack dns he = + let pool0 = + Lwt_pool.create limit @@ fun () -> + let encoder = Bytes.create 0x7ff in + let decoder = Bytes.create 0x7ff in + let queue = Ke.Rke.create ~capacity:0x800 Bigarray.char in + Lwt.return (encoder, decoder, queue) in + let pool1 = + Lwt_pool.create limit @@ fun () -> + let encoder = Bytes.create 0x7ff in + let decoder = Bytes.create 0x7ff in + let queue = Ke.Rke.create ~capacity:0x800 Bigarray.char in + Lwt.return (encoder, decoder, queue) in + let pool1 = + { Ptt_sendmail.pool= fun fn -> Lwt_pool.use pool1 fn } in + let ic_server, stream0, close0 = Relay.create ~info in + let oc_server, push0 = Sendmail.v ~resolver:mail_exchange_resolver ~pool:pool1 ~info tls in + Lwt.join + [ mail_exchange_job ~pool:pool0 ?stop ?port stack dns ic_server close0 + ; mail_exchange_logic_job ~info locals (stream0, push0) + ; Sendmail.job dns he oc_server ] + end + + type 'k t = + { locals : Ptt_map.t + ; tls : Tls.Config.client + ; random : Mirage_crypto_rng.g option + ; hash : 'k Digestif.hash + ; authentication : 'k Ptt.Authentication.t + ; mechanisms : Ptt.Mechanism.t list + ; destination : Ipaddr.t } + + type 'k iter = (Ptt_map.local -> 'k Digestif.t -> Emile.mailbox list -> unit Lwt.t) -> unit Lwt.t + + let v ?g ?(mechanisms= [ Ptt.Mechanism.PLAIN ]) ~postmaster hash iter destination = + let authenticator = R.failwith_error_msg (Nss.authenticator ()) in + let tls = Rresult.R.failwith_error_msg (Tls.Config.client ~authenticator ()) in + let locals = Ptt_map.empty ~postmaster in + let passwds = Hashtbl.create 0x100 in + let add local passwd dsts = + List.iter (fun dst -> Ptt_map.add ~local dst locals) dsts; + Hashtbl.add passwds local passwd; + Lwt.return_unit in + iter add >|= fun () -> + let authentication local passwd' = + match Hashtbl.find_opt passwds local with + | Some passwd -> Lwt.return (Digestif.equal hash passwd passwd') + | None -> Lwt.return false in + let authentication = Ptt.Authentication.v authentication in + { locals; tls; random= g; hash; authentication; mechanisms; destination } + + let job ?stop t ~info stack dns he = + Lwt.join + [ Local.job ?stop ~locals:t.locals ~tls:t.tls ~info ~destination:t.destination + stack he t.random t.hash t.authentication t.mechanisms + ; Out.job ?stop ~locals:t.locals ~tls:t.tls ~info stack dns he ] +end diff --git a/lib/elit.mli b/lib/elit.mli new file mode 100644 index 0000000..e0f161d --- /dev/null +++ b/lib/elit.mli @@ -0,0 +1,29 @@ +module Make + (Time : Mirage_time.S) + (Mclock : Mirage_clock.MCLOCK) + (Pclock : Mirage_clock.PCLOCK) + (Stack : Tcpip.Stack.V4V6) + (Dns_client : Dns_client_mirage.S) + (Happy_eyeballs : Happy_eyeballs_mirage.S with type flow = Stack.TCP.flow) : +sig + type 'k t + type 'k iter = (Ptt_map.local -> 'k Digestif.t -> Emile.mailbox list -> unit Lwt.t) -> unit Lwt.t + + val v : + ?g:Mirage_crypto_rng.g + -> ?mechanisms:Ptt.Mechanism.t list + -> postmaster:Emile.mailbox + -> 'k Digestif.hash + -> 'k iter + -> Ipaddr.t + -> 'k t Lwt.t + + val job : + ?stop:Lwt_switch.t + -> 'k t + -> info:Ptt_common.info + -> Stack.TCP.t + -> Dns_client.t + -> Happy_eyeballs.t + -> unit Lwt.t +end diff --git a/lib/lipap.ml b/lib/lipap.ml index 0d7420c..ee6ba00 100644 --- a/lib/lipap.ml +++ b/lib/lipap.ml @@ -44,7 +44,7 @@ struct Lwt.finalize (fun () -> Lwt_pool.use pool @@ fun (encoder, decoder, _) -> - Submission.accept ~encoder:(Fun.const encoder) + Submission.accept_without_starttls ~encoder:(Fun.const encoder) ~decoder:(Fun.const decoder) ~ipaddr flow dns resolver random hash server >|= R.reword_error (R.msgf "%a" Submission.pp_error)) @@ -85,7 +85,7 @@ struct Lwt.pause () >>= go in go () - let job ?(limit = 20) ?stop ~locals ~port ~tls ~info + let job ?(limit = 20) ?stop ~locals ~port ~tls ~info random hash stack dns he authenticator mechanisms = let pool0 = diff --git a/lib/logic.ml b/lib/logic.ml index 9990909..31876c0 100644 --- a/lib/logic.ml +++ b/lib/logic.ml @@ -280,8 +280,10 @@ module Make (Monad : MONAD) = struct let* () = send ctx Value.PP_250 [ - politely ~domain:info.Ptt_common.domain ~ipaddr:info.Ptt_common.ipaddr; "8BITMIME" - ; "SMTPUTF8"; Fmt.str "SIZE %Ld" info.Ptt_common.size + politely ~domain:info.Ptt_common.domain ~ipaddr:info.Ptt_common.ipaddr + ; "8BITMIME" + ; "SMTPUTF8" + ; Fmt.str "SIZE %Ld" info.Ptt_common.size ] in m_relay ctx ~domain_from @@ -291,8 +293,10 @@ module Make (Monad : MONAD) = struct let* () = send ctx Value.PP_250 [ - politely ~domain:info.Ptt_common.domain ~ipaddr:info.Ptt_common.ipaddr; "8BITMIME" - ; "SMTPUTF8"; Fmt.str "SIZE %Ld" info.Ptt_common.size + politely ~domain:info.Ptt_common.domain ~ipaddr:info.Ptt_common.ipaddr + ; "8BITMIME" + ; "SMTPUTF8" + ; Fmt.str "SIZE %Ld" info.Ptt_common.size ; Fmt.str "AUTH %a" Fmt.(list ~sep:(const string " ") Mechanism.pp) ms ] in m_submission ctx ~domain_from ms diff --git a/lib/submission.ml b/lib/submission.ml index cebc75f..3e81e67 100644 --- a/lib/submission.ml +++ b/lib/submission.ml @@ -155,7 +155,7 @@ module Make (Stack : Tcpip.Stack.V4V6) = struct in go 0 () - let accept : + let accept_without_starttls : ?encoder:(unit -> bytes) -> ?decoder:(unit -> bytes) -> ipaddr:Ipaddr.t diff --git a/lib/submission.mli b/lib/submission.mli index ccb9716..fc112a5 100644 --- a/lib/submission.mli +++ b/lib/submission.mli @@ -22,7 +22,7 @@ module Make (Stack : Tcpip.Stack.V4V6) : sig -> Mechanism.t list -> 'k server * (Messaged.key * string Lwt_stream.t) Lwt_stream.t * (unit -> unit) - val accept : + val accept_without_starttls : ?encoder:(unit -> bytes) -> ?decoder:(unit -> bytes) -> ipaddr:Ipaddr.t