diff --git a/tm_admin/users/users.py b/tm_admin/users/users.py index 69c4d549..33281683 100755 --- a/tm_admin/users/users.py +++ b/tm_admin/users/users.py @@ -30,20 +30,24 @@ from tm_admin.types_tm import Userrole, Mappinglevel, Teammemberfunctions import concurrent.futures from cpuinfo import get_cpu_info - +from atpbar import atpbar from tm_admin.dbsupport import DBSupport from tm_admin.users.users_class import UsersTable from osm_rawdata.postgres import uriParser, PostgresClient from tm_admin.types_tm import Userrole +from alive_progress import alive_bar +from tqdm import tqdm +from codetiming import Timer +import threading # Instantiate logger log = logging.getLogger(__name__) # The number of threads is based on the CPU cores info = get_cpu_info() -cores = info["count"] +cores = info["count"] * 2 -def updateThread( +def licensesThread( data: list, db: PostgresClient, ): @@ -53,8 +57,11 @@ def updateThread( data (list): The list of records to import db (PostgresClient): A database connection """ + array = "licenses" + column = "license" + for record in data: - sql = f" UPDATE users SET licenses = ARRAY[{record[0]['license']}] WHERE id={record[0]['user']}" + sql = f" UPDATE users SET {array} = ARRAY[{record[0]['{column}']}] WHERE id={record[0]['user']}" # print(sql) try: result = db.dbcursor.execute(f"{sql};") @@ -63,6 +70,33 @@ def updateThread( return True +def interestsThread( + interests: list, + db: PostgresClient, +): + """Thread to handle importing + + Args: + data (list): The list of records to import + db (PostgresClient): A database connection + """ + data = dict() + for record in interests: + entry = record[0] # there's only one item in the input data + if entry['user_id'] not in data: + data[entry['user_id']] = list() + data[entry['user_id']].append(entry['interest_id']) + + for uid, value in data.items(): + sql = f" UPDATE users SET interests = ARRAY{str(value)} WHERE id={uid}" + print(sql) + try: + result = db.dbcursor.execute(f"{sql};") + except: + return False + + return True + class UsersDB(DBSupport): def __init__(self, dburi: str = "localhost/tm_admin", @@ -83,10 +117,15 @@ def __init__(self, def mergeInterests(self): table = 'user_interests' - # FIXME: this shouldn't be hardcoded + log.info(f"Merging interests table...") + # One database connection per thread + tmpg = list() + for i in range(0, cores + 1): + # FIXME: this shouldn't be hardcoded + tmpg.append(PostgresClient('localhost/tm_admin')) pg = PostgresClient('localhost/tm4') sql = f"SELECT row_to_json({table}) as row FROM {table}" - print(sql) + # print(sql) try: result = pg.dbcursor.execute(sql) except: @@ -95,26 +134,37 @@ def mergeInterests(self): result = pg.dbcursor.fetchall() - data = dict() - for record in result: - entry = record[0] # there's only one item in the input data - if entry['user_id'] not in data: - data[entry['user_id']] = list() - data[entry['user_id']].append(entry['interest_id']) + entries = len(result) + log.debug(f"There are {entries} entries in {table}") + chunk = round(entries / cores) - for uid, value in data.items(): - sql = f" UPDATE users SET interests = ARRAY{str(value)} WHERE id={uid}" - print(sql) - try: - result = self.pg.dbcursor.execute(f"{sql};") - except: - return False + # if True: + # interestsThread(result, tmpg[0]) + + index = 0 + with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor: + # futures = list() + block = 0 + while block <= entries: + log.debug(f"Dispatching Block %d:%d" % (block, block + chunk)) + executor.submit(interestsThread, result[block : block + chunk], tmpg[index]) + # futures.append(result) + block += chunk + index += 1 + # for future in tqdm(futures, desc=f"Dispatching Block {block}:{block + chunk}", total=chunk): + # future.result() + executor.shutdown() + timer.stop return True def mergeLicenses(self): """Merge data from the TM user_licenses table into TM Admin.""" table = 'user_licenses' + log.info(f"Merging licenses table...") + timer = Timer(text="merging liceneses table took {seconds:.0f}s") + timer.start() + sql = f"SELECT row_to_json({table}) as row FROM {table}" # One database connection per thread tmpg = list() @@ -135,25 +185,33 @@ def mergeLicenses(self): chunk = round(entries / cores) # if True: - # importThread(data, tmpg[0]) + # licensesThread(data, tmpg[0]) index = 0 with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor: + # futures = list() block = 0 while block <= entries: log.debug("Dispatching Block %d:%d" % (block, block + chunk)) - result = executor.submit(updateThread, data[block : block + chunk], tmpg[index]) + result = executor.submit(licensesThread, data[block : block + chunk], tmpg[index]) + # futures.append(result) block += chunk index += 1 + # for future in tqdm(futures, desc=f"Dispatching Block {block}:{block + chunk}", total=chunk): + # future.result() executor.shutdown() + timer.stop return True def mergeTeam(self): table = 'team_members' # FIXME: this shouldn't be hardcoded! + log.info(f"Merging team members table...") + timer = Timer(text="merging team members table took {seconds:.0f}s") + timer.start() pg = PostgresClient('localhost/tm4') sql = f"SELECT row_to_json({table}) as row FROM {table}" - print(sql) + #print(sql) try: result = pg.dbcursor.execute(sql) except: @@ -165,7 +223,7 @@ def mergeTeam(self): func = record[0]['function'] tmfunc = Teammemberfunctions(func) sql = f"UPDATE {self.table} SET team_members.team={record[0]['team_id']}, team_members.active={record[0]['active']}, team_members.function='{tmfunc.name}' WHERE id={record[0]['user_id']}" - print(f"{sql};") + #print(f"{sql};") try: # FIXME: this fails to execute, but if I write the out to a file, # it works just fine. @@ -175,9 +233,15 @@ def mergeTeam(self): log.error(f"Couldn't execute query! '{sql}'") return False + timer.stop() + return True + def mergeFavorites(self): table = 'project_favorites' + log.info(f"Merging favorites table...") # FIXME: this shouldn't be hardcoded! + timer = Timer(text="merging favorites table took {seconds:.0f}s") + timer.start() pg = PostgresClient('localhost/tm4') sql = f"SELECT row_to_json({table}) as row FROM {table}" # print(sql) @@ -197,12 +261,12 @@ def mergeFavorites(self): for uid, value in data.items(): sql = f" UPDATE users SET favorite_projects = ARRAY{str(value)} WHERE id={uid}" - print(sql) + # print(sql) try: result = self.pg.dbcursor.execute(f"{sql};") except: return False - + timer.stop() return True # These are just convience wrappers to support the REST API. @@ -295,8 +359,8 @@ def main(): if user.mergeInterests(): log.info("UserDB.mergeInterests worked!") - #if user.mergeLicenses(): - # log.info("UserDB.mergeLicenses worked!") + if user.mergeLicenses(): + log.info("UserDB.mergeLicenses worked!") if user.mergeFavorites(): log.info("UserDB.mergeFavorites worked!")