Skip to content

Commit

Permalink
Support passwords stored as SCRAM-SHA-256
Browse files Browse the repository at this point in the history
This allows passing the password as plain text, a md5 hash or a
SCRAM-SHA-256 hash. The password will be compared to whatever hash is
stored in Postgres and updated accordingly.

The SCRAM-SHA-256 hash algorithm for password storage was introduced in
Postgres version 10 and made the default in Postgres version 14.
  • Loading branch information
johanfleury committed Sep 2, 2022
1 parent 54a2c4b commit 2c66ca1
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 11 deletions.
56 changes: 50 additions & 6 deletions pgbedrock/attributes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import copy
import datetime as dt
import hashlib
import hmac
import base64
import logging

import click
Expand Down Expand Up @@ -87,6 +89,14 @@ def create_md5_hash(rolename, value):
salted_input = (value + rolename).encode('utf-8')
return 'md5' + hashlib.md5(salted_input).hexdigest()

def create_scram_hash(password, salt, iterations):
digest = hashlib.pbkdf2_hmac("sha256", password.encode(), salt, iterations)
client_key = hmac.digest(digest, "Client Key".encode(), hashlib.sha256)
stored_key = base64.b64encode(hashlib.sha256(client_key).digest()).decode()
server_key = base64.b64encode(hmac.digest(digest, "Server Key".encode(), hashlib.sha256)).decode()

return "SCRAM-SHA-256${}:{}${}:{}".format(iterations, base64.b64encode(salt).decode(), stored_key, server_key)


def is_valid_forever(val):
if val is None or val == 'infinity':
Expand Down Expand Up @@ -194,13 +204,47 @@ def get_attribute_value(self, attribute):
return value

def is_same_password(self, value):
""" Convert the input value into a postgres rolname-salted md5 hash and compare
it with the currently stored hash """
if value is None:
return self.current_attributes.get('rolpassword') is None
""" Compare stored password hash to the new password.
Returns ``True`` if both ``value`` and the stored password are ``None`` (empty)
**or** ``value`` is a plain text password its hash matches the stored password
hash **or** ``value`` is a hashed password and matches the stored password hash
type and value. ``False`` in any other case. """
current_value = self.current_attributes.get('rolpassword')

if current_value == value:
return True

if value is None or current_value is None:
return False

if current_value.startswith("SCRAM-SHA-256$"):
if value.startswith("md5"):
return False

if value.startswith("SCRAM-SHA-256$"):
return current_value == value

hash_parts = current_value.split("$")

iterations, salt = hash_parts[1].split(":")
iterations = int(iterations)
salt = base64.b64decode(salt)

return current_value == create_scram_hash(value, salt, iterations)

if current_value.startswith("md5"):
if value.startswith("SCRAM-SHA-256$"):
return False

if value.startswith("md5"):
return current_value == value

return hmac.compare_digest(current_value, create_md5_hash(self.rolename, value))

md5_hash = create_md5_hash(self.rolename, value)
return self.current_attributes.get('rolpassword') == md5_hash
# There’s currently only two hash algorithm supported by Postgres (md5
# and SCRAM-SHA-256) so we should never reach this.
return False

def role_exists(self):
# If current_attributes is empty then the rolname wasn't in pg_authid, i.e. it doesn't exist
Expand Down
34 changes: 29 additions & 5 deletions tests/test_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,14 +382,38 @@ def test_set_attribute_value_valid_until(roleconf):
rolpassword=attr.create_md5_hash(ROLE1, 'supersecret'),
))
@pytest.mark.parametrize('desired_value, expected', [
('supersecret', True),
('incorrect_password', False)])
def test_is_same_password(roleconf, desired_value, expected):
('supersecret', True),
('md5c85aa4317b187e73a31e8ab775a10833', True),
('incorrect_password', False),
('SCRAM-SHA-256$4096:c3VwZXJzYWx0$E6lZT4K2olotsu19xYcF825iMPGdJQDYaklVS2mR6js=:Pe9DLNf8idnP59Q5l8Xmz3H+6LrTuiq//bcujQPGsRM=', False),
]
)
def test_is_same_password_md5(roleconf, desired_value, expected):
assert roleconf.is_same_password(desired_value) == expected


def test_is_same_password_if_empty(roleconf):
assert roleconf.is_same_password(None) is True
@nondefault_attributes(dict(
rolpassword=attr.create_scram_hash('supersecret', 'supersalt'.encode(), 4096),
))
@pytest.mark.parametrize('desired_value, expected', [
('supersecret', True),
('SCRAM-SHA-256$4096:c3VwZXJzYWx0$E6lZT4K2olotsu19xYcF825iMPGdJQDYaklVS2mR6js=:Pe9DLNf8idnP59Q5l8Xmz3H+6LrTuiq//bcujQPGsRM=', True),
('incorrect_password', False),
('md5c85aa4317b187e73a31e8ab775a10833', False),
]
)
def test_is_same_password_scram(roleconf, desired_value, expected):
assert roleconf.is_same_password(desired_value) == expected

@pytest.mark.parametrize('desired_value, expected', [
(None, True),
('incorrect_password', False),
('md5c85aa4317b187e73a31e8ab775a10833', False),
('SCRAM-SHA-256$4096:c3VwZXJzYWx0$E6lZT4K2olotsu19xYcF825iMPGdJQDYaklVS2mR6js=:Pe9DLNf8idnP59Q5l8Xmz3H+6LrTuiq//bcujQPGsRM=', False),
]
)
def test_is_same_password_if_empty(roleconf, desired_value, expected):
assert roleconf.is_same_password(desired_value) == expected


@nondefault_attributes(dict(
Expand Down

0 comments on commit 2c66ca1

Please sign in to comment.