From 25fd64552a5757a142b0f07a4774cab970333795 Mon Sep 17 00:00:00 2001 From: "patched.codes[bot]" <298395+patched.codes[bot]@users.noreply.github.com> Date: Thu, 19 Dec 2024 07:55:09 +0000 Subject: [PATCH] Patched sqli/dao/user.py --- sqli/dao/user.py | 39 +++++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/sqli/dao/user.py b/sqli/dao/user.py index 96fefd04..8837fd75 100644 --- a/sqli/dao/user.py +++ b/sqli/dao/user.py @@ -1,4 +1,5 @@ from typing import NamedTuple, Optional +import os from aiopg import Connection from cryptography.hazmat.primitives.kdf.argon2 import Argon2id @@ -11,6 +12,7 @@ class User(NamedTuple): last_name: str username: str pwd_hash: str + pwd_salt: bytes # Added field for unique salt is_admin: bool @classmethod @@ -22,7 +24,7 @@ async def get(conn: Connection, id_: int): async with conn.cursor() as cur: await cur.execute( 'SELECT id, first_name, middle_name, last_name, ' - 'username, pwd_hash, is_admin FROM users WHERE id = %s', + 'username, pwd_hash, pwd_salt, is_admin FROM users WHERE id = %s', (id_,), ) return User.from_raw(await cur.fetchone()) @@ -32,12 +34,41 @@ async def get_by_username(conn: Connection, username: str): async with conn.cursor() as cur: await cur.execute( 'SELECT id, first_name, middle_name, last_name, ' - 'username, pwd_hash, is_admin FROM users WHERE username = %s', + 'username, pwd_hash, pwd_salt, is_admin FROM users WHERE username = %s', (username,), ) return User.from_raw(await cur.fetchone()) - def check_password(self, password: str): - kdf = Argon2id(salt=b'secure_salt') + @staticmethod + async def create(conn: Connection, first_name: str, middle_name: Optional[str], + last_name: str, username: str, password: str, is_admin: bool = False): + # Generate a unique salt for this user + salt = os.urandom(16) + # Hash the password with the unique salt + kdf = Argon2id(salt=salt) + pwd_hashed = kdf.derive(password.encode('utf-8')) + + async with conn.cursor() as cur: + await cur.execute( + 'INSERT INTO users (first_name, middle_name, last_name, ' + 'username, pwd_hash, pwd_salt, is_admin) ' + 'VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id', + (first_name, middle_name, last_name, username, + pwd_hashed, salt, is_admin), + ) + user_id = (await cur.fetchone())[0] + return User(user_id, first_name, middle_name, last_name, + username, pwd_hashed, salt, is_admin) + + def check_password(self, password: str) -> bool: + """Check if the provided password matches the stored hash. + + Args: + password: The password to verify + + Returns: + bool: True if password matches, False otherwise + """ + kdf = Argon2id(salt=self.pwd_salt) pwd_hashed = kdf.derive(password.encode('utf-8')) return self.pwd_hash == pwd_hashed