Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
dinosaure committed Oct 22, 2024
1 parent 05ab393 commit 5982d29
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 62 deletions.
70 changes: 48 additions & 22 deletions lib/elit.ml
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,33 @@ struct
Lwt_stream.get ic >>= function
| None -> oc None; Lwt.return_unit
| Some (key, stream, wk) ->
let sender = fst (Ptt.Msgd.from key) in
let recipients = Ptt.Msgd.recipients key in
let recipients = List.map fst recipients in
let recipients = Ptt_map.expand ~info map recipients in
let recipients = Ptt_aggregate.to_recipients ~info recipients in
let id = Ptt_common.id_to_messageID ~info (Ptt.Msgd.id key) in
let elts = List.map (fun recipients ->
{ Ptt_sendmail.sender
; recipients
; data= Lwt_stream.clone stream
; policies= []
; id }) recipients in
List.iter (oc $ Option.some) elts;
Lwt.wakeup_later wk `Ok;
Lwt.pause () >>= go in
Lwt.catch
(fun () ->
let sender = fst (Ptt.Msgd.from key) in
let recipients = Ptt.Msgd.recipients key in
let recipients = List.map fst recipients in
let recipients = Ptt_map.expand ~info map recipients in
let recipients = Ptt_aggregate.to_recipients ~info recipients in
let id = Ptt_common.id_to_messageID ~info (Ptt.Msgd.id key) in
Log.debug (fun m -> m "%a submitted a new email %a."
Colombe.Reverse_path.pp sender Mrmime.MessageID.pp id);
let elts = List.map (fun recipients ->
{ Ptt_sendmail.sender
; recipients
; data= Lwt_stream.clone stream
; policies= []
; id }) recipients in
Log.debug (fun m -> m "Notice the SMTP server that everything is ok for %a from %a."
Colombe.Reverse_path.pp sender Mrmime.MessageID.pp id);
Lwt.wakeup_later wk `Ok;
Log.debug (fun m -> m "Send the incoming email %a to our destination."
Mrmime.MessageID.pp id);
List.iter (oc $ Option.some) elts;
Lwt.return_unit)
(fun exn ->
Log.err (fun m -> m "Got an error into the submission logic: %S" (Printexc.to_string exn));
Lwt.return_unit)
>>= Lwt.pause >>= go in
go ()

let job ?(limit = 20) ?stop ~locals ?port ~tls ~info ~destination
Expand Down Expand Up @@ -117,8 +129,9 @@ struct
Dns_client.gethostbyname6 dns domain_name
>|= Result.map (fun ipv6 -> Ipaddr.V6 ipv6) in
Lwt.all [ ipv4; ipv6 ] >|= function
| [ _; (Ok _ as ipv6) ] -> ipv6
| [ (Ok _ as ipv4); Error _ ] -> ipv4
| [ Ok ipv4; Ok ipv6 ] -> Ok [ ipv4; ipv6 ]
| [ Error _; (Ok ipv6) ] -> Ok [ ipv6 ]
| [ (Ok ipv4); Error _ ] -> Ok [ ipv4 ]
| [ (Error _ as err); _ ] -> err
| [] | [_] | _ :: _ :: _ -> assert false in
{ getmxbyname; gethostbyname }
Expand Down Expand Up @@ -194,10 +207,16 @@ struct
let real_recipients = Ptt_map.expand ~info map fake_recipients in
let real_recipients = Ptt_aggregate.to_recipients ~info real_recipients in
let id = Ptt_common.id_to_messageID ~info (Ptt.Msgd.id key) in
verify ~info
~sender:(fst (Ptt.Msgd.from key))
~ipaddr:(Ptt.Msgd.ipaddr key) dns stream >>= function
begin
if forward_granted (Ptt.Msgd.ipaddr key) allowed_to_forward
then Lwt.return (`Ok stream)
else verify ~info
~sender:(fst (Ptt.Msgd.from key))
~ipaddr:(Ptt.Msgd.ipaddr key) dns stream end >>= function
| #Ptt.Msgd.error as err ->
Log.warn (fun m -> m "Can verify SPF informations from %a for %a, discard it!"
Colombe.Reverse_path.pp (fst (Ptt.Msgd.from key))
Mrmime.MessageID.pp id);
Lwt.wakeup_later wk err;
Lwt.pause () >>= go
| `Ok stream ->
Expand All @@ -212,8 +231,15 @@ struct
|| only_registered_recipients ~info map fake_recipients
then begin
List.iter (oc $ Option.some) elts;
Log.debug (fun m -> m "Notice the SMTP server that everything is ok for %a from %a."
Colombe.Reverse_path.pp sender Mrmime.MessageID.pp id);
Lwt.wakeup_later wk `Ok
end else Lwt.wakeup_later wk (`Requested_action_not_taken `Permanent);
end else begin
Log.warn (fun m -> m "Email %a to unknown users (%a), discard it!"
Mrmime.MessageID.pp id
Fmt.(Dump.list Colombe.Forward_path.pp) fake_recipients);
Lwt.wakeup_later wk (`Requested_action_not_taken `Permanent)
end;
Lwt.pause () >>= go in
go ()

Expand Down Expand Up @@ -277,7 +303,7 @@ struct
let submission = { info with Ptt_common.tls= submission } in
let relay = { info with Ptt_common.tls= relay } in
Lwt.join
[ Local.job ?stop ~locals:t.locals ~tls:t.tls ~info:submission ~destination:t.destination
[ Local.job ?stop ~locals:t.locals ~tls:t.tls ~info:submission ~destination:[ t.destination ]
stack he t.random t.hash t.authentication t.mechanisms
; Out.job ?stop ~locals:t.locals ~tls:t.tls ~info:relay ~forward_granted:t.forward_granted
stack dns he ]
Expand Down
5 changes: 3 additions & 2 deletions lib/hm.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ struct
Dns_client.gethostbyname6 dns domain_name
>|= Result.map (fun ipv6 -> Ipaddr.V6 ipv6) in
Lwt.all [ ipv4; ipv6 ] >|= function
| [ _; (Ok _ as ipv6) ] -> ipv6
| [ (Ok _ as ipv4); Error _ ] -> ipv4
| [ Ok ipv4; Ok ipv6 ] -> Ok [ ipv4; ipv6 ]
| [ (Ok ipv4); Error _ ] -> Ok [ ipv4 ]
| [ Error _; (Ok ipv6) ] -> Ok [ ipv6 ]
| [ (Error _ as err); _ ] -> err
| [] | [_] | _ :: _ :: _ -> assert false in
{ getmxbyname; gethostbyname }
Expand Down
5 changes: 3 additions & 2 deletions lib/lipap.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ struct
Dns_client.gethostbyname6 dns domain_name
>|= Result.map (fun ipv6 -> Ipaddr.V6 ipv6) in
Lwt.all [ ipv4; ipv6 ] >|= function
| [ _; (Ok _ as ipv6) ] -> ipv6
| [ (Ok _ as ipv4); Error _ ] -> ipv4
| [ Ok ipv4; Ok ipv6 ] -> Ok [ ipv4; ipv6 ]
| [ (Ok ipv4); Error _ ] -> Ok [ ipv4 ]
| [ Error _; (Ok ipv6) ] -> Ok [ ipv6 ]
| [ (Error _ as err); _ ] -> err
| [] | [_] | _ :: _ :: _ -> assert false in
{ getmxbyname; gethostbyname }
Expand Down
5 changes: 3 additions & 2 deletions lib/mti_gf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ struct
Dns_client.gethostbyname6 dns domain_name
>|= Result.map (fun ipv6 -> Ipaddr.V6 ipv6) in
Lwt.all [ ipv4; ipv6 ] >|= function
| [ _; (Ok _ as ipv6) ] -> ipv6
| [ (Ok _ as ipv4); Error _ ] -> ipv4
| [ Ok ipv4; Ok ipv6 ] -> Ok [ ipv4; ipv6 ]
| [ (Ok ipv4); Error _ ] -> Ok [ ipv4 ]
| [ Error _; (Ok ipv6) ] -> Ok [ ipv6 ]
| [ (Error _ as err); _ ] -> err
| [] | [_] | _ :: _ :: _ -> assert false in
{ getmxbyname; gethostbyname }
Expand Down
4 changes: 2 additions & 2 deletions lib/mxs.mli
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ val pp_key : key Fmt.t

include Map.S with type key := key

val v : preference:int -> domain:[ `host ] Domain_name.t -> Ipaddr.t -> Ipaddr.t t
val vs : (Dns.Mx.t * Ipaddr.t) list -> Ipaddr.t t
val v : preference:int -> domain:[ `host ] Domain_name.t -> Ipaddr.t list -> Ipaddr.t list t
val vs : (Dns.Mx.t * Ipaddr.t list) list -> Ipaddr.t list t
5 changes: 3 additions & 2 deletions lib/nec.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ struct
Dns_client.gethostbyname6 dns domain_name
>|= Result.map (fun ipv6 -> Ipaddr.V6 ipv6) in
Lwt.all [ ipv4; ipv6 ] >|= function
| [ _; (Ok _ as ipv6) ] -> ipv6
| [ (Ok _ as ipv4); Error _ ] -> ipv4
| [ Ok ipv4; Ok ipv6 ] -> Ok [ ipv4; ipv6 ]
| [ (Ok ipv4); Error _ ] -> Ok [ ipv4 ]
| [ Error _; (Ok ipv6) ] -> Ok [ ipv6 ]
| [ (Error _ as err); _ ] -> err
| [] | [_] | _ :: _ :: _ -> assert false in
{ getmxbyname; gethostbyname }
Expand Down
2 changes: 1 addition & 1 deletion lib/ptt_common.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type ('dns, 'a) getmxbyname =
type ('dns, 'a) gethostbyname =
'dns
-> [ `host ] Domain_name.t
-> (Ipaddr.t, [> `Msg of string ] as 'a) result Lwt.t
-> (Ipaddr.t list, [> `Msg of string ] as 'a) result Lwt.t

type 'dns resolver =
{ getmxbyname : 'a. ('dns, 'a) getmxbyname
Expand Down
30 changes: 20 additions & 10 deletions lib/ptt_sendmail.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ and policy = [ `Ignore ]

[@@@warning "+30"]

let pp_recipients ppf { domain; locals } =
let pp_domain ppf = function
| `Ipaddr (Ipaddr.V4 ipv4) -> Fmt.pf ppf "[%a]" Ipaddr.V4.pp ipv4
| `Ipaddr (Ipaddr.V6 ipv6) -> Fmt.pf ppf "[IPv6:%a]" Ipaddr.V6.pp ipv6
| `Domain domain_name -> Domain_name.pp ppf domain_name in
match locals with
| `All -> Fmt.pf ppf "<%a>" pp_domain domain
| `Postmaster -> Fmt.pf ppf "Postmaster@%a" pp_domain domain
| `Some locals -> Fmt.pf ppf "%a@%a" Fmt.(Dump.list Emile.pp_local) locals pp_domain domain

let warn_about_an_unreachable_mail_exchange ~domain ~mail_exchange msg =
Log.warn @@ fun m -> m "Impossible to resolve %a, a mail exchange server for %a: %s"
Domain_name.pp mail_exchange Domain_name.pp domain msg
Expand Down Expand Up @@ -93,9 +103,9 @@ module Make
we must use an IP address as a destination to avoid the resolution mechanism
of happy-eyeballs! *)

let sendmail ?(last_option= false) he t ~ipaddr elt =
let sendmail ?(last_option= false) he t ~ipaddrs elt =
let ( let* ) = Lwt.bind in
let destination = Ipaddr.to_string ipaddr in
let destination = `Ipaddrs ipaddrs in
let backup = Lwt_stream.clone elt.data in
let consumed, stream = to_stream elt.data in
let recipients = recipients_to_forward_paths elt.recipients in
Expand Down Expand Up @@ -136,7 +146,7 @@ module Make
Fmt.(list ~sep:(any ",") Colombe.Forward_path.pp) recipients
Colombe.Reverse_path.pp elt.sender);
Lwt.return_unit
| Some _forward_path -> assert false (* TODO *)
| Some _forward_path -> Lwt.return_unit (* TODO *)

let pp_error ppf = function
| #Sendmail_with_starttls.error as err ->
Expand All @@ -154,15 +164,15 @@ module Make
Colombe.Reverse_path.pp elt.sender
pp_error err);
Lwt.return_unit
| Some _forward_path -> assert false (* TODO *)
| Some _forward_path -> Lwt.return_unit (* TODO *)

let sendmail dns he t elt =
let ( let* ) = Lwt.bind in
let open Ptt_common in
begin match elt.recipients.domain with
| `Ipaddr ipaddr ->
let domain = Ipaddr.to_domain_name ipaddr in
Lwt.return_ok Mxs.(v ~preference:0 ~domain ipaddr)
Lwt.return_ok Mxs.(v ~preference:0 ~domain [ ipaddr ])
| `Domain domain ->
let* r = t.resolver.getmxbyname dns domain in
match r with
Expand All @@ -171,7 +181,7 @@ module Make
begin fun acc ({ Dns.Mx.mail_exchange; _ } as mx) ->
let* r = t.resolver.gethostbyname dns mail_exchange in
match r with
| Ok ipaddr -> Lwt.return ((mx, ipaddr) :: acc)
| Ok ipaddrs -> Lwt.return ((mx, ipaddrs) :: acc)
| Error (`Msg msg) ->
warn_about_an_unreachable_mail_exchange ~domain ~mail_exchange msg;
Lwt.return acc end in
Expand All @@ -190,13 +200,13 @@ module Make
of [mxs] which does not do the recursion. This case should
never occur. *)
assert false
| [ _mx, ipaddr ] ->
let* result = sendmail ~last_option:true he t ~ipaddr elt in
| [ _mx, ipaddrs ] ->
let* result = sendmail ~last_option:true he t ~ipaddrs elt in
begin match result with
| `Retry | `Ok -> Lwt.return_unit
| `Errored value -> error_while_sending_email elt value end
| (_mx, ipaddr) :: mxs ->
let* result = sendmail he t ~ipaddr elt in
| (_mx, ipaddrs) :: mxs ->
let* result = sendmail he t ~ipaddrs elt in
match result with
| `Ok -> Lwt.return_unit
| `Retry -> go mxs
Expand Down
2 changes: 2 additions & 0 deletions lib/ptt_sendmail.mli
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ and resource = bytes * bytes * (char, Bigarray.int8_unsigned_elt) Ke.Rke.t
and 'a push = 'a option -> unit
and policy = [ `Ignore ]

val pp_recipients : recipients Fmt.t

module Make
(Clock : Mirage_clock.PCLOCK)
(Stack : Tcpip.Stack.V4V6)
Expand Down
16 changes: 8 additions & 8 deletions lib/relay.ml
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,20 @@ module Make (Stack : Tcpip.Stack.V4V6) = struct

let dot = ".\r\n"

let receive_mail ?(limit = 0x100000) flow ctx m bounded_stream =
let receive_mail ?(limit = 0x100000) flow ctx m push =
let rec go count () =
if count >= limit
then Lwt.return_error `Too_big_data
then begin push None; Lwt.return_error `Too_big_data end
(* NOTE(dinosaure): [552] will be returned later. *)
else
run flow (m ctx) >>? function
| ".." -> bounded_stream#push dot >>= go (count + 3)
| "." -> bounded_stream#close; Lwt.return_ok ()
| ".." -> push (Some dot); go (count + 3) ()
| "." -> push None; Lwt.return_ok ()
| str ->
let len = String.length str in
let str = str ^ "\r\n" in
bounded_stream#push str >>=
go (count + len + 2)
push (Some str);
go (count + len + 2) ()
in
go 0 ()

Expand Down Expand Up @@ -100,7 +100,7 @@ module Make (Stack : Tcpip.Stack.V4V6) = struct
| true ->
let id = succ server in
let key = Msgd.key ~domain_from ~from ~recipients ~ipaddr id in
let stream, bounded_stream = Lwt_stream.create_bounded 0x7ff in
let stream, push = Lwt_stream.create () in
let th, wk = Lwt.task () in
server.push (Some (key, stream, wk));
let m = SMTP.m_mail ctx in
Expand All @@ -109,7 +109,7 @@ module Make (Stack : Tcpip.Stack.V4V6) = struct
~limit:(Int64.to_int server.info.size)
flow ctx
SMTP.(fun ctx -> Monad.recv ctx Value.Payload)
bounded_stream
push
>>= fun result ->
th >>= fun result' ->
let m = SMTP.m_end (merge result result') ctx in
Expand Down
5 changes: 3 additions & 2 deletions lib/spartacus.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ struct
Dns_client.gethostbyname6 dns domain_name
>|= Result.map (fun ipv6 -> Ipaddr.V6 ipv6) in
Lwt.all [ ipv4; ipv6 ] >|= function
| [ _; (Ok _ as ipv6) ] -> ipv6
| [ (Ok _ as ipv4); Error _ ] -> ipv4
| [ Ok ipv4; Ok ipv6 ] -> Ok [ ipv4; ipv6 ]
| [ (Ok ipv4); Error _ ] -> Ok [ ipv4 ]
| [ Error _; (Ok ipv6) ] -> Ok [ ipv6 ]
| [ (Error _ as err); _ ] -> err
| [] | [_] | _ :: _ :: _ -> assert false in
{ getmxbyname; gethostbyname }
Expand Down
18 changes: 10 additions & 8 deletions lib/submission.ml
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,19 @@ module Make (Stack : Tcpip.Stack.V4V6) = struct

let dot = ".\r\n"

let receive_mail ?(limit = 0x100000) (Runner { run; flow}) ctx m bounded_stream =
let receive_mail ?(limit = 0x100000) (Runner { run; flow}) ctx m push =
let rec go count () =
if count >= limit then Lwt.return_error `Too_big
if count >= limit
then begin push None; Lwt.return_error `Too_big end
else
run flow (m ctx) >>? function
| ".." -> bounded_stream#push dot >>= go (count + 3)
| "." -> bounded_stream#close; Lwt.return_ok ()
| ".." -> push (Some dot); go (count + 3) ()
| "." -> push None; Lwt.return_ok ()
| str ->
let len = String.length str in
let str = str ^ "\r\n" in
bounded_stream#push str >>=
go (count + len + 2)
push (Some str);
go (count + len + 2) ()
in
go 0 ()

Expand Down Expand Up @@ -212,7 +213,7 @@ module Make (Stack : Tcpip.Stack.V4V6) = struct
let sender = Colombe.Path.{ local= user; domain= server.info.SSMTP.domain; rest= [] } in
Some sender, snd from in
let key = Msgd.key ~domain_from ~from ~recipients ~ipaddr id in
let stream, bounded_stream = Lwt_stream.create_bounded 0x7ff in
let stream, push = Lwt_stream.create () in
let th, wk = Lwt.task () in
server.push (Some (key, stream, wk));
let m = SSMTP.m_mail ctx in
Expand All @@ -221,8 +222,9 @@ module Make (Stack : Tcpip.Stack.V4V6) = struct
~limit:(Int64.to_int server.info.size)
runner ctx
SSMTP.(fun ctx -> Monad.recv ctx Value.Payload)
bounded_stream
push
>>= fun result ->
Log.debug (fun m -> m "Email received, waiting result from the logic");
th >>= fun result' ->
let m = SSMTP.m_end (merge result result') ctx in
run flow m >>? fun `Quit ->
Expand Down
2 changes: 1 addition & 1 deletion test/test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ module Sendmail = Sendmail_mirage.Make

let sendmail he ipaddr port ~domain sender recipients contents =
let open Lwt.Infix in
let destination = Fmt.str "%a" Ipaddr.pp ipaddr in
let destination = `Ipaddrs [ ipaddr ] in
let stream = Lwt_stream.of_list contents in
let stream = Lwt_stream.map (fun str -> str ^ "\r\n") stream in
let mail () =
Expand Down

0 comments on commit 5982d29

Please sign in to comment.