Skip to content

Commit

Permalink
Lint with isort and black
Browse files Browse the repository at this point in the history
  • Loading branch information
digitaldogsbody committed Mar 21, 2024
1 parent a6c0641 commit bbe9ad0
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 75 deletions.
6 changes: 4 additions & 2 deletions app/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
from sqlalchemy import MetaData

convention = {
"ix": 'ix_%(column_0_label)s',
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s"
"pk": "pk_%(table_name)s",
}

metadata = MetaData(naming_convention=convention)
Expand All @@ -15,6 +15,7 @@

class CRUDMixin(object):
"""Mixin that adds convenience methods for CRUD (create, read, update, delete) operations."""

@classmethod
def get(cls, id):
"""Read a record from the database by id"""
Expand Down Expand Up @@ -47,4 +48,5 @@ def delete(self, commit=True):

class Model(CRUDMixin, db.Model):
"""Base model class that includes CRUD convenience methods."""

__abstract__ = True
4 changes: 3 additions & 1 deletion app/emails.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ def send_email(to: str, subject: str, body: str) -> dict:
return response.json()


def send_confirmation_email(to: str, name: str, datafile: str, link_time: int, url: str, landing_page: str) -> dict:
def send_confirmation_email(
to: str, name: str, datafile: str, link_time: int, url: str, landing_page: str
) -> dict:
return send_email(
to=to,
subject=f"Your access link for the {datafile} data file",
Expand Down
74 changes: 54 additions & 20 deletions app/forms.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,43 @@
from flask_wtf import FlaskForm
from wtforms import StringField, SubmitField, BooleanField, TextAreaField, SelectMultipleField, widgets, RadioField, EmailField
from wtforms.validators import DataRequired, StopValidation, Email
from wtforms import (
BooleanField,
EmailField,
RadioField,
SelectMultipleField,
StringField,
SubmitField,
TextAreaField,
widgets,
)
from wtforms.validators import DataRequired, Email, StopValidation

use_choices = [
('research', 'Research: Analyze the dataset, identify trends, build new models.'),
('teaching', 'Teaching: Prepare lessons, create assignments, teach data analysis skills.'),
('software', 'Software Development: Build tools, create applications, integrate data sources.'),
('reporting', 'Reporting: Checking on external use of data.'),
('notsure', 'I’m not sure yet.'),
('other', 'Other (please specify below).')
("research", "Research: Analyze the dataset, identify trends, build new models."),
(
"teaching",
"Teaching: Prepare lessons, create assignments, teach data analysis skills.",
),
(
"software",
"Software Development: Build tools, create applications, integrate data sources.",
),
("reporting", "Reporting: Checking on external use of data."),
("notsure", "I’m not sure yet."),
("other", "Other (please specify below)."),
]


class MultiCheckboxField(SelectMultipleField):
widget = widgets.ListWidget(html_tag='ul', prefix_label=False)
widget = widgets.ListWidget(html_tag="ul", prefix_label=False)
option_widget = widgets.CheckboxInput()


class MultiCheckboxAtLeastOne:
def __call__(self, form, field):
if len(field.data) == 0:
raise StopValidation('At least one option must be selected.')
if 'other' in field.data and form.additional_info.data == '':
raise StopValidation('Please provide information about your usage below.')
raise StopValidation("At least one option must be selected.")
if "other" in field.data and form.additional_info.data == "":
raise StopValidation("Please provide information about your usage below.")


class EscapedLabel:
Expand All @@ -35,11 +50,30 @@ def __html__(self):

class RequestAccessForm(FlaskForm):
# form based on User properties
name = StringField('Name*', validators=[DataRequired()])
organisation = StringField('Organisational Affiliation*', validators=[DataRequired()])
email = EmailField('Email (to receive the link to the data file)*', validators=[DataRequired(), Email()])
contact = RadioField('Can we follow up with you at this email address to discuss your planned use of the data file?*', choices=[(True, 'Yes'), (False, 'No')], coerce=bool, validators=[DataRequired()])
primary_use = MultiCheckboxField('What is your planned use for the data? (check all that apply)*', choices=use_choices, validators=[MultiCheckboxAtLeastOne()])
additional_info = TextAreaField('Tell us more about how you plan to use the data!')
accept_terms = BooleanField(EscapedLabel('I acknowledge the <a href="https://support.datacite.org/docs/datacite-data-file-use-policy" target="_blank">DataCite Data File Use Policy</a>'), validators=[DataRequired()])
submit = SubmitField('Send link')
name = StringField("Name*", validators=[DataRequired()])
organisation = StringField(
"Organisational Affiliation*", validators=[DataRequired()]
)
email = EmailField(
"Email (to receive the link to the data file)*",
validators=[DataRequired(), Email()],
)
contact = RadioField(
"Can we follow up with you at this email address to discuss your planned use of the data file?*",
choices=[(True, "Yes"), (False, "No")],
coerce=bool,
validators=[DataRequired()],
)
primary_use = MultiCheckboxField(
"What is your planned use for the data? (check all that apply)*",
choices=use_choices,
validators=[MultiCheckboxAtLeastOne()],
)
additional_info = TextAreaField("Tell us more about how you plan to use the data!")
accept_terms = BooleanField(
EscapedLabel(
'I acknowledge the <a href="https://support.datacite.org/docs/datacite-data-file-use-policy" target="_blank">DataCite Data File Use Policy</a>'
),
validators=[DataRequired()],
)
submit = SubmitField("Send link")
103 changes: 65 additions & 38 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,31 @@
from datetime import datetime
import re
from jinja2 import pass_eval_context
from markupsafe import Markup, escape
from flask import Flask, request, abort, redirect, render_template, url_for
from datetime import datetime
from os import getenv

from dotenv import load_dotenv
from flask import Flask, abort, redirect, render_template, request, url_for
from flask_bootstrap import Bootstrap5
from flask_jwt_extended import JWTManager, decode_token
from flask_jwt_extended.exceptions import JWTDecodeError
from httpx import HTTPError
from flask_bootstrap import Bootstrap5
from dotenv import load_dotenv
from markupsafe import Markup

from emails import send_confirmation_email
from models import User, Datafile
from database import db
from emails import send_confirmation_email
from forms import RequestAccessForm
from models import Datafile, User

load_dotenv()
app = Flask(__name__)

app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config["TEMPLATES_AUTO_RELOAD"] = True
app.config["SECRET_KEY"] = getenv("SECRET_KEY", "changeme")
app.config["SQLALCHEMY_DATABASE_URI"] = getenv("DATABASE_URL", "sqlite:///tesem-dev.db")
app.config["JWT_SECRET_KEY"] = getenv("JWT_SECRET_KEY", "changeme")
app.config["MAILGUN_API_KEY"] = getenv("MAILGUN_API_KEY", "changeme")
app.config["MAILGUN_ENDPOINT"] = getenv("MAILGUN_ENDPOINT", "https://api.mailgun.net/v3/mg.datacite.org")
app.config["MAILGUN_ENDPOINT"] = getenv(
"MAILGUN_ENDPOINT", "https://api.mailgun.net/v3/mg.datacite.org"
)
app.config["MAILGUN_DOMAIN"] = getenv("MAILGUN_DOMAIN", "mg.datacite.org")
app.config["EMAIL_FROM"] = getenv("EMAIL_FROM", "DataCite Data Files Service")
app.config["EMAIL_ADDRESS"] = getenv("EMAIL_ADDRESS", "[email protected]")
Expand All @@ -34,29 +36,23 @@


@app.template_filter()
# @pass_eval_context
def nl2br(value):
br = "<br>\n"

# if eval_ctx.autoescape:
# value = escape(value)
# br = Markup(br)

result = "\n\n".join(
f"<p>{br.join(p.splitlines())}</p>"
for p in re.split(r"(?:\r\n|\r(?!\n)|\n){2,}", value)
)
return Markup(result) #Markup(result) if eval_ctx.autoescape else result
return Markup(result)


@app.route('/')
@app.route('/datafiles')
@app.route("/")
@app.route("/datafiles")
def index():
datafiles = Datafile.query.all()
return render_template('index.html', datafiles=datafiles)
return render_template("index.html", datafiles=datafiles)


@app.route('/datafiles/<datafile_slug>', methods=['GET', 'POST'])
@app.route("/datafiles/<datafile_slug>", methods=["GET", "POST"])
def datafile(datafile_slug):
datafile = Datafile.query.filter_by(slug=datafile_slug).first()
if not datafile:
Expand All @@ -77,25 +73,43 @@ def datafile(datafile_slug):

token = u.generate_token()
try:
send_confirmation_email(u.email, u.name, datafile.name, 24,
url_for('download_datafile', datafile_slug=datafile.slug, token=token, _external=True), datafile.landing_page)
return render_template('success.html', datafile=datafile)
send_confirmation_email(
u.email,
u.name,
datafile.name,
24,
url_for(
"download_datafile",
datafile_slug=datafile.slug,
token=token,
_external=True,
),
datafile.landing_page,
)
return render_template("success.html", datafile=datafile)
except HTTPError as e:
abort(500, "Something went wrong - please contact [email protected]") # todo: error handling
abort(
500, "Something went wrong - please contact [email protected]"
) # todo: error handling

return render_template('datafile.html', datafile=datafile, form=form)
return render_template("datafile.html", datafile=datafile, form=form)


@app.route('/datafiles/<datafile_slug>/download')
@app.route("/datafiles/<datafile_slug>/download")
def download_datafile(datafile_slug):
token = request.args.get('token')
if not token or token == '':
abort(403, "Missing token - please check the link in your email and try again, making sure to include the ?token= parameter")
token = request.args.get("token")
if not token or token == "":
abort(
403,
"Missing token - please check the link in your email and try again, making sure to include the ?token= parameter",
)
try:
token_json = decode_token(token)
u = User.get(token_json['sub'])
u = User.get(token_json["sub"])
if not u:
abort(403, "Invalid token - please check the link in your email and try again")
abort(
403, "Invalid token - please check the link in your email and try again"
)

datafile = Datafile.query.filter_by(slug=datafile_slug).first()
if not datafile:
Expand All @@ -110,28 +124,41 @@ def download_datafile(datafile_slug):

@app.errorhandler(404)
def page_not_found(message):
return render_template('error.html', code=404, status="Page not found", message=message), 404
return (
render_template(
"error.html", code=404, status="Page not found", message=message
),
404,
)


@app.errorhandler(500)
def internal_server_error(message):
return render_template('error.html', code=500, status="Internal server error", message=message), 500
return (
render_template(
"error.html", code=500, status="Internal server error", message=message
),
500,
)


@app.errorhandler(403)
def forbidden(message):
return render_template('error.html', code=403, status="Forbidden", message=message), 403
return (
render_template("error.html", code=403, status="Forbidden", message=message),
403,
)


@app.route('/support')
@app.route("/support")
def support():
return redirect("https://support.datacite.org/", code=302)


@app.route('/contact')
@app.route("/contact")
def contact():
return redirect("https://datacite.org/contact", code=302)


if __name__ == '__main__':
if __name__ == "__main__":
app.run()
29 changes: 15 additions & 14 deletions app/models.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from datetime import timedelta

import boto3
from botocore.exceptions import ClientError
from flask import url_for, render_template

from database import db, Model
from flask import render_template, url_for
from flask_jwt_extended import create_access_token

from database import Model, db


class User(Model):
__tablename__ = 'users'
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(120), nullable=False)
name = db.Column(db.String(240), nullable=False)
Expand All @@ -18,15 +19,15 @@ class User(Model):
additional_info = db.Column(db.Text, nullable=True)
access_date = db.Column(db.DateTime, nullable=True)
requested_access_date = db.Column(db.DateTime, nullable=True)
datafile_id = db.Column(db.Integer, db.ForeignKey('datafiles.id'))
datafile = db.relationship('Datafile', lazy=True)
datafile_id = db.Column(db.Integer, db.ForeignKey("datafiles.id"))
datafile = db.relationship("Datafile", lazy=True)

def generate_token(self):
return create_access_token(identity=self.id, expires_delta=timedelta(days=1))


class Datafile(Model):
__tablename__ = 'datafiles'
__tablename__ = "datafiles"
id = db.Column(db.Integer, primary_key=True)
slug = db.Column(db.String(24), unique=True, nullable=False, index=True)
name = db.Column(db.String(120), nullable=False)
Expand All @@ -41,10 +42,7 @@ def generate_link(self):
try:
url = s3_client.generate_presigned_url(
ClientMethod="get_object",
Params={
"Bucket": "pidgraph-data-dumps",
"Key": self.filename
},
Params={"Bucket": "pidgraph-data-dumps", "Key": self.filename},
ExpiresIn=300,
)
return url
Expand All @@ -58,12 +56,15 @@ def landing_page(self):
if self.doi:
return f"https://doi.org/{self.doi}"
else:
return url_for('datafile', datafile_slug=self.slug, _external=True)
return url_for("datafile", datafile_slug=self.slug, _external=True)

@property
def access_button(self):
return render_template("components/access_button.html", link=url_for('datafile', datafile_slug=self.slug))
return render_template(
"components/access_button.html",
link=url_for("datafile", datafile_slug=self.slug),
)

@property
def short_description(self):
return self.description.split('\n')[0]
return self.description.split("\n")[0]

0 comments on commit bbe9ad0

Please sign in to comment.