Skip to content

Commit

Permalink
Merge pull request #60 from XENONnT/uconfig_for_api_token
Browse files Browse the repository at this point in the history
Use uconfig for api config management
  • Loading branch information
ershockley authored Apr 14, 2022
2 parents f6f47e8 + 7c9409a commit 3b42b74
Showing 1 changed file with 116 additions and 37 deletions.
153 changes: 116 additions & 37 deletions utilix/processing_requests.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
from multiprocessing import context
import os
from typing import Literal
import rframe
import uuid
import datetime
import getpass
import pydantic
import requests
from itertools import product
from warnings import warn
import utilix

CACHE = {}
DEFAULT_ENV = '2022.03.5'
ENV_PATH = "/cvmfs/singularity.opensciencegrid.org/xenonnt/base-environment:{tag}"

ENV_TAGS_URL = 'https://api.github.com/repos/xenonnt/base_environment/git/matching-refs/tags/'

API_URL = 'http://api.cmt.yossisprojects.com'

RSE = Literal['SURFSARA_USERDISK',
API_URL = 'https://api.xedocs.yossisprojects.com'

if utilix.uconfig is not None:
API_URL = utilix.uconfig.get('cmt2', 'api_url', fallback=API_URL)


RSE_TYPE = Literal['SURFSARA_USERDISK',
'SDSC_USERDISK',
'LNGS_USERDISK',
'UC_OSG_USERDISK',
Expand All @@ -22,7 +33,8 @@


def xeauth_user():
return os.environ.get('XEAUTH_USER', 'UNKNOWN')
return utilix.uconfig.get('cmt2', 'api_user', fallback='unknown')


def get_envs():
r = requests.get(ENV_TAGS_URL)
Expand All @@ -44,12 +56,12 @@ def default_env():
class ProcessingRequest(rframe.BaseSchema):
'''Schema definition for a processing request
'''
_NAME = 'processing_requests'
_ALIAS = 'processing_requests'

data_type: str = rframe.Index()
lineage_hash: str = rframe.Index()
run_id: str = rframe.Index()
destination: RSE = rframe.Index(default='UC_DALI_USERDISK')
destination: RSE_TYPE = rframe.Index(default='UC_DALI_USERDISK')
user: str = pydantic.Field(default_factory=xeauth_user)
request_date: datetime.datetime = pydantic.Field(default_factory=datetime.datetime.utcnow)

Expand All @@ -64,87 +76,154 @@ def pre_update(self, datasource, new):
if new.run_id != self.run_id:
raise ValueError(new.run)

@classmethod
def default_datasource(cls):
return processing_api()

def latest_context(self):
import utilix
import pymongo

contexts = utilix.xent_collection('contexts')
ctx = contexts.find_one({f'hashes.{self.data_type}': self.lineage_hash},
projection={'context': '$name', 'env': '$tag', '_id': 0},
sort=[('date_added', pymongo.DESCENDING)])
return dict(ctx)

def create_job(self):
kwargs = self.latest_context()
kwargs.update(self.dict())
kwargs['job_id'] = uuid.uuid4()
return ProcessingJob(**kwargs)


class ProcessingJob(rframe.BaseSchema):
_NAME = 'processing_jobs'
_ALIAS = 'processing_jobs'

job_id: str = rframe.Index()
location: RSE = rframe.Index()
destination: RSE = rframe.Index()
job_id: uuid.UUID = rframe.Index()
destination: RSE_TYPE = rframe.Index()
env: str = rframe.Index()
context: str = rframe.Index()
data_type: str = rframe.Index()
run_id: str = rframe.Index()
lineage_hash: str = rframe.Index()

submission_time: datetime.datetime = pydantic.Field(default_factory=datetime.datetime.utcnow)
location: RSE_TYPE = None
submission_time: datetime.datetime = None
completed: bool = False
progress: int = 0
error: str = ''

def create_workflow(self, **kwargs):
from outsource.Outsource import Outsource

image = ENV_PATH.format(tag=self.env)
wf = Outsource([int(self.run_id)],
context_name=self.context,
image=image,
wf_id=str(self.job_id),
**kwargs)
return wf

def submit(self, **kwargs):
wf = self.create_workflow(**kwargs)
return wf.submit_workflow()


def xeauth_login(readonly=True):
try:
import xeauth
scope = 'read:all' if readonly else 'write:all'
xetoken = xeauth.cmt_login(scope=scope)
username = xetoken.profile.get('name', None)
if username is not None:
os.environ['XEAUTH_USER'] = username

scopes = ['openid', 'profile', 'email', 'offline_access', 'read:all']
if not readonly:
scopes.append('write:all')
audience = utilix.uconfig.get('cmt2', 'api_audience',
fallback='https://api.cmt.xenonnt.org')

username = utilix.uconfig.get('cmt2', 'api_user', fallback=None)
password = utilix.uconfig.get('cmt2', 'api_password', fallback=None)

if username is None or password is None:
xetoken = xeauth.login(scopes=scopes, audience=audience)
utilix.uconfig.set('cmt2', 'api_user', xetoken.username)

else:
xetoken = xeauth.user_login(username,
password,
scopes=scopes)
return xetoken.access_token
except ImportError:
warn('xeauth not installed, cannot retrieve token automatically.')
except:
return None


def processing_api(token=None, readonly=True):
cache_key = f'api_token_readonly_{readonly}'
def valid_token(token, readonly=True):
if not token:
return False

if readonly:
scope = 'read:all'
else:
scope = 'write:all'

try:
import xeauth
claims = xeauth.certs.extract_verified_claims(token)
assert scope in claims.get('scope', '')
except:
return False

return True

def processing_api(token=None, readonly=False):

if token is None:
token = CACHE.get(cache_key, None)
if token is None:
token = os.environ.get('PROCESSING_API_TOKEN', None)
token = utilix.uconfig.get('cmt2', 'api_token', fallback=None)

if not valid_token(token, readonly=readonly):
token = None

if token is None:
token = xeauth_login(readonly=readonly)

if token is None:
token = getpass.getpass('API token: ')

headers = {}
if token:
headers['Authorization'] = f"Bearer {token}"
CACHE[cache_key] = token
utilix.uconfig.set('cmt2', 'api_token', token)

client = rframe.RestClient(f'{API_URL}/processing_requests',
headers=headers,)
return client


try:
import strax
import tqdm

@strax.Context.add_method
def request_processing(context, run_ids, data_type,
def request_processing(context,
run_ids,
data_types,
priority=-1, comments='',
destination='DALI',
destination='UC_DALI_USERDISK',
token=None, submit=True):
client = processing_api(token=token, readonly=False)

run_ids = strax.to_str_tuple(run_ids)
data_types = strax.to_str_tuple(data_types)

combinations = product(run_ids, data_types)

requests = []
for run_id in tqdm.tqdm(run_ids, desc='Requesting processing'):

for run_id, data_type in tqdm.tqdm(combinations, desc='Requesting processing'):
lineage_hash = context.key_for(run_id, data_type).lineage_hash

kwargs = dict(data_type=data_type,
lineage_hash=lineage_hash,
run_id=run_id,
priority=priority,
destination=destination,
comments=comments)
lineage_hash=lineage_hash,
run_id=run_id,
priority=priority,
destination=destination,
comments=comments)

request = ProcessingRequest(**kwargs)
requests.append(request)
Expand Down

0 comments on commit 3b42b74

Please sign in to comment.