diff --git a/amstrax/__init__.py b/amstrax/__init__.py index 18bd7d95..99703e94 100644 --- a/amstrax/__init__.py +++ b/amstrax/__init__.py @@ -2,6 +2,7 @@ from .common import * from .rundb import * +from .logging_utils import * from . import plugins from .plugins import * diff --git a/amstrax/auto_processing/auto_processing_stoomboot.py b/amstrax/auto_processing/auto_processing_stoomboot.py index 6b1d7d5a..8f169ee3 100644 --- a/amstrax/auto_processing/auto_processing_stoomboot.py +++ b/amstrax/auto_processing/auto_processing_stoomboot.py @@ -4,7 +4,6 @@ import subprocess from datetime import datetime, timedelta import logging -from logging.handlers import TimedRotatingFileHandler import sys @@ -43,7 +42,7 @@ def parse_args(): help="Memory per CPU") parser.add_argument( '--logs_path', - default='/data/xenon/xams_v2/logs/auto_processing', + default='/data/xenon/xams_v2/logs/', help="Path where to save logs") parser.add_argument( '--production', @@ -56,42 +55,13 @@ def parse_args(): return parser.parse_args() - -def setup_logging(logs_path): - """ - Setup logging configuration with daily log rotation. - """ - if not os.path.exists(logs_path): - os.makedirs(logs_path) - - log_file = os.path.join(logs_path, 'copying.log') - - log_formatter = logging.Formatter("%(asctime)s | %(levelname)-5.5s | %(message)s") - logger = logging.getLogger() - - logger.setLevel(logging.INFO) - - # Setup file handler with daily rotation - file_handler = TimedRotatingFileHandler(log_file, when="midnight", interval=1, backupCount=7) - file_handler.setFormatter(log_formatter) - file_handler.suffix = "%Y%m%d" - logger.addHandler(file_handler) - - # Setup console handler - console_handler = logging.StreamHandler() - console_handler.setFormatter(log_formatter) - logger.addHandler(console_handler) - def main(args): """ Main function to handle auto-processing of xams data. """ - - # Import custom modules - import amstrax version = '2.1.0' - logging.info(f'Starting autoprocess version {version}...') + log.info(f'Starting autoprocess version {version}...') # Get configuration and setup amstrax_dir = amstrax.amstrax_dir @@ -99,7 +69,7 @@ def main(args): output_folder = args.output_folder targets = " ".join(args.target) runs_col = amstrax.get_mongo_collection() - logging.info('Correctly connected, starting loop') + log.info('Correctly connected, starting loop') amstrax_dir = amstrax.amstrax_dir client = amstrax.get_mongo_client() @@ -117,17 +87,17 @@ def main(args): handle_running_jobs(runs_col, production=args.production) if not run_docs_to_do: - logging.info(f'I found no runs to process, time to take a nap for {nap_time} seconds') + log.info(f'I found no runs to process, time to take a nap for {nap_time} seconds') time.sleep(nap_time) continue # Submit new jobs if under max limit submit_new_jobs(args, runs_col, run_docs_to_do, amstrax_dir=amstrax_dir) - logging.info(f"Waiting {nap_time} seconds before rechecking, press Ctrl+C to quit...") + log.info(f"Waiting {nap_time} seconds before rechecking, press Ctrl+C to quit...") time.sleep(nap_time) - logging.info('Done!') + log.info('Done!') # Define additional functions to modularize the script @@ -181,8 +151,8 @@ def update_task_list(args, runs_col, auto_processing_on): # Log the found runs if run_docs_to_do: - logging.info(f'I found {len(run_docs_to_do)} runs to process, time to get to work!') - logging.info(f'Run numbers: {[run_doc["number"] for run_doc in run_docs_to_do]}') + log.info(f'I found {len(run_docs_to_do)} runs to process, time to get to work!') + log.info(f'Run numbers: {[run_doc["number"] for run_doc in run_docs_to_do]}') return run_docs_to_do @@ -208,7 +178,7 @@ def handle_running_jobs(runs_col, production=False): if processing_status['status'] in ['running', 'submitted']: if processing_status['time'] < datetime.now() - timedelta(hours=0, minutes=30): new_status = 'failed' - logging.info(f'Run {run_number} has a job {processing_status["status"]} for more than 1 hour, marking as {new_status}') + log.info(f'Run {run_number} has a job {processing_status["status"]} for more than 1 hour, marking as {new_status}') if production: runs_col.update_one( @@ -218,7 +188,7 @@ def handle_running_jobs(runs_col, production=False): ) else: - logging.info(f'Would have updated run {run_number} to {new_status} in the database') + log.info(f'Would have updated run {run_number} to {new_status} in the database') def submit_new_jobs(args, runs_col, run_docs_to_do, amstrax_dir): @@ -233,20 +203,20 @@ def submit_new_jobs(args, runs_col, run_docs_to_do, amstrax_dir): run_docs_running = list(runs_col.find(query, projection).sort(sort)) num_running_jobs = len(run_docs_running) - logging.info(f'Found {num_running_jobs} runs that are running or submitted') + log.info(f'Found {num_running_jobs} runs that are running or submitted') for run_doc in run_docs_running: - logging.info(f'Run {run_doc["number"]} is in ststus {run_doc["processing_status"]["status"]}') + log.info(f'Run {run_doc["number"]} is in ststus {run_doc["processing_status"]["status"]}') if num_running_jobs >= args.max_jobs: - logging.info(f'The number of running jobs ({num_running_jobs}) reached the limit ({args.max_jobs})') + log.info(f'The number of running jobs ({num_running_jobs}) reached the limit ({args.max_jobs})') return # Submit new jobs max_jobs_to_submit = args.max_jobs - num_running_jobs will_do_run_ids = [int(run_doc['number']) for run_doc in run_docs_to_do[:max_jobs_to_submit]] - logging.info(f'Will submit jobs for runs: {will_do_run_ids}') + log.info(f'Will submit jobs for runs: {will_do_run_ids}') for run_doc in run_docs_to_do[:max_jobs_to_submit]: run_id = f'{int(run_doc["number"]):06}' @@ -287,12 +257,30 @@ def submit_new_jobs(args, runs_col, run_docs_to_do, amstrax_dir): } ) else: - logging.info(f'Would have submitted job for run {run_id}') - logging.info(f'Would have updated run {run_doc["number"]} to submitted in the database') + log.info(f'Would have submitted job for run {run_id}') + log.info(f'Would have updated run {run_doc["number"]} to submitted in the database') # Ensure the script is run as the main program if __name__ == '__main__': args = parse_args() - setup_logging(args.logs_path) + + log_name = "auto_processing_stoomboot" + + import amstrax + + versions = amstrax.print_versions( + modules="strax amstrax numpy numba".split(), + include_git=False, + return_string=True, + ) + + log = amstrax.get_daq_logger( + log_name, + log_name, + level=logging.DEBUG, + opening_message=f"I am processing with these software versions: {versions}", + logdir=args.logs_path, + ) + main(args) diff --git a/amstrax/auto_processing/copy_live.py b/amstrax/auto_processing/copy_live.py index ecad0875..f334e870 100644 --- a/amstrax/auto_processing/copy_live.py +++ b/amstrax/auto_processing/copy_live.py @@ -6,9 +6,7 @@ import subprocess import time import pymongo -import amstrax import logging -from logging.handlers import TimedRotatingFileHandler import os # Define a dictionary for storage locations @@ -39,32 +37,6 @@ def parse_args(): return parser.parse_args() -def setup_logging(logs_path): - """ - Setup logging configuration with daily log rotation. - """ - if not os.path.exists(logs_path): - os.makedirs(logs_path) - - log_file = os.path.join(logs_path, 'copying.log') - - log_formatter = logging.Formatter("%(asctime)s | %(levelname)-5.5s | %(message)s") - logger = logging.getLogger() - - logger.setLevel(logging.INFO) - - # Setup file handler with daily rotation - file_handler = TimedRotatingFileHandler(log_file, when="midnight", interval=1, backupCount=7) - file_handler.setFormatter(log_formatter) - file_handler.suffix = "%Y%m%d" - logger.addHandler(file_handler) - - # Setup console handler - console_handler = logging.StreamHandler() - console_handler.setFormatter(log_formatter) - logger.addHandler(console_handler) - - def get_rundocs(runsdb, args): """ Retrieve run documents from MongoDB collection based on specific criteria. @@ -138,7 +110,7 @@ def copy_data(run_id, live_data_path, location, hostname, production, ssh_host): Copy data to the specified location using rsync. """ if not os.path.exists(live_data_path): - logging.error(f"Could not find the data for run {run_id} at {live_data_path}, marking run as abandon") + log.error(f"Could not find the data for run {run_id} at {live_data_path}, marking run as abandon") # add a tag to the tags array in the database, marking the run as abandon runsdb.update_one( {'number': int(run_id)}, @@ -147,16 +119,16 @@ def copy_data(run_id, live_data_path, location, hostname, production, ssh_host): return - logging.info(f"Copying run {run_id} to {location}") + log.info(f"Copying run {run_id} to {location}") copy_cmd = ['rsync', '-av', live_data_path, f'{ssh_host}:{location}'] copy = subprocess.run(copy_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if copy.returncode != 0: - logging.error(f"Something went wrong copying run {run_id} to {location}") - logging.error(copy.stderr.decode()) + log.error(f"Something went wrong copying run {run_id} to {location}") + log.error(copy.stderr.decode()) else: - logging.info(f"Successfully copied run {run_id} to {location}") - logging.info(copy.stdout.decode()) + log.info(f"Successfully copied run {run_id} to {location}") + log.info(copy.stdout.decode()) if production: runsdb.update_one( @@ -164,7 +136,7 @@ def copy_data(run_id, live_data_path, location, hostname, production, ssh_host): {'$push': {'data': {'type': 'live', 'host': hostname, 'location': location, 'by': 'copy_live', 'time': datetime.datetime.now()}}} ) - logging.info(f"Successfully updated the database for run {run_id}") + log.info(f"Successfully updated the database for run {run_id}") return copy.returncode @@ -176,7 +148,7 @@ def handle_runs(rundocs, args): try: path = next(d['location'] for d in rd['data'] if d['type'] == 'live' and d['host'] == 'daq') except StopIteration: - logging.error(f"Could not find the DB entry for live data of run {rd['number']}") + log.error(f"Could not find the DB entry for live data of run {rd['number']}") continue live_data_path = os.path.join(path, run_id) @@ -184,8 +156,10 @@ def handle_runs(rundocs, args): # Check if data is on stoomboot and copy if not copied_stomboot = False if not any(d['type'] == 'live' and d['host'] == 'stbc' for d in rd['data']): - copied_stomboot = copy_data(run_id, live_data_path, STORAGE_PATHS['stbc'], 'stbc', args.production, args.ssh_host) + exit_code = copy_data(run_id, live_data_path, STORAGE_PATHS['stbc'], 'stbc', args.production, args.ssh_host) + copied_stomboot = (exit_code == 0) else: + # it was already on stoomboot copied_stomboot = True if copied_stomboot: @@ -200,22 +174,41 @@ def main(args): """ Main function to automate copying of new runs. """ - logging.info('Starting to copy new runs...') + + log.info('Starting to copy new runs...') rundocs = get_rundocs(runsdb, args) print(f"Found {len(rundocs)} runs to copy") runs_copied = handle_runs(rundocs, args) - logging.info('Finished copying new runs.') + log.info('Finished copying new runs.') if __name__ == '__main__': args = parse_args() - setup_logging(args.logs_path) + + log_name = "copy_live" + + import amstrax + + versions = amstrax.print_versions( + modules="strax amstrax numpy numba".split(), + include_git=False, + return_string=True, + ) + + log = amstrax.get_daq_logger( + log_name, + log_name, + level=logging.DEBUG, + opening_message=f"I am processing with these software versions: {versions}", + logdir=args.logs_path, + ) + runsdb = amstrax.get_mongo_collection() if args.loop_infinite: while True: runs_copied = main(args) sleep_time = 1 if runs_copied else args.sleep_time - logging.info(f"Sleeping for {args.sleep_time} seconds...") + log.info(f"Sleeping for {args.sleep_time} seconds...") time.sleep(args.sleep_time) else: main(args) diff --git a/amstrax/auto_processing/delete_live.py b/amstrax/auto_processing/delete_live.py index 095fd913..3e576e9a 100644 --- a/amstrax/auto_processing/delete_live.py +++ b/amstrax/auto_processing/delete_live.py @@ -6,9 +6,7 @@ import subprocess import time import pymongo -import amstrax import logging -from logging.handlers import TimedRotatingFileHandler import os def parse_args(): @@ -26,31 +24,6 @@ def parse_args(): parser.add_argument('--sleep_time', type=int, default=60, help='Sleep time between runs') return parser.parse_args() -def setup_logging(logs_path): - """ - Setup logging configuration with daily log rotation. - """ - if not os.path.exists(logs_path): - os.makedirs(logs_path) - - log_file = os.path.join(logs_path, 'delete_live.log') - - log_formatter = logging.Formatter("%(asctime)s | %(levelname)-5.5s | %(message)s") - logger = logging.getLogger() - - logger.setLevel(logging.INFO) - - # Setup file handler with daily rotation - file_handler = TimedRotatingFileHandler(log_file, when="midnight", interval=1, backupCount=7) - file_handler.setFormatter(log_formatter) - file_handler.suffix = "%Y%m%d" - logger.addHandler(file_handler) - - # Setup console handler - console_handler = logging.StreamHandler() - console_handler.setFormatter(log_formatter) - logger.addHandler(console_handler) - def get_old_runs(runsdb, days, args): """ Retrieve run documents where the data is older than specified days and exists in three locations @@ -104,22 +77,22 @@ def check_data_safety(run_doc, ssh_host, args): for host in hosts_to_check: path = next((d['location'] for d in run_doc['data'] if d['host'] == host), None) if not path: - logging.warning(f"Missing data path for run {run_id} on host {host}") + log.warning(f"Missing data path for run {run_id} on host {host}") return False # Get number of files in the directory num_files = count_files_in_directory(path, run_id, is_remote=(host != 'daq'), ssh_host=ssh_host) - logging.info(f"Found {num_files} files in {path} on {host} for run {run_id}") + log.info(f"Found {num_files} files in {path} on {host} for run {run_id}") result[host] = num_files # Check if the file counts match if (result['stbc'] != result.get('dcache', -9) and not args.only_stoomboot) or result['daq'] != result['stbc']: - logging.warning(f"Mismatch in file count for run {run_id}") + log.warning(f"Mismatch in file count for run {run_id}") return False num_files_daq = result['daq'] - logging.info(f"File count is {num_files_daq} for run {run_id} on all hosts, safe to delete") + log.info(f"File count is {num_files_daq} for run {run_id} on all hosts, safe to delete") return True @@ -134,7 +107,7 @@ def count_files_in_directory(path, run_id, is_remote=False, ssh_host=None): ssh_cmd = ["ssh", ssh_host, f"ls -1 {full_path}/* | wc -l"] result = subprocess.run(ssh_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode != 0: - logging.error(f"SSH command failed: {result.stderr}") + log.error(f"SSH command failed: {result.stderr}") return 0 result = int(result.stdout.strip()) else: @@ -142,7 +115,7 @@ def count_files_in_directory(path, run_id, is_remote=False, ssh_host=None): cmd = f"ls -1 {full_path}/* | wc -l" result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, text=True) if result.returncode != 0: - logging.error(f"Command failed: {result.stderr}") + log.error(f"Command failed: {result.stderr}") return 0 result = int(result.stdout.strip()) return result @@ -158,22 +131,22 @@ def delete_data(runsdb, run_doc, production, we_are_really_sure): daq_path = next(d['location'] for d in run_doc['data'] if d['host'] == 'daq') daq_path = os.path.join(daq_path, run_id) if not production: - logging.info(f"[Dry Run] Would delete data for run {run_id} at {daq_path}") + log.info(f"[Dry Run] Would delete data for run {run_id} at {daq_path}") else: # we actually do it - logging.info(f"Deleting data for run {run_id} at {daq_path}") + log.info(f"Deleting data for run {run_id} at {daq_path}") if we_are_really_sure: # check that the path ends with the run number if not daq_path.endswith(run_id): - logging.error(f"Path {daq_path} does not end with run number {run_id}") + log.error(f"Path {daq_path} does not end with run number {run_id}") return # check that the path exists if not os.path.exists(daq_path): - logging.error(f"Path {daq_path} does not exist, eliminating it from database") + log.error(f"Path {daq_path} does not exist, eliminating it from database") else: - logging.info(f"Deleting {daq_path}") + log.info(f"Deleting {daq_path}") # delete the directory daq_path os.system(f"rm -rf {daq_path}") @@ -193,48 +166,67 @@ def delete_data(runsdb, run_doc, production, we_are_really_sure): } } ) - logging.info(f"Moved DAQ data entry for run {run_id} to 'deleted_data'") + log.info(f"Moved DAQ data entry for run {run_id} to 'deleted_data'") else: - logging.info(f"[Not Really Sure] Would delete data for run {run_id} at {daq_path}") + log.info(f"[Not Really Sure] Would delete data for run {run_id} at {daq_path}") except Exception as e: - logging.error(f"Error in deleting data for run {run_id}: {e}") + log.error(f"Error in deleting data for run {run_id}: {e}") def main(args): runsdb = amstrax.get_mongo_collection() old_runs = get_old_runs(runsdb, args.days_old, args) - logging.info(f"Found {len(old_runs)} runs with data older than {args.days_old} days or with abandon tag") + log.info(f"Found {len(old_runs)} runs with data older than {args.days_old} days or with abandon tag") for run_doc in old_runs: # if abandoned, delete immediately (tags.name == 'abandon') if 'abandon' in [tag['name'] for tag in run_doc.get('tags', [])]: - logging.info(f"Deleting abandoned run {run_doc['number']}") + log.info(f"Deleting abandoned run {run_doc['number']}") delete_data(runsdb, run_doc, args.production, args.we_are_really_sure) continue else: # do some checks - logging.info(f"Checking safety for run {run_doc['number']}") + log.info(f"Checking safety for run {run_doc['number']}") if check_data_safety(run_doc, args.ssh_host, args): delete_data(runsdb, run_doc, args.production, args.we_are_really_sure) else: - logging.warning(f"Skipping deletion for run {run_doc['number']} due to safety check failure") + log.warning(f"Skipping deletion for run {run_doc['number']} due to safety check failure") return len(old_runs) if __name__ == '__main__': args = parse_args() - setup_logging(args.logs_path) + + # Set up logging + log_name = "delete_live" + + import amstrax + + versions = amstrax.print_versions( + modules="strax amstrax numpy numba".split(), + include_git=False, + return_string=True, + ) + + log = amstrax.get_daq_logger( + log_name, + log_name, + level=logging.DEBUG, + opening_message=f"I am processing with these software versions: {versions}", + logdir=args.logs_path, + ) + if not args.production: - logging.info("Performing a dry run. No data will be deleted.") + log.info("Performing a dry run. No data will be deleted.") if not args.we_are_really_sure: - logging.info("We are not really sure. No data will be deleted.") + log.info("We are not really sure. No data will be deleted.") if args.loop_infinite: while True: runs_deleted = main(args) sleep_time = 1 if runs_deleted else args.sleep_time - logging.info(f"Sleeping for {args.sleep_time} seconds...") + log.info(f"Sleeping for {args.sleep_time} seconds...") time.sleep(args.sleep_time) else: main(args) diff --git a/amstrax/logging_utils.py b/amstrax/logging_utils.py new file mode 100644 index 00000000..4c6b79cb --- /dev/null +++ b/amstrax/logging_utils.py @@ -0,0 +1,179 @@ +import datetime +import logging +import os +from git import Repo +from importlib import import_module + +__all__ = ['get_daq_logger', 'DAQLogHandler', 'get_git_hash'] + +""" +This module contains the common logger for all DAQ software. It is +responsible for writing log messages to disk and to the console. +Mainly copied from Joran's work on the DAQ of XENONnT. +""" + +def get_daq_logger(name, + process_name=None, + level=logging.INFO, + opening_message=None, + **kwargs): + """ + Common logger for DAQ-software + :param name: name to be displayed in the logs, e.g. "main" + :param process_name: name of the process, e.g. "bootstrax". Under + this name the log will be written. If none is provided, use the + name argument (but dont do name="main" for obvious reasons). + :param level: logging.DEBUG or logging.INFO etc, or "DEBUG" or "INFO" etc. Default INFO + :param opening_message: Each time a new document is opened (because + it's a new day, add this message to a log.info) + :param **kwargs: directly passed to DAQLogHandler + :return: logger + """ + if process_name is None: + if name == 'main': + raise ValueError('Be a bit more informative, we don\'t want to be ' + 'logging to "main.log" whatever that means.') + process_name = name + logger = logging.getLogger(name) + logger.addHandler(DAQLogHandler(process_name, opening_message=opening_message, **kwargs)) + if isinstance(level, str): + level = getattr(logging, level) + logger.setLevel(level) + return logger + + +def get_git_hash(module: 'str') -> str: + """ + Get the latest git hash from a module + :param module: Name of the module to load, e.g. "daqnt" + :return: the latest commit hash + """ + mod = import_module(module) + path = mod.__path__[0] + repo = Repo(path, search_parent_directories=True) + return repo.head.object.hexsha + + +class DAQLogHandler(logging.Handler): + """Common logger logic for all DAQ software""" + + def __init__(self, + process_name: str, + mc=None, + opening_message=None, + logdir=os.path.join(os.environ['HOME'], 'daq', 'logs'), + ): + logging.Handler.__init__(self) + self.opening_message=opening_message + self.process_name = process_name + now = datetime.datetime.utcnow() + self.today = datetime.date(now.year, now.month, now.day) + if not os.path.exists(logdir): + raise OSError(f'Are you on the DAQ? {logdir} does not exist..') + self.logdir = logdir + self.Rotate(self.today) + self.count = 0 + self.mc = mc + + def close(self): + if hasattr(self, 'f') and not self.f.closed: + self.f.flush() + self.f.close() + + def __del__(self): + self.close() + + def emit(self, record): + """ + This function is responsible for sending log messages to their + output destinations, in this case the console and disk + + :param record: logging.record, the log message + :returns: None + """ + msg_datetime = datetime.datetime.utcfromtimestamp(record.created) + msg_today = datetime.date(msg_datetime.year, msg_datetime.month, msg_datetime.day) + + if msg_today != self.today: + # if the log message is not from the same day as the logfile, rotate + self.Rotate(msg_today) + m = self.FormattedMessage(msg_datetime, record.levelname, record.funcName, record.lineno, record.getMessage()) + self.f.write(m) + print(m[:-1]) # strip \n + self.count += 1 + if self.count > 2: + # a lot of implementations don't routinely flush data to disk + # and we want to ensure that logs are readily available on disk, not sitting + # around in a buffer somewhere. Redax does this with a timer because + # logging happens from many threads, things logging via this module + # generally don't + self.f.flush() + self.count = 0 + # if this is bad enough, push to the db/website + if record.levelno >= logging.CRITICAL and self.mc is not None: + try: + self.mc.daq.log.insert_one( + {'user': self.process_name, + 'message': record.getMessage(), + 'priority': 4, 'runid': -1}) + except Exception as e: + print(f'Database issue? Cannot log? {type(e)}, {e}') + + def Rotate(self, when): + """ + This function makes sure that the currently-opened file has "today's" date. If "today" + doesn't match the filename, open a new one + + :param when: datetime.date, the file-date that should be opened + :returns: None + """ + if hasattr(self, 'f'): + self.f.close() + + self.f = open(self.FullFilename(when), 'a') + self.today = datetime.date.today() + m = self.FormattedMessage(datetime.datetime.utcnow(), "init", "logger", 0, "Opening a new file") + self.f.write(m) + if self.opening_message is not None: + m = self.FormattedMessage(datetime.datetime.utcnow(), + "init", + "logger", + 0, + self.opening_message + ) + self.f.write(m) + + def FullFilename(self, when): + """ + Returns the path and filename + + :param when: datetime.date, the file-date that should be opened + :returns: os.path, the full path to the new file + """ + day_dir = os.path.join(self.logdir, f"{when.year:04d}", f"{when.month:02d}.{when.day:02d}") + os.makedirs(day_dir, exist_ok=True) + return os.path.join(day_dir, self.Filename(when)) + + def Filename(self, when): + """ + The name of the file to log to + + :param when: datetime.date, unused + :returns: str, name of the file (without any path information) + """ + return f"{self.process_name}.log" + + def FormattedMessage(self, when, level, func_name, lineno, msg): + """ + Formats the message for output: | | (L ) | | + + :param when: datetime.datetime, when the message was created + :param level: str, the name of the level of this message + :param msg: str, the actual message + :param func_name: name of the calling function + :param lineno: line number + :returns: str, the formatted message + """ + func_line = f'{func_name} (L{lineno})' + return f"{when.isoformat(sep=' ')} | {str(level).upper():8} | {func_line:20} | {msg}\n" + \ No newline at end of file