diff --git a/install/upgrade/8.X.X.sql b/install/upgrade/8.X.X.sql index 0a6af1e59..b37603907 100644 --- a/install/upgrade/8.X.X.sql +++ b/install/upgrade/8.X.X.sql @@ -1,25 +1,31 @@ --- Table: web.user_tokens +-- Upgrade 8.0.x to 8.1.0 +-- Be sure to backup the database and read the upgrade notes before executing. + +BEGIN; + +-- UPDATE web.settings SET value = '8.0.0' WHERE name = 'database_version'; --- DROP TABLE IF EXISTS web.user_tokens; CREATE TABLE IF NOT EXISTS web.user_tokens ( id integer NOT NULL, - user_id integer, + user_id integer NOT NULL, + name text COLLATE pg_catalog."default", jit text COLLATE pg_catalog."default", valid_from timestamp without time zone, valid_until timestamp without time zone, - created timestamp without time zone, + created timestamp without time zone NOT NULL DEFAULT now(), modified timestamp without time zone, CONSTRAINT user_tokens_pkey PRIMARY KEY (id), - CONSTRAINT user_tokens_user_id_fkey FOREIGN KEY (user_id) - REFERENCES web."user" (id) MATCH SIMPLE - ON UPDATE NO ACTION - ON DELETE NO ACTION - NOT VALID -) + CONSTRAINT user_tokens_user_id_fkey FOREIGN KEY (user_id) REFERENCES web."user" (id) MATCH SIMPLE ON UPDATE CASCADE ON DELETE SET NULL NOT VALID +); + + +ALTER TABLE IF EXISTS web.user_tokens OWNER to openatlas; +CREATE SEQUENCE web.user_tokens_id_seq START WITH 1 INCREMENT BY 1 NO MINVALUE NO MAXVALUE CACHE 1; +ALTER TABLE web.user_tokens_id_seq OWNER TO openatlas; +ALTER SEQUENCE web.user_tokens_id_seq OWNED BY web.user_tokens.id; +ALTER TABLE ONLY web.user_tokens ALTER COLUMN id SET DEFAULT nextval('web.user_tokens_id_seq'::regclass); -TABLESPACE pg_default; -ALTER TABLE IF EXISTS web.user_tokens - OWNER to openatlas; +END; diff --git a/openatlas/database/user.py b/openatlas/database/user.py index 7bee19d84..2f8408e18 100644 --- a/openatlas/database/user.py +++ b/openatlas/database/user.py @@ -298,3 +298,12 @@ def get_user_entities(id_: int) -> list[int]: """, {'user_id': id_}) return [row['id'] for row in g.cursor.fetchall()] + + +def generate_token(data: dict[str, str]) -> None: + g.cursor.execute( + """ + INSERT INTO web.user_tokens(user_id, jit, valid_from, valid_until, name) + VALUES (%(user_id)s, %(jit)s, %(valid_from)s, %(valid_until)s, %(name)s); + """, data) + return None diff --git a/openatlas/forms/setting.py b/openatlas/forms/setting.py index 077b61ba7..5dcdcb38d 100644 --- a/openatlas/forms/setting.py +++ b/openatlas/forms/setting.py @@ -1,3 +1,5 @@ +from datetime import datetime + from flask_babel import lazy_gettext as _ from flask_wtf import FlaskForm from wtforms import ( @@ -169,5 +171,8 @@ class TokenForm(FlaskForm): _('expiration'), choices=[('0','One day'),('1','90 days'), ('2', 'no expiration date')], default='0') + token_name = StringField( + _('token name'), + default=f"Token_{datetime.today().strftime('%Y-%m-%d')}") token_text = StringField(_('token'), render_kw={'readonly': True}) save = SubmitField(_('generate')) diff --git a/openatlas/models/user.py b/openatlas/models/user.py index fb6f84575..98f80834f 100644 --- a/openatlas/models/user.py +++ b/openatlas/models/user.py @@ -6,6 +6,7 @@ from typing import Any, Optional from flask import g, session +from flask_jwt_extended import create_access_token, decode_token from flask_login import UserMixin, current_user from openatlas.database import user as db @@ -82,6 +83,27 @@ def get_notes_by_entity_id(self, entity_id: int) -> list[dict[str, Any]]: def get_entities(self) -> list[Entity]: return Entity.get_by_ids(db.get_user_entities(self.id), types=True) + def generate_token(self, expiration: str, token_name: str) -> None: + match expiration: + case '0': + expires_delta = timedelta(days=1) + case '1': + expires_delta = timedelta(days=90) + case '2' | _: + expires_delta = False + access_token = create_access_token( + identity=self.username, + additional_claims={'role': self.group}, + expires_delta=expires_delta) + decoded_token = decode_token(access_token, allow_expired=True ) + db.generate_token({ + 'jit':decoded_token['jti'], + 'user_id': self.id, + 'name': token_name, + 'valid_until': datetime.fromtimestamp(decoded_token.get('exp')), + 'valid_from': datetime.fromtimestamp(decoded_token['iat'])}) + return access_token + @staticmethod def get_all() -> list[User]: return [User(row) for row in db.get_all()] diff --git a/openatlas/views/profile.py b/openatlas/views/profile.py index 57660ce67..683cceb4b 100644 --- a/openatlas/views/profile.py +++ b/openatlas/views/profile.py @@ -84,9 +84,9 @@ def profile_index() -> str: button(_('edit'), url_for('profile_settings', category='profile')), button(_('change password'), url_for('profile_password'))] tabs['modules'].buttons.append( - button( - _('edit'), - url_for('profile_settings', category='modules'))) + button( + _('edit'), + url_for('profile_settings', category='modules'))) tabs['display'].buttons.append( button(_('edit'), url_for('profile_settings', category='display'))) tabs['token'].buttons.append( @@ -143,23 +143,22 @@ def profile_settings(category: str) -> str | Response: @app.route('/profile/generate_token', methods=['GET', 'POST']) @login_required def generate_token() -> str | Response: - expiration = request.args.get('expiration') form = TokenForm() if form.validate_on_submit(): expiration = form.expiration.data - return redirect(f"{url_for('generate_token', expiration=expiration)}") - if expiration: - match expiration: - case '0': - expires_delta = timedelta(days=1) - case '1': - expires_delta = timedelta(days=90) - case '2' | _: - expires_delta = False - form.token_text.data = create_access_token( - identity=current_user.username, - additional_claims={'role': current_user.group}, - expires_delta=expires_delta) + token_name = form.token_name.data + token = None + Transaction.begin() + try: + token = current_user.generate_token(expiration, token_name) + Transaction.commit() + flash(_('token stored'), 'info') + except Exception as e: # pragma: no cover + Transaction.rollback() + g.logger.log('error', 'database', 'transaction failed', e) + flash(_('error transaction'), 'error') + return redirect(f"{url_for('generate_token', token=token)}") + form.token_text.data = request.args.get('token') return render_template( 'content.html', content=display_form(form, manual_page='profile'), @@ -169,7 +168,6 @@ def generate_token() -> str | Response: _('token')]) - @app.route('/profile/password', methods=['GET', 'POST']) @login_required def profile_password() -> str | Response: