Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to use header from if domain is aligned with envelope #2332

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion app/mailbox_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
email_can_be_used_as_mailbox,
send_email,
render,
get_email_domain_part,
)
from app.email_validation import is_valid_email
from app.log import LOG
from app.models import User, Mailbox, Job, MailboxActivation
from app.models import User, Mailbox, Job, MailboxActivation, Alias
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
from app.utils import canonicalize_email


@dataclasses.dataclass
Expand Down Expand Up @@ -328,3 +330,56 @@ def perform_mailbox_email_change(mailbox_id: int) -> MailboxEmailChangeResult:
message="Invalid link",
message_category="error",
)


def __get_alias_mailbox_from_email(
email_address: str, alias: Alias
) -> Optional[Mailbox]:
for mailbox in alias.mailboxes:
if mailbox.email == email_address:
return mailbox

for authorized_address in mailbox.authorized_addresses:
if authorized_address.email == email_address:
LOG.d(
"Found an authorized address for %s %s %s",
alias,
mailbox,
authorized_address,
)
return mailbox
return None


def __get_alias_mailbox_from_email_or_canonical_email(
email_address: str, alias: Alias
) -> Optional[Mailbox]:
# We need to first check for the uncanonicalized version because we still have users in the db with the
# email non canonicalized. So if it matches the already existing one use that, otherwise check the canonical one
mbox = __get_alias_mailbox_from_email(email_address, alias)
if mbox is not None:
return mbox
canonical_email = canonicalize_email(email_address)
if canonical_email != email_address:
return __get_alias_mailbox_from_email(canonical_email, alias)
return None


def get_mailbox_for_reply_phase(
envelope_mail_from: str, header_mail_from: str, alias
) -> Optional[Mailbox]:
"""return the corresponding mailbox given the mail_from and alias
Usually the mail_from=mailbox.email but it can also be one of the authorized address
"""
mbox = __get_alias_mailbox_from_email_or_canonical_email(envelope_mail_from, alias)
if mbox is not None:
return mbox
if not header_mail_from:
return None
envelope_from_domain = get_email_domain_part(envelope_mail_from)
header_from_domain = get_email_domain_part(header_mail_from)
if envelope_from_domain != header_from_domain:
return None
# For services that use VERP sending (envelope from has encoded data to account for bounces)
# if the domain is the same in the header from as the envelope from we can use the header from
return __get_alias_mailbox_from_email_or_canonical_email(header_mail_from, alias)
36 changes: 6 additions & 30 deletions email_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
from app.handler.unsubscribe_handler import UnsubscribeHandler
from app.log import LOG, set_message_id
from app.mail_sender import sl_sendmail
from app.mailbox_utils import get_mailbox_for_reply_phase
from app.message_utils import message_to_bytes
from app.models import (
Alias,
Expand All @@ -172,7 +173,7 @@
sign_data,
load_public_key_and_check,
)
from app.utils import sanitize_email, canonicalize_email
from app.utils import sanitize_email
from init_app import load_pgp_public_keys
from server import create_light_app

Expand Down Expand Up @@ -1008,7 +1009,6 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str):
return False, status.E503

user = alias.user
mail_from = envelope.mail_from

if not user.can_send_or_receive():
LOG.i(f"User {user} cannot send emails")
Expand All @@ -1022,13 +1022,15 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str):
return False, dmarc_delivery_status

# Anti-spoofing
mailbox = get_mailbox_from_mail_from(mail_from, alias)
mailbox = get_mailbox_for_reply_phase(
envelope.mail_from, get_header_unicode(msg[headers.FROM]), alias
)
if not mailbox:
if alias.disable_email_spoofing_check:
# ignore this error, use default alias mailbox
LOG.w(
"ignore unknown sender to reverse-alias %s: %s -> %s",
mail_from,
envelope.mail_from,
alias,
contact,
)
Expand Down Expand Up @@ -1367,32 +1369,6 @@ def replace_original_message_id(alias: Alias, email_log: EmailLog, msg: Message)
msg[headers.REFERENCES] = " ".join(new_message_ids)


def get_mailbox_from_mail_from(mail_from: str, alias) -> Optional[Mailbox]:
"""return the corresponding mailbox given the mail_from and alias
Usually the mail_from=mailbox.email but it can also be one of the authorized address
"""

def __check(email_address: str, alias: Alias) -> Optional[Mailbox]:
for mailbox in alias.mailboxes:
if mailbox.email == email_address:
return mailbox

for authorized_address in mailbox.authorized_addresses:
if authorized_address.email == email_address:
LOG.d(
"Found an authorized address for %s %s %s",
alias,
mailbox,
authorized_address,
)
return mailbox
return None

# We need to first check for the uncanonicalized version because we still have users in the db with the
# email non canonicalized. So if it matches the already existing one use that, otherwise check the canonical one
return __check(mail_from, alias) or __check(canonicalize_email(mail_from), alias)


def handle_unknown_mailbox(
envelope, msg, reply_email: str, user: User, alias: Alias, contact: Contact
):
Expand Down
24 changes: 0 additions & 24 deletions tests/test_email_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from app.mail_sender import mail_sender
from app.models import (
Alias,
AuthorizedAddress,
IgnoredEmail,
EmailLog,
Notification,
Expand All @@ -24,35 +23,12 @@
)
from app.utils import random_string, canonicalize_email
from email_handler import (
get_mailbox_from_mail_from,
should_ignore,
is_automatic_out_of_office,
)
from tests.utils import load_eml_file, create_new_user, random_email


def test_get_mailbox_from_mail_from(flask_client):
user = create_new_user()
alias = Alias.create_new_random(user)
Session.commit()

mb = get_mailbox_from_mail_from(user.email, alias)
assert mb.email == user.email

mb = get_mailbox_from_mail_from("[email protected]", alias)
assert mb is None

# authorized address
AuthorizedAddress.create(
user_id=user.id,
mailbox_id=user.default_mailbox_id,
email="[email protected]",
commit=True,
)
mb = get_mailbox_from_mail_from("[email protected]", alias)
assert mb.email == user.email


def test_should_ignore(flask_client):
assert should_ignore("mail_from", []) is False

Expand Down
85 changes: 83 additions & 2 deletions tests/test_mailbox_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,18 @@
from app import mailbox_utils, config
from app.db import Session
from app.mail_sender import mail_sender
from app.mailbox_utils import MailboxEmailChangeError
from app.models import Mailbox, MailboxActivation, User, Job, UserAuditLog
from app.mailbox_utils import MailboxEmailChangeError, get_mailbox_for_reply_phase
from app.models import (
Mailbox,
MailboxActivation,
User,
Job,
UserAuditLog,
Alias,
AuthorizedAddress,
)
from app.user_audit_log_utils import UserAuditLogAction
from app.utils import random_string, canonicalize_email
from tests.utils import create_new_user, random_email


Expand Down Expand Up @@ -418,3 +427,75 @@ def test_perform_mailbox_email_change_success():
user_id=user.id, action=UserAuditLogAction.UpdateMailbox.value
).count()
assert audit_log_entries == 1


def test_get_mailbox_from_mail_from(flask_client):
user = create_new_user()
alias = Alias.create_new_random(user)
Session.commit()

mb = get_mailbox_for_reply_phase(user.email, "", alias)
assert mb.email == user.email

mb = get_mailbox_for_reply_phase("[email protected]", "", alias)
assert mb is None

# authorized address
AuthorizedAddress.create(
user_id=user.id,
mailbox_id=user.default_mailbox_id,
email="[email protected]",
commit=True,
)
mb = get_mailbox_for_reply_phase("[email protected]", "", alias)
assert mb.email == user.email


def test_get_mailbox_from_mail_from_for_canonical_email(flask_client):
prefix = random_string(10)
email = f"{prefix}[email protected]"
canonical_email = canonicalize_email(email)
assert canonical_email != email

user = create_new_user()
mbox = Mailbox.create(
email=canonical_email, user_id=user.id, verified=True, flush=True
)
alias = Alias.create(user_id=user.id, email=random_email(), mailbox_id=mbox.id)
Session.flush()

mb = get_mailbox_for_reply_phase(email, "", alias)
assert mb.email == canonical_email

mb = get_mailbox_for_reply_phase(canonical_email, "", alias)
assert mb.email == canonical_email


def test_get_mailbox_from_mail_from_coming_from_header_if_domain_is_aligned(
flask_client,
):
domain = f"{random_string(10)}.com"
envelope_from = f"envelope_verp@{domain}"
mail_from = f"mail_from@{domain}"
user = create_new_user()
mbox = Mailbox.create(email=mail_from, user_id=user.id, verified=True, flush=True)
alias = Alias.create(user_id=user.id, email=random_email(), mailbox_id=mbox.id)
Session.flush()

mb = get_mailbox_for_reply_phase(envelope_from, mail_from, alias)
assert mb.email == mail_from


def test_get_mailbox_from_mail_from_coming_from_header_if_domain_is_not_aligned(
flask_client,
):
domain = f"{random_string(10)}.com"
envelope_from = f"envelope_verp@{domain}"
mail_from = f"mail_from@other_{domain}"
user = create_new_user()
mbox = Mailbox.create(email=mail_from, user_id=user.id, verified=True, flush=True)
alias = Alias.create(user_id=user.id, email=random_email(), mailbox_id=mbox.id)
Session.flush()

mb = get_mailbox_for_reply_phase(envelope_from, mail_from, alias)
assert mb is None
Loading