-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
mq-sendmail
: parse /etc/aliases to resolve recipient names like "root"
- Loading branch information
1 parent
3cfa48a
commit 2940663
Showing
4 changed files
with
243 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
"""Parse /etc/aliases""" | ||
|
||
import os | ||
import re | ||
from pathlib import Path | ||
from typing import Optional, Sequence, TextIO, Union | ||
|
||
|
||
__all__ = ['lookup_address', 'lookup_adresses'] | ||
|
||
StrPath = Union[str, os.PathLike[str]] | ||
|
||
def lookup_address( | ||
address: str, | ||
_aliases: Optional[dict[str, list[str]]] = None, | ||
) -> Optional[tuple[str]]: | ||
if _is_email_address(address): | ||
return (address,) | ||
aliases = _load_aliases(_aliases) | ||
if not aliases: | ||
return None | ||
return _resolve_alias(address, aliases) | ||
|
||
|
||
def lookup_adresses(recipients: Sequence[str], aliases: Optional[dict]) -> tuple[str]: | ||
email_addresses = [] | ||
for recipient in recipients: | ||
targets = lookup_address(recipient, aliases) | ||
email_addresses = _extend(email_addresses, targets) | ||
return tuple(email_addresses) | ||
|
||
|
||
def _resolve_alias(address: str, aliases) -> Optional[tuple[str]]: | ||
if _is_email_address(address): | ||
return (address,) | ||
|
||
targets = aliases.get(address) | ||
if not targets: | ||
return None | ||
email_addresses = [] | ||
for target in targets: | ||
if _is_email_address(target): | ||
email_addresses = _extend(email_addresses, [target]) | ||
else: | ||
_extra_adresses = _resolve_alias(target, aliases) | ||
email_addresses = _extend(email_addresses, _extra_adresses) | ||
return tuple(email_addresses) | ||
|
||
|
||
def _is_email_address(address: Optional[str]) -> bool: | ||
return address and ('@' in address) | ||
|
||
|
||
def _extend(values, new_values): | ||
_new = list(values) | ||
if new_values: | ||
for value in new_values: | ||
if value not in _new: | ||
_new.append(value) | ||
return _new | ||
|
||
|
||
def _load_aliases(aliases): | ||
if not aliases: | ||
system_aliases = Path('/etc/aliases') | ||
if system_aliases.exists(): | ||
return _parse_aliases(system_aliases) | ||
return aliases | ||
|
||
|
||
def _parse_aliases(src: Union[StrPath, TextIO]) -> dict[str, list[str]]: | ||
if isinstance(src, (os.PathLike, str)): | ||
with open(src) as aliases_fp: | ||
aliases_str = aliases_fp.read() | ||
else: | ||
aliases_str = src.read() | ||
|
||
_aliases = {} | ||
re_colon = re.compile(r'\s*:\s*') | ||
re_items = re.compile(r'\s*,\s*') | ||
for line_str in re.split(r'\n+', aliases_str): | ||
alias_line = line_str.split('#', 1)[0].strip() | ||
if not alias_line: | ||
continue | ||
if ':' not in alias_line: | ||
# faulty alias line | ||
continue | ||
_alias, _target = re_colon.split(alias_line, 1) | ||
_aliases[_alias] = list(re_items.split(_target)) | ||
|
||
return _aliases |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
# -*- coding: utf-8 -*- | ||
# SPDX-License-Identifier: MIT | ||
|
||
import textwrap | ||
from io import StringIO | ||
|
||
import pytest | ||
|
||
from schwarz.mailqueue.aliases_parser import _parse_aliases, lookup_address | ||
|
||
|
||
def test_lookup_adress_simple_match(): | ||
result = lookup_address('foo', _aliases={'foo': ['[email protected]']}) | ||
assert result == ('[email protected]',) | ||
|
||
|
||
def test_lookup_adress_full_email_adress(): | ||
# regular email address -> should not use aliases | ||
result = lookup_address('[email protected]', _aliases={'foo': ['[email protected]']}) | ||
assert result == ('[email protected]',) | ||
|
||
|
||
@pytest.mark.parametrize('aliases', [ | ||
{}, | ||
{'bar': ['[email protected]']} | ||
]) | ||
def test_lookup_adress_no_match(aliases): | ||
result = lookup_address('foo', _aliases=aliases) | ||
assert result is None | ||
|
||
|
||
def test_lookup_adress_recursive_lookup(): | ||
aliases = { | ||
'foo': ['bar'], | ||
'bar': ['baz'], | ||
'baz': ['[email protected]'], | ||
} | ||
result = lookup_address('foo', _aliases=aliases) | ||
assert result == ('[email protected]',) | ||
|
||
|
||
def test_lookup_multiple_addresses(): | ||
aliases = { | ||
'admins': ['foo', 'bar'], | ||
'foo': ['[email protected]'], | ||
'bar': ['[email protected]'], | ||
} | ||
result = lookup_address('admins', _aliases=aliases) | ||
assert result == ('[email protected]', '[email protected]') | ||
|
||
|
||
def test_lookup_address_multiple_aliases_to_same_email_address(): | ||
aliases = { | ||
'admins': ['foo', 'bar'], | ||
'foo': ['[email protected]'], | ||
'bar': ['[email protected]'], | ||
} | ||
result = lookup_address('admins', _aliases=aliases) | ||
assert result == ('[email protected]',) | ||
|
||
|
||
def test_lookup_address_mixed_aliases(): | ||
aliases = { | ||
'admins': ['foo', '[email protected]'], | ||
'foo': ['staff'], | ||
'staff': ['[email protected]'], | ||
} | ||
result = lookup_address('admins', _aliases=aliases) | ||
assert set(result) == set(['[email protected]', '[email protected]']) | ||
|
||
|
||
def test_parse_aliases_empty_file(): | ||
assert _parse_aliases(StringIO('')) == {} | ||
|
||
|
||
def test_parse_aliases_single_alias(): | ||
aliases = 'foo: [email protected]' | ||
assert _parse_aliases(StringIO(aliases)) == {'foo': ['[email protected]']} | ||
|
||
|
||
def test_parse_aliases_comments_and_empty_lines(): | ||
aliases = textwrap.dedent(''' | ||
# comment | ||
foo : \t bar | ||
\tbar: [email protected] | ||
baz: [email protected] # some comment | ||
''') | ||
expected_aliases = { | ||
'foo': ['bar'], | ||
'bar': ['[email protected]'], | ||
'baz': ['[email protected]'], | ||
} | ||
assert _parse_aliases(StringIO(aliases)) == expected_aliases | ||
|
||
|
||
def test_parse_aliases_multiple_targets(): | ||
aliases = 'foo: [email protected], [email protected]' | ||
expected_aliases = { | ||
'foo': ['[email protected]', '[email protected]'], | ||
} | ||
assert _parse_aliases(StringIO(aliases)) == expected_aliases |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,17 +31,18 @@ def ctx(tmp_path): | |
mta_helper.stop_mta() | ||
|
||
|
||
def _example_message() -> str: | ||
return textwrap.dedent(''' | ||
To: [email protected] | ||
def _example_message(to) -> str: | ||
base_msg = textwrap.dedent(''' | ||
Subject: Test message | ||
Mail body | ||
''').strip() | ||
to_line = f'To: {to}\n' if to else '' | ||
return to_line + base_msg | ||
|
||
|
||
def test_mq_sendmail(ctx): | ||
rfc_msg = _example_message() | ||
rfc_msg = _example_message(to='[email protected]') | ||
_mq_sendmail(['[email protected]'], msg=rfc_msg, ctx=ctx) | ||
|
||
smtp_msg = _retrieve_sent_message(ctx.mta) | ||
|
@@ -59,7 +60,7 @@ def _retrieve_sent_message(mta): | |
|
||
|
||
def test_mq_sendmail_can_add_headers(ctx): | ||
sent_msg = _example_message() | ||
sent_msg = _example_message(to='[email protected]') | ||
cli_params = [ | ||
'--set-from-header', | ||
'--set-date-header', | ||
|
@@ -83,6 +84,32 @@ def _is_email_address(s): | |
pattern = r'^\w+@[\w.\-]+$' | ||
return re.match(pattern, s) is not None | ||
|
||
|
||
def test_mq_sendmail_with_aliases(ctx, tmp_path): | ||
aliases_path = _create_alias_file({'foo': '[email protected]'}, tmp_path) | ||
|
||
rfc_msg = _example_message(to='[email protected]') | ||
_mq_sendmail([f'--aliases={aliases_path}', 'foo'], msg=rfc_msg, ctx=ctx) | ||
|
||
smtp_msg = _retrieve_sent_message(ctx.mta) | ||
expected_recipient = '[email protected]' | ||
assert tuple(smtp_msg.smtp_to) == (expected_recipient,) | ||
assert smtp_msg.msg_data == rfc_msg | ||
msg = email.message_from_string(smtp_msg.msg_data) | ||
# "From" header should not be changed by mq_sendmail | ||
assert msg['To'] == '[email protected]' | ||
|
||
|
||
def _create_alias_file(aliases, dir_path) -> str: | ||
aliases_contents = '' | ||
for alias, target in aliases.items(): | ||
aliases_contents += f'{alias}: {target}\n' | ||
|
||
aliases_path = dir_path / 'aliases' | ||
aliases_path.write_text(aliases_contents) | ||
return str(aliases_path) | ||
|
||
|
||
def _mq_sendmail(cli_params, msg, *, ctx): | ||
cfg_dir = str(ctx.tmp_path) | ||
config_path = create_ini(ctx.hostname, ctx.listen_port, dir_path=cfg_dir) | ||
|
@@ -95,8 +122,12 @@ def _mq_sendmail(cli_params, msg, *, ctx): | |
else: | ||
proc = subprocess.run(cmd, input=msg_bytes, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | ||
|
||
if proc.returncode != 0: | ||
if proc.stdout: | ||
sys.stderr.buffer.write(proc.stdout) | ||
if proc.stderr: | ||
sys.stderr.buffer.write(proc.stderr) | ||
assert proc.returncode == 0 | ||
if proc.stderr: | ||
# sys.stderr.buffer.write(proc.stderr) | ||
raise AssertionError(proc.stderr) | ||
assert not proc.stdout | ||
assert proc.returncode == 0 |