diff --git a/pgbedrock/attributes.py b/pgbedrock/attributes.py index a53181b..39ad178 100644 --- a/pgbedrock/attributes.py +++ b/pgbedrock/attributes.py @@ -1,6 +1,8 @@ import copy import datetime as dt import hashlib +import hmac +import base64 import logging import click @@ -87,6 +89,14 @@ def create_md5_hash(rolename, value): salted_input = (value + rolename).encode('utf-8') return 'md5' + hashlib.md5(salted_input).hexdigest() +def create_scram_hash(password, salt, iterations): + digest = hashlib.pbkdf2_hmac("sha256", password.encode(), salt, iterations) + client_key = hmac.digest(digest, "Client Key".encode(), hashlib.sha256) + stored_key = base64.b64encode(hashlib.sha256(client_key).digest()).decode() + server_key = base64.b64encode(hmac.digest(digest, "Server Key".encode(), hashlib.sha256)).decode() + + return "SCRAM-SHA-256${}:{}${}:{}".format(iterations, base64.b64encode(salt).decode(), stored_key, server_key) + def is_valid_forever(val): if val is None or val == 'infinity': @@ -194,13 +204,47 @@ def get_attribute_value(self, attribute): return value def is_same_password(self, value): - """ Convert the input value into a postgres rolname-salted md5 hash and compare - it with the currently stored hash """ - if value is None: - return self.current_attributes.get('rolpassword') is None + """ Compare stored password hash to the new password. + + Returns ``True`` if both ``value`` and the stored password are ``None`` (empty) + **or** ``value`` is a plain text password its hash matches the stored password + hash **or** ``value`` is a hashed password and matches the stored password hash + type and value. ``False`` in any other case. """ + current_value = self.current_attributes.get('rolpassword') + + if value is None or current_value is None: + return value == current_value + + if hmac.compare_digest(current_value, value): + return True + + if current_value.startswith("SCRAM-SHA-256$"): + if value.startswith("md5"): + return False + + if value.startswith("SCRAM-SHA-256$"): + return hmac.compare_digest(current_value, value) + + hash_parts = current_value.split("$") + + iterations, salt = hash_parts[1].split(":") + iterations = int(iterations) + salt = base64.b64decode(salt) + + return hmac.compare_digest(current_value, create_scram_hash(value, salt, iterations)) + + if current_value.startswith("md5"): + if value.startswith("SCRAM-SHA-256$"): + return False + + if value.startswith("md5"): + return hmac.compare_digest(current_value, value) + + return hmac.compare_digest(current_value, create_md5_hash(self.rolename, value)) - md5_hash = create_md5_hash(self.rolename, value) - return self.current_attributes.get('rolpassword') == md5_hash + # There’s currently only two hash algorithm supported by Postgres (md5 + # and SCRAM-SHA-256) so we should never reach this. + return False def role_exists(self): # If current_attributes is empty then the rolname wasn't in pg_authid, i.e. it doesn't exist diff --git a/requirements-dev.txt b/requirements-dev.txt index 16b680f..ee3c7ef 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -2,3 +2,4 @@ pytest==3.1.3 pytest-cov==2.5.1 -r requirements-docs.txt wheel==0.33.6 +psycopg2==2.7.7 diff --git a/requirements.txt b/requirements.txt index 4fcc6a0..f808bfe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ Cerberus==1.1 click==6.7 Jinja2==2.10.1 -MarkupSafe==1.0 +MarkupSafe==1.1.1 psycopg2==2.7.3 PyYAML==5.2 diff --git a/tests/test_attributes.py b/tests/test_attributes.py index 7688786..921b46d 100644 --- a/tests/test_attributes.py +++ b/tests/test_attributes.py @@ -382,14 +382,38 @@ def test_set_attribute_value_valid_until(roleconf): rolpassword=attr.create_md5_hash(ROLE1, 'supersecret'), )) @pytest.mark.parametrize('desired_value, expected', [ - ('supersecret', True), - ('incorrect_password', False)]) -def test_is_same_password(roleconf, desired_value, expected): + ('supersecret', True), + ('md5c85aa4317b187e73a31e8ab775a10833', True), + ('incorrect_password', False), + ('SCRAM-SHA-256$4096:c3VwZXJzYWx0$E6lZT4K2olotsu19xYcF825iMPGdJQDYaklVS2mR6js=:Pe9DLNf8idnP59Q5l8Xmz3H+6LrTuiq//bcujQPGsRM=', False), + ] +) +def test_is_same_password_md5(roleconf, desired_value, expected): assert roleconf.is_same_password(desired_value) == expected -def test_is_same_password_if_empty(roleconf): - assert roleconf.is_same_password(None) is True +@nondefault_attributes(dict( + rolpassword=attr.create_scram_hash('supersecret', 'supersalt'.encode(), 4096), +)) +@pytest.mark.parametrize('desired_value, expected', [ + ('supersecret', True), + ('SCRAM-SHA-256$4096:c3VwZXJzYWx0$E6lZT4K2olotsu19xYcF825iMPGdJQDYaklVS2mR6js=:Pe9DLNf8idnP59Q5l8Xmz3H+6LrTuiq//bcujQPGsRM=', True), + ('incorrect_password', False), + ('md5c85aa4317b187e73a31e8ab775a10833', False), + ] +) +def test_is_same_password_scram(roleconf, desired_value, expected): + assert roleconf.is_same_password(desired_value) == expected + +@pytest.mark.parametrize('desired_value, expected', [ + (None, True), + ('incorrect_password', False), + ('md5c85aa4317b187e73a31e8ab775a10833', False), + ('SCRAM-SHA-256$4096:c3VwZXJzYWx0$E6lZT4K2olotsu19xYcF825iMPGdJQDYaklVS2mR6js=:Pe9DLNf8idnP59Q5l8Xmz3H+6LrTuiq//bcujQPGsRM=', False), + ] +) +def test_is_same_password_if_empty(roleconf, desired_value, expected): + assert roleconf.is_same_password(desired_value) == expected @nondefault_attributes(dict(