Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring usermanager.py #1461

Draft
wants to merge 7 commits into
base: develop
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 147 additions & 35 deletions mxcubeweb/core/components/user/usermanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
import json
import logging
import uuid
from typing import (
List,
Union,
)

import flask
import flask_security
Expand All @@ -21,14 +25,20 @@ class BaseUserManager(ComponentBase):
def __init__(self, app, config):
super().__init__(app, config)

def get_observers(self):
def get_observers(self) -> List[User]:
"""List users that are in observer mode.

Observer mode means user is logged in (authenticated and active) but not in
control of the application.
"""
return [
user
for user in User.query.all()
if ((not user.in_control) and user.is_authenticated and user.is_active)
]

def get_operator(self):
def get_operator(self) -> User:
"""Return user object that is controlling the beamline (operator)."""
user = None

for _u in User.query.all():
Expand All @@ -38,10 +48,16 @@ def get_operator(self):

return user

def is_operator(self):
def is_operator(self) -> bool:
"""Return True if the current_user is an operator."""
return getattr(current_user, "in_control", False)

def active_logged_in_users(self, exclude_inhouse=False):
def active_logged_in_users(self, exclude_inhouse: bool = False) -> List[User]:
"""List of active and logged in users.

Attributes:
exclude_inhouse (bool): exclude inhouse users from the list
"""
self.update_active_users()

if exclude_inhouse:
Expand All @@ -53,7 +69,8 @@ def active_logged_in_users(self, exclude_inhouse=False):

return users

def get_user(self, username):
def get_user(self, username: str) -> Union[User, None]:
"""Return user model instance based on username."""
user = None

for _u in User.query.all():
Expand All @@ -62,7 +79,8 @@ def get_user(self, username):

return user

def set_operator(self, username):
def set_operator(self, username: str) -> Union[User, None]:
"""Set the user with the given username to be an operator."""
user = None

for _u in User.query.all():
Expand All @@ -74,7 +92,12 @@ def set_operator(self, username):

return user

def update_active_users(self):
def update_active_users(self) -> None:
"""
Check if any user have been inactive for a period longer than the
session lifetime. If so, deactivate the user in datastore and emit
the relvant signals `userChanged` and `observersChanged` to the client.
"""
for _u in User.query.all():
if (
_u.active
Expand All @@ -92,7 +115,15 @@ def update_active_users(self):

self.app.server.emit("observersChanged", namespace="/hwr")

def update_operator(self, new_login=False):
def update_operator(self, new_login: bool = False) -> None:
"""
Set the operator based on the logged in users. If no user is currently
in control, the first logged in user is set. Additionally, proposal
is set based on the operator selected_proposal field.

Attributes:
new_login: True if method was invoked with new user login.
"""
active_in_control = False

for _u in User.query.all():
Expand Down Expand Up @@ -121,10 +152,15 @@ def update_operator(self, new_login=False):
if _u.is_authenticated and _u.in_control:
if HWR.beamline.lims.loginType.lower() != "user":
self.app.lims.select_proposal(self.app.lims.get_proposal(_u))
elif _u.selected_proposal is not None:
elif _u.selected_proposal:
self.app.lims.select_proposal(_u.selected_proposal)

def is_inhouse_user(self, user_id):
def is_inhouse_user(self, user_id: str) -> bool:
"""Retrun True if the user_id is in the in-house user list.

Attributes:
user_id: user id composed from code and number.
"""
user_id_list = [
"%s%s" % (code, number)
for (code, number) in HWR.beamline.session.in_house_users
Expand All @@ -133,10 +169,19 @@ def is_inhouse_user(self, user_id):
return user_id in user_id_list

# Abstract method to be implemented by concrete implementation
def _login(self, login_id, password):
def _login(self, login_id: str, password: str):
pass

def login(self, login_id: str, password: str):
def login(self, login_id: str, password: str) -> None:
"""
Create new session for the user if it does not exist. Activate user in
data store. If a sample is loaded in sample changer but not mounted,
mount it and update the smaple list. Try update the operator.

Attributes:
login_id: username.
password: password.
"""
try:
login_res = self._login(login_id, password)
except Exception:
Expand Down Expand Up @@ -172,6 +217,12 @@ def _signout(self):
pass

def signout(self):
"""Sign out the current user.

If the user was an operator, the queue and samples are restored to init values,
the session is cleared, the user is not an operator anymore. Log out and
deactivte the user, and emit 'observersChanged' signal.
"""
self._signout()
user = current_user

Expand All @@ -193,15 +244,22 @@ def signout(self):
msg = "User %s signed out" % user.username
logging.getLogger("MX3.HWR").info(msg)

# change current_user.active to False
self.app.server.user_datastore.deactivate_user(user)
flask_security.logout_user()

self.app.server.emit("observersChanged", namespace="/hwr")

def is_authenticated(self):
def is_authenticated(self) -> bool:
"""Return True whether the current user is authenticated."""
return current_user.is_authenticated()

def force_signout_user(self, username):
def force_signout_user(self, username: str) -> None:
"""Force signout of the annonymous or non operating user.

Attributes:
username: username of the user to be signed out.
"""
user = self.get_user(username)

if not user.in_control or current_user.is_anonymous:
Expand All @@ -210,7 +268,14 @@ def force_signout_user(self, username):
self.app.server.user_datastore.commit()
self.app.server.emit("forceSignout", room=socketio_sid, namespace="/hwr")

def login_info(self):
def login_info(self) -> dict:
"""Login information to be displayed in the application.

Infomration such as: synchrotron and beamline names, user infromation, proposals list, selected proposal etc.

Returns:
dictionary with login information.
"""
if not current_user.is_anonymous:
login_info = convert_to_dict(json.loads(current_user.limsdata))

Expand Down Expand Up @@ -248,11 +313,23 @@ def login_info(self):

return res

def update_user(self, user):
def update_user(self, user: User) -> None:
"""Update user information in datastore.

Attributes:
user: User model instance.
"""
self.app.server.user_datastore.put(user)
self.app.server.user_datastore.commit()

def _get_configured_roles(self, user):
def _get_configured_roles(self, user: str) -> List[str]:
"""Get the roles configured for the user.

Inhouse user is always assigned additionaly a staff role.

Attributes:
user: username.
"""
roles = set()

_ihs = ["%s%s" % prop for prop in HWR.beamline.session.in_house_users]
Expand All @@ -267,15 +344,30 @@ def _get_configured_roles(self, user):

return list(roles)

def db_create_user(self, user: str, password: str, lims_data: dict):
def db_create_user(self, user: str, password: str, lims_data: dict) -> User:
"""Create or update user in datastore.

If the user already exists, update the user information. If not create new one.
Assign roles to the user, prevoiusly making sure the roles of 'staff' and
'incontrol' existis in data store. If not create them also.

Attributes:
user: representation of username (eventually part of it). Also a nickname
for new users.
password (unused): password.
lims_data: dictionary with the lims data to be updated.

Returns:
User model instance existing in or added to datastore.
"""
sid = flask.session["sid"]
user_datastore = self.app.server.user_datastore
username = f"{user}-{str(uuid.uuid4())}"
if HWR.beamline.lims.loginType.lower() == "user":
username = f"{user}"
else:
username = f"{user}-{str(uuid.uuid4())}"

# Make sure that the roles staff and incontrol always
# exists
# Make sure that the roles staff and incontrol always exists
if not user_datastore.find_role("staff"):
user_datastore.create_role(name="staff")
user_datastore.create_role(name="incontrol")
Expand Down Expand Up @@ -306,7 +398,17 @@ def db_create_user(self, user: str, password: str, lims_data: dict):

return user_datastore.find_user(username=username)

def db_set_in_control(self, user, control):
def db_set_in_control(self, user: User, control: bool) -> None:
"""Update users (their in_control field) in the datastore.

If the passed user becomes an operator (control=True), the remaining users'
in_control fields are set to False. If passed user stops being an operator,
only its in_control field is set to False.

Attributes:
user: User model instance.
control: the user becomes an operator (Ture) or not (False).
"""
user_datastore = self.app.server.user_datastore

if control:
Expand All @@ -319,7 +421,7 @@ def db_set_in_control(self, user, control):
user_datastore.put(_u)
else:
_u = user_datastore.find_user(username=user.username)
_u.in_control = control
_u.in_control = False
user_datastore.put(_u)

self.app.server.user_datastore.commit()
Expand All @@ -329,14 +431,28 @@ class UserManager(BaseUserManager):
def __init__(self, app, config):
super().__init__(app, config)

def _login(self, login_id: str, password: str):
def _login(self, login_id: str, password: str) -> dict:
"""Check loging conditions

Login conditions are: active, anonymous, inhouse, local/remote login, within
existing session or not.

Attributes:
login_id: username
password: password

Returns:
dictionary with login response information.
"""
login_res = self.app.lims.lims_login(login_id, password, create_session=False)
inhouse = self.is_inhouse_user(login_id)
valid_login = self.app.lims.lims_valid_login(login_res)
existing_session = self.app.lims.lims_existing_session(login_res)

info = {
"valid": self.app.lims.lims_valid_login(login_res),
"valid": valid_login,
"local": is_local_host(),
"existing_session": self.app.lims.lims_existing_session(login_res),
"existing_session": existing_session,
"inhouse": inhouse,
}

Expand All @@ -356,7 +472,7 @@ def _login(self, login_id: str, password: str):
)

# Only allow in-house log-in from local host
if inhouse and not (inhouse and is_local_host()):
if inhouse and not is_local_host():
raise Exception("In-house only allowed from localhost")

non_inhouse_active_users = self.active_logged_in_users(exclude_inhouse=True)
Expand Down Expand Up @@ -385,21 +501,17 @@ def _login(self, login_id: str, password: str):
raise Exception("Remote access disabled")

# Only allow remote logins with existing sessions
if self.app.lims.lims_valid_login(login_res) and is_local_host():
if not self.app.lims.lims_existing_session(login_res):
if valid_login and is_local_host():
if not existing_session:
login_res = self.app.lims.create_lims_session(login_res)

msg = "[LOGIN] Valid login from local host (%s)" % str(info)
logging.getLogger("MX3.HWR").info(msg)
elif self.app.lims.lims_valid_login(
login_res
) and self.app.lims.lims_existing_session(login_res):
elif valid_login and existing_session:
msg = "[LOGIN] Valid remote login from %s with existing session (%s)"
msg += msg % (remote_addr(), str(info))
logging.getLogger("MX3.HWR").info(msg)
else:
logging.getLogger("MX3.HWR").info("Invalid login %s" % info)
raise Exception(str(info))
logging.getLogger("MX3.HWR").info(msg)

return login_res

Expand Down
Loading