Skip to content

Commit

Permalink
Merge pull request #34 from citysciencelab/name_process_title
Browse files Browse the repository at this point in the history
Add process_title and name to job
  • Loading branch information
KaiVolland authored Aug 30, 2024
2 parents 52c0811 + ccbec9b commit f75f490
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 24 deletions.
2 changes: 1 addition & 1 deletion migrations/versions/1.0.0_add_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# revision identifiers, used by Alembic.
revision = '1.0.0'
down_revision = None
branch_labels = None
branch_labels = 'add_user'
depends_on = None


Expand Down
23 changes: 23 additions & 0 deletions migrations/versions/1.0.1_add_process_title_and_name.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Add process title
Revision ID: 1.0.1
Revises:
Create Date: 2024-08-29 11:14
"""
from alembic import op
from sqlalchemy import Column, String

# revision identifiers, used by Alembic.
revision = '1.0.1'
down_revision = '1.0.0'
branch_labels = 'add_process_title_and_name'
depends_on = '1.0.0'


def upgrade():
op.add_column('jobs', Column('process_title', String()))
op.add_column('jobs', Column('name', String()))

def downgrade():
pass
46 changes: 31 additions & 15 deletions src/ump/api/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class Job:
"links",
"parameters",
"results_metadata",
"name",
"process_title"
]

SORTABLE_COLUMNS = [
Expand All @@ -54,6 +56,8 @@ def __init__(self, job_id = None, user = None):
self.updated = None
self.results_metadata = {}
self.user_id = None
self.name = None
self.process_title = None

if job_id and not self._init_from_db(job_id, user):
raise CustomException(f"Job could not be found!")
Expand All @@ -63,14 +67,18 @@ def create(
job_id=None,
remote_job_id=None,
process_id_with_prefix=None,
process_title=None,
name=None,
parameters={},
user=None
):
self._set_attributes(
job_id=job_id,
remote_job_id=remote_job_id,
process_id_with_prefix=process_id_with_prefix,
parameters=parameters,
job_id,
remote_job_id,
process_id_with_prefix,
process_title,
name,
parameters,
user_id = user
)

Expand All @@ -79,11 +87,11 @@ def create(
self.updated = datetime.utcnow()

query = """
INSERT INTO jobs
(job_id, remote_job_id, process_id, provider_prefix, provider_url, status, progress, parameters, message, created, started, finished, updated, user_id)
VALUES
(%(job_id)s, %(remote_job_id)s, %(process_id)s, %(provider_prefix)s, %(provider_url)s, %(status)s, %(progress)s, %(parameters)s, %(message)s, %(created)s, %(started)s, %(finished)s, %(updated)s, %(user_id)s)
"""
INSERT INTO jobs
(job_id, remote_job_id, process_id, provider_prefix, provider_url, status, progress, parameters, message, created, started, finished, updated, user_id, process_title, name)
VALUES
(%(job_id)s, %(remote_job_id)s, %(process_id)s, %(provider_prefix)s, %(provider_url)s, %(status)s, %(progress)s, %(parameters)s, %(message)s, %(created)s, %(started)s, %(finished)s, %(updated)s, %(user_id)s, %(process_title)s, %(name)s)
"""
with DBHandler() as db:
db.run_query(query, query_params=self._to_dict())

Expand All @@ -94,12 +102,16 @@ def _set_attributes(
job_id=None,
remote_job_id=None,
process_id_with_prefix=None,
process_title=None,
name=None,
parameters={},
user_id = None
):
self.job_id = job_id
self.remote_job_id = remote_job_id
self.user_id = user_id
self.process_title = process_title
self.name = name

if remote_job_id and not job_id:
self.job_id = f"job-{remote_job_id}"
Expand Down Expand Up @@ -160,6 +172,8 @@ def _init_from_dict(self, data):
self.parameters = data["parameters"]
self.results_metadata = data["results_metadata"]
self.user_id = data['user_id']
self.process_title = data['process_title']
self.name = data['name']

def _to_dict(self):
return {
Expand All @@ -175,6 +189,8 @@ def _to_dict(self):
"finished": self.finished,
"updated": self.updated,
"progress": self.progress,
"process_title": self.process_title,
"name": self.name,
"parameters": json.dumps(self.parameters),
"results_metadata": json.dumps(self.results_metadata),
"user_id": self.user_id,
Expand All @@ -184,12 +200,12 @@ def save(self):
self.updated = datetime.utcnow()

query = """
UPDATE jobs SET
(process_id, provider_prefix, provider_url, status, progress, parameters, message, created, started, finished, updated, results_metadata)
=
(%(process_id)s, %(provider_prefix)s, %(provider_url)s, %(status)s, %(progress)s, %(parameters)s, %(message)s, %(created)s, %(started)s, %(finished)s, %(updated)s, %(results_metadata)s)
WHERE job_id = %(job_id)s
"""
UPDATE jobs SET
(process_id, provider_prefix, provider_url, status, progress, parameters, message, created, started, finished, updated, results_metadata)
=
(%(process_id)s, %(provider_prefix)s, %(provider_url)s, %(status)s, %(progress)s, %(parameters)s, %(message)s, %(created)s, %(started)s, %(finished)s, %(updated)s, %(results_metadata)s)
WHERE job_id = %(job_id)s
"""
with DBHandler() as db:
db.run_query(query, query_params=self._to_dict())

Expand Down
46 changes: 38 additions & 8 deletions src/ump/api/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,19 @@ def __init__(self, process_id_with_prefix=None):

auth = g.get('auth_token')
role = f"{self.provider_prefix}_{self.process_id}"
if auth is None or \
not self.provider_prefix in auth['realm_access']['roles'] and \
not self.provider_prefix in auth['resource_access']['ump-client']['roles'] and \
not role in auth['realm_access']['roles'] and \
not role in auth['resource_access']['ump-client']['roles']:
raise InvalidUsage(
f"Process ID {self.process_id_with_prefix} is not known! Please check endpoint api/processes for a list of available processes."
)
restricted_access = "authentication" in providers.PROVIDERS[self.provider_prefix]

if restricted_access:
if (
auth is None or
self.provider_prefix not in auth['realm_access']['roles'] and
self.provider_prefix not in auth['resource_access']['ump-client']['roles'] and
role not in auth['realm_access']['roles'] and
role not in auth['resource_access']['ump-client']['roles']
):
raise InvalidUsage(
f"Process ID {self.process_id_with_prefix} is not known! Please check endpoint api/processes for a list of available processes."
)

asyncio.run(self.set_details())

Expand Down Expand Up @@ -83,6 +88,12 @@ def validate_params(self, parameters):
if not self.inputs:
return

if not "job_name" in parameters:
raise InvalidUsage(
f"Parameter job_name is required",
payload={"parameter_description": self.inputs["job_name"]},
)

for input in self.inputs:
try:
if not "schema" in self.inputs[input]:
Expand Down Expand Up @@ -193,10 +204,21 @@ async def start_process_execution(self, request_body, user):
request_body["mode"] = "async"
p = providers.PROVIDERS[self.provider_prefix]

# extract job_name from request_body
name = request_body.pop("job_name")

try:
auth = providers.authenticate_provider(p)

async with aiohttp.ClientSession() as session:
process_response = await session.get(
f"{p['url']}/processes/{self.process_id}",
auth=auth,
headers={
"Content-type": "application/json",
"Accept": "application/json",
},
)
response = await session.post(
f"{p['url']}/processes/{self.process_id}/execution",
json=request_body,
Expand All @@ -209,6 +231,12 @@ async def start_process_execution(self, request_body, user):
},
)

process_response.raise_for_status()

if process_response.ok:
process_details = await process_response.json()
self.process_title = process_details["title"]

response.raise_for_status()

if response.ok and response.headers:
Expand All @@ -226,6 +254,8 @@ async def start_process_execution(self, request_body, user):
job.create(
remote_job_id=remote_job_id,
process_id_with_prefix=self.process_id_with_prefix,
process_title=self.process_title,
name=name,
parameters=request_body,
user=user
)
Expand Down

0 comments on commit f75f490

Please sign in to comment.