diff --git a/mxcubeweb/core/components/user/usermanager.py b/mxcubeweb/core/components/user/usermanager.py index aed0f567f..b587b5c6b 100644 --- a/mxcubeweb/core/components/user/usermanager.py +++ b/mxcubeweb/core/components/user/usermanager.py @@ -2,6 +2,10 @@ import json import logging import uuid +from typing import ( + List, + Union, +) import flask import flask_security @@ -21,14 +25,20 @@ class BaseUserManager(ComponentBase): def __init__(self, app, config): super().__init__(app, config) - def get_observers(self): + def get_observers(self) -> List[User]: + """List users that are in observer mode. + + Observer mode means user is logged in (authenticated and active) but not in + control of the application. + """ return [ user for user in User.query.all() if ((not user.in_control) and user.is_authenticated and user.is_active) ] - def get_operator(self): + def get_operator(self) -> User: + """Return user object that is controlling the beamline (operator).""" user = None for _u in User.query.all(): @@ -38,10 +48,16 @@ def get_operator(self): return user - def is_operator(self): + def is_operator(self) -> bool: + """Return True if the current_user is an operator.""" return getattr(current_user, "in_control", False) - def active_logged_in_users(self, exclude_inhouse=False): + def active_logged_in_users(self, exclude_inhouse: bool = False) -> List[User]: + """List of active and logged in users. + + Attributes: + exclude_inhouse (bool): exclude inhouse users from the list + """ self.update_active_users() if exclude_inhouse: @@ -53,7 +69,8 @@ def active_logged_in_users(self, exclude_inhouse=False): return users - def get_user(self, username): + def get_user(self, username: str) -> Union[User, None]: + """Return user model instance based on username.""" user = None for _u in User.query.all(): @@ -62,7 +79,8 @@ def get_user(self, username): return user - def set_operator(self, username): + def set_operator(self, username: str) -> Union[User, None]: + """Set the user with the given username to be an operator.""" user = None for _u in User.query.all(): @@ -74,7 +92,12 @@ def set_operator(self, username): return user - def update_active_users(self): + def update_active_users(self) -> None: + """ + Check if any user have been inactive for a period longer than the + session lifetime. If so, deactivate the user in datastore and emit + the relvant signals `userChanged` and `observersChanged` to the client. + """ for _u in User.query.all(): if ( _u.active @@ -92,7 +115,15 @@ def update_active_users(self): self.app.server.emit("observersChanged", namespace="/hwr") - def update_operator(self, new_login=False): + def update_operator(self, new_login: bool = False) -> None: + """ + Set the operator based on the logged in users. If no user is currently + in control, the first logged in user is set. Additionally, proposal + is set based on the operator selected_proposal field. + + Attributes: + new_login: True if method was invoked with new user login. + """ active_in_control = False for _u in User.query.all(): @@ -121,10 +152,15 @@ def update_operator(self, new_login=False): if _u.is_authenticated and _u.in_control: if HWR.beamline.lims.loginType.lower() != "user": self.app.lims.select_proposal(self.app.lims.get_proposal(_u)) - elif _u.selected_proposal is not None: + elif _u.selected_proposal: self.app.lims.select_proposal(_u.selected_proposal) - def is_inhouse_user(self, user_id): + def is_inhouse_user(self, user_id: str) -> bool: + """Retrun True if the user_id is in the in-house user list. + + Attributes: + user_id: user id composed from code and number. + """ user_id_list = [ "%s%s" % (code, number) for (code, number) in HWR.beamline.session.in_house_users @@ -133,10 +169,19 @@ def is_inhouse_user(self, user_id): return user_id in user_id_list # Abstract method to be implemented by concrete implementation - def _login(self, login_id, password): + def _login(self, login_id: str, password: str): pass - def login(self, login_id: str, password: str): + def login(self, login_id: str, password: str) -> None: + """ + Create new session for the user if it does not exist. Activate user in + data store. If a sample is loaded in sample changer but not mounted, + mount it and update the smaple list. Try update the operator. + + Attributes: + login_id: username. + password: password. + """ try: login_res = self._login(login_id, password) except Exception: @@ -172,6 +217,12 @@ def _signout(self): pass def signout(self): + """Sign out the current user. + + If the user was an operator, the queue and samples are restored to init values, + the session is cleared, the user is not an operator anymore. Log out and + deactivte the user, and emit 'observersChanged' signal. + """ self._signout() user = current_user @@ -193,15 +244,22 @@ def signout(self): msg = "User %s signed out" % user.username logging.getLogger("MX3.HWR").info(msg) + # change current_user.active to False self.app.server.user_datastore.deactivate_user(user) flask_security.logout_user() self.app.server.emit("observersChanged", namespace="/hwr") - def is_authenticated(self): + def is_authenticated(self) -> bool: + """Return True whether the current user is authenticated.""" return current_user.is_authenticated() - def force_signout_user(self, username): + def force_signout_user(self, username: str) -> None: + """Force signout of the annonymous or non operating user. + + Attributes: + username: username of the user to be signed out. + """ user = self.get_user(username) if not user.in_control or current_user.is_anonymous: @@ -210,7 +268,14 @@ def force_signout_user(self, username): self.app.server.user_datastore.commit() self.app.server.emit("forceSignout", room=socketio_sid, namespace="/hwr") - def login_info(self): + def login_info(self) -> dict: + """Login information to be displayed in the application. + + Infomration such as: synchrotron and beamline names, user infromation, proposals list, selected proposal etc. + + Returns: + dictionary with login information. + """ if not current_user.is_anonymous: login_info = convert_to_dict(json.loads(current_user.limsdata)) @@ -248,11 +313,23 @@ def login_info(self): return res - def update_user(self, user): + def update_user(self, user: User) -> None: + """Update user information in datastore. + + Attributes: + user: User model instance. + """ self.app.server.user_datastore.put(user) self.app.server.user_datastore.commit() - def _get_configured_roles(self, user): + def _get_configured_roles(self, user: str) -> List[str]: + """Get the roles configured for the user. + + Inhouse user is always assigned additionaly a staff role. + + Attributes: + user: username. + """ roles = set() _ihs = ["%s%s" % prop for prop in HWR.beamline.session.in_house_users] @@ -267,15 +344,30 @@ def _get_configured_roles(self, user): return list(roles) - def db_create_user(self, user: str, password: str, lims_data: dict): + def db_create_user(self, user: str, password: str, lims_data: dict) -> User: + """Create or update user in datastore. + + If the user already exists, update the user information. If not create new one. + Assign roles to the user, prevoiusly making sure the roles of 'staff' and + 'incontrol' existis in data store. If not create them also. + + Attributes: + user: representation of username (eventually part of it). Also a nickname + for new users. + password (unused): password. + lims_data: dictionary with the lims data to be updated. + + Returns: + User model instance existing in or added to datastore. + """ sid = flask.session["sid"] user_datastore = self.app.server.user_datastore - username = f"{user}-{str(uuid.uuid4())}" if HWR.beamline.lims.loginType.lower() == "user": username = f"{user}" + else: + username = f"{user}-{str(uuid.uuid4())}" - # Make sure that the roles staff and incontrol always - # exists + # Make sure that the roles staff and incontrol always exists if not user_datastore.find_role("staff"): user_datastore.create_role(name="staff") user_datastore.create_role(name="incontrol") @@ -306,7 +398,17 @@ def db_create_user(self, user: str, password: str, lims_data: dict): return user_datastore.find_user(username=username) - def db_set_in_control(self, user, control): + def db_set_in_control(self, user: User, control: bool) -> None: + """Update users (their in_control field) in the datastore. + + If the passed user becomes an operator (control=True), the remaining users' + in_control fields are set to False. If passed user stops being an operator, + only its in_control field is set to False. + + Attributes: + user: User model instance. + control: the user becomes an operator (Ture) or not (False). + """ user_datastore = self.app.server.user_datastore if control: @@ -319,7 +421,7 @@ def db_set_in_control(self, user, control): user_datastore.put(_u) else: _u = user_datastore.find_user(username=user.username) - _u.in_control = control + _u.in_control = False user_datastore.put(_u) self.app.server.user_datastore.commit() @@ -329,14 +431,28 @@ class UserManager(BaseUserManager): def __init__(self, app, config): super().__init__(app, config) - def _login(self, login_id: str, password: str): + def _login(self, login_id: str, password: str) -> dict: + """Check loging conditions + + Login conditions are: active, anonymous, inhouse, local/remote login, within + existing session or not. + + Attributes: + login_id: username + password: password + + Returns: + dictionary with login response information. + """ login_res = self.app.lims.lims_login(login_id, password, create_session=False) inhouse = self.is_inhouse_user(login_id) + valid_login = self.app.lims.lims_valid_login(login_res) + existing_session = self.app.lims.lims_existing_session(login_res) info = { - "valid": self.app.lims.lims_valid_login(login_res), + "valid": valid_login, "local": is_local_host(), - "existing_session": self.app.lims.lims_existing_session(login_res), + "existing_session": existing_session, "inhouse": inhouse, } @@ -356,7 +472,7 @@ def _login(self, login_id: str, password: str): ) # Only allow in-house log-in from local host - if inhouse and not (inhouse and is_local_host()): + if inhouse and not is_local_host(): raise Exception("In-house only allowed from localhost") non_inhouse_active_users = self.active_logged_in_users(exclude_inhouse=True) @@ -385,21 +501,17 @@ def _login(self, login_id: str, password: str): raise Exception("Remote access disabled") # Only allow remote logins with existing sessions - if self.app.lims.lims_valid_login(login_res) and is_local_host(): - if not self.app.lims.lims_existing_session(login_res): + if valid_login and is_local_host(): + if not existing_session: login_res = self.app.lims.create_lims_session(login_res) - msg = "[LOGIN] Valid login from local host (%s)" % str(info) - logging.getLogger("MX3.HWR").info(msg) - elif self.app.lims.lims_valid_login( - login_res - ) and self.app.lims.lims_existing_session(login_res): + elif valid_login and existing_session: msg = "[LOGIN] Valid remote login from %s with existing session (%s)" msg += msg % (remote_addr(), str(info)) - logging.getLogger("MX3.HWR").info(msg) else: logging.getLogger("MX3.HWR").info("Invalid login %s" % info) raise Exception(str(info)) + logging.getLogger("MX3.HWR").info(msg) return login_res