diff --git a/schwarz/mailqueue/aliases_parser.py b/schwarz/mailqueue/aliases_parser.py new file mode 100644 index 0000000..96dc285 --- /dev/null +++ b/schwarz/mailqueue/aliases_parser.py @@ -0,0 +1,91 @@ +"""Parse /etc/aliases""" + +import os +import re +from pathlib import Path +from typing import Optional, Sequence, TextIO, Union + + +__all__ = ['lookup_address', 'lookup_adresses'] + +StrPath = Union[str, os.PathLike[str]] + +def lookup_address( + address: str, + _aliases: Optional[dict[str, list[str]]] = None, + ) -> Optional[tuple[str]]: + if _is_email_address(address): + return (address,) + aliases = _load_aliases(_aliases) + if not aliases: + return None + return _resolve_alias(address, aliases) + + +def lookup_adresses(recipients: Sequence[str], aliases: Optional[dict]) -> tuple[str]: + email_addresses = [] + for recipient in recipients: + targets = lookup_address(recipient, aliases) + email_addresses = _extend(email_addresses, targets) + return tuple(email_addresses) + + +def _resolve_alias(address: str, aliases) -> Optional[tuple[str]]: + if _is_email_address(address): + return (address,) + + targets = aliases.get(address) + if not targets: + return None + email_addresses = [] + for target in targets: + if _is_email_address(target): + email_addresses = _extend(email_addresses, [target]) + else: + _extra_adresses = _resolve_alias(target, aliases) + email_addresses = _extend(email_addresses, _extra_adresses) + return tuple(email_addresses) + + +def _is_email_address(address: Optional[str]) -> bool: + return address and ('@' in address) + + +def _extend(values, new_values): + _new = list(values) + if new_values: + for value in new_values: + if value not in _new: + _new.append(value) + return _new + + +def _load_aliases(aliases): + if not aliases: + system_aliases = Path('/etc/aliases') + if system_aliases.exists(): + return _parse_aliases(system_aliases) + return aliases + + +def _parse_aliases(src: Union[StrPath, TextIO]) -> dict[str, list[str]]: + if isinstance(src, (os.PathLike, str)): + with open(src) as aliases_fp: + aliases_str = aliases_fp.read() + else: + aliases_str = src.read() + + _aliases = {} + re_colon = re.compile(r'\s*:\s*') + re_items = re.compile(r'\s*,\s*') + for line_str in re.split(r'\n+', aliases_str): + alias_line = line_str.split('#', 1)[0].strip() + if not alias_line: + continue + if ':' not in alias_line: + # faulty alias line + continue + _alias, _target = re_colon.split(alias_line, 1) + _aliases[_alias] = list(re_items.split(_target)) + + return _aliases diff --git a/schwarz/mailqueue/cli/mq_sendmail.py b/schwarz/mailqueue/cli/mq_sendmail.py index 27dc96e..f7eb1a4 100644 --- a/schwarz/mailqueue/cli/mq_sendmail.py +++ b/schwarz/mailqueue/cli/mq_sendmail.py @@ -5,11 +5,12 @@ mq-sendmail [options] ... Options: - -C, --config= Path to the config file (default: /etc/mailqueue-runner.ini) - --set-date-header Add a "Date" header to the message (if not present) - --set-from-header Set the "From:" header in the outgoing mail based on the unix user - --set-msgid-header Add a "Message-ID" header to the message (if not present) - --verbose, -v more verbose program output + --aliases= Path to aliases [default: /etc/aliases] + -C, --config= Path to the config file + --set-date-header Add a "Date" header to the message (if not present) + --set-from-header Add "From:" header to the message (if not present) + --set-msgid-header Add a "Message-ID" header to the message (if not present) + --verbose, -v more verbose program output """ import os @@ -23,6 +24,7 @@ from docopt import docopt +from schwarz.mailqueue.aliases_parser import _parse_aliases, lookup_adresses from schwarz.mailqueue.app_helpers import guess_config_path, init_app, init_smtp_mailer from schwarz.mailqueue.message_handler import InMemoryMsg, MessageHandler @@ -32,13 +34,17 @@ def mq_sendmail_main(argv=sys.argv, return_rc_code=False): arguments = docopt(__doc__, argv=argv[1:]) config_path = guess_config_path(arguments['--config']) - recipients = arguments[''] + recipient_params = arguments[''] verbose = arguments['--verbose'] + aliases_fn = arguments['--aliases'] set_date_header = arguments['--set-date-header'] set_from_header = arguments['--set-from-header'] set_msgid_header = arguments['--set-msgid-header'] + aliases = _parse_aliases(aliases_fn) if aliases_fn else None + recipients = lookup_adresses(recipient_params, aliases) + msg_bytes = sys.stdin.buffer.read() try: username = os.getlogin() diff --git a/tests/alias_parser_test.py b/tests/alias_parser_test.py new file mode 100644 index 0000000..9316185 --- /dev/null +++ b/tests/alias_parser_test.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# SPDX-License-Identifier: MIT + +import textwrap +from io import StringIO + +import pytest + +from schwarz.mailqueue.aliases_parser import _parse_aliases, lookup_address + + +def test_lookup_adress_simple_match(): + result = lookup_address('foo', _aliases={'foo': ['foo@site.example']}) + assert result == ('foo@site.example',) + + +def test_lookup_adress_full_email_adress(): + # regular email address -> should not use aliases + result = lookup_address('foo@site.example', _aliases={'foo': ['bar@site.example']}) + assert result == ('foo@site.example',) + + +@pytest.mark.parametrize('aliases', [ + {}, + {'bar': ['bar@site.example']} +]) +def test_lookup_adress_no_match(aliases): + result = lookup_address('foo', _aliases=aliases) + assert result is None + + +def test_lookup_adress_recursive_lookup(): + aliases = { + 'foo': ['bar'], + 'bar': ['baz'], + 'baz': ['info@site.example'], + } + result = lookup_address('foo', _aliases=aliases) + assert result == ('info@site.example',) + + +def test_lookup_multiple_addresses(): + aliases = { + 'admins': ['foo', 'bar'], + 'foo': ['foo@site.example'], + 'bar': ['bar@site.example'], + } + result = lookup_address('admins', _aliases=aliases) + assert result == ('foo@site.example', 'bar@site.example') + + +def test_lookup_address_multiple_aliases_to_same_email_address(): + aliases = { + 'admins': ['foo', 'bar'], + 'foo': ['staff@site.example'], + 'bar': ['staff@site.example'], + } + result = lookup_address('admins', _aliases=aliases) + assert result == ('staff@site.example',) + + +def test_lookup_address_mixed_aliases(): + aliases = { + 'admins': ['foo', 'monitor@site.example'], + 'foo': ['staff'], + 'staff': ['staff@site.example'], + } + result = lookup_address('admins', _aliases=aliases) + assert set(result) == set(['monitor@site.example', 'staff@site.example']) + + +def test_parse_aliases_empty_file(): + assert _parse_aliases(StringIO('')) == {} + + +def test_parse_aliases_single_alias(): + aliases = 'foo: foo@site.example' + assert _parse_aliases(StringIO(aliases)) == {'foo': ['foo@site.example']} + + +def test_parse_aliases_comments_and_empty_lines(): + aliases = textwrap.dedent(''' + # comment + foo : \t bar + + \tbar: staff@site.example + baz: root@site.example # some comment + ''') + expected_aliases = { + 'foo': ['bar'], + 'bar': ['staff@site.example'], + 'baz': ['root@site.example'], + } + assert _parse_aliases(StringIO(aliases)) == expected_aliases + + +def test_parse_aliases_multiple_targets(): + aliases = 'foo: foo@site.example, monitor@site.example' + expected_aliases = { + 'foo': ['foo@site.example', 'monitor@site.example'], + } + assert _parse_aliases(StringIO(aliases)) == expected_aliases diff --git a/tests/mq_sendmail_test.py b/tests/mq_sendmail_test.py index c2ebdb1..724ad3b 100644 --- a/tests/mq_sendmail_test.py +++ b/tests/mq_sendmail_test.py @@ -31,17 +31,18 @@ def ctx(tmp_path): mta_helper.stop_mta() -def _example_message() -> str: - return textwrap.dedent(''' - To: baz@site.example +def _example_message(to) -> str: + base_msg = textwrap.dedent(''' Subject: Test message Mail body ''').strip() + to_line = f'To: {to}\n' if to else '' + return to_line + base_msg def test_mq_sendmail(ctx): - rfc_msg = _example_message() + rfc_msg = _example_message(to='baz@site.example') _mq_sendmail(['foo@site.example'], msg=rfc_msg, ctx=ctx) smtp_msg = _retrieve_sent_message(ctx.mta) @@ -59,7 +60,7 @@ def _retrieve_sent_message(mta): def test_mq_sendmail_can_add_headers(ctx): - sent_msg = _example_message() + sent_msg = _example_message(to='baz@site.example') cli_params = [ '--set-from-header', '--set-date-header', @@ -83,6 +84,32 @@ def _is_email_address(s): pattern = r'^\w+@[\w.\-]+$' return re.match(pattern, s) is not None + +def test_mq_sendmail_with_aliases(ctx, tmp_path): + aliases_path = _create_alias_file({'foo': 'staff@site.example'}, tmp_path) + + rfc_msg = _example_message(to='baz@site.example') + _mq_sendmail([f'--aliases={aliases_path}', 'foo'], msg=rfc_msg, ctx=ctx) + + smtp_msg = _retrieve_sent_message(ctx.mta) + expected_recipient = 'staff@site.example' + assert tuple(smtp_msg.smtp_to) == (expected_recipient,) + assert smtp_msg.msg_data == rfc_msg + msg = email.message_from_string(smtp_msg.msg_data) + # "From" header should not be changed by mq_sendmail + assert msg['To'] == 'baz@site.example' + + +def _create_alias_file(aliases, dir_path) -> str: + aliases_contents = '' + for alias, target in aliases.items(): + aliases_contents += f'{alias}: {target}\n' + + aliases_path = dir_path / 'aliases' + aliases_path.write_text(aliases_contents) + return str(aliases_path) + + def _mq_sendmail(cli_params, msg, *, ctx): cfg_dir = str(ctx.tmp_path) config_path = create_ini(ctx.hostname, ctx.listen_port, dir_path=cfg_dir) @@ -95,8 +122,12 @@ def _mq_sendmail(cli_params, msg, *, ctx): else: proc = subprocess.run(cmd, input=msg_bytes, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if proc.returncode != 0: + if proc.stdout: + sys.stderr.buffer.write(proc.stdout) + if proc.stderr: + sys.stderr.buffer.write(proc.stderr) + assert proc.returncode == 0 if proc.stderr: - # sys.stderr.buffer.write(proc.stderr) raise AssertionError(proc.stderr) assert not proc.stdout - assert proc.returncode == 0