diff --git a/autosubmit_api/autosubmit_legacy/job/job_list.py b/autosubmit_api/autosubmit_legacy/job/job_list.py index 828f2f78..c66e0ba6 100644 --- a/autosubmit_api/autosubmit_legacy/job/job_list.py +++ b/autosubmit_api/autosubmit_legacy/job/job_list.py @@ -113,8 +113,7 @@ def get_tree_structured_from_previous_run(expid, BasicConfig, run_id, chunk_unit chunk_size = experiment_run.chunk_size else: raise Exception("Autosubmit couldn't fin the experiment header information necessary to complete this request.") - job_list = job_data_structure.get_current_job_data( - run_id, all_states=True) + job_list = job_data_structure.get_current_job_data(run_id) if not job_list: return [], [], {} else: diff --git a/autosubmit_api/database/common.py b/autosubmit_api/database/common.py index defe7056..8b99d96b 100644 --- a/autosubmit_api/database/common.py +++ b/autosubmit_api/database/common.py @@ -55,14 +55,19 @@ def create_main_db_conn() -> Connection: return builder.product +def create_sqlite_db_engine(db_path: str) -> Engine: + """ + Create an engine for a SQLite DDBB. + """ + return create_engine(f"sqlite:///{ os.path.abspath(db_path)}", poolclass=NullPool) + + def create_autosubmit_db_engine() -> Engine: """ Create an engine for the autosubmit DDBB. Usually named autosubmit.db """ APIBasicConfig.read() - return create_engine( - f"sqlite:///{ os.path.abspath(APIBasicConfig.DB_PATH)}", poolclass=NullPool - ) + return create_sqlite_db_engine(APIBasicConfig.DB_PATH) def create_as_times_db_engine() -> Engine: @@ -71,7 +76,7 @@ def create_as_times_db_engine() -> Engine: """ APIBasicConfig.read() db_path = os.path.join(APIBasicConfig.DB_DIR, APIBasicConfig.AS_TIMES_DB) - return create_engine(f"sqlite:///{ os.path.abspath(db_path)}", poolclass=NullPool) + return create_sqlite_db_engine(db_path) def execute_with_limit_offset( diff --git a/autosubmit_api/database/db_jobdata.py b/autosubmit_api/database/db_jobdata.py index 321bdf59..08ff8bf8 100644 --- a/autosubmit_api/database/db_jobdata.py +++ b/autosubmit_api/database/db_jobdata.py @@ -19,26 +19,33 @@ import os import time -import textwrap import traceback -import sqlite3 import collections +from typing import List, Optional, Tuple import portalocker from datetime import datetime, timedelta from json import loads -from time import mktime +from autosubmit_api.logger import logger from autosubmit_api.components.jobs.utils import generate_job_html_title + # from networkx import DiGraph from autosubmit_api.config.basicConfig import APIBasicConfig from autosubmit_api.monitor.monitor import Monitor from autosubmit_api.performance.utils import calculate_ASYPD_perjob from autosubmit_api.components.jobs.job_factory import SimJob -from autosubmit_api.common.utils import get_jobs_with_no_outliers, Status, datechunk_to_year +from autosubmit_api.common.utils import ( + get_jobs_with_no_outliers, + Status, + datechunk_to_year, +) + # from autosubmitAPIwu.job.job_list # import autosubmitAPIwu.experiment.common_db_requests as DbRequests from bscearth.utils.date import Log -from autosubmit_api.persistance.experiment import ExperimentPaths +from autosubmit_api.repositories.experiment_run import create_experiment_run_repository +from autosubmit_api.repositories.graph_layout import create_exp_graph_layout_repository +from autosubmit_api.repositories.job_data import create_experiment_job_data_repository # Version 15 includes out err MaxRSS AveRSS and rowstatus @@ -425,137 +432,17 @@ def energy(self, energy): self._energy = energy if energy else 0 -class JobStepExtraData(): - def __init__(self, key, dict_data): - self.key = key - if isinstance(dict_data, dict): - # dict_data["ncpus"] if dict_data and "ncpus" in dict_data.keys( - self.ncpus = dict_data.get("ncpus", 0) if dict_data else 0 - # ) else 0 - self.nnodes = dict_data.get( - "nnodes", 0) if dict_data else 0 # and "nnodes" in dict_data.keys( - # ) else 0 - self.submit = int(mktime(datetime.strptime(dict_data["submit"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and "submit" in list(dict_data.keys( - )) else 0 - self.start = int(mktime(datetime.strptime(dict_data["start"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and "start" in list(dict_data.keys( - )) else 0 - self.finish = int(mktime(datetime.strptime(dict_data["finish"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and "finish" in list(dict_data.keys( - )) and dict_data["finish"] != "Unknown" else 0 - self.energy = parse_output_number(dict_data["energy"]) if dict_data and "energy" in list(dict_data.keys( - )) else 0 - # if dict_data and "MaxRSS" in dict_data.keys( - self.maxRSS = dict_data.get("MaxRSS", 0) - # ) else 0 - # if dict_data and "AveRSS" in dict_data.keys( - self.aveRSS = dict_data.get("AveRSS", 0) - # ) else 0 - else: - self.ncpus = 0 - self.nnodes = 0 - self.submit = 0 - self.start = 0 - self.finish = 0 - self.energy = 0 - self.maxRSS = 0 - self.aveRSS = 0 - - -class MainDataBase(): - def __init__(self, expid): - self.expid = expid - self.conn = None - self.conn_ec = None - self.create_table_query = None - self.db_version = None - - def create_connection(self, db_file): - """ - Create a database connection to the SQLite database specified by db_file. - :param db_file: database file name - :return: Connection object or None - """ - try: - conn = sqlite3.connect(db_file) - return conn - except Exception: - return None - - def create_table(self): - """ create a table from the create_table_sql statement - :param conn: Connection object - :param create_table_sql: a CREATE TABLE statement - :return: - """ - try: - if self.conn: - c = self.conn.cursor() - c.execute(self.create_table_query) - self.conn.commit() - else: - raise IOError("Not a valid connection") - except IOError as exp: - Log.warning(exp) - return None - except sqlite3.Error as e: - if _debug is True: - Log.info(traceback.format_exc()) - Log.warning("Error on create table : " + str(type(e).__name__)) - return None - - def create_index(self): - """ Creates index from statement defined in child class - """ - try: - if self.conn: - c = self.conn.cursor() - c.execute(self.create_index_query) - self.conn.commit() - else: - raise IOError("Not a valid connection") - except IOError as exp: - Log.warning(exp) - return None - except sqlite3.Error as e: - if _debug is True: - Log.info(traceback.format_exc()) - Log.debug(str(type(e).__name__)) - Log.warning("Error on create index . create_index") - return None - - -class ExperimentGraphDrawing(MainDataBase): - def __init__(self, expid): +class ExperimentGraphDrawing: + def __init__(self, expid: str): """ Sets and validates graph drawing. + :param expid: Name of experiment - :type expid: str - :param allJobs: list of all jobs objects (usually from job_list) - :type allJobs: list() """ - MainDataBase.__init__(self, expid) APIBasicConfig.read() self.expid = expid - exp_paths = ExperimentPaths(expid) self.folder_path = APIBasicConfig.LOCAL_ROOT_DIR - self.database_path = exp_paths.graph_data_db - self.create_table_query = textwrap.dedent( - '''CREATE TABLE - IF NOT EXISTS experiment_graph_draw ( - id INTEGER PRIMARY KEY, - job_name text NOT NULL, - x INTEGER NOT NULL, - y INTEGER NOT NULL - );''') - - if not os.path.exists(self.database_path): - os.umask(0) - if not os.path.exists(os.path.dirname(self.database_path)): - os.makedirs(os.path.dirname(self.database_path)) - os.open(self.database_path, os.O_WRONLY | os.O_CREAT, 0o777) - self.conn = self.create_connection(self.database_path) - self.create_table() - else: - self.conn = self.create_connection(self.database_path) + self.graph_data_repository = create_exp_graph_layout_repository(expid) self.lock_name = "calculation_in_progress.lock" self.current_position_dictionary = None self.current_jobs_set = set() @@ -607,7 +494,6 @@ def calculate_drawing(self, allJobs, independent=False, num_chunks=48, job_dicti lock_path_file = os.path.join(self.folder_path, lock_name) try: with portalocker.Lock(lock_path_file, timeout=1) as fh: - self.conn = self.create_connection(self.database_path) monitor = Monitor() graph = monitor.create_tree_list( self.expid, allJobs, None, dict(), False, job_dictionary) @@ -671,46 +557,35 @@ def set_current_position(self): self.current_position_dictionary = {row[1]: (row[2], row[3]) for row in current_table} self.current_jobs_set = set(self.current_position_dictionary.keys()) - def _get_current_position(self): + def _get_current_position(self) -> List[Tuple[int, str, int, int]]: """ Get all registers from experiment_graph_draw.\n :return: row content: id, job_name, x, y :rtype: 4-tuple (int, str, int, int) """ try: - if self.conn: - # conn = create_connection(DB_FILE_AS_TIMES) - self.conn.text_factory = str - cur = self.conn.cursor() - cur.execute( - "SELECT id, job_name, x, y FROM experiment_graph_draw") - rows = cur.fetchall() - return rows - return None + result = self.graph_data_repository.get_all() + return [ + (item.id, item.job_name, item.x, item.y) + for item in result + ] except Exception as exp: print((traceback.format_exc())) print((str(exp))) return None - def _insert_many_graph_coordinates(self, values): + def _insert_many_graph_coordinates( + self, values: List[Tuple[str, int, int]] + ) -> Optional[int]: """ Create many graph coordinates - :param conn: - :param details: - :return: """ try: - if self.conn: - # exp_id = self._get_id_db() - # conn = create_connection(DB_FILE_AS_TIMES) - # creation_date = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') - sql = ''' INSERT INTO experiment_graph_draw(job_name, x, y) VALUES(?,?,?) ''' - # print(row_content) - cur = self.conn.cursor() - cur.executemany(sql, values) - # print(cur) - self.conn.commit() - return cur.lastrowid + _vals = [ + {"job_name": item[0], "x": item[1], "y": item[2]} for item in values + ] + logger.debug(_vals) + return self.graph_data_repository.insert_many(_vals) except Exception as exp: print((traceback.format_exc())) Log.warning( @@ -722,310 +597,144 @@ def _clear_graph_database(self): Clear all content from graph drawing database """ try: - if self.conn: - # conn = create_connection(DB_FILE_AS_TIMES) - # modified_date = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') - sql = ''' DELETE FROM experiment_graph_draw ''' - cur = self.conn.cursor() - cur.execute(sql, ) - self.conn.commit() - return True - return False + self.graph_data_repository.delete_all() except Exception as exp: print((traceback.format_exc())) print(("Error on Database clear: {}".format(str(exp)))) return False + return True -class JobDataStructure(MainDataBase): - +class JobDataStructure: def __init__(self, expid: str, basic_config: APIBasicConfig): """Initializes the object based on the unique identifier of the experiment. Args: expid (str): Experiment identifier """ - MainDataBase.__init__(self, expid) - # BasicConfig.read() - # self.expid = expid - self.folder_path = basic_config.JOBDATA_DIR - exp_paths = ExperimentPaths(expid) - self.database_path = exp_paths.job_data_db - # self.conn = None - self.db_version = None - # self.jobdata_list = JobDataList(self.expid) - self.create_index_query = textwrap.dedent(''' - CREATE INDEX IF NOT EXISTS ID_JOB_NAME ON job_data(job_name); - ''') - if not os.path.exists(self.database_path): - self.conn = None - else: - self.conn = self.create_connection(self.database_path) - self.db_version = self._select_pragma_version() - # self.query_job_historic = None - # Historic only working on DB 12 now - self.query_job_historic = "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id FROM job_data WHERE job_name=? ORDER BY counter DESC" - - if self.db_version < DB_VERSION_SCHEMA_CHANGES: - try: - self.create_index() - except Exception as exp: - print(exp) - pass + self.expid = expid + self.experiment_run_data_repository = create_experiment_run_repository(expid) + self.experiment_job_data_repository = create_experiment_job_data_repository( + expid + ) def __str__(self): - return '{} {}'.format("Data structure. Version:", self.db_version) + return f"Run and job data of experiment {self.expid}" - def get_max_id_experiment_run(self): + def get_max_id_experiment_run(self) -> Optional[ExperimentRun]: """ Get last (max) experiment run object. :return: ExperimentRun data :rtype: ExperimentRun object """ try: - # expe = list() - if not os.path.exists(self.database_path): - raise Exception("Job data folder not found {0} or the database version is outdated.".format(str(self.database_path))) - if self.db_version < DB_VERSION_SCHEMA_CHANGES: - print(("Job database version {0} outdated.".format(str(self.db_version)))) - if os.path.exists(self.database_path) and self.db_version >= DB_VERSION_SCHEMA_CHANGES: - modified_time = int(os.stat(self.database_path).st_mtime) - current_experiment_run = self._get_max_id_experiment_run() - if current_experiment_run: - exprun_item = ExperimentRunItem_14( - *current_experiment_run) if self.db_version >= DB_EXPERIMENT_HEADER_SCHEMA_CHANGES else ExperimentRunItem(*current_experiment_run) - return ExperimentRun(exprun_item.run_id, exprun_item.created, exprun_item.start, exprun_item.finish, exprun_item.chunk_unit, exprun_item.chunk_size, exprun_item.completed, exprun_item.total, exprun_item.failed, exprun_item.queuing, exprun_item.running, exprun_item.submitted, exprun_item.suspended if self.db_version >= DB_EXPERIMENT_HEADER_SCHEMA_CHANGES else 0, exprun_item.metadata if self.db_version >= DB_EXPERIMENT_HEADER_SCHEMA_CHANGES else "", modified_time) - else: - return None - else: - raise Exception("Job data folder not found {0} or the database version is outdated.".format( - str(self.database_path))) - except Exception as exp: - print((str(exp))) + current_experiment_run = self.experiment_run_data_repository.get_last_run() + return ExperimentRun( + current_experiment_run.run_id, + current_experiment_run.created, + current_experiment_run.start, + current_experiment_run.finish, + current_experiment_run.chunk_unit, + current_experiment_run.chunk_size, + current_experiment_run.completed, + current_experiment_run.total, + current_experiment_run.failed, + current_experiment_run.queuing, + current_experiment_run.running, + current_experiment_run.submitted, + current_experiment_run.suspended, + current_experiment_run.metadata, + current_experiment_run.modified, + ) + except Exception as exc: + print((str(exc))) print((traceback.format_exc())) return None - def get_experiment_run_by_id(self, run_id): + def get_experiment_run_by_id(self, run_id: int) -> Optional[ExperimentRun]: """ Get experiment run stored in database by run_id """ try: - # expe = list() - if os.path.exists(self.folder_path) and self.db_version >= DB_VERSION_SCHEMA_CHANGES: - result = None - current_experiment_run = self._get_experiment_run_by_id(run_id) - if current_experiment_run: - # for run in current_experiment_run: - exprun_item = ExperimentRunItem_14( - *current_experiment_run) if self.db_version >= DB_EXPERIMENT_HEADER_SCHEMA_CHANGES else ExperimentRunItem(*current_experiment_run) - result = ExperimentRun(exprun_item.run_id, exprun_item.created, exprun_item.start, exprun_item.finish, exprun_item.chunk_unit, exprun_item.chunk_size, exprun_item.completed, exprun_item.total, exprun_item.failed, exprun_item.queuing, - exprun_item.running, exprun_item.submitted, exprun_item.suspended if self.db_version >= DB_EXPERIMENT_HEADER_SCHEMA_CHANGES else 0, exprun_item.metadata if self.db_version >= DB_EXPERIMENT_HEADER_SCHEMA_CHANGES else "") - return result - else: - return None - else: - raise Exception("Job data folder not found {0} or the database version is outdated.".format( - str(self.database_path))) - except Exception as exp: + current_experiment_run = self.experiment_run_data_repository.get_run_by_id( + run_id + ) + return ExperimentRun( + current_experiment_run.run_id, + current_experiment_run.created, + current_experiment_run.start, + current_experiment_run.finish, + current_experiment_run.chunk_unit, + current_experiment_run.chunk_size, + current_experiment_run.completed, + current_experiment_run.total, + current_experiment_run.failed, + current_experiment_run.queuing, + current_experiment_run.running, + current_experiment_run.submitted, + current_experiment_run.suspended, + current_experiment_run.metadata, + current_experiment_run.modified, + ) + except Exception as exc: if _debug is True: Log.info(traceback.format_exc()) Log.debug(traceback.format_exc()) Log.warning( - "Autosubmit couldn't retrieve experiment run. get_experiment_run_by_id. Exception {0}".format(str(exp))) + "Autosubmit couldn't retrieve experiment run. get_experiment_run_by_id. Exception {0}".format( + str(exc) + ) + ) return None - def get_current_job_data(self, run_id, all_states=False): + def get_current_job_data(self, run_id: int) -> Optional[List[JobData]]: """ Gets the job historical data for a run_id. :param run_id: Run identifier :type run_id: int - :param all_states: False if only last=1 should be included, otherwise all rows - :return: List of jobdata rows - :rtype: list() of JobData objects """ try: + current_job_data = ( + self.experiment_job_data_repository.get_last_job_data_by_run_id(run_id) + ) + current_collection = [] - if self.db_version < DB_VERSION_SCHEMA_CHANGES: - raise Exception("This function requieres a newer DB version.") - if os.path.exists(self.folder_path): - current_job_data = self._get_current_job_data( - run_id, all_states) - if current_job_data: - for job_data in current_job_data: - if self.db_version >= CURRENT_DB_VERSION: - jobitem = JobItem_15(*job_data) - current_collection.append(JobData(jobitem.id, jobitem.counter, jobitem.job_name, jobitem.created, jobitem.modified, jobitem.submit, jobitem.start, jobitem.finish, jobitem.status, jobitem.rowtype, jobitem.ncpus, - jobitem.wallclock, jobitem.qos, jobitem.energy, jobitem.date, jobitem.section, jobitem.member, jobitem.chunk, jobitem.last, jobitem.platform, jobitem.job_id, jobitem.extra_data, jobitem.nnodes, jobitem.run_id, jobitem.MaxRSS, jobitem.AveRSS, jobitem.out, jobitem.err, jobitem.rowstatus)) - else: - jobitem = JobItem_12(*job_data) - current_collection.append(JobData(jobitem.id, jobitem.counter, jobitem.job_name, jobitem.created, jobitem.modified, jobitem.submit, jobitem.start, jobitem.finish, jobitem.status, jobitem.rowtype, jobitem.ncpus, - jobitem.wallclock, jobitem.qos, jobitem.energy, jobitem.date, jobitem.section, jobitem.member, jobitem.chunk, jobitem.last, jobitem.platform, jobitem.job_id, jobitem.extra_data, jobitem.nnodes, jobitem.run_id)) - return current_collection - return None + for job_data in current_job_data: + current_collection.append( + JobData( + _id=job_data.id, + counter=job_data.counter, + job_name=job_data.job_name, + created=job_data.created, + modified=job_data.modified, + submit=job_data.submit, + start=job_data.start, + finish=job_data.finish, + status=job_data.status, + rowtype=job_data.rowtype, + ncpus=job_data.ncpus, + wallclock=job_data.wallclock, + qos=job_data.qos, + energy=job_data.energy, + date=job_data.date, + section=job_data.section, + member=job_data.member, + chunk=job_data.chunk, + last=job_data.last, + platform=job_data.platform, + job_id=job_data.job_id, + extra_data=job_data.extra_data, + nnodes=job_data.nnodes, + run_id=job_data.run_id, + MaxRSS=job_data.MaxRSS, + AveRSS=job_data.AveRSS, + out=job_data.out, + err=job_data.err, + rowstatus=job_data.rowstatus, + ) + ) + + return current_collection except Exception: print((traceback.format_exc())) - print(( - "Error on returning current job data. run_id {0}".format(run_id))) - return None - - def _get_experiment_run_by_id(self, run_id): - """ - :param run_id: Run Identifier - :type run_id: int - :return: First row that matches the run_id - :rtype: Row as Tuple - """ - try: - if self.conn: - self.conn.text_factory = str - cur = self.conn.cursor() - if self.db_version >= DB_EXPERIMENT_HEADER_SCHEMA_CHANGES: - cur.execute( - "SELECT run_id,created,start,finish,chunk_unit,chunk_size,completed,total,failed,queuing,running,submitted,suspended, metadata FROM experiment_run WHERE run_id=? and total > 0 ORDER BY run_id DESC", (run_id,)) - else: - cur.execute( - "SELECT run_id,created,start,finish,chunk_unit,chunk_size,completed,total,failed,queuing,running,submitted FROM experiment_run WHERE run_id=? and total > 0 ORDER BY run_id DESC", (run_id,)) - rows = cur.fetchall() - if len(rows) > 0: - return rows[0] - else: - return None - else: - raise Exception("Not a valid connection.") - except sqlite3.Error: - if _debug is True: - print((traceback.format_exc())) - print(("Error while retrieving run {0} information. {1}".format( - run_id, "_get_experiment_run_by_id"))) - return None - - def _select_pragma_version(self): - """ Retrieves user_version from database - """ - try: - if self.conn: - self.conn.text_factory = str - cur = self.conn.cursor() - cur.execute("pragma user_version;") - rows = cur.fetchall() - # print("Result {0}".format(str(rows))) - if len(rows) > 0: - # print(rows) - # print("Row " + str(rows[0])) - result, = rows[0] - # print(result) - return int(result) if result >= 0 else None - else: - # Starting value - return None - except sqlite3.Error as e: - if _debug is True: - Log.info(traceback.format_exc()) - Log.debug(traceback.format_exc()) - Log.warning("Error while retrieving version: " + - str(type(e).__name__)) - return None - - def _get_max_id_experiment_run(self): - """Return the max id from experiment_run - - :return: max run_id, None - :rtype: int, None - """ - try: - if self.conn: - self.conn.text_factory = str - cur = self.conn.cursor() - if self.db_version >= DB_EXPERIMENT_HEADER_SCHEMA_CHANGES: - cur.execute( - "SELECT run_id,created,start,finish,chunk_unit,chunk_size,completed,total,failed,queuing,running,submitted,suspended, metadata from experiment_run ORDER BY run_id DESC LIMIT 0, 1") - else: - cur.execute( - "SELECT run_id,created,start,finish,chunk_unit,chunk_size,completed,total,failed,queuing,running,submitted from experiment_run ORDER BY run_id DESC LIMIT 0, 1") - rows = cur.fetchall() - if len(rows) > 0: - return rows[0] - else: - return None - return None - except sqlite3.Error as e: - if _debug is True: - Log.info(traceback.format_exc()) - Log.debug(traceback.format_exc()) - Log.warning("Error on select max run_id : " + - str(type(e).__name__)) - return None - - def _get_current_job_data(self, run_id, all_states=False): - """ - Get JobData by run_id. - :param run_id: Run Identifier - :type run_id: int - :param all_states: False if only last=1, True all - :type all_states: bool - """ - try: - if self.conn: - # print("Run {0} states {1} db {2}".format( - # run_id, all_states, self.db_version)) - self.conn.text_factory = str - cur = self.conn.cursor() - request_string = "" - if all_states is False: - if self.db_version >= CURRENT_DB_VERSION: - request_string = "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus from job_data WHERE run_id=? and last=1 and finish > 0 and rowtype >= 2 ORDER BY id" - else: - request_string = "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id from job_data WHERE run_id=? and last=1 and finish > 0 and rowtype >= 2 ORDER BY id" - - else: - if self.db_version >= CURRENT_DB_VERSION: - request_string = "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus from job_data WHERE run_id=? and rowtype >= 2 ORDER BY id" - else: - request_string = "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id from job_data WHERE run_id=? and rowtype >= 2 ORDER BY id" - - cur.execute(request_string, (run_id,)) - rows = cur.fetchall() - # print(rows) - if len(rows) > 0: - return rows - else: - return None - except sqlite3.Error as e: - if _debug is True: - print((traceback.format_exc())) - print(("Error on select job data: {0}".format( - str(type(e).__name__)))) + print(("Error on returning current job data. run_id {0}".format(run_id))) return None - - -def parse_output_number(string_number): - """ - Parses number in format 1.0K 1.0M 1.0G - - :param string_number: String representation of number - :type string_number: str - :return: number in float format - :rtype: float - """ - number = 0.0 - if (string_number): - if string_number == "NA": - return 0.0 - last_letter = string_number.strip()[-1] - multiplier = 1.0 - if last_letter == "G": - multiplier = 1000000000.0 - number = string_number[:-1] - elif last_letter == "M": - multiplier = 1000000.0 - number = string_number[:-1] - elif last_letter == "K": - multiplier = 1000.0 - number = string_number[:-1] - else: - number = string_number - try: - number = float(number) * multiplier - except Exception: - number = 0.0 - pass - return number diff --git a/autosubmit_api/database/tables.py b/autosubmit_api/database/tables.py index a1fd39dc..510a99eb 100644 --- a/autosubmit_api/database/tables.py +++ b/autosubmit_api/database/tables.py @@ -1,4 +1,13 @@ -from sqlalchemy import MetaData, Integer, String, Text, Table +from sqlalchemy import ( + Column, + Float, + MetaData, + Integer, + String, + Text, + Table, + UniqueConstraint, +) from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped @@ -52,18 +61,16 @@ class ExperimentStatusTable(BaseTable): modified: Mapped[str] = mapped_column(Text, nullable=False) -class GraphDataTable(BaseTable): - """ - Stores the coordinates and it is used exclusively to speed up the process - of generating the graph layout - """ - - __tablename__ = "experiment_graph_draw" - - id: Mapped[int] = mapped_column(Integer, primary_key=True) - job_name: Mapped[str] = mapped_column(Text, nullable=False) - x: Mapped[int] = mapped_column(Integer, nullable=False) - y: Mapped[int] = mapped_column(Integer, nullable=False) +GraphDataTable = Table( + "experiment_graph_draw", + metadata_obj, + Column("id", Integer, primary_key=True), + Column("job_name", Text, nullable=False), + Column("x", Integer, nullable=False), + Column("y", Integer, nullable=False), +) +"""Stores the coordinates and it is used exclusively +to speed up the process of generating the graph layout""" class JobPackageTable(BaseTable): @@ -103,8 +110,65 @@ class WrapperJobPackageTable(BaseTable): experiment_status_table: Table = ExperimentStatusTable.__table__ # Graph Data TABLES -graph_data_table: Table = GraphDataTable.__table__ +graph_data_table: Table = GraphDataTable # Job package TABLES job_package_table: Table = JobPackageTable.__table__ -wrapper_job_package_table: Table = WrapperJobPackageTable.__table__ \ No newline at end of file +wrapper_job_package_table: Table = WrapperJobPackageTable.__table__ + +ExperimentRunTable = Table( + "experiment_run", + metadata_obj, + Column("run_id", Integer, primary_key=True), + Column("created", Text, nullable=False), + Column("modified", Text, nullable=True), + Column("start", Integer, nullable=False), + Column("finish", Integer), + Column("chunk_unit", Text, nullable=False), + Column("chunk_size", Integer, nullable=False), + Column("completed", Integer, nullable=False), + Column("total", Integer, nullable=False), + Column("failed", Integer, nullable=False), + Column("queuing", Integer, nullable=False), + Column("running", Integer, nullable=False), + Column("submitted", Integer, nullable=False), + Column("suspended", Integer, nullable=False, default=0), + Column("metadata", Text), +) + +JobDataTable = Table( + "job_data", + metadata_obj, + Column("id", Integer, nullable=False, primary_key=True), + Column("counter", Integer, nullable=False), + Column("job_name", Text, nullable=False, index=True), + Column("created", Text, nullable=False), + Column("modified", Text, nullable=False), + Column("submit", Integer, nullable=False), + Column("start", Integer, nullable=False), + Column("finish", Integer, nullable=False), + Column("status", Text, nullable=False), + Column("rowtype", Integer, nullable=False), + Column("ncpus", Integer, nullable=False), + Column("wallclock", Text, nullable=False), + Column("qos", Text, nullable=False), + Column("energy", Integer, nullable=False), + Column("date", Text, nullable=False), + Column("section", Text, nullable=False), + Column("member", Text, nullable=False), + Column("chunk", Integer, nullable=False), + Column("last", Integer, nullable=False), + Column("platform", Text, nullable=False), + Column("job_id", Integer, nullable=False), + Column("extra_data", Text, nullable=False), + Column("nnodes", Integer, nullable=False, default=0), + Column("run_id", Integer), + Column("MaxRSS", Float, nullable=False, default=0.0), + Column("AveRSS", Float, nullable=False, default=0.0), + Column("out", Text, nullable=False), + Column("err", Text, nullable=False), + Column("rowstatus", Integer, nullable=False, default=0), + Column("children", Text, nullable=True), + Column("platform_output", Text, nullable=True), + UniqueConstraint("counter", "job_name", name="unique_counter_and_job_name"), +) diff --git a/autosubmit_api/repositories/__init__.py b/autosubmit_api/repositories/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/autosubmit_api/repositories/experiment.py b/autosubmit_api/repositories/experiment.py new file mode 100644 index 00000000..f2053c0c --- /dev/null +++ b/autosubmit_api/repositories/experiment.py @@ -0,0 +1,73 @@ +from abc import ABC, abstractmethod +from typing import List, Optional +from pydantic import BaseModel +from sqlalchemy import Engine, Table +from autosubmit_api.database import tables +from autosubmit_api.database.common import create_autosubmit_db_engine + + +class ExperimentModel(BaseModel): + id: int + name: str + description: Optional[str] = None + autosubmit_version: Optional[str] = None + + +class ExperimentRepository(ABC): + @abstractmethod + def get_all(self) -> List[ExperimentModel]: + """ + Get all the experiments + + :return experiments: The list of experiments + """ + pass + + @abstractmethod + def get_by_expid(self, expid: str) -> ExperimentModel: + """ + Get the experiment by expid + + :param expid: The experiment id + :return experiment: The experiment + :raises ValueError: If the experiment is not found + """ + pass + + +class ExperimentSQLRepository(ExperimentRepository): + def __init__(self, engine: Engine, table: Table): + self.engine = engine + self.table = table + + def get_all(self): + with self.engine.connect() as conn: + statement = self.table.select() + result = conn.execute(statement).all() + return [ + ExperimentModel( + id=row.id, + name=row.name, + description=row.description, + autosubmit_version=row.autosubmit_version, + ) + for row in result + ] + + def get_by_expid(self, expid: str): + with self.engine.connect() as conn: + statement = self.table.select().where(self.table.c.name == expid) + result = conn.execute(statement).first() + if result is None: + raise ValueError(f"Experiment with id {expid} not found") + return ExperimentModel( + id=result.id, + name=result.name, + description=result.description, + autosubmit_version=result.autosubmit_version, + ) + + +def create_experiment_repository() -> ExperimentRepository: + engine = create_autosubmit_db_engine() + return ExperimentSQLRepository(engine, tables.experiment_table) diff --git a/autosubmit_api/repositories/experiment_run.py b/autosubmit_api/repositories/experiment_run.py new file mode 100644 index 00000000..f52cc82c --- /dev/null +++ b/autosubmit_api/repositories/experiment_run.py @@ -0,0 +1,92 @@ +from abc import ABC, abstractmethod +from typing import Any, List +from pydantic import BaseModel +from sqlalchemy import Engine, Table +from sqlalchemy.schema import CreateTable +from autosubmit_api.database import tables +from autosubmit_api.database.common import create_sqlite_db_engine +from autosubmit_api.persistance.experiment import ExperimentPaths + + +class ExperimentRunModel(BaseModel): + run_id: Any + created: Any + modified: Any + start: Any + finish: Any + chunk_unit: Any + chunk_size: Any + completed: Any + total: Any + failed: Any + queuing: Any + running: Any + submitted: Any + suspended: Any + metadata: Any + + +class ExperimentRunRepository(ABC): + @abstractmethod + def get_all(self) -> List[ExperimentRunModel]: + """ + Gets all runs of the experiment + """ + + @abstractmethod + def get_last_run(self) -> ExperimentRunModel: + """ + Gets last run of the experiment. Raises ValueError if no runs found. + """ + + @abstractmethod + def get_run_by_id(self, run_id: int) -> ExperimentRunModel: + """ + Gets run by id. Raises ValueError if run not found. + """ + + +class ExperimentRunSQLRepository(ExperimentRunRepository): + def __init__(self, expid: str, engine: Engine, table: Table): + self.engine = engine + self.table = table + self.expid = expid + + with self.engine.connect() as conn: + conn.execute(CreateTable(self.table, if_not_exists=True)) + conn.commit() + + def get_all(self): + with self.engine.connect() as conn: + statement = self.table.select() + result = conn.execute(statement).all() + + return [ + ExperimentRunModel.model_validate(row, from_attributes=True) + for row in result + ] + + def get_last_run(self): + with self.engine.connect() as conn: + statement = ( + self.table.select().order_by(self.table.c.run_id.desc()) + ) + result = conn.execute(statement).first() + if result is None: + raise ValueError(f"No runs found for experiment {self.expid}") + return ExperimentRunModel.model_validate(result, from_attributes=True) + + def get_run_by_id(self, run_id: int): + with self.engine.connect() as conn: + statement = self.table.select().where(self.table.c.run_id == run_id) + result = conn.execute(statement).first() + if result is None: + raise ValueError( + f"Run with id {run_id} not found for experiment {self.expid}" + ) + return ExperimentRunModel.model_validate(result, from_attributes=True) + + +def create_experiment_run_repository(expid: str): + engine = create_sqlite_db_engine(ExperimentPaths(expid).job_data_db) + return ExperimentRunSQLRepository(expid, engine, tables.ExperimentRunTable) diff --git a/autosubmit_api/repositories/graph_layout.py b/autosubmit_api/repositories/graph_layout.py new file mode 100644 index 00000000..3e267265 --- /dev/null +++ b/autosubmit_api/repositories/graph_layout.py @@ -0,0 +1,72 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Union +from pydantic import BaseModel +from sqlalchemy import Engine, Table +from sqlalchemy.schema import CreateTable +from autosubmit_api.database import tables +from autosubmit_api.database.common import create_sqlite_db_engine +from autosubmit_api.persistance.experiment import ExperimentPaths + + +class ExpGraphLayoutModel(BaseModel): + id: Union[int, Any] + job_name: Union[str, Any] + x: Union[float, Any] + y: Union[float, Any] + + +class ExpGraphLayoutRepository(ABC): + @abstractmethod + def get_all(self) -> List[ExpGraphLayoutModel]: + """ + Get all the graph layout data. + """ + + def delete_all(self) -> int: + """ + Delete all the graph layout data. + """ + + def insert_many(self, values: List[Dict[str, Any]]) -> int: + """ + Insert many graph layout data. + """ + + +class ExpGraphLayoutSQLRepository(ExpGraphLayoutRepository): + def __init__(self, expid: str, engine: Engine, table: Table): + self.expid = expid + self.engine = engine + self.table = table + + with self.engine.connect() as conn: + conn.execute(CreateTable(self.table, if_not_exists=True)) + conn.commit() + + def get_all(self) -> List[ExpGraphLayoutModel]: + with self.engine.connect() as conn: + statement = self.table.select() + result = conn.execute(statement).all() + return [ + ExpGraphLayoutModel(id=row.id, job_name=row.job_name, x=row.x, y=row.y) + for row in result + ] + + def delete_all(self) -> int: + with self.engine.connect() as conn: + statement = self.table.delete() + result = conn.execute(statement) + conn.commit() + return result.rowcount + + def insert_many(self, values) -> int: + with self.engine.connect() as conn: + statement = self.table.insert() + result = conn.execute(statement, values) + conn.commit() + return result.rowcount + + +def create_exp_graph_layout_repository(expid: str) -> ExpGraphLayoutRepository: + engine = create_sqlite_db_engine(ExperimentPaths(expid).graph_data_db) + return ExpGraphLayoutSQLRepository(expid, engine, tables.GraphDataTable) diff --git a/autosubmit_api/repositories/job_data.py b/autosubmit_api/repositories/job_data.py new file mode 100644 index 00000000..72cac369 --- /dev/null +++ b/autosubmit_api/repositories/job_data.py @@ -0,0 +1,194 @@ +from abc import ABC, abstractmethod +from typing import Any, List +from pydantic import BaseModel +from sqlalchemy import Engine, Table, or_, Index +from sqlalchemy.schema import CreateTable +from autosubmit_api.database import tables +from autosubmit_api.database.common import create_sqlite_db_engine +from autosubmit_api.persistance.experiment import ExperimentPaths + + +class ExperimentJobDataModel(BaseModel): + id: Any + counter: Any + job_name: Any + created: Any + modified: Any + submit: Any + start: Any + finish: Any + status: Any + rowtype: Any + ncpus: Any + wallclock: Any + qos: Any + energy: Any + date: Any + section: Any + member: Any + chunk: Any + last: Any + platform: Any + job_id: Any + extra_data: Any + nnodes: Any + run_id: Any + MaxRSS: Any + AveRSS: Any + out: Any + err: Any + rowstatus: Any + children: Any + + +class ExperimentJobDataRepository(ABC): + @abstractmethod + def get_last_job_data_by_run_id(self, run_id: int) -> List[ExperimentJobDataModel]: + """ + Gets last job data of an specific run id + """ + + @abstractmethod + def get_last_job_data(self) -> List[ExperimentJobDataModel]: + """ + Gets last job data + """ + + @abstractmethod + def get_jobs_by_name(self, job_name: str) -> List[ExperimentJobDataModel]: + """ + Gets historical job data by job_name + """ + + @abstractmethod + def get_all(self) -> List[ExperimentJobDataModel]: + """ + Gets all job data + """ + + @abstractmethod + def get_job_data_COMPLETED_by_rowtype_run_id( + self, rowtype: int, run_id: int + ) -> List[ExperimentJobDataModel]: + """ + Gets job data by rowtype and run id + """ + + @abstractmethod + def get_job_data_COMPLETD_by_section( + self, section: str + ) -> List[ExperimentJobDataModel]: + """ + Gets job data by section + """ + + +class ExperimentJobDataSQLRepository(ExperimentJobDataRepository): + def __init__(self, expid: str, engine: Engine, table: Table): + self.engine = engine + self.table = table + self.expid = expid + + with self.engine.connect() as conn: + conn.execute(CreateTable(self.table, if_not_exists=True)) + Index("ID_JOB_NAME", self.table.c.job_name).create(conn, checkfirst=True) + conn.commit() + + def get_last_job_data_by_run_id(self, run_id: int): + with self.engine.connect() as conn: + statement = ( + self.table.select() + .where( + (self.table.c.run_id == run_id), + (self.table.c.rowtype >= 2), + ) + .order_by(self.table.c.id.desc()) + ) + result = conn.execute(statement).all() + + return [ + ExperimentJobDataModel.model_validate(row, from_attributes=True) + for row in result + ] + + def get_last_job_data(self): + with self.engine.connect() as conn: + statement = self.table.select().where( + (self.table.c.last == 1), + (self.table.c.rowtype >= 2), + ) + result = conn.execute(statement).all() + + return [ + ExperimentJobDataModel.model_validate(row, from_attributes=True) + for row in result + ] + + def get_jobs_by_name(self, job_name: str): + with self.engine.connect() as conn: + statement = ( + self.table.select() + .where(self.table.c.job_name == job_name) + .order_by(self.table.c.counter.desc()) + ) + result = conn.execute(statement).all() + + return [ + ExperimentJobDataModel.model_validate(row, from_attributes=True) + for row in result + ] + + def get_all(self): + with self.engine.connect() as conn: + statement = ( + self.table.select().where(self.table.c.id > 0).order_by(self.table.c.id) + ) + result = conn.execute(statement).all() + + return [ + ExperimentJobDataModel.model_validate(row, from_attributes=True) + for row in result + ] + + def get_job_data_COMPLETED_by_rowtype_run_id(self, rowtype: int, run_id: int): + with self.engine.connect() as conn: + statement = ( + self.table.select() + .where( + (self.table.c.rowtype == rowtype), + (self.table.c.run_id == run_id), + (self.table.c.status == "COMPLETED"), + ) + .order_by(self.table.c.id) + ) + result = conn.execute(statement).all() + + return [ + ExperimentJobDataModel.model_validate(row, from_attributes=True) + for row in result + ] + + def get_job_data_COMPLETD_by_section(self, section: str): + with self.engine.connect() as conn: + statement = ( + self.table.select() + .where( + (self.table.c.status == "COMPLETED"), + or_( + (self.table.c.section == section), + (self.table.c.member == section), + ), + ) + .order_by(self.table.c.id) + ) + result = conn.execute(statement).all() + + return [ + ExperimentJobDataModel.model_validate(row, from_attributes=True) + for row in result + ] + + +def create_experiment_job_data_repository(expid: str): + engine = create_sqlite_db_engine(ExperimentPaths(expid).job_data_db) + return ExperimentJobDataSQLRepository(expid, engine, tables.JobDataTable) diff --git a/tests/test_graph.py b/tests/test_graph.py index 0dc5beda..71f6a025 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -1,6 +1,3 @@ -import os - -from sqlalchemy import create_engine from autosubmit_api.builders.configuration_facade_builder import ( AutosubmitConfigurationFacadeBuilder, ConfigurationFacadeDirector, @@ -9,14 +6,12 @@ JobListLoaderBuilder, JobListLoaderDirector, ) -from autosubmit_api.database import tables from autosubmit_api.database.db_jobdata import ExperimentGraphDrawing from autosubmit_api.monitor.monitor import Monitor -from autosubmit_api.persistance.experiment import ExperimentPaths +from autosubmit_api.repositories.graph_layout import create_exp_graph_layout_repository class TestPopulateDB: - def test_monitor_dot(self, fixture_mock_basic_config): expid = "a003" job_list_loader = JobListLoaderDirector( @@ -44,32 +39,33 @@ def test_process_graph(self, fixture_mock_basic_config): JobListLoaderBuilder(expid) ).build_loaded_joblist_loader() + assert len(job_list_loader.jobs) == 8 + autosubmit_configuration_facade = ConfigurationFacadeDirector( AutosubmitConfigurationFacadeBuilder(expid) ).build_autosubmit_configuration_facade() - exp_paths = ExperimentPaths(expid) - with create_engine( - f"sqlite:///{ os.path.abspath(exp_paths.graph_data_db)}" - ).connect() as conn: - conn.execute(tables.graph_data_table.delete()) - conn.commit() + # Create repository handler + graph_draw_db = create_exp_graph_layout_repository(expid) - experimentGraphDrawing.calculate_drawing( - allJobs=job_list_loader.jobs, - independent=False, - num_chunks=autosubmit_configuration_facade.chunk_size, - job_dictionary=job_list_loader.job_dictionary, - ) + # Delete content of table + graph_draw_db.delete_all() - assert ( - experimentGraphDrawing.coordinates - and len(experimentGraphDrawing.coordinates) == 8 - ) + experimentGraphDrawing.calculate_drawing( + allJobs=job_list_loader.jobs, + independent=False, + num_chunks=autosubmit_configuration_facade.chunk_size, + job_dictionary=job_list_loader.job_dictionary, + ) + + assert ( + isinstance(experimentGraphDrawing.coordinates, list) + and len(experimentGraphDrawing.coordinates) == 8 + ) - rows = conn.execute(tables.graph_data_table.select()).all() + rows = graph_draw_db.get_all() - assert len(rows) == 8 - for job in rows: - job_name: str = job.job_name - assert job_name.startswith(expid) + assert len(rows) == 8 + for job in rows: + job_name: str = job.job_name + assert job_name.startswith(expid) diff --git a/tests/test_jobdata.py b/tests/test_jobdata.py new file mode 100644 index 00000000..a74d13e5 --- /dev/null +++ b/tests/test_jobdata.py @@ -0,0 +1,36 @@ +from autosubmit_api.database.db_jobdata import JobDataStructure, ExperimentRun + + +class TestJobDataStructure: + def test_valid_operations(self, fixture_mock_basic_config): + expid = "a003" + job_data_db = JobDataStructure(expid, None) + + last_exp_run = job_data_db.get_max_id_experiment_run() + + assert isinstance(last_exp_run, ExperimentRun) + assert last_exp_run.run_id == 3 + assert last_exp_run.total == 8 + + exp_run = job_data_db.get_experiment_run_by_id(2) + assert isinstance(exp_run, ExperimentRun) + assert exp_run.run_id == 2 + assert exp_run.total == 8 + + # Run greater that the last one + exp_run = job_data_db.get_experiment_run_by_id(4) + assert exp_run is None + + job_data = job_data_db.get_current_job_data(3) + assert isinstance(job_data, list) + assert len(job_data) == 8 + + def test_invalid_operations(self, fixture_mock_basic_config): + expid = "404" + job_data_db = JobDataStructure(expid, None) + + last_exp_run = job_data_db.get_max_id_experiment_run() + assert last_exp_run is None + + exp_run = job_data_db.get_experiment_run_by_id(2) + assert exp_run is None diff --git a/tests/test_repositories.py b/tests/test_repositories.py new file mode 100644 index 00000000..cea59f4c --- /dev/null +++ b/tests/test_repositories.py @@ -0,0 +1,61 @@ +from sqlalchemy import inspect +from autosubmit_api.repositories.experiment import create_experiment_repository +from autosubmit_api.repositories.job_data import create_experiment_job_data_repository +from autosubmit_api.repositories.graph_layout import create_exp_graph_layout_repository + + +class TestExperimentRepository: + def test_operations(self, fixture_mock_basic_config): + experiment_db = create_experiment_repository() + + EXPIDS = ["a003", "a007", "a3tb", "a6zj"] + + # Check get_all + rows = experiment_db.get_all() + assert len(rows) >= 4 + for expid in EXPIDS: + assert expid in [row.name for row in rows] + + # Check get_by_expid + for expid in EXPIDS: + row = experiment_db.get_by_expid(expid) + assert row.name == expid + + +class TestExpGraphLayoutRepository: + def test_operations(self, fixture_mock_basic_config): + expid = "g001" + graph_draw_db = create_exp_graph_layout_repository(expid) + + # Table exists and is empty + assert graph_draw_db.get_all() == [] + + # Insert data + data = [ + {"id": 1, "job_name": "job1", "x": 1, "y": 2}, + {"id": 2, "job_name": "job2", "x": 2, "y": 3}, + ] + assert graph_draw_db.insert_many(data) == len(data) + + # Get data + graph_data = [x.model_dump() for x in graph_draw_db.get_all()] + assert graph_data == data + + # Delete data + assert graph_draw_db.delete_all() == len(data) + + # Table is empty + graph_data = [x.model_dump() for x in graph_draw_db.get_all()] + assert graph_data == [] + + +class TestExperimentJobDataRepository: + def test_sql_init(self, fixture_mock_basic_config): + exp_run_repository = create_experiment_job_data_repository("any") + + # Check if index exists and is correct + inspector = inspect(exp_run_repository.engine) + indexes = inspector.get_indexes(exp_run_repository.table.name) + assert len(indexes) == 1 + assert indexes[0]["name"] == "ID_JOB_NAME" + assert indexes[0]["column_names"] == ["job_name"]