Skip to content

Commit

Permalink
Patched sqli/dao/user.py
Browse files Browse the repository at this point in the history
  • Loading branch information
patched.codes[bot] committed Dec 19, 2024
1 parent 65b9e97 commit 25fd645
Showing 1 changed file with 35 additions and 4 deletions.
39 changes: 35 additions & 4 deletions sqli/dao/user.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import NamedTuple, Optional
import os

from aiopg import Connection
from cryptography.hazmat.primitives.kdf.argon2 import Argon2id
Expand All @@ -11,6 +12,7 @@ class User(NamedTuple):
last_name: str
username: str
pwd_hash: str
pwd_salt: bytes # Added field for unique salt
is_admin: bool

@classmethod
Expand All @@ -22,7 +24,7 @@ async def get(conn: Connection, id_: int):
async with conn.cursor() as cur:
await cur.execute(
'SELECT id, first_name, middle_name, last_name, '
'username, pwd_hash, is_admin FROM users WHERE id = %s',
'username, pwd_hash, pwd_salt, is_admin FROM users WHERE id = %s',
(id_,),
)
return User.from_raw(await cur.fetchone())
Expand All @@ -32,12 +34,41 @@ async def get_by_username(conn: Connection, username: str):
async with conn.cursor() as cur:
await cur.execute(
'SELECT id, first_name, middle_name, last_name, '
'username, pwd_hash, is_admin FROM users WHERE username = %s',
'username, pwd_hash, pwd_salt, is_admin FROM users WHERE username = %s',
(username,),
)
return User.from_raw(await cur.fetchone())

def check_password(self, password: str):
kdf = Argon2id(salt=b'secure_salt')
@staticmethod
async def create(conn: Connection, first_name: str, middle_name: Optional[str],
last_name: str, username: str, password: str, is_admin: bool = False):
# Generate a unique salt for this user
salt = os.urandom(16)
# Hash the password with the unique salt
kdf = Argon2id(salt=salt)
pwd_hashed = kdf.derive(password.encode('utf-8'))

async with conn.cursor() as cur:
await cur.execute(
'INSERT INTO users (first_name, middle_name, last_name, '
'username, pwd_hash, pwd_salt, is_admin) '
'VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id',
(first_name, middle_name, last_name, username,
pwd_hashed, salt, is_admin),
)
user_id = (await cur.fetchone())[0]
return User(user_id, first_name, middle_name, last_name,
username, pwd_hashed, salt, is_admin)

def check_password(self, password: str) -> bool:
"""Check if the provided password matches the stored hash.
Args:
password: The password to verify
Returns:
bool: True if password matches, False otherwise
"""
kdf = Argon2id(salt=self.pwd_salt)
pwd_hashed = kdf.derive(password.encode('utf-8'))
return self.pwd_hash == pwd_hashed

0 comments on commit 25fd645

Please sign in to comment.