From 2c8858a3e4a925dc62fcb9ec7289101d26d37f95 Mon Sep 17 00:00:00 2001 From: David Date: Thu, 28 Jan 2021 13:43:35 +0000 Subject: [PATCH 01/15] initial mirror for vtt based on Dreem implementation --- .DS_Store | Bin 0 -> 6148 bytes data_transfer/config.py | 2 + data_transfer/devices/vtt.py | 96 +++++++++++++++++++ data_transfer/jobs/vtt.py | 15 +++ data_transfer/lib/vtt.py | 172 +++++++++++++++++++++++++++++++++++ data_transfer/main.py | 37 +++++++- data_transfer/tasks/vtt.py | 0 7 files changed, 319 insertions(+), 3 deletions(-) create mode 100644 .DS_Store create mode 100644 data_transfer/devices/vtt.py create mode 100644 data_transfer/jobs/vtt.py create mode 100644 data_transfer/lib/vtt.py create mode 100644 data_transfer/tasks/vtt.py diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..7b9d33741ed04ce8ac5299ff580d34195ae35713 GIT binary patch literal 6148 zcmeH~F^XlKsY>SV?}7 zi2>N|ciI9YfHmETFAp;_<^v|2amRW3`MzFmx2qRvA06ne-@9Kng5F0slS}y0a!*XM8#sVgw)umczJ?S%NHHAZxO9vO=?*9xPie z#t^SZJ6ZC&nrxlD9hSp~<(h|QedXQs^_Dh{|EYi^Z%?xsT7a` zZ>E3^`{RDcm&&vC>GiyR%Brs$os7#F9)1Ft_))x}hjG97f~?8b$qG$B0wIHf6nLou Ee?2S`p8x;= literal 0 HcmV?d00001 diff --git a/data_transfer/config.py b/data_transfer/config.py index 8c3c330..253e060 100644 --- a/data_transfer/config.py +++ b/data_transfer/config.py @@ -32,6 +32,8 @@ class Settings(BaseSettings): dreem_login_url: str dreem_api_url: str + # TODO: add VTT url, api and .csv (temporary until UCAM data), etc. + # Hardcoded as this data structure is not # supported unless JSON is stored in .env dreem: dict = { diff --git a/data_transfer/devices/vtt.py b/data_transfer/devices/vtt.py new file mode 100644 index 0000000..beefd71 --- /dev/null +++ b/data_transfer/devices/vtt.py @@ -0,0 +1,96 @@ +from data_transfer.config import config +from data_transfer.lib import vtt as vtt_api +from data_transfer.services import inventory +from data_transfer.schemas.record import Record +from data_transfer.db import create_record, \ + read_record, update_record, all_filenames +from data_transfer import utils + +from pathlib import Path +from datetime import datetime +import json + + +class Vtt: + def __init__(self, study_site: str): + """ + Use study_site name to build auth as there are multiple sites/credentials. + """ + self.study_site = study_site + self.user_id, self.session = self.authenticate() + + def authenticate(self): + """ + Authenticate once when object created to share session between requests + """ + # credentials = config.dreem[self.study_site] + # token, user_id = dreem_api.get_token(credentials) + # session = dreem_api.get_session(token) + # return user_id, session + # TODO: update to reflect VTT / S3 setup + + def download_metadata(self) -> None: + """ + Before downloading raw data we need to know which files to download. + VTT provides a weekly dump in an S3 bucket + + TODO: understand VTT data structure in the bucket and how to extract \ + metadata from these files + + This method downloads and stores the metadata as file, and stores most + relevant metadata as a Record in the database in preparation for next stages. + + NOTE/TODO: will run as BATCH job. + """ + # Note: includes metadata for ALL data records, therefore we must filter them + # all_records = dreem_api.get_restricted_list(self.session, self.user_id) + + # Only add records that are not known in the DB based on stored filename (ID and filename in dreem) + unknown_records = [r for r in all_records if r['id'] not in set(all_filenames())] + + # Aim: construct valid record (metadata) and add to DB + for item in unknown_records: + + # device_serial = dreem_api.serial_by_device(item['device']) + device_id = inventory.device_id_by_serial(device_serial) + + # NOTE: lookup Patient ID by email: if None (e.g. personal email used), then use inventory + # patient_id = dreem_api.patient_id_by_user(item['user']) or inventory.patient_id_by_device_id(device_id) + + # NOTE: using inventory to determine intended wear time period. + # Useful for historical data, but (TODO) should be replaced with UCAM API. + history = inventory.device_history(device_id)[patient_id] + + start_wear = utils.format_inventory_weartime(history['checkout']) + # NOTE: if device not returned the current day is used. + end_wear = utils.format_inventory_weartime(history['checkin']) + + record = Record( + # NOTE: id is a unique uuid used to GET raw data from Dreem + filename=item['id'], + device_id=device_id, + patient_id=patient_id, + start_wear=start_wear, + end_wear=end_wear + ) + + create_record(record) + + path = Path(config.storage_vol / f'{record.filename}-meta.json') + # Store metadata from memory to file + utils.write_json(path, item) + + def download_file(self, mongo_id: str) -> None: + """ + Downloads files and store them to {config.storage_vol} + + Tracking: {db.record.is_downloaded} indicates success + + NOTE/TODO: is run as a task. + """ + record = read_record(mongo_id) + # is_downloaded_success = dreem_api.download_file(self.session, record.filename) + if is_downloaded_success: + record.is_downloaded = is_downloaded_success + update_record(record) + # TODO: otherwise re-start task to try again \ No newline at end of file diff --git a/data_transfer/jobs/vtt.py b/data_transfer/jobs/vtt.py new file mode 100644 index 0000000..363e5d9 --- /dev/null +++ b/data_transfer/jobs/vtt.py @@ -0,0 +1,15 @@ + +from data_transfer.devices.vtt import Vtt +from data_transfer.schemas.record import Record + + +def batch_metadata(study_site: str) -> [Record]: + """ + VTT dumps the patient data weekly in an S3 bucket (Per study site??) + + This method stores the records we have not yet processed. + + NOTE/TODO: this cron batch should be run weekly to accomodate the weekly VTT S3 dump + """ + vtt = Vtt(study_site) + vtt.download_metadata() diff --git a/data_transfer/lib/vtt.py b/data_transfer/lib/vtt.py new file mode 100644 index 0000000..48ae0a1 --- /dev/null +++ b/data_transfer/lib/vtt.py @@ -0,0 +1,172 @@ +# NOTE: some methods exist for convience and will be advanced +# for the CVS, e.g. serial_by_device_uuid might to hit a live +# endpoint rather than pull from a CSV file. + +from data_transfer.config import config +from pathlib import Path +from dataclasses import dataclass +from typing import Optional +from functools import lru_cache # note: @cache exists in py >= 3.9 +import requests +import csv + + +@dataclass +class VttFileDownload: + """Use as CLI arguments for Dreem's library.""" + directory: Path = config.storage_vol + # TODO: Check VTT's data file type and format and choose appopriately + # ftype: str = "h5" + + +# Define location and filetype to download +args = VttFileDownload() + + +def get_token(creds: dict) -> (str, str): + """ + Generates a JWT token with the credentials for API access + """ + # TODO: mimick for S3 bucket + # res = requests.post(f"{config.dreem_login_url}/token/", auth=creds) + # TODO: catch/log exception + res.raise_for_status() + resp = res.json() + return (resp["token"], resp["user_id"]) + + +def get_session(token: str) -> requests.Session: + """ + Builds a requests session object with the required header + """ + # TODO: check if still needed / adapt for VTT + session = requests.Session() + session.headers.update({"Authorization": f"Bearer {token}"}) + return session + + +def get_restricted_list(session: requests.Session, user_id: str) -> [dict]: + """ + GET all records (metadata) associated with a restricted account (e.g. study site) + TODO: adapt for VTT approach + """ + url = f"{config.dreem_api_url}/dreem/algorythm/restricted_list/{user_id}/record/" + results = [] + + while url: + response = session.get(url) + # TODO: catch/log exception + response.raise_for_status() + response = response.json() + url = response["next"] + results.extend(response["results"]) + return results + + +def download_file(session: requests.Session, record_id: str) -> bool: + """ + GET specified file based on known record + # TODO: adapt for VTT approach + """ + file_type = args.ftype + url = __build_url(file_type, record_id) + + response = session.get(url) + # TODO: catch/log exception + response.raise_for_status() + response = response.json() + + # Used to lookup the download URL + key = "url" if file_type == "raw" else "data_url" + file_url = response[key] + + # NOTE: file_url may be empty if a file is unavailable: + # (1): file is on dreem headband but not uploaded + # (2): file is being processed by dreem's algorithms + if not file_url: + return False + # TODO: for now, assumes that this method never throws ... + __download_file(file_url, record_id) + return True + + +def serial_by_device(uuid: str) -> Optional[str]: + """ + Lookup Device ID by dreem headband serial + # TODO: adapt for VTT approach + """ + serial = __key_by_value(config.dreem_devices, uuid) + return serial + + +def patient_id_by_user(uuid: str) -> Optional[str]: + """ + Lookup Patient ID by dreem email hash. + # TODO: adapt for VTT approach + """ + email = __key_by_value(config.dreem_users, uuid) + if email: + # Initially, gmail accounts were used for dreem based on SamsungA40 ID. + # We want to skip this record and determine PatientID elsewhere. + if 'gmail' in email: + return None + email = email.replace("ideafast.", "") + email = email.split("@")[0] + # Create consistency in Patient ID format + if email[1] != "-": + email = f"{email[0]}-{email[1:]}" + # TODO: what if personal email used (e.g., in CVS)? + return email + + +def __key_by_value(filename: Path, needle: str) -> Optional[str]: + """ + Helper method to find key in CSV by value (needle) + """ + data = __load_csv(filename) + for pair in data: + key, value = pair + if needle == value: + return key + return None + + +def __download_file(url: str, record_id: str) -> None: + """ + Builds the target filename and starts downloading the file to disk + # NOTE: this approach is specific for Dreem + # TODO: adapt for VTT approach + """ + # NOTE: can be simplified once we agree on specific file type + ext = 'tar.gz' if args.ftype == 'raw' else args.ftype + file_path = Path(config.storage_vol) / f"{record_id}.{ext}" + response = requests.get(url, stream=True) + + with open(file_path, "wb") as output_file: + for chunk in response.iter_content(chunk_size=1024): + if chunk: + output_file.write(chunk) + + +def __build_url(file_type: str, record_id: str) -> (str, str): + """ + Build URL based on file info. This varied by filetype, e.g., raw/EDF/H5. + # TODO: adapt for VTT approach + """ + # TODO: can be simplified once we determine if we will download only H5 data. + if file_type == "raw": + url = f"{config.dreem_api_url}/dreem/dataupload/data/{record_id}" + else: + url = f"{config.dreem_api_url}/dreem/algorythm/record/{record_id}/{file_type}/" + return url + + +@lru_cache(maxsize=None) +def __load_csv(path: Path) -> [tuple]: + """ + Load full CSV into memory for quick lookup + """ + with open(path) as csv_file: + csv_reader = csv.reader(csv_file, delimiter=',') + data = [(row[0], row[1]) for row in csv_reader] + return data \ No newline at end of file diff --git a/data_transfer/main.py b/data_transfer/main.py index 0e23b24..6dc76cd 100644 --- a/data_transfer/main.py +++ b/data_transfer/main.py @@ -1,5 +1,9 @@ -from data_transfer.jobs import dreem as dreem_jobs, shared as shared_jobs -from data_transfer.tasks import dreem as dreem_tasks, shared as shared_tasks +from data_transfer.jobs import shared as shared_jobs, \ + dreem as dreem_jobs, \ + vtt as vtt_jobs +from data_transfer.tasks import shared as shared_tasks, \ + dreem as dreem_tasks, \ + vtt as vtt_tasks from data_transfer.db import records_not_downloaded @@ -29,6 +33,33 @@ def dreem_dag(study_site): # All said folders FOR ALL DEVICES are uploaded once per day # shared_jobs.batch_upload_data() +def vtt_dag(study_site): + """ + Directed acyclic graph (DAG) representing dreem data pipeline: + + batch_metadata + ->task_download_data + ->task_preprocess_data + ->task_prepare_data + ->batch_upload_data + + NOTE/TODO: this method simulates the pipeline. + """ + # TODO: adjust for VTT + # dreem_jobs.batch_metadata(study_site) + + # NOTE: simulates initiation of tasks upon metadata download + # TODO: in practice the tasks should be invoked within the batch job. + for record in records_not_downloaded(): + # Each task should be idempotent. Returned values feeds subsequent task + # mongoid = dreem_tasks.task_download_data(study_site, record.id) + # mongoid = dreem_tasks.task_preprocess_data(mongoid) + # Data is finalised and moved to a folder in /uploading/ + shared_tasks.task_prepare_data("DRM", mongoid) + + # All said folders FOR ALL DEVICES are uploaded once per day + # shared_jobs.batch_upload_data() if __name__ == "__main__": - dreem_dag("newcastle") \ No newline at end of file + # dreem_dag("munster") + vtt_dag("munster") \ No newline at end of file diff --git a/data_transfer/tasks/vtt.py b/data_transfer/tasks/vtt.py new file mode 100644 index 0000000..e69de29 From 1331cdcde4beb8edab9bcf10021068b5d1c7f323 Mon Sep 17 00:00:00 2001 From: David Date: Fri, 29 Jan 2021 14:36:08 +0000 Subject: [PATCH 02/15] establish connection with S3 bucket and pull metadata --- .dtransfer.env.example | 4 + data_transfer/config.py | 7 +- data_transfer/devices/vtt.py | 73 +++++++---- data_transfer/jobs/vtt.py | 6 +- data_transfer/lib/vtt.py | 116 +++++++++-------- data_transfer/main.py | 27 ++-- poetry.lock | 246 +++++++++++++++++++++++------------ pyproject.toml | 1 + 8 files changed, 298 insertions(+), 182 deletions(-) diff --git a/.dtransfer.env.example b/.dtransfer.env.example index 4ea3741..5d61288 100644 --- a/.dtransfer.env.example +++ b/.dtransfer.env.example @@ -5,6 +5,10 @@ DATABASE_URI="mongodb://localhost:27017/" SUPPORT_BASE_URL="" SUPPORT_TOKEN="" +VTT_AWS_ACCESSKEY="" +VTT_AWS_SECRET_ACCESSKEY="" +VTT_AWS_BUCKET_NAME="" + DREEM_LOGIN_URL="" DREEM_API_URL="" diff --git a/data_transfer/config.py b/data_transfer/config.py index 253e060..1d7e36d 100644 --- a/data_transfer/config.py +++ b/data_transfer/config.py @@ -29,11 +29,14 @@ class Settings(BaseSettings): support_base_url: str support_token: str + # NOTE: VTT does not differentiate between study sites + vtt_aws_accesskey: str + vtt_aws_secret_accesskey: str + vtt_aws_bucket_name: str + dreem_login_url: str dreem_api_url: str - # TODO: add VTT url, api and .csv (temporary until UCAM data), etc. - # Hardcoded as this data structure is not # supported unless JSON is stored in .env dreem: dict = { diff --git a/data_transfer/devices/vtt.py b/data_transfer/devices/vtt.py index beefd71..30d3a1e 100644 --- a/data_transfer/devices/vtt.py +++ b/data_transfer/devices/vtt.py @@ -10,41 +10,56 @@ from datetime import datetime import json - class Vtt: - def __init__(self, study_site: str): + def __init__(self): """ - Use study_site name to build auth as there are multiple sites/credentials. + Set up a session to the s3 bucket to use in multiple steps """ - self.study_site = study_site - self.user_id, self.session = self.authenticate() + # self.bucket = self.authenticate() + self.bucket = "" def authenticate(self): """ Authenticate once when object created to share session between requests """ - # credentials = config.dreem[self.study_site] - # token, user_id = dreem_api.get_token(credentials) - # session = dreem_api.get_session(token) - # return user_id, session - # TODO: update to reflect VTT / S3 setup + credentials = dict( + aws_ak=config.vtt_aws_accesskey, + aws_ask=config.vtt_aws_secret_accesskey, + bucket_name=config.vtt_aws_bucket_name, + ) + + bucket = vtt_api.get_bucket(credentials) + return bucket def download_metadata(self) -> None: """ Before downloading raw data we need to know which files to download. - VTT provides a weekly dump in an S3 bucket - - TODO: understand VTT data structure in the bucket and how to extract \ - metadata from these files - - This method downloads and stores the metadata as file, and stores most - relevant metadata as a Record in the database in preparation for next stages. + VTT provides a weekly dump in an S3 bucket, with the structure: + . + ├── data_yyyy_mm_dd + │ ├── users.txt + │ ├── raw + │ | ├── vtt_patient (hash) + │ | └── vtt_patient (hash) + │ | ├── vtt_patient (hash).zip + | | └── vtt_patient (hash).nfo + | └── files + | (audio files - unknown structure) + . + + NOTE: + - users.txt contains the user hashes present in this dump (equal to subfolders) + - .nfo files contain the time spans of the specific hash dumps, e.g.: + - Start time : 2020-10-28T00:00:01Z + - End time : 2020-11-24T00:00:00.001Z NOTE/TODO: will run as BATCH job. """ - # Note: includes metadata for ALL data records, therefore we must filter them - # all_records = dreem_api.get_restricted_list(self.session, self.user_id) - + + # NOTE: includes metadata for ALL data records, therefore we must filter them + # NOTE: currently downloads all dumps (inc. historical) TODO: only since last run + all_records = vtt_api.get_list(self.bucket) + # Only add records that are not known in the DB based on stored filename (ID and filename in dreem) unknown_records = [r for r in all_records if r['id'] not in set(all_filenames())] @@ -52,21 +67,25 @@ def download_metadata(self) -> None: for item in unknown_records: # device_serial = dreem_api.serial_by_device(item['device']) - device_id = inventory.device_id_by_serial(device_serial) + # device_id = inventory.device_id_by_serial(device_serial) + device_id = 'x' # NOTE: lookup Patient ID by email: if None (e.g. personal email used), then use inventory # patient_id = dreem_api.patient_id_by_user(item['user']) or inventory.patient_id_by_device_id(device_id) + patient_id = 'x' # NOTE: using inventory to determine intended wear time period. # Useful for historical data, but (TODO) should be replaced with UCAM API. - history = inventory.device_history(device_id)[patient_id] + # history = inventory.device_history(device_id)[patient_id] - start_wear = utils.format_inventory_weartime(history['checkout']) + # start_wear = utils.format_inventory_weartime(history['checkout']) # NOTE: if device not returned the current day is used. - end_wear = utils.format_inventory_weartime(history['checkin']) + # end_wear = utils.format_inventory_weartime(history['checkin']) + start_wear = utils.format_inventory_weartime("2020-01-01 01:01:01") + end_wear = utils.format_inventory_weartime("2020-01-01 01:01:01") record = Record( - # NOTE: id is a unique uuid used to GET raw data from Dreem + # NOTE: id is the hashedID provided by VTT filename=item['id'], device_id=device_id, patient_id=patient_id, @@ -79,7 +98,7 @@ def download_metadata(self) -> None: path = Path(config.storage_vol / f'{record.filename}-meta.json') # Store metadata from memory to file utils.write_json(path, item) - + def download_file(self, mongo_id: str) -> None: """ Downloads files and store them to {config.storage_vol} @@ -89,7 +108,7 @@ def download_file(self, mongo_id: str) -> None: NOTE/TODO: is run as a task. """ record = read_record(mongo_id) - # is_downloaded_success = dreem_api.download_file(self.session, record.filename) + is_downloaded_success = vtt_api.download_file(self.session, record.filename) if is_downloaded_success: record.is_downloaded = is_downloaded_success update_record(record) diff --git a/data_transfer/jobs/vtt.py b/data_transfer/jobs/vtt.py index 363e5d9..720ebd0 100644 --- a/data_transfer/jobs/vtt.py +++ b/data_transfer/jobs/vtt.py @@ -3,13 +3,13 @@ from data_transfer.schemas.record import Record -def batch_metadata(study_site: str) -> [Record]: +def batch_metadata() -> [Record]: """ - VTT dumps the patient data weekly in an S3 bucket (Per study site??) + VTT dumps the patient data weekly in an S3 bucket This method stores the records we have not yet processed. NOTE/TODO: this cron batch should be run weekly to accomodate the weekly VTT S3 dump """ - vtt = Vtt(study_site) + vtt = Vtt() vtt.download_metadata() diff --git a/data_transfer/lib/vtt.py b/data_transfer/lib/vtt.py index 48ae0a1..e5156ab 100644 --- a/data_transfer/lib/vtt.py +++ b/data_transfer/lib/vtt.py @@ -7,87 +7,95 @@ from dataclasses import dataclass from typing import Optional from functools import lru_cache # note: @cache exists in py >= 3.9 -import requests import csv +import boto3 @dataclass class VttFileDownload: """Use as CLI arguments for Dreem's library.""" + # TODO: how does this relate to VTT? directory: Path = config.storage_vol # TODO: Check VTT's data file type and format and choose appopriately - # ftype: str = "h5" + ftype: str = "zip" # Define location and filetype to download args = VttFileDownload() - -def get_token(creds: dict) -> (str, str): +def get_bucket(creds: dict): # TODO: add typing from https://pypi.org/project/mypy-boto3-s3/ """ - Generates a JWT token with the credentials for API access + Builds a S3 session bucket object to interface with the S3 bucket + # NOTE: https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/s3.html#bucket """ - # TODO: mimick for S3 bucket - # res = requests.post(f"{config.dreem_login_url}/token/", auth=creds) - # TODO: catch/log exception - res.raise_for_status() - resp = res.json() - return (resp["token"], resp["user_id"]) + session = boto3.session.Session( + aws_access_key_id=creds['aws_ak'], + aws_secret_access_key=creds['aws_ask'], + ) + + s3 = session.resource('s3') + bucket = s3.Bucket(creds['bucket_name']) + + return bucket -def get_session(token: str) -> requests.Session: +def get_list(bucket) -> [dict]: # TODO: add typing """ - Builds a requests session object with the required header + GET all records (metadata) from the AWS S3 bucket """ - # TODO: check if still needed / adapt for VTT - session = requests.Session() - session.headers.update({"Authorization": f"Bearer {token}"}) - return session + # using boto3 Resource instead of Client for a readable + # object oriented approach. Not 100% coverage of AWS API functionality + + # returns a list of s3.ObjectSummary() objects containing keys + # contains metadata, such as .last_modified + + # objects = bucket.objects.all() + # object_paths = [obj.key for obj in summary] + + # ignore users.txt files - will deduct from folders + split_paths = [p.split('/') for p in object_paths if p.find('users.txt') == -1] -def get_restricted_list(session: requests.Session, user_id: str) -> [dict]: - """ - GET all records (metadata) associated with a restricted account (e.g. study site) - TODO: adapt for VTT approach - """ - url = f"{config.dreem_api_url}/dreem/algorythm/restricted_list/{user_id}/record/" - results = [] + # generally follows [dump_date, raw/files, patienthash, patienthash.file (.nfo or .zip)] + # transform list to set to list to remove duplicates due to each object having a full path + patients = dict() + for path in split_paths: + if path[2] in patients: + patients[path[2]].add() + patients.add(path[2]) - while url: - response = session.get(url) - # TODO: catch/log exception - response.raise_for_status() - response = response.json() - url = response["next"] - results.extend(response["results"]) - return results + patients = list(set(p[2] for p in split_paths)) + patients = [dict(id=p) for p in patients] + # dump_dates = set(d[0] for d in split_paths) + return patients -def download_file(session: requests.Session, record_id: str) -> bool: - """ - GET specified file based on known record - # TODO: adapt for VTT approach - """ - file_type = args.ftype - url = __build_url(file_type, record_id) - response = session.get(url) - # TODO: catch/log exception - response.raise_for_status() - response = response.json() +# def download_file(session: requests.Session, record_id: str) -> bool: +# """ +# GET specified file based on known record +# # TODO: adapt for VTT approach +# """ +# file_type = args.ftype +# url = __build_url(file_type, record_id) + +# response = session.get(url) +# # TODO: catch/log exception +# response.raise_for_status() +# response = response.json() - # Used to lookup the download URL - key = "url" if file_type == "raw" else "data_url" - file_url = response[key] +# # Used to lookup the download URL +# key = "url" if file_type == "raw" else "data_url" +# file_url = response[key] - # NOTE: file_url may be empty if a file is unavailable: - # (1): file is on dreem headband but not uploaded - # (2): file is being processed by dreem's algorithms - if not file_url: - return False - # TODO: for now, assumes that this method never throws ... - __download_file(file_url, record_id) - return True +# # NOTE: file_url may be empty if a file is unavailable: +# # (1): file is on dreem headband but not uploaded +# # (2): file is being processed by dreem's algorithms +# if not file_url: +# return False +# # TODO: for now, assumes that this method never throws ... +# __download_file(file_url, record_id) +# return True def serial_by_device(uuid: str) -> Optional[str]: diff --git a/data_transfer/main.py b/data_transfer/main.py index 6dc76cd..a60a4c6 100644 --- a/data_transfer/main.py +++ b/data_transfer/main.py @@ -33,33 +33,28 @@ def dreem_dag(study_site): # All said folders FOR ALL DEVICES are uploaded once per day # shared_jobs.batch_upload_data() -def vtt_dag(study_site): +def vtt_dag(): """ - Directed acyclic graph (DAG) representing dreem data pipeline: - - batch_metadata - ->task_download_data - ->task_preprocess_data - ->task_prepare_data - ->batch_upload_data + Directed acyclic graph (DAG) representing data_transfer pipeline as used for all devices + Note that VTT does not distinguish between study sites NOTE/TODO: this method simulates the pipeline. """ # TODO: adjust for VTT - # dreem_jobs.batch_metadata(study_site) + vtt_jobs.batch_metadata() # NOTE: simulates initiation of tasks upon metadata download # TODO: in practice the tasks should be invoked within the batch job. - for record in records_not_downloaded(): - # Each task should be idempotent. Returned values feeds subsequent task - # mongoid = dreem_tasks.task_download_data(study_site, record.id) - # mongoid = dreem_tasks.task_preprocess_data(mongoid) - # Data is finalised and moved to a folder in /uploading/ - shared_tasks.task_prepare_data("DRM", mongoid) + # for record in records_not_downloaded(): + # # Each task should be idempotent. Returned values feeds subsequent task + # mongoid = vtt_tasks.task_download_data(study_site, record.id) + # mongoid = vtt_tasks.task_preprocess_data(mongoid) + # # Data is finalised and moved to a folder in /uploading/ + # # shared_tasks.task_prepare_data("DRM", mongoid) # All said folders FOR ALL DEVICES are uploaded once per day # shared_jobs.batch_upload_data() if __name__ == "__main__": # dreem_dag("munster") - vtt_dag("munster") \ No newline at end of file + vtt_dag() \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index b364e39..7e9db30 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,66 +1,90 @@ [[package]] -category = "dev" -description = "Atomic file writes." -marker = "sys_platform == \"win32\"" name = "atomicwrites" +version = "1.4.0" +description = "Atomic file writes." +category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" -version = "1.4.0" [[package]] -category = "dev" -description = "Classes Without Boilerplate" name = "attrs" +version = "20.3.0" +description = "Classes Without Boilerplate" +category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" -version = "20.3.0" [package.extras] -dev = ["coverage (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "six", "zope.interface", "furo", "sphinx", "pre-commit"] +dev = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "six", "zope.interface", "furo", "sphinx", "pre-commit"] docs = ["furo", "sphinx", "zope.interface"] -tests = ["coverage (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "six", "zope.interface"] -tests_no_zope = ["coverage (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "six"] +tests = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "six", "zope.interface"] +tests_no_zope = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "six"] [[package]] +name = "boto3" +version = "1.16.61" +description = "The AWS SDK for Python" category = "main" -description = "Python package for providing Mozilla's CA Bundle." -name = "certifi" optional = false python-versions = "*" -version = "2020.6.20" + +[package.dependencies] +botocore = ">=1.19.61,<1.20.0" +jmespath = ">=0.7.1,<1.0.0" +s3transfer = ">=0.3.0,<0.4.0" [[package]] +name = "botocore" +version = "1.19.61" +description = "Low-level, data-driven core of boto 3." category = "main" -description = "Universal encoding detector for Python 2 and 3" -name = "chardet" optional = false python-versions = "*" -version = "3.0.4" + +[package.dependencies] +jmespath = ">=0.7.1,<1.0.0" +python-dateutil = ">=2.1,<3.0.0" +urllib3 = {version = ">=1.25.4,<1.27", markers = "python_version != \"3.4\""} [[package]] +name = "certifi" +version = "2020.6.20" +description = "Python package for providing Mozilla's CA Bundle." category = "main" -description = "Composable command line interface toolkit" +optional = false +python-versions = "*" + +[[package]] +name = "chardet" +version = "3.0.4" +description = "Universal encoding detector for Python 2 and 3" +category = "main" +optional = false +python-versions = "*" + +[[package]] name = "click" +version = "7.1.2" +description = "Composable command line interface toolkit" +category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" -version = "7.1.2" [[package]] -category = "dev" -description = "Cross-platform colored terminal text." -marker = "sys_platform == \"win32\"" name = "colorama" +version = "0.4.4" +description = "Cross-platform colored terminal text." +category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" -version = "0.4.4" [[package]] -category = "main" -description = "FastAPI framework, high performance, easy to learn, fast to code, ready for production" name = "fastapi" +version = "0.61.1" +description = "FastAPI framework, high performance, easy to learn, fast to code, ready for production" +category = "main" optional = false python-versions = ">=3.6" -version = "0.61.1" [package.dependencies] pydantic = ">=1.0.0,<2.0.0" @@ -68,71 +92,79 @@ starlette = "0.13.6" [package.extras] all = ["requests (>=2.24.0,<3.0.0)", "aiofiles (>=0.5.0,<0.6.0)", "jinja2 (>=2.11.2,<3.0.0)", "python-multipart (>=0.0.5,<0.0.6)", "itsdangerous (>=1.1.0,<2.0.0)", "pyyaml (>=5.3.1,<6.0.0)", "graphene (>=2.1.8,<3.0.0)", "ujson (>=3.0.0,<4.0.0)", "orjson (>=3.2.1,<4.0.0)", "email_validator (>=1.1.1,<2.0.0)", "uvicorn (>=0.11.5,<0.12.0)", "async_exit_stack (>=1.0.1,<2.0.0)", "async_generator (>=1.10,<2.0.0)"] -dev = ["python-jose (>=3.1.0,<4.0.0)", "passlib (>=1.7.2,<2.0.0)", "autoflake (>=1.3.1,<2.0.0)", "flake8 (>=3.8.3,<4.0.0)", "uvicorn (>=0.11.5,<0.12.0)", "graphene (>=2.1.8,<3.0.0)"] +dev = ["python-jose[cryptography] (>=3.1.0,<4.0.0)", "passlib[bcrypt] (>=1.7.2,<2.0.0)", "autoflake (>=1.3.1,<2.0.0)", "flake8 (>=3.8.3,<4.0.0)", "uvicorn (>=0.11.5,<0.12.0)", "graphene (>=2.1.8,<3.0.0)"] doc = ["mkdocs (>=1.1.2,<2.0.0)", "mkdocs-material (>=5.5.0,<6.0.0)", "markdown-include (>=0.5.1,<0.6.0)", "mkdocs-markdownextradata-plugin (>=0.1.7,<0.2.0)", "typer (>=0.3.0,<0.4.0)", "typer-cli (>=0.0.9,<0.0.10)", "pyyaml (>=5.3.1,<6.0.0)"] -test = ["pytest (5.4.3)", "pytest-cov (2.10.0)", "pytest-asyncio (>=0.14.0,<0.15.0)", "mypy (0.782)", "flake8 (>=3.8.3,<4.0.0)", "black (19.10b0)", "isort (>=5.0.6,<6.0.0)", "requests (>=2.24.0,<3.0.0)", "httpx (>=0.14.0,<0.15.0)", "email_validator (>=1.1.1,<2.0.0)", "sqlalchemy (>=1.3.18,<2.0.0)", "peewee (>=3.13.3,<4.0.0)", "databases (>=0.3.2,<0.4.0)", "orjson (>=3.2.1,<4.0.0)", "async_exit_stack (>=1.0.1,<2.0.0)", "async_generator (>=1.10,<2.0.0)", "python-multipart (>=0.0.5,<0.0.6)", "aiofiles (>=0.5.0,<0.6.0)", "flask (>=1.1.2,<2.0.0)"] +test = ["pytest (==5.4.3)", "pytest-cov (==2.10.0)", "pytest-asyncio (>=0.14.0,<0.15.0)", "mypy (==0.782)", "flake8 (>=3.8.3,<4.0.0)", "black (==19.10b0)", "isort (>=5.0.6,<6.0.0)", "requests (>=2.24.0,<3.0.0)", "httpx (>=0.14.0,<0.15.0)", "email_validator (>=1.1.1,<2.0.0)", "sqlalchemy (>=1.3.18,<2.0.0)", "peewee (>=3.13.3,<4.0.0)", "databases[sqlite] (>=0.3.2,<0.4.0)", "orjson (>=3.2.1,<4.0.0)", "async_exit_stack (>=1.0.1,<2.0.0)", "async_generator (>=1.10,<2.0.0)", "python-multipart (>=0.0.5,<0.0.6)", "aiofiles (>=0.5.0,<0.6.0)", "flask (>=1.1.2,<2.0.0)"] [[package]] -category = "main" -description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1" name = "h11" +version = "0.11.0" +description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1" +category = "main" optional = false python-versions = "*" -version = "0.11.0" [[package]] -category = "main" -description = "Internationalized Domain Names in Applications (IDNA)" name = "idna" +version = "2.10" +description = "Internationalized Domain Names in Applications (IDNA)" +category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" -version = "2.10" [[package]] -category = "dev" -description = "iniconfig: brain-dead simple config-ini parsing" name = "iniconfig" +version = "1.1.1" +description = "iniconfig: brain-dead simple config-ini parsing" +category = "dev" optional = false python-versions = "*" -version = "1.1.1" [[package]] -category = "dev" -description = "Core utilities for Python packages" +name = "jmespath" +version = "0.10.0" +description = "JSON Matching Expressions" +category = "main" +optional = false +python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" + +[[package]] name = "packaging" +version = "20.7" +description = "Core utilities for Python packages" +category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" -version = "20.7" [package.dependencies] pyparsing = ">=2.0.2" [[package]] -category = "dev" -description = "plugin and hook calling mechanisms for python" name = "pluggy" +version = "0.13.1" +description = "plugin and hook calling mechanisms for python" +category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" -version = "0.13.1" [package.extras] dev = ["pre-commit", "tox"] [[package]] -category = "dev" -description = "library with cross-python path, ini-parsing, io, code, log facilities" name = "py" +version = "1.9.0" +description = "library with cross-python path, ini-parsing, io, code, log facilities" +category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" -version = "1.9.0" [[package]] -category = "main" -description = "Data validation and settings management using python 3.6 type hinting" name = "pydantic" +version = "1.6.1" +description = "Data validation and settings management using python 3.6 type hinting" +category = "main" optional = false python-versions = ">=3.6" -version = "1.6.1" [package.extras] dotenv = ["python-dotenv (>=0.10.4)"] @@ -140,12 +172,12 @@ email = ["email-validator (>=1.0.3)"] typing_extensions = ["typing-extensions (>=3.7.2)"] [[package]] -category = "main" -description = "Python driver for MongoDB " name = "pymongo" +version = "3.11.2" +description = "Python driver for MongoDB " +category = "main" optional = false python-versions = "*" -version = "3.11.2" [package.extras] aws = ["pymongo-auth-aws (<2.0.0)"] @@ -158,25 +190,25 @@ tls = ["ipaddress"] zstd = ["zstandard"] [[package]] -category = "dev" -description = "Python parsing module" name = "pyparsing" +version = "2.4.7" +description = "Python parsing module" +category = "dev" optional = false python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" -version = "2.4.7" [[package]] -category = "dev" -description = "pytest: simple powerful testing with Python" name = "pytest" +version = "6.1.2" +description = "pytest: simple powerful testing with Python" +category = "dev" optional = false python-versions = ">=3.5" -version = "6.1.2" [package.dependencies] -atomicwrites = ">=1.0" +atomicwrites = {version = ">=1.0", markers = "sys_platform == \"win32\""} attrs = ">=17.4.0" -colorama = "*" +colorama = {version = "*", markers = "sys_platform == \"win32\""} iniconfig = "*" packaging = "*" pluggy = ">=0.12,<1.0" @@ -184,27 +216,38 @@ py = ">=1.8.2" toml = "*" [package.extras] -checkqa_mypy = ["mypy (0.780)"] +checkqa_mypy = ["mypy (==0.780)"] testing = ["argcomplete", "hypothesis (>=3.56)", "mock", "nose", "requests", "xmlschema"] [[package]] +name = "python-dateutil" +version = "2.8.1" +description = "Extensions to the standard Python datetime module" category = "main" -description = "Add .env support to your django/flask apps in development and deployments" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" + +[package.dependencies] +six = ">=1.5" + +[[package]] name = "python-dotenv" +version = "0.14.0" +description = "Add .env support to your django/flask apps in development and deployments" +category = "main" optional = false python-versions = "*" -version = "0.14.0" [package.extras] cli = ["click (>=5.0)"] [[package]] -category = "main" -description = "Python HTTP for Humans." name = "requests" +version = "2.24.0" +description = "Python HTTP for Humans." +category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" -version = "2.24.0" [package.dependencies] certifi = ">=2017.4.17" @@ -214,47 +257,66 @@ urllib3 = ">=1.21.1,<1.25.0 || >1.25.0,<1.25.1 || >1.25.1,<1.26" [package.extras] security = ["pyOpenSSL (>=0.14)", "cryptography (>=1.3.4)"] -socks = ["PySocks (>=1.5.6,<1.5.7 || >1.5.7)", "win-inet-pton"] +socks = ["PySocks (>=1.5.6,!=1.5.7)", "win-inet-pton"] [[package]] +name = "s3transfer" +version = "0.3.4" +description = "An Amazon S3 Transfer Manager" category = "main" -description = "The little ASGI library that shines." +optional = false +python-versions = "*" + +[package.dependencies] +botocore = ">=1.12.36,<2.0a.0" + +[[package]] +name = "six" +version = "1.15.0" +description = "Python 2 and 3 compatibility utilities" +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*" + +[[package]] name = "starlette" +version = "0.13.6" +description = "The little ASGI library that shines." +category = "main" optional = false python-versions = ">=3.6" -version = "0.13.6" [package.extras] full = ["aiofiles", "graphene", "itsdangerous", "jinja2", "python-multipart", "pyyaml", "requests", "ujson"] [[package]] -category = "dev" -description = "Python Library for Tom's Obvious, Minimal Language" name = "toml" +version = "0.10.2" +description = "Python Library for Tom's Obvious, Minimal Language" +category = "dev" optional = false python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" -version = "0.10.2" [[package]] -category = "main" -description = "HTTP library with thread-safe connection pooling, file post, and more." name = "urllib3" +version = "1.25.11" +description = "HTTP library with thread-safe connection pooling, file post, and more." +category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4" -version = "1.25.11" [package.extras] brotli = ["brotlipy (>=0.6.0)"] secure = ["pyOpenSSL (>=0.14)", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "certifi", "ipaddress"] -socks = ["PySocks (>=1.5.6,<1.5.7 || >1.5.7,<2.0)"] +socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"] [[package]] -category = "main" -description = "The lightning-fast ASGI server." name = "uvicorn" +version = "0.12.2" +description = "The lightning-fast ASGI server." +category = "main" optional = false python-versions = "*" -version = "0.12.2" [package.dependencies] click = ">=7.0.0,<8.0.0" @@ -264,9 +326,9 @@ h11 = ">=0.8" standard = ["websockets (>=8.0.0,<9.0.0)", "watchgod (>=0.6,<0.7)", "python-dotenv (>=0.13)", "PyYAML (>=5.1)", "httptools (>=0.1.0,<0.2.0)", "uvloop (>=0.14.0)", "colorama (>=0.4)"] [metadata] -content-hash = "92039d76dc46bf5a8bcef2e3faef8f45eb74d88871ad3fc64a48d8933eea6044" -lock-version = "1.0" +lock-version = "1.1" python-versions = "^3.8" +content-hash = "471b26ac750f7f87cfc1f89b85bbf7930d25b197304a22e98330267b6970cc12" [metadata.files] atomicwrites = [ @@ -277,6 +339,14 @@ attrs = [ {file = "attrs-20.3.0-py2.py3-none-any.whl", hash = "sha256:31b2eced602aa8423c2aea9c76a724617ed67cf9513173fd3a4f03e3a929c7e6"}, {file = "attrs-20.3.0.tar.gz", hash = "sha256:832aa3cde19744e49938b91fea06d69ecb9e649c93ba974535d08ad92164f700"}, ] +boto3 = [ + {file = "boto3-1.16.61-py2.py3-none-any.whl", hash = "sha256:8343a8e87c07cfebd1ca26b21b841f0875f28622e9a08e8dd2d1085881f9a6fd"}, + {file = "boto3-1.16.61.tar.gz", hash = "sha256:38f503f0502aba4251dff4d19057c31a7b9dd0f54333df5521f8931ee4c65e26"}, +] +botocore = [ + {file = "botocore-1.19.61-py2.py3-none-any.whl", hash = "sha256:59da7b91ebd57362b482d3963392f2944296d6dc79877bf716e1c2671a208a74"}, + {file = "botocore-1.19.61.tar.gz", hash = "sha256:3245c9e996143bcfdea73d105145ca733fcd7d5afe744a8760612fc449c3f810"}, +] certifi = [ {file = "certifi-2020.6.20-py2.py3-none-any.whl", hash = "sha256:8fc0819f1f30ba15bdb34cceffb9ef04d99f420f68eb75d901e9560b8749fc41"}, {file = "certifi-2020.6.20.tar.gz", hash = "sha256:5930595817496dd21bb8dc35dad090f1c2cd0adfaf21204bf6732ca5d8ee34d3"}, @@ -309,6 +379,10 @@ iniconfig = [ {file = "iniconfig-1.1.1-py2.py3-none-any.whl", hash = "sha256:011e24c64b7f47f6ebd835bb12a743f2fbe9a26d4cecaa7f53bc4f35ee9da8b3"}, {file = "iniconfig-1.1.1.tar.gz", hash = "sha256:bc3af051d7d14b2ee5ef9969666def0cd1a000e121eaea580d4a313df4b37f32"}, ] +jmespath = [ + {file = "jmespath-0.10.0-py2.py3-none-any.whl", hash = "sha256:cdf6525904cc597730141d61b36f2e4b8ecc257c420fa2f4549bac2c2d0cb72f"}, + {file = "jmespath-0.10.0.tar.gz", hash = "sha256:b85d0567b8666149a93172712e68920734333c0ce7e89b78b3e987f71e5ed4f9"}, +] packaging = [ {file = "packaging-20.7-py2.py3-none-any.whl", hash = "sha256:eb41423378682dadb7166144a4926e443093863024de508ca5c9737d6bc08376"}, {file = "packaging-20.7.tar.gz", hash = "sha256:05af3bb85d320377db281cf254ab050e1a7ebcbf5410685a9a407e18a1f81236"}, @@ -414,6 +488,10 @@ pytest = [ {file = "pytest-6.1.2-py3-none-any.whl", hash = "sha256:4288fed0d9153d9646bfcdf0c0428197dba1ecb27a33bb6e031d002fa88653fe"}, {file = "pytest-6.1.2.tar.gz", hash = "sha256:c0a7e94a8cdbc5422a51ccdad8e6f1024795939cc89159a0ae7f0b316ad3823e"}, ] +python-dateutil = [ + {file = "python-dateutil-2.8.1.tar.gz", hash = "sha256:73ebfe9dbf22e832286dafa60473e4cd239f8592f699aa5adaf10050e6e1823c"}, + {file = "python_dateutil-2.8.1-py2.py3-none-any.whl", hash = "sha256:75bb3f31ea686f1197762692a9ee6a7550b59fc6ca3a1f4b5d7e32fb98e2da2a"}, +] python-dotenv = [ {file = "python-dotenv-0.14.0.tar.gz", hash = "sha256:8c10c99a1b25d9a68058a1ad6f90381a62ba68230ca93966882a4dbc3bc9c33d"}, {file = "python_dotenv-0.14.0-py2.py3-none-any.whl", hash = "sha256:c10863aee750ad720f4f43436565e4c1698798d763b63234fb5021b6c616e423"}, @@ -422,6 +500,14 @@ requests = [ {file = "requests-2.24.0-py2.py3-none-any.whl", hash = "sha256:fe75cc94a9443b9246fc7049224f75604b113c36acb93f87b80ed42c44cbb898"}, {file = "requests-2.24.0.tar.gz", hash = "sha256:b3559a131db72c33ee969480840fff4bb6dd111de7dd27c8ee1f820f4f00231b"}, ] +s3transfer = [ + {file = "s3transfer-0.3.4-py2.py3-none-any.whl", hash = "sha256:1e28620e5b444652ed752cf87c7e0cb15b0e578972568c6609f0f18212f259ed"}, + {file = "s3transfer-0.3.4.tar.gz", hash = "sha256:7fdddb4f22275cf1d32129e21f056337fd2a80b6ccef1664528145b72c49e6d2"}, +] +six = [ + {file = "six-1.15.0-py2.py3-none-any.whl", hash = "sha256:8b74bedcbbbaca38ff6d7491d76f2b06b3592611af620f8426e82dddb04a5ced"}, + {file = "six-1.15.0.tar.gz", hash = "sha256:30639c035cdb23534cd4aa2dd52c3bf48f06e5f4a941509c8bafd8ce11080259"}, +] starlette = [ {file = "starlette-0.13.6-py3-none-any.whl", hash = "sha256:bd2ffe5e37fb75d014728511f8e68ebf2c80b0fa3d04ca1479f4dc752ae31ac9"}, {file = "starlette-0.13.6.tar.gz", hash = "sha256:ebe8ee08d9be96a3c9f31b2cb2a24dbdf845247b745664bd8a3f9bd0c977fdbc"}, diff --git a/pyproject.toml b/pyproject.toml index 205f424..f395067 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,7 @@ uvicorn = "^0.12.2" requests = "^2.24.0" python-dotenv = "^0.14.0" pymongo = "^3.11.2" +boto3 = "^1.16.61" [tool.poetry.dev-dependencies] click = "^7.1.2" From 1321f6739c3d8307ab855ead345e2d53725da329 Mon Sep 17 00:00:00 2001 From: David Date: Fri, 29 Jan 2021 16:20:05 +0000 Subject: [PATCH 03/15] download meta AND download files working --- data_transfer/devices/vtt.py | 10 ++-- data_transfer/lib/vtt.py | 92 ++++++++------------------------- data_transfer/main.py | 12 ++--- data_transfer/schemas/record.py | 2 + data_transfer/tasks/vtt.py | 21 ++++++++ 5 files changed, 56 insertions(+), 81 deletions(-) diff --git a/data_transfer/devices/vtt.py b/data_transfer/devices/vtt.py index 30d3a1e..9c8fc1c 100644 --- a/data_transfer/devices/vtt.py +++ b/data_transfer/devices/vtt.py @@ -15,8 +15,8 @@ def __init__(self): """ Set up a session to the s3 bucket to use in multiple steps """ - # self.bucket = self.authenticate() - self.bucket = "" + self.bucket = self.authenticate() + # self.bucket = "" def authenticate(self): """ @@ -59,7 +59,7 @@ def download_metadata(self) -> None: # NOTE: includes metadata for ALL data records, therefore we must filter them # NOTE: currently downloads all dumps (inc. historical) TODO: only since last run all_records = vtt_api.get_list(self.bucket) - + # Only add records that are not known in the DB based on stored filename (ID and filename in dreem) unknown_records = [r for r in all_records if r['id'] not in set(all_filenames())] @@ -87,6 +87,7 @@ def download_metadata(self) -> None: record = Record( # NOTE: id is the hashedID provided by VTT filename=item['id'], + vtt_dump_date=item['dumps'][0], device_id=device_id, patient_id=patient_id, start_wear=start_wear, @@ -99,6 +100,7 @@ def download_metadata(self) -> None: # Store metadata from memory to file utils.write_json(path, item) + def download_file(self, mongo_id: str) -> None: """ Downloads files and store them to {config.storage_vol} @@ -108,7 +110,7 @@ def download_file(self, mongo_id: str) -> None: NOTE/TODO: is run as a task. """ record = read_record(mongo_id) - is_downloaded_success = vtt_api.download_file(self.session, record.filename) + is_downloaded_success = vtt_api.download_file(self.bucket, record.filename, record.vtt_dump_date) if is_downloaded_success: record.is_downloaded = is_downloaded_success update_record(record) diff --git a/data_transfer/lib/vtt.py b/data_transfer/lib/vtt.py index e5156ab..975c23c 100644 --- a/data_transfer/lib/vtt.py +++ b/data_transfer/lib/vtt.py @@ -49,53 +49,33 @@ def get_list(bucket) -> [dict]: # TODO: add typing # returns a list of s3.ObjectSummary() objects containing keys # contains metadata, such as .last_modified - # objects = bucket.objects.all() - # object_paths = [obj.key for obj in summary] - + objects = bucket.objects.all() + object_paths = [obj.key for obj in objects] # ignore users.txt files - will deduct from folders split_paths = [p.split('/') for p in object_paths if p.find('users.txt') == -1] # generally follows [dump_date, raw/files, patienthash, patienthash.file (.nfo or .zip)] - # transform list to set to list to remove duplicates due to each object having a full path - patients = dict() - for path in split_paths: - if path[2] in patients: - patients[path[2]].add() - patients.add(path[2]) - - patients = list(set(p[2] for p in split_paths)) - patients = [dict(id=p) for p in patients] - # dump_dates = set(d[0] for d in split_paths) - - return patients - - -# def download_file(session: requests.Session, record_id: str) -> bool: -# """ -# GET specified file based on known record -# # TODO: adapt for VTT approach -# """ -# file_type = args.ftype -# url = __build_url(file_type, record_id) - -# response = session.get(url) -# # TODO: catch/log exception -# response.raise_for_status() -# response = response.json() + patients = list(set([p[2] for p in split_paths])) -# # Used to lookup the download URL -# key = "url" if file_type == "raw" else "data_url" -# file_url = response[key] + result = [] + for patient in patients: + # TODO: only process dump dates of interest (i.e. since last run) + result.append(dict(id=patient, dumps=list(set([p[0] for p in split_paths if p[2] == patient])))) + + return result + + +def download_file(bucket, patient_hash: str, dump_date: str,) -> bool: # TODO: add typing + """ + GET specified file based on known record + """ + ext = 'zip' if args.ftype == 'raw' else args.ftype + file_path = Path(config.storage_vol) / f"{patient_hash}.{ext}" + download_path = f"{dump_date}/raw/{patient_hash}/{patient_hash}.zip" -# # NOTE: file_url may be empty if a file is unavailable: -# # (1): file is on dreem headband but not uploaded -# # (2): file is being processed by dreem's algorithms -# if not file_url: -# return False -# # TODO: for now, assumes that this method never throws ... -# __download_file(file_url, record_id) -# return True + bucket.download_file(download_path, str(file_path)) + return True def serial_by_device(uuid: str) -> Optional[str]: @@ -137,37 +117,7 @@ def __key_by_value(filename: Path, needle: str) -> Optional[str]: if needle == value: return key return None - - -def __download_file(url: str, record_id: str) -> None: - """ - Builds the target filename and starts downloading the file to disk - # NOTE: this approach is specific for Dreem - # TODO: adapt for VTT approach - """ - # NOTE: can be simplified once we agree on specific file type - ext = 'tar.gz' if args.ftype == 'raw' else args.ftype - file_path = Path(config.storage_vol) / f"{record_id}.{ext}" - response = requests.get(url, stream=True) - - with open(file_path, "wb") as output_file: - for chunk in response.iter_content(chunk_size=1024): - if chunk: - output_file.write(chunk) - - -def __build_url(file_type: str, record_id: str) -> (str, str): - """ - Build URL based on file info. This varied by filetype, e.g., raw/EDF/H5. - # TODO: adapt for VTT approach - """ - # TODO: can be simplified once we determine if we will download only H5 data. - if file_type == "raw": - url = f"{config.dreem_api_url}/dreem/dataupload/data/{record_id}" - else: - url = f"{config.dreem_api_url}/dreem/algorythm/record/{record_id}/{file_type}/" - return url - + @lru_cache(maxsize=None) def __load_csv(path: Path) -> [tuple]: diff --git a/data_transfer/main.py b/data_transfer/main.py index a60a4c6..c725f4c 100644 --- a/data_transfer/main.py +++ b/data_transfer/main.py @@ -45,12 +45,12 @@ def vtt_dag(): # NOTE: simulates initiation of tasks upon metadata download # TODO: in practice the tasks should be invoked within the batch job. - # for record in records_not_downloaded(): - # # Each task should be idempotent. Returned values feeds subsequent task - # mongoid = vtt_tasks.task_download_data(study_site, record.id) - # mongoid = vtt_tasks.task_preprocess_data(mongoid) - # # Data is finalised and moved to a folder in /uploading/ - # # shared_tasks.task_prepare_data("DRM", mongoid) + for record in records_not_downloaded(): + # Each task should be idempotent. Returned values feeds subsequent task + mongoid = vtt_tasks.task_download_data(record.id) + mongoid = vtt_tasks.task_preprocess_data(mongoid) + # Data is finalised and moved to a folder in /uploading/ + # shared_tasks.task_prepare_data("DRM", mongoid) # All said folders FOR ALL DEVICES are uploaded once per day # shared_jobs.batch_upload_data() diff --git a/data_transfer/schemas/record.py b/data_transfer/schemas/record.py index 482e4a2..12ba6d1 100644 --- a/data_transfer/schemas/record.py +++ b/data_transfer/schemas/record.py @@ -15,6 +15,8 @@ class Record(BaseModel): start_wear: datetime end_wear: datetime + vtt_dump_date: Optional[str] + # Each stage of the pipeline is_downloaded: Optional[bool] = False is_processed: Optional[bool] = False diff --git a/data_transfer/tasks/vtt.py b/data_transfer/tasks/vtt.py index e69de29..a099188 100644 --- a/data_transfer/tasks/vtt.py +++ b/data_transfer/tasks/vtt.py @@ -0,0 +1,21 @@ + +from data_transfer.db import read_record, update_record +from data_transfer.devices.vtt import Vtt + + +def task_download_data(mongoid: str) -> str: + """ + Download a datafile for the VTT dump device. + """ + Vtt().download_file(mongoid) + return mongoid + + +def task_preprocess_data(mongoid: str) -> str: + """ + Preprocessing tasks on dreem data. + """ + record = read_record(mongoid) + record.is_processed = True + update_record(record) + return mongoid \ No newline at end of file From ca68ed72f41d034ff28c40b6f1e8845db92c6eb2 Mon Sep 17 00:00:00 2001 From: David Date: Mon, 1 Feb 2021 09:42:46 +0000 Subject: [PATCH 04/15] name change from vtt to vttsma --- data_transfer/config.py | 6 +++--- data_transfer/devices/{vtt.py => vttsma.py} | 18 +++++++++--------- data_transfer/jobs/{vtt.py => vttsma.py} | 6 +++--- data_transfer/lib/{vtt.py => vttsma.py} | 4 ++-- data_transfer/main.py | 15 +++++++-------- data_transfer/tasks/{vtt.py => vttsma.py} | 4 ++-- 6 files changed, 26 insertions(+), 27 deletions(-) rename data_transfer/devices/{vtt.py => vttsma.py} (89%) rename data_transfer/jobs/{vtt.py => vttsma.py} (75%) rename data_transfer/lib/{vtt.py => vttsma.py} (98%) rename data_transfer/tasks/{vtt.py => vttsma.py} (82%) diff --git a/data_transfer/config.py b/data_transfer/config.py index 1d7e36d..9d7d2fb 100644 --- a/data_transfer/config.py +++ b/data_transfer/config.py @@ -30,9 +30,9 @@ class Settings(BaseSettings): support_token: str # NOTE: VTT does not differentiate between study sites - vtt_aws_accesskey: str - vtt_aws_secret_accesskey: str - vtt_aws_bucket_name: str + vttsma_aws_accesskey: str + vttsma_aws_secret_accesskey: str + vttsma_aws_bucket_name: str dreem_login_url: str dreem_api_url: str diff --git a/data_transfer/devices/vtt.py b/data_transfer/devices/vttsma.py similarity index 89% rename from data_transfer/devices/vtt.py rename to data_transfer/devices/vttsma.py index 9c8fc1c..7ea435c 100644 --- a/data_transfer/devices/vtt.py +++ b/data_transfer/devices/vttsma.py @@ -1,5 +1,5 @@ from data_transfer.config import config -from data_transfer.lib import vtt as vtt_api +from data_transfer.lib import vttsma as vttsma_api from data_transfer.services import inventory from data_transfer.schemas.record import Record from data_transfer.db import create_record, \ @@ -10,7 +10,7 @@ from datetime import datetime import json -class Vtt: +class Vttsma: def __init__(self): """ Set up a session to the s3 bucket to use in multiple steps @@ -23,12 +23,12 @@ def authenticate(self): Authenticate once when object created to share session between requests """ credentials = dict( - aws_ak=config.vtt_aws_accesskey, - aws_ask=config.vtt_aws_secret_accesskey, - bucket_name=config.vtt_aws_bucket_name, + aws_ak=config.vttsma_aws_accesskey, + aws_ask=config.vttsma_aws_secret_accesskey, + bucket_name=config.vttsma_aws_bucket_name, ) - bucket = vtt_api.get_bucket(credentials) + bucket = vttsma_api.get_bucket(credentials) return bucket def download_metadata(self) -> None: @@ -58,7 +58,7 @@ def download_metadata(self) -> None: # NOTE: includes metadata for ALL data records, therefore we must filter them # NOTE: currently downloads all dumps (inc. historical) TODO: only since last run - all_records = vtt_api.get_list(self.bucket) + all_records = vttsma_api.get_list(self.bucket) # Only add records that are not known in the DB based on stored filename (ID and filename in dreem) unknown_records = [r for r in all_records if r['id'] not in set(all_filenames())] @@ -87,7 +87,7 @@ def download_metadata(self) -> None: record = Record( # NOTE: id is the hashedID provided by VTT filename=item['id'], - vtt_dump_date=item['dumps'][0], + vttsma_dump_date=item['dumps'][0], device_id=device_id, patient_id=patient_id, start_wear=start_wear, @@ -110,7 +110,7 @@ def download_file(self, mongo_id: str) -> None: NOTE/TODO: is run as a task. """ record = read_record(mongo_id) - is_downloaded_success = vtt_api.download_file(self.bucket, record.filename, record.vtt_dump_date) + is_downloaded_success = vttsma_api.download_file(self.bucket, record.filename, record.vttsma_dump_date) if is_downloaded_success: record.is_downloaded = is_downloaded_success update_record(record) diff --git a/data_transfer/jobs/vtt.py b/data_transfer/jobs/vttsma.py similarity index 75% rename from data_transfer/jobs/vtt.py rename to data_transfer/jobs/vttsma.py index 720ebd0..1531d08 100644 --- a/data_transfer/jobs/vtt.py +++ b/data_transfer/jobs/vttsma.py @@ -1,5 +1,5 @@ -from data_transfer.devices.vtt import Vtt +from data_transfer.devices.vttsma import Vttsma from data_transfer.schemas.record import Record @@ -11,5 +11,5 @@ def batch_metadata() -> [Record]: NOTE/TODO: this cron batch should be run weekly to accomodate the weekly VTT S3 dump """ - vtt = Vtt() - vtt.download_metadata() + vttsma = Vttsma() + vttsma.download_metadata() diff --git a/data_transfer/lib/vtt.py b/data_transfer/lib/vttsma.py similarity index 98% rename from data_transfer/lib/vtt.py rename to data_transfer/lib/vttsma.py index 975c23c..30ca440 100644 --- a/data_transfer/lib/vtt.py +++ b/data_transfer/lib/vttsma.py @@ -12,7 +12,7 @@ @dataclass -class VttFileDownload: +class VttsmaFileDownload: """Use as CLI arguments for Dreem's library.""" # TODO: how does this relate to VTT? directory: Path = config.storage_vol @@ -21,7 +21,7 @@ class VttFileDownload: # Define location and filetype to download -args = VttFileDownload() +args = VttsmaFileDownload() def get_bucket(creds: dict): # TODO: add typing from https://pypi.org/project/mypy-boto3-s3/ """ diff --git a/data_transfer/main.py b/data_transfer/main.py index c725f4c..eb6d150 100644 --- a/data_transfer/main.py +++ b/data_transfer/main.py @@ -1,9 +1,9 @@ from data_transfer.jobs import shared as shared_jobs, \ dreem as dreem_jobs, \ - vtt as vtt_jobs + vttsma as vttsma_jobs from data_transfer.tasks import shared as shared_tasks, \ dreem as dreem_tasks, \ - vtt as vtt_tasks + vttsma as vttsma_tasks from data_transfer.db import records_not_downloaded @@ -33,22 +33,21 @@ def dreem_dag(study_site): # All said folders FOR ALL DEVICES are uploaded once per day # shared_jobs.batch_upload_data() -def vtt_dag(): +def vttsma_dag(): """ Directed acyclic graph (DAG) representing data_transfer pipeline as used for all devices Note that VTT does not distinguish between study sites NOTE/TODO: this method simulates the pipeline. """ - # TODO: adjust for VTT - vtt_jobs.batch_metadata() + vttsma_jobs.batch_metadata() # NOTE: simulates initiation of tasks upon metadata download # TODO: in practice the tasks should be invoked within the batch job. for record in records_not_downloaded(): # Each task should be idempotent. Returned values feeds subsequent task - mongoid = vtt_tasks.task_download_data(record.id) - mongoid = vtt_tasks.task_preprocess_data(mongoid) + mongoid = vttsma_tasks.task_download_data(record.id) + mongoid = vttsma_tasks.task_preprocess_data(mongoid) # Data is finalised and moved to a folder in /uploading/ # shared_tasks.task_prepare_data("DRM", mongoid) @@ -57,4 +56,4 @@ def vtt_dag(): if __name__ == "__main__": # dreem_dag("munster") - vtt_dag() \ No newline at end of file + vttsma_dag() \ No newline at end of file diff --git a/data_transfer/tasks/vtt.py b/data_transfer/tasks/vttsma.py similarity index 82% rename from data_transfer/tasks/vtt.py rename to data_transfer/tasks/vttsma.py index a099188..20eb2d4 100644 --- a/data_transfer/tasks/vtt.py +++ b/data_transfer/tasks/vttsma.py @@ -1,13 +1,13 @@ from data_transfer.db import read_record, update_record -from data_transfer.devices.vtt import Vtt +from data_transfer.devices.vttsma import Vttsma def task_download_data(mongoid: str) -> str: """ Download a datafile for the VTT dump device. """ - Vtt().download_file(mongoid) + Vttsma().download_file(mongoid) return mongoid From 5b00198c3ade0cc5491792cf92e9acdc9dc29974 Mon Sep 17 00:00:00 2001 From: David Date: Mon, 1 Feb 2021 11:24:38 +0000 Subject: [PATCH 05/15] add typing for boto3 --- data_transfer/devices/vttsma.py | 5 ++--- data_transfer/lib/vttsma.py | 25 ++++++++++++------------- data_transfer/main.py | 2 +- poetry.lock | 14 +++++++++++++- pyproject.toml | 1 + 5 files changed, 29 insertions(+), 18 deletions(-) diff --git a/data_transfer/devices/vttsma.py b/data_transfer/devices/vttsma.py index 7ea435c..dd421a9 100644 --- a/data_transfer/devices/vttsma.py +++ b/data_transfer/devices/vttsma.py @@ -16,7 +16,6 @@ def __init__(self): Set up a session to the s3 bucket to use in multiple steps """ self.bucket = self.authenticate() - # self.bucket = "" def authenticate(self): """ @@ -34,7 +33,7 @@ def authenticate(self): def download_metadata(self) -> None: """ Before downloading raw data we need to know which files to download. - VTT provides a weekly dump in an S3 bucket, with the structure: + VTT provides a weekly dump in an S3 bucket, with a symbolic structure: . ├── data_yyyy_mm_dd │ ├── users.txt @@ -60,7 +59,7 @@ def download_metadata(self) -> None: # NOTE: currently downloads all dumps (inc. historical) TODO: only since last run all_records = vttsma_api.get_list(self.bucket) - # Only add records that are not known in the DB based on stored filename (ID and filename in dreem) + # Only add records that are not known in the DB based on stored filename (id = VTT hash id) unknown_records = [r for r in all_records if r['id'] not in set(all_filenames())] # Aim: construct valid record (metadata) and add to DB diff --git a/data_transfer/lib/vttsma.py b/data_transfer/lib/vttsma.py index 30ca440..b71cdd8 100644 --- a/data_transfer/lib/vttsma.py +++ b/data_transfer/lib/vttsma.py @@ -8,7 +8,9 @@ from typing import Optional from functools import lru_cache # note: @cache exists in py >= 3.9 import csv + import boto3 +from mypy_boto3_s3.service_resource import Bucket @dataclass @@ -23,10 +25,9 @@ class VttsmaFileDownload: # Define location and filetype to download args = VttsmaFileDownload() -def get_bucket(creds: dict): # TODO: add typing from https://pypi.org/project/mypy-boto3-s3/ +def get_bucket(creds: dict) -> Bucket: """ Builds a S3 session bucket object to interface with the S3 bucket - # NOTE: https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/s3.html#bucket """ session = boto3.session.Session( aws_access_key_id=creds['aws_ak'], @@ -39,23 +40,21 @@ def get_bucket(creds: dict): # TODO: add typing from https://pypi.org/project/my return bucket -def get_list(bucket) -> [dict]: # TODO: add typing +def get_list(bucket: Bucket) -> [dict]: """ GET all records (metadata) from the AWS S3 bucket - """ - # using boto3 Resource instead of Client for a readable - # object oriented approach. Not 100% coverage of AWS API functionality - - # returns a list of s3.ObjectSummary() objects containing keys - # contains metadata, such as .last_modified - + + NOTE: S3 folder structure is symbolic. The 'key' (str) for each file object \ + represents the path. See also `download_metadata()` in devices > vttsma.py \ + When split, results in [dump_date, raw/files, patienthash, patienthash.nfo/.zip] + """ objects = bucket.objects.all() object_paths = [obj.key for obj in objects] - # ignore users.txt files - will deduct from folders + # ignore users.txt files - duplicate data present in object key split_paths = [p.split('/') for p in object_paths if p.find('users.txt') == -1] - # generally follows [dump_date, raw/files, patienthash, patienthash.file (.nfo or .zip)] + # follows [dump_date, raw/files, patienthash, patienthash.file (.nfo or .zip)] patients = list(set([p[2] for p in split_paths])) result = [] @@ -66,7 +65,7 @@ def get_list(bucket) -> [dict]: # TODO: add typing return result -def download_file(bucket, patient_hash: str, dump_date: str,) -> bool: # TODO: add typing +def download_file(bucket: Bucket, patient_hash: str, dump_date: str,) -> bool: """ GET specified file based on known record """ diff --git a/data_transfer/main.py b/data_transfer/main.py index eb6d150..31eb9f1 100644 --- a/data_transfer/main.py +++ b/data_transfer/main.py @@ -49,7 +49,7 @@ def vttsma_dag(): mongoid = vttsma_tasks.task_download_data(record.id) mongoid = vttsma_tasks.task_preprocess_data(mongoid) # Data is finalised and moved to a folder in /uploading/ - # shared_tasks.task_prepare_data("DRM", mongoid) + # shared_tasks.task_prepare_data("SMA", mongoid) # All said folders FOR ALL DEVICES are uploaded once per day # shared_jobs.batch_upload_data() diff --git a/poetry.lock b/poetry.lock index 7e9db30..eab717b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -128,6 +128,14 @@ category = "main" optional = false python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" +[[package]] +name = "mypy-boto3-s3" +version = "1.16.63.0" +description = "Type annotations for boto3.S3 1.16.63 service, generated by mypy-boto3-buider 4.3.1" +category = "main" +optional = false +python-versions = ">=3.6" + [[package]] name = "packaging" version = "20.7" @@ -328,7 +336,7 @@ standard = ["websockets (>=8.0.0,<9.0.0)", "watchgod (>=0.6,<0.7)", "python-dote [metadata] lock-version = "1.1" python-versions = "^3.8" -content-hash = "471b26ac750f7f87cfc1f89b85bbf7930d25b197304a22e98330267b6970cc12" +content-hash = "08957403c061293df3eff86a615cb35b5d965570b3cfbc3a6bc481d646d44ec3" [metadata.files] atomicwrites = [ @@ -383,6 +391,10 @@ jmespath = [ {file = "jmespath-0.10.0-py2.py3-none-any.whl", hash = "sha256:cdf6525904cc597730141d61b36f2e4b8ecc257c420fa2f4549bac2c2d0cb72f"}, {file = "jmespath-0.10.0.tar.gz", hash = "sha256:b85d0567b8666149a93172712e68920734333c0ce7e89b78b3e987f71e5ed4f9"}, ] +mypy-boto3-s3 = [ + {file = "mypy-boto3-s3-1.16.63.0.tar.gz", hash = "sha256:b6fc2213e2a81bbd552c70aac666c807ca45d77a534349da63f92733275aa5cc"}, + {file = "mypy_boto3_s3-1.16.63.0-py3-none-any.whl", hash = "sha256:e7957ee6b21da24054fadde0a481b488e9ee0615a1e1c76b0aebbf25db04ef65"}, +] packaging = [ {file = "packaging-20.7-py2.py3-none-any.whl", hash = "sha256:eb41423378682dadb7166144a4926e443093863024de508ca5c9737d6bc08376"}, {file = "packaging-20.7.tar.gz", hash = "sha256:05af3bb85d320377db281cf254ab050e1a7ebcbf5410685a9a407e18a1f81236"}, diff --git a/pyproject.toml b/pyproject.toml index f395067..77bfc76 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ requests = "^2.24.0" python-dotenv = "^0.14.0" pymongo = "^3.11.2" boto3 = "^1.16.61" +mypy-boto3-s3 = "^1.16.63" [tool.poetry.dev-dependencies] click = "^7.1.2" From 1581e9a91a2ba391d608eeca494755cb0a84e3bf Mon Sep 17 00:00:00 2001 From: David Date: Mon, 1 Feb 2021 15:59:08 +0000 Subject: [PATCH 06/15] use UCAM api to retrieve patientID and wear times --- .dtransfer.env.example | 6 ++-- data_transfer/devices/vttsma.py | 51 +++++++++++++------------------ data_transfer/lib/vttsma.py | 54 +-------------------------------- data_transfer/main.py | 2 +- data_transfer/schemas/record.py | 2 +- data_transfer/services/ucam.py | 36 ++++++++++++++++++++++ data_transfer/tasks/shared.py | 3 +- 7 files changed, 65 insertions(+), 89 deletions(-) diff --git a/.dtransfer.env.example b/.dtransfer.env.example index 5d61288..a77e7a7 100644 --- a/.dtransfer.env.example +++ b/.dtransfer.env.example @@ -5,9 +5,9 @@ DATABASE_URI="mongodb://localhost:27017/" SUPPORT_BASE_URL="" SUPPORT_TOKEN="" -VTT_AWS_ACCESSKEY="" -VTT_AWS_SECRET_ACCESSKEY="" -VTT_AWS_BUCKET_NAME="" +VTTSMA_AWS_ACCESSKEY="" +VTTSMA_AWS_SECRET_ACCESSKEY="" +VTTSMA_AWS_BUCKET_NAME="" DREEM_LOGIN_URL="" DREEM_API_URL="" diff --git a/data_transfer/devices/vttsma.py b/data_transfer/devices/vttsma.py index dd421a9..e54a85b 100644 --- a/data_transfer/devices/vttsma.py +++ b/data_transfer/devices/vttsma.py @@ -1,6 +1,6 @@ from data_transfer.config import config from data_transfer.lib import vttsma as vttsma_api -from data_transfer.services import inventory +from data_transfer.services import inventory, ucam from data_transfer.schemas.record import Record from data_transfer.db import create_record, \ read_record, update_record, all_filenames @@ -65,39 +65,30 @@ def download_metadata(self) -> None: # Aim: construct valid record (metadata) and add to DB for item in unknown_records: - # device_serial = dreem_api.serial_by_device(item['device']) - # device_id = inventory.device_id_by_serial(device_serial) - device_id = 'x' + if patient_record := ucam.record_by_vtt(item['id']): + device_used = [r for r in patient_record.devices if r.vttsma_id == item['id']] - # NOTE: lookup Patient ID by email: if None (e.g. personal email used), then use inventory - # patient_id = dreem_api.patient_id_by_user(item['user']) or inventory.patient_id_by_device_id(device_id) - patient_id = 'x' - - # NOTE: using inventory to determine intended wear time period. - # Useful for historical data, but (TODO) should be replaced with UCAM API. - # history = inventory.device_history(device_id)[patient_id] - - # start_wear = utils.format_inventory_weartime(history['checkout']) - # NOTE: if device not returned the current day is used. - # end_wear = utils.format_inventory_weartime(history['checkin']) - start_wear = utils.format_inventory_weartime("2020-01-01 01:01:01") - end_wear = utils.format_inventory_weartime("2020-01-01 01:01:01") + # Assuming that only one device (phone) is used for the VTT SMA + device_used = device_used[0] - record = Record( - # NOTE: id is the hashedID provided by VTT - filename=item['id'], - vttsma_dump_date=item['dumps'][0], - device_id=device_id, - patient_id=patient_id, - start_wear=start_wear, - end_wear=end_wear - ) + record = Record( + filename=device_used.vttsma_id, # NOTE: id is the hashedID provided by VTT + vttsma_dump_date=item['dumps'][0], # TODO: expect data across dumps + device_id=device_used.vttsma_id, # Currently the VTT SMA hash represents the device, might change + patient_id=patient_record.patient.id, + start_wear=device_used.start_wear, + end_wear=device_used.end_wear + ) - create_record(record) + create_record(record) - path = Path(config.storage_vol / f'{record.filename}-meta.json') - # Store metadata from memory to file - utils.write_json(path, item) + path = Path(config.storage_vol / f'{record.filename}-meta.json') + # Store metadata from memory to file + utils.write_json(path, item) + + else: + # throw / log here + pass def download_file(self, mongo_id: str) -> None: diff --git a/data_transfer/lib/vttsma.py b/data_transfer/lib/vttsma.py index b71cdd8..5fa9d15 100644 --- a/data_transfer/lib/vttsma.py +++ b/data_transfer/lib/vttsma.py @@ -74,56 +74,4 @@ def download_file(bucket: Bucket, patient_hash: str, dump_date: str,) -> bool: download_path = f"{dump_date}/raw/{patient_hash}/{patient_hash}.zip" bucket.download_file(download_path, str(file_path)) - return True - - -def serial_by_device(uuid: str) -> Optional[str]: - """ - Lookup Device ID by dreem headband serial - # TODO: adapt for VTT approach - """ - serial = __key_by_value(config.dreem_devices, uuid) - return serial - - -def patient_id_by_user(uuid: str) -> Optional[str]: - """ - Lookup Patient ID by dreem email hash. - # TODO: adapt for VTT approach - """ - email = __key_by_value(config.dreem_users, uuid) - if email: - # Initially, gmail accounts were used for dreem based on SamsungA40 ID. - # We want to skip this record and determine PatientID elsewhere. - if 'gmail' in email: - return None - email = email.replace("ideafast.", "") - email = email.split("@")[0] - # Create consistency in Patient ID format - if email[1] != "-": - email = f"{email[0]}-{email[1:]}" - # TODO: what if personal email used (e.g., in CVS)? - return email - - -def __key_by_value(filename: Path, needle: str) -> Optional[str]: - """ - Helper method to find key in CSV by value (needle) - """ - data = __load_csv(filename) - for pair in data: - key, value = pair - if needle == value: - return key - return None - - -@lru_cache(maxsize=None) -def __load_csv(path: Path) -> [tuple]: - """ - Load full CSV into memory for quick lookup - """ - with open(path) as csv_file: - csv_reader = csv.reader(csv_file, delimiter=',') - data = [(row[0], row[1]) for row in csv_reader] - return data \ No newline at end of file + return True \ No newline at end of file diff --git a/data_transfer/main.py b/data_transfer/main.py index 31eb9f1..8726e71 100644 --- a/data_transfer/main.py +++ b/data_transfer/main.py @@ -49,7 +49,7 @@ def vttsma_dag(): mongoid = vttsma_tasks.task_download_data(record.id) mongoid = vttsma_tasks.task_preprocess_data(mongoid) # Data is finalised and moved to a folder in /uploading/ - # shared_tasks.task_prepare_data("SMA", mongoid) + shared_tasks.task_prepare_data("SMA", mongoid) # All said folders FOR ALL DEVICES are uploaded once per day # shared_jobs.batch_upload_data() diff --git a/data_transfer/schemas/record.py b/data_transfer/schemas/record.py index 12ba6d1..14ff290 100644 --- a/data_transfer/schemas/record.py +++ b/data_transfer/schemas/record.py @@ -15,7 +15,7 @@ class Record(BaseModel): start_wear: datetime end_wear: datetime - vtt_dump_date: Optional[str] + vttsma_dump_date: Optional[str] # Each stage of the pipeline is_downloaded: Optional[bool] = False diff --git a/data_transfer/services/ucam.py b/data_transfer/services/ucam.py index fa415bb..1896970 100644 --- a/data_transfer/services/ucam.py +++ b/data_transfer/services/ucam.py @@ -65,6 +65,42 @@ def __device_from_record(device: Payload) -> Device: return PatientRecord(patient, devices) +def record_by_vtt(vtt_hash: str) -> Optional[PatientRecord]: + """ + Return a patient record based on then Hashed ID provided by VTT + The VTT hashes are unique to each patient + + GET /vtt// + """ + patient_records = [r for r in __get_patients() if r.vttsma_id == vtt_hash] + + # No records exist for that patient, + # e.g., if a device was not worn or a staff member forget to add the record + if len(patient_records) == 0: + return None + + # Records returned from UCAM contain the patient ID in each row. + # Rather than duplicating this, we create it once, then associate + # all other rows (i.e., devices) below + patient = patient_records[0] + patient = Patient(patient.patient_id, patient.disease) + + def __device_from_record(device: Payload) -> Device: + """ + Convenient method to only store Device-specific metadata. + """ + return Device( + id=device.device_id, + vttsma_id=device.vttsma_id, + devitations=device.devitations, + start_wear=device.start_wear, + end_wear=device.end_wear + ) + + devices = [__device_from_record(r) for r in patient_records] + return PatientRecord(patient, devices) + + def device_history(device_id: str) -> [Payload]: """ A device may be worn by many patients. This returns such history. diff --git a/data_transfer/tasks/shared.py b/data_transfer/tasks/shared.py index 544e093..e39d965 100644 --- a/data_transfer/tasks/shared.py +++ b/data_transfer/tasks/shared.py @@ -3,7 +3,8 @@ FILE_TYPES = { - 'DRM': '.h5' + 'DRM': '.h5', + 'SMA': '.zip' } From 3c8f4c5fe983706bc2119a3332fe840f94a2adf4 Mon Sep 17 00:00:00 2001 From: David Date: Mon, 1 Feb 2021 16:36:00 +0000 Subject: [PATCH 07/15] add global VTT SMA Device ID --- .dtransfer.env.example | 1 + data_transfer/config.py | 1 + data_transfer/devices/vttsma.py | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.dtransfer.env.example b/.dtransfer.env.example index a77e7a7..73f73c8 100644 --- a/.dtransfer.env.example +++ b/.dtransfer.env.example @@ -8,6 +8,7 @@ SUPPORT_TOKEN="" VTTSMA_AWS_ACCESSKEY="" VTTSMA_AWS_SECRET_ACCESSKEY="" VTTSMA_AWS_BUCKET_NAME="" +VTTSMA_GLOBAL_DEVICE_ID="" DREEM_LOGIN_URL="" DREEM_API_URL="" diff --git a/data_transfer/config.py b/data_transfer/config.py index 9d7d2fb..4976e06 100644 --- a/data_transfer/config.py +++ b/data_transfer/config.py @@ -33,6 +33,7 @@ class Settings(BaseSettings): vttsma_aws_accesskey: str vttsma_aws_secret_accesskey: str vttsma_aws_bucket_name: str + vttsma_global_device_id: str dreem_login_url: str dreem_api_url: str diff --git a/data_transfer/devices/vttsma.py b/data_transfer/devices/vttsma.py index e54a85b..ad24f76 100644 --- a/data_transfer/devices/vttsma.py +++ b/data_transfer/devices/vttsma.py @@ -74,7 +74,7 @@ def download_metadata(self) -> None: record = Record( filename=device_used.vttsma_id, # NOTE: id is the hashedID provided by VTT vttsma_dump_date=item['dumps'][0], # TODO: expect data across dumps - device_id=device_used.vttsma_id, # Currently the VTT SMA hash represents the device, might change + device_id=config.vttsma_global_device_id, # All VTT-SMA share the same device ID patient_id=patient_record.patient.id, start_wear=device_used.start_wear, end_wear=device_used.end_wear From 6d95eae7d48d3d92373b79447eeccc470abd42f6 Mon Sep 17 00:00:00 2001 From: David Date: Mon, 1 Feb 2021 17:25:55 +0000 Subject: [PATCH 08/15] download original data from patient, including audio files and metadata. Zips all data together and removes folder. --- data_transfer/devices/vttsma.py | 2 +- data_transfer/lib/vttsma.py | 23 +++++++++++++++++------ data_transfer/services/dmpy.py | 6 ++++++ 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/data_transfer/devices/vttsma.py b/data_transfer/devices/vttsma.py index ad24f76..caffc77 100644 --- a/data_transfer/devices/vttsma.py +++ b/data_transfer/devices/vttsma.py @@ -100,7 +100,7 @@ def download_file(self, mongo_id: str) -> None: NOTE/TODO: is run as a task. """ record = read_record(mongo_id) - is_downloaded_success = vttsma_api.download_file(self.bucket, record.filename, record.vttsma_dump_date) + is_downloaded_success = vttsma_api.download_files(self.bucket, record.filename, record.vttsma_dump_date) if is_downloaded_success: record.is_downloaded = is_downloaded_success update_record(record) diff --git a/data_transfer/lib/vttsma.py b/data_transfer/lib/vttsma.py index 5fa9d15..3bcbdfd 100644 --- a/data_transfer/lib/vttsma.py +++ b/data_transfer/lib/vttsma.py @@ -3,6 +3,7 @@ # endpoint rather than pull from a CSV file. from data_transfer.config import config +from data_transfer.services import dmpy from pathlib import Path from dataclasses import dataclass from typing import Optional @@ -54,7 +55,7 @@ def get_list(bucket: Bucket) -> [dict]: # ignore users.txt files - duplicate data present in object key split_paths = [p.split('/') for p in object_paths if p.find('users.txt') == -1] - # follows [dump_date, raw/files, patienthash, patienthash.file (.nfo or .zip)] + # follows [dump_date, raw/files, patienthash, patienthash.nfo/.zip/.audio?)] patients = list(set([p[2] for p in split_paths])) result = [] @@ -65,13 +66,23 @@ def get_list(bucket: Bucket) -> [dict]: return result -def download_file(bucket: Bucket, patient_hash: str, dump_date: str,) -> bool: +def download_files(bucket: Bucket, patient_hash: str, dump_date: str,) -> bool: """ - GET specified file based on known record + GET all files associated with the known record. + NOTE: S3 folder association is symbolic, so a need to pull down data + through a nested loop. """ ext = 'zip' if args.ftype == 'raw' else args.ftype - file_path = Path(config.storage_vol) / f"{patient_hash}.{ext}" - download_path = f"{dump_date}/raw/{patient_hash}/{patient_hash}.zip" + folder_path = Path(config.storage_vol) / f"{patient_hash}" - bucket.download_file(download_path, str(file_path)) + for prefix in ['raw','files']: + sub_folder = folder_path/prefix + sub_folder.mkdir(parents=True, exist_ok=True) + for obj in bucket.objects.filter(Prefix=f"{dump_date}/{prefix}/{patient_hash}"): + file_name = obj.key.rsplit('/',1)[1] + bucket.download_file(obj.key, str(folder_path/prefix/file_name)) + + # added method to dmpy service + dmpy.zip_folder_and_rm_local(folder_path) + return True \ No newline at end of file diff --git a/data_transfer/services/dmpy.py b/data_transfer/services/dmpy.py index 6f34ebd..b398acc 100644 --- a/data_transfer/services/dmpy.py +++ b/data_transfer/services/dmpy.py @@ -6,6 +6,12 @@ def zip_folder(path: Path) -> Path: return Path(shutil.make_archive(path, 'zip', path)) +def zip_folder_and_rm_local(path: Path) -> Path: + zip_path = Path(shutil.make_archive(path, 'zip', path)) + shutil.rmtree(path) + return zip_path + + def upload(path: Path) -> bool: return True From 5f7e65ede5b8a4f8befd5d67373d82bc43a0e98e Mon Sep 17 00:00:00 2001 From: David Date: Tue, 2 Feb 2021 10:04:38 +0000 Subject: [PATCH 09/15] implemented device type, see also #20 --- data_transfer/db/main.py | 5 +++-- data_transfer/devices/dreem.py | 1 + data_transfer/devices/vttsma.py | 1 + data_transfer/main.py | 9 +++++---- data_transfer/schemas/record.py | 1 + data_transfer/tasks/shared.py | 6 ++++-- data_transfer/utils/__init__.py | 15 +++++++++++++++ 7 files changed, 30 insertions(+), 8 deletions(-) diff --git a/data_transfer/db/main.py b/data_transfer/db/main.py index 5c8d3b3..b5dd97f 100644 --- a/data_transfer/db/main.py +++ b/data_transfer/db/main.py @@ -2,6 +2,7 @@ from pymongo import MongoClient from data_transfer.config import config from data_transfer.schemas.record import Record +from data_transfer.utils import DeviceType client = MongoClient(config.database_uri) @@ -35,6 +36,6 @@ def all_filenames() -> [str]: return [doc['filename'] for doc in _db.records.find()] -def records_not_downloaded() -> [Record]: - docs = _db.records.find({"is_downloaded": False}) +def records_not_downloaded(device_type: DeviceType) -> [Record]: + docs = _db.records.find({"is_downloaded": False, "device_type": device_type.name}) return [Record(**doc) for doc in docs] \ No newline at end of file diff --git a/data_transfer/devices/dreem.py b/data_transfer/devices/dreem.py index c7fd481..9479ceb 100644 --- a/data_transfer/devices/dreem.py +++ b/data_transfer/devices/dreem.py @@ -63,6 +63,7 @@ def download_metadata(self) -> None: record = Record( # NOTE: id is a unique uuid used to GET raw data from Dreem filename=item['id'], + device_type=utils.DeviceType.DRM.name, device_id=device_id, patient_id=patient_id, start_wear=start_wear, diff --git a/data_transfer/devices/vttsma.py b/data_transfer/devices/vttsma.py index caffc77..e3ab9d7 100644 --- a/data_transfer/devices/vttsma.py +++ b/data_transfer/devices/vttsma.py @@ -73,6 +73,7 @@ def download_metadata(self) -> None: record = Record( filename=device_used.vttsma_id, # NOTE: id is the hashedID provided by VTT + device_type=utils.DeviceType.SMA.name, vttsma_dump_date=item['dumps'][0], # TODO: expect data across dumps device_id=config.vttsma_global_device_id, # All VTT-SMA share the same device ID patient_id=patient_record.patient.id, diff --git a/data_transfer/main.py b/data_transfer/main.py index 8726e71..4be5ec5 100644 --- a/data_transfer/main.py +++ b/data_transfer/main.py @@ -5,6 +5,7 @@ dreem as dreem_tasks, \ vttsma as vttsma_tasks from data_transfer.db import records_not_downloaded +from data_transfer.utils import DeviceType def dreem_dag(study_site): @@ -23,12 +24,12 @@ def dreem_dag(study_site): # NOTE: simulates initiation of tasks upon metadata download # TODO: in practice the tasks should be invoked within the batch job. - for record in records_not_downloaded(): + for record in records_not_downloaded(DeviceType.DRM): # Each task should be idempotent. Returned values feeds subsequent task mongoid = dreem_tasks.task_download_data(study_site, record.id) mongoid = dreem_tasks.task_preprocess_data(mongoid) # Data is finalised and moved to a folder in /uploading/ - shared_tasks.task_prepare_data("DRM", mongoid) + shared_tasks.task_prepare_data(DeviceType.DRM, mongoid) # All said folders FOR ALL DEVICES are uploaded once per day # shared_jobs.batch_upload_data() @@ -44,12 +45,12 @@ def vttsma_dag(): # NOTE: simulates initiation of tasks upon metadata download # TODO: in practice the tasks should be invoked within the batch job. - for record in records_not_downloaded(): + for record in records_not_downloaded(DeviceType.SMA): # Each task should be idempotent. Returned values feeds subsequent task mongoid = vttsma_tasks.task_download_data(record.id) mongoid = vttsma_tasks.task_preprocess_data(mongoid) # Data is finalised and moved to a folder in /uploading/ - shared_tasks.task_prepare_data("SMA", mongoid) + shared_tasks.task_prepare_data(DeviceType.SMA, mongoid) # All said folders FOR ALL DEVICES are uploaded once per day # shared_jobs.batch_upload_data() diff --git a/data_transfer/schemas/record.py b/data_transfer/schemas/record.py index 14ff290..f2e71a0 100644 --- a/data_transfer/schemas/record.py +++ b/data_transfer/schemas/record.py @@ -10,6 +10,7 @@ class Record(BaseModel): """ id: Optional[Any] = Field(alias='_id') filename: str + device_type: str device_id: str patient_id: str start_wear: datetime diff --git a/data_transfer/tasks/shared.py b/data_transfer/tasks/shared.py index e39d965..de7d891 100644 --- a/data_transfer/tasks/shared.py +++ b/data_transfer/tasks/shared.py @@ -1,14 +1,16 @@ from data_transfer.db import update_record, read_record from data_transfer.config import config +from data_transfer.utils import DeviceType FILE_TYPES = { + # Possibly move this to utils or embed with the enums 'DRM': '.h5', 'SMA': '.zip' } -def task_prepare_data(device_name: str, mongo_id: str) -> None: +def task_prepare_data(device_type: DeviceType, mongo_id: str) -> None: """ Moves all data (meta/raw/preprocessed) from /input/ into one folder by Device/Patient ID in /uploading of the format: @@ -37,7 +39,7 @@ def task_prepare_data(device_name: str, mongo_id: str) -> None: if not destination.exists(): destination.mkdir() - for extension in [FILE_TYPES[device_name], "-meta.json"]: + for extension in [FILE_TYPES[device_type.name], "-meta.json"]: fname = f'{record.filename}{extension}' old_path = data_input / fname diff --git a/data_transfer/utils/__init__.py b/data_transfer/utils/__init__.py index fb00482..325c993 100644 --- a/data_transfer/utils/__init__.py +++ b/data_transfer/utils/__init__.py @@ -1,3 +1,4 @@ +from enum import Enum from datetime import datetime from pathlib import Path from functools import lru_cache @@ -6,6 +7,20 @@ import json +class DeviceType(Enum): + AX6 = 1 # Axivity + BTF = 2 # Byteflies + DRM = 3 # Dreem + TFA = 4 # Think Fast + BVN = 5 # Everion + MMM = 6 # Move Monitor + SMP = 7 # Samsung Smartphone + SMA = 8 # Stress Monitor App + BED = 9 # EBedSensor + VTP = 10 # Vital Patch + YSM = 11 # ZKOne YOLI + + FORMATS = { 'ucam': "%d/%m/%Y", 'inventory': "%Y-%m-%d %H:%M:%S" From af50098ce4da4d2a34c4a5ae25c9ffdf383ce967 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 2 Feb 2021 11:00:58 +0000 Subject: [PATCH 10/15] slight comment cleanup --- data_transfer/devices/vttsma.py | 1 - data_transfer/lib/vttsma.py | 12 +++++++----- data_transfer/services/dmpy.py | 1 + 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/data_transfer/devices/vttsma.py b/data_transfer/devices/vttsma.py index e3ab9d7..7506965 100644 --- a/data_transfer/devices/vttsma.py +++ b/data_transfer/devices/vttsma.py @@ -55,7 +55,6 @@ def download_metadata(self) -> None: NOTE/TODO: will run as BATCH job. """ - # NOTE: includes metadata for ALL data records, therefore we must filter them # NOTE: currently downloads all dumps (inc. historical) TODO: only since last run all_records = vttsma_api.get_list(self.bucket) diff --git a/data_transfer/lib/vttsma.py b/data_transfer/lib/vttsma.py index 3bcbdfd..f95d22f 100644 --- a/data_transfer/lib/vttsma.py +++ b/data_transfer/lib/vttsma.py @@ -46,16 +46,16 @@ def get_list(bucket: Bucket) -> [dict]: GET all records (metadata) from the AWS S3 bucket NOTE: S3 folder structure is symbolic. The 'key' (str) for each file object \ - represents the path. See also `download_metadata()` in devices > vttsma.py \ - When split, results in [dump_date, raw/files, patienthash, patienthash.nfo/.zip] + represents the path. See also `download_metadata()` in devices > vttsma.py """ objects = bucket.objects.all() object_paths = [obj.key for obj in objects] - # ignore users.txt files - duplicate data present in object key + # ignore users.txt files - data already present in object key split_paths = [p.split('/') for p in object_paths if p.find('users.txt') == -1] # follows [dump_date, raw/files, patienthash, patienthash.nfo/.zip/.audio?)] + # remove duplicates via a set() patients = list(set([p[2] for p in split_paths])) result = [] @@ -69,15 +69,17 @@ def get_list(bucket: Bucket) -> [dict]: def download_files(bucket: Bucket, patient_hash: str, dump_date: str,) -> bool: """ GET all files associated with the known record. - NOTE: S3 folder association is symbolic, so a need to pull down data - through a nested loop. + NOTE: S3 folder association is symbolic, so a need to pull down data through a nested loop. """ ext = 'zip' if args.ftype == 'raw' else args.ftype folder_path = Path(config.storage_vol) / f"{patient_hash}" + # 'raw' and 'files' are 2nd level top folders for prefix in ['raw','files']: sub_folder = folder_path/prefix sub_folder.mkdir(parents=True, exist_ok=True) + + # filter to limit returned results to just this patient for obj in bucket.objects.filter(Prefix=f"{dump_date}/{prefix}/{patient_hash}"): file_name = obj.key.rsplit('/',1)[1] bucket.download_file(obj.key, str(folder_path/prefix/file_name)) diff --git a/data_transfer/services/dmpy.py b/data_transfer/services/dmpy.py index b398acc..375d72d 100644 --- a/data_transfer/services/dmpy.py +++ b/data_transfer/services/dmpy.py @@ -7,6 +7,7 @@ def zip_folder(path: Path) -> Path: def zip_folder_and_rm_local(path: Path) -> Path: + """Zips folder and removes the original immediately""" zip_path = Path(shutil.make_archive(path, 'zip', path)) shutil.rmtree(path) return zip_path From b725075d439ff2c3ac24574dc01bfe705b79f976 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 2 Feb 2021 14:47:25 +0000 Subject: [PATCH 11/15] reflect general PR comments --- data_transfer/devices/vttsma.py | 4 ++-- data_transfer/lib/vttsma.py | 32 +++++--------------------------- data_transfer/services/dmpy.py | 2 +- 3 files changed, 8 insertions(+), 30 deletions(-) diff --git a/data_transfer/devices/vttsma.py b/data_transfer/devices/vttsma.py index 7506965..2ff7484 100644 --- a/data_transfer/devices/vttsma.py +++ b/data_transfer/devices/vttsma.py @@ -1,6 +1,6 @@ from data_transfer.config import config from data_transfer.lib import vttsma as vttsma_api -from data_transfer.services import inventory, ucam +from data_transfer.services import ucam from data_transfer.schemas.record import Record from data_transfer.db import create_record, \ read_record, update_record, all_filenames @@ -8,7 +8,6 @@ from pathlib import Path from datetime import datetime -import json class Vttsma: def __init__(self): @@ -68,6 +67,7 @@ def download_metadata(self) -> None: device_used = [r for r in patient_record.devices if r.vttsma_id == item['id']] # Assuming that only one device (phone) is used for the VTT SMA + # TODO: re-evaluate once data from Newcastle is present on S3 device_used = device_used[0] record = Record( diff --git a/data_transfer/lib/vttsma.py b/data_transfer/lib/vttsma.py index f95d22f..377d489 100644 --- a/data_transfer/lib/vttsma.py +++ b/data_transfer/lib/vttsma.py @@ -1,31 +1,12 @@ -# NOTE: some methods exist for convience and will be advanced -# for the CVS, e.g. serial_by_device_uuid might to hit a live -# endpoint rather than pull from a CSV file. - from data_transfer.config import config from data_transfer.services import dmpy from pathlib import Path from dataclasses import dataclass -from typing import Optional -from functools import lru_cache # note: @cache exists in py >= 3.9 -import csv import boto3 from mypy_boto3_s3.service_resource import Bucket -@dataclass -class VttsmaFileDownload: - """Use as CLI arguments for Dreem's library.""" - # TODO: how does this relate to VTT? - directory: Path = config.storage_vol - # TODO: Check VTT's data file type and format and choose appopriately - ftype: str = "zip" - - -# Define location and filetype to download -args = VttsmaFileDownload() - def get_bucket(creds: dict) -> Bucket: """ Builds a S3 session bucket object to interface with the S3 bucket @@ -56,14 +37,12 @@ def get_list(bucket: Bucket) -> [dict]: # follows [dump_date, raw/files, patienthash, patienthash.nfo/.zip/.audio?)] # remove duplicates via a set() - patients = list(set([p[2] for p in split_paths])) + patients = set([p[2] for p in split_paths]) - result = [] - for patient in patients: - # TODO: only process dump dates of interest (i.e. since last run) - result.append(dict(id=patient, dumps=list(set([p[0] for p in split_paths if p[2] == patient])))) - - return result + return [ + dict(id=patient, dumps=list(set([p[0] for p in split_paths if p[2] == patient]))) + for patient in patients + ] def download_files(bucket: Bucket, patient_hash: str, dump_date: str,) -> bool: @@ -71,7 +50,6 @@ def download_files(bucket: Bucket, patient_hash: str, dump_date: str,) -> bool: GET all files associated with the known record. NOTE: S3 folder association is symbolic, so a need to pull down data through a nested loop. """ - ext = 'zip' if args.ftype == 'raw' else args.ftype folder_path = Path(config.storage_vol) / f"{patient_hash}" # 'raw' and 'files' are 2nd level top folders diff --git a/data_transfer/services/dmpy.py b/data_transfer/services/dmpy.py index 375d72d..830f1f9 100644 --- a/data_transfer/services/dmpy.py +++ b/data_transfer/services/dmpy.py @@ -8,7 +8,7 @@ def zip_folder(path: Path) -> Path: def zip_folder_and_rm_local(path: Path) -> Path: """Zips folder and removes the original immediately""" - zip_path = Path(shutil.make_archive(path, 'zip', path)) + zip_path = zip_folder(path) shutil.rmtree(path) return zip_path From cb462bc04765804a55acbfd9a5605f72315a2c96 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 2 Feb 2021 15:00:27 +0000 Subject: [PATCH 12/15] refactor get_record for abstraction --- data_transfer/services/ucam.py | 48 +++++++++++----------------------- 1 file changed, 15 insertions(+), 33 deletions(-) diff --git a/data_transfer/services/ucam.py b/data_transfer/services/ucam.py index 1896970..1b37faf 100644 --- a/data_transfer/services/ucam.py +++ b/data_transfer/services/ucam.py @@ -29,15 +29,12 @@ def __create_record(data: dict) -> Payload: return [__create_record(d) for d in read_csv_from_cache(config.ucam_data)] - -def get_record(patient_id: str) -> Optional[PatientRecord]: - """ - Transforms the payload for consistent access with Record - - GET /patients// +def __serialise_records(patient_records: [Payload]) -> Optional[PatientRecord]: """ - patient_records = [r for r in __get_patients() if r.patient_id == patient_id] + All records from the UCAM DB. + GET /patients/ + """ # No records exist for that patient, # e.g., if a device was not worn or a staff member forget to add the record if len(patient_records) == 0: @@ -65,6 +62,16 @@ def __device_from_record(device: Payload) -> Device: return PatientRecord(patient, devices) +def get_record(patient_id: str) -> Optional[PatientRecord]: + """ + Transforms the payload for consistent access with Record + + GET /patients// + """ + patient_records = [r for r in __get_patients() if r.patient_id == patient_id] + return __serialise_records(patient_records) + + def record_by_vtt(vtt_hash: str) -> Optional[PatientRecord]: """ Return a patient record based on then Hashed ID provided by VTT @@ -73,33 +80,8 @@ def record_by_vtt(vtt_hash: str) -> Optional[PatientRecord]: GET /vtt// """ patient_records = [r for r in __get_patients() if r.vttsma_id == vtt_hash] + return __serialise_records(patient_records) - # No records exist for that patient, - # e.g., if a device was not worn or a staff member forget to add the record - if len(patient_records) == 0: - return None - - # Records returned from UCAM contain the patient ID in each row. - # Rather than duplicating this, we create it once, then associate - # all other rows (i.e., devices) below - patient = patient_records[0] - patient = Patient(patient.patient_id, patient.disease) - - def __device_from_record(device: Payload) -> Device: - """ - Convenient method to only store Device-specific metadata. - """ - return Device( - id=device.device_id, - vttsma_id=device.vttsma_id, - devitations=device.devitations, - start_wear=device.start_wear, - end_wear=device.end_wear - ) - - devices = [__device_from_record(r) for r in patient_records] - return PatientRecord(patient, devices) - def device_history(device_id: str) -> [Payload]: """ From 67743387b35b5c3787211bf00731c00ae3690cc0 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 2 Feb 2021 15:02:16 +0000 Subject: [PATCH 13/15] remove unused dataclass import --- data_transfer/lib/vttsma.py | 1 - 1 file changed, 1 deletion(-) diff --git a/data_transfer/lib/vttsma.py b/data_transfer/lib/vttsma.py index 377d489..ad5a3c2 100644 --- a/data_transfer/lib/vttsma.py +++ b/data_transfer/lib/vttsma.py @@ -1,7 +1,6 @@ from data_transfer.config import config from data_transfer.services import dmpy from pathlib import Path -from dataclasses import dataclass import boto3 from mypy_boto3_s3.service_resource import Bucket From b86756abe8323a4804a673a875440b23dddf2843 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 2 Feb 2021 15:46:11 +0000 Subject: [PATCH 14/15] clean up list comprehensions --- data_transfer/lib/vttsma.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data_transfer/lib/vttsma.py b/data_transfer/lib/vttsma.py index ad5a3c2..40ee2a5 100644 --- a/data_transfer/lib/vttsma.py +++ b/data_transfer/lib/vttsma.py @@ -32,7 +32,7 @@ def get_list(bucket: Bucket) -> [dict]: object_paths = [obj.key for obj in objects] # ignore users.txt files - data already present in object key - split_paths = [p.split('/') for p in object_paths if p.find('users.txt') == -1] + split_paths = [p.split('/') for p in object_paths if 'users.txt' not in p] # follows [dump_date, raw/files, patienthash, patienthash.nfo/.zip/.audio?)] # remove duplicates via a set() From 4fdb7ed5f50cfb939fbd15d40344ad9bb017a14b Mon Sep 17 00:00:00 2001 From: David Date: Tue, 2 Feb 2021 15:54:27 +0000 Subject: [PATCH 15/15] rename dumps --> exports --- data_transfer/devices/vttsma.py | 12 ++++++------ data_transfer/jobs/vttsma.py | 4 ++-- data_transfer/lib/vttsma.py | 8 ++++---- data_transfer/schemas/record.py | 2 +- data_transfer/tasks/vttsma.py | 2 +- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/data_transfer/devices/vttsma.py b/data_transfer/devices/vttsma.py index 2ff7484..8ca38d8 100644 --- a/data_transfer/devices/vttsma.py +++ b/data_transfer/devices/vttsma.py @@ -32,7 +32,7 @@ def authenticate(self): def download_metadata(self) -> None: """ Before downloading raw data we need to know which files to download. - VTT provides a weekly dump in an S3 bucket, with a symbolic structure: + VTT provides a weekly export in an S3 bucket, with a symbolic structure: . ├── data_yyyy_mm_dd │ ├── users.txt @@ -46,15 +46,15 @@ def download_metadata(self) -> None: . NOTE: - - users.txt contains the user hashes present in this dump (equal to subfolders) - - .nfo files contain the time spans of the specific hash dumps, e.g.: + - users.txt contains the user hashes present in this export (equal to subfolders) + - .nfo files contain the time spans of the specific hash exports, e.g.: - Start time : 2020-10-28T00:00:01Z - End time : 2020-11-24T00:00:00.001Z NOTE/TODO: will run as BATCH job. """ - # NOTE: currently downloads all dumps (inc. historical) TODO: only since last run + # NOTE: currently downloads all exports (inc. historical) TODO: only since last run all_records = vttsma_api.get_list(self.bucket) # Only add records that are not known in the DB based on stored filename (id = VTT hash id) @@ -73,7 +73,7 @@ def download_metadata(self) -> None: record = Record( filename=device_used.vttsma_id, # NOTE: id is the hashedID provided by VTT device_type=utils.DeviceType.SMA.name, - vttsma_dump_date=item['dumps'][0], # TODO: expect data across dumps + vttsma_export_date=item['exports'][0], # TODO: expect data across exports device_id=config.vttsma_global_device_id, # All VTT-SMA share the same device ID patient_id=patient_record.patient.id, start_wear=device_used.start_wear, @@ -100,7 +100,7 @@ def download_file(self, mongo_id: str) -> None: NOTE/TODO: is run as a task. """ record = read_record(mongo_id) - is_downloaded_success = vttsma_api.download_files(self.bucket, record.filename, record.vttsma_dump_date) + is_downloaded_success = vttsma_api.download_files(self.bucket, record.filename, record.vttsma_export_date) if is_downloaded_success: record.is_downloaded = is_downloaded_success update_record(record) diff --git a/data_transfer/jobs/vttsma.py b/data_transfer/jobs/vttsma.py index 1531d08..8ee8674 100644 --- a/data_transfer/jobs/vttsma.py +++ b/data_transfer/jobs/vttsma.py @@ -5,11 +5,11 @@ def batch_metadata() -> [Record]: """ - VTT dumps the patient data weekly in an S3 bucket + VTT exports the patient data weekly in an S3 bucket This method stores the records we have not yet processed. - NOTE/TODO: this cron batch should be run weekly to accomodate the weekly VTT S3 dump + NOTE/TODO: this cron batch should be run weekly to accomodate the weekly VTT S3 export """ vttsma = Vttsma() vttsma.download_metadata() diff --git a/data_transfer/lib/vttsma.py b/data_transfer/lib/vttsma.py index 40ee2a5..d084af5 100644 --- a/data_transfer/lib/vttsma.py +++ b/data_transfer/lib/vttsma.py @@ -34,17 +34,17 @@ def get_list(bucket: Bucket) -> [dict]: # ignore users.txt files - data already present in object key split_paths = [p.split('/') for p in object_paths if 'users.txt' not in p] - # follows [dump_date, raw/files, patienthash, patienthash.nfo/.zip/.audio?)] + # follows [export_date, raw/files, patienthash, patienthash.nfo/.zip/.audio?)] # remove duplicates via a set() patients = set([p[2] for p in split_paths]) return [ - dict(id=patient, dumps=list(set([p[0] for p in split_paths if p[2] == patient]))) + dict(id=patient, exports=list(set([p[0] for p in split_paths if p[2] == patient]))) for patient in patients ] -def download_files(bucket: Bucket, patient_hash: str, dump_date: str,) -> bool: +def download_files(bucket: Bucket, patient_hash: str, export_date: str,) -> bool: """ GET all files associated with the known record. NOTE: S3 folder association is symbolic, so a need to pull down data through a nested loop. @@ -57,7 +57,7 @@ def download_files(bucket: Bucket, patient_hash: str, dump_date: str,) -> bool: sub_folder.mkdir(parents=True, exist_ok=True) # filter to limit returned results to just this patient - for obj in bucket.objects.filter(Prefix=f"{dump_date}/{prefix}/{patient_hash}"): + for obj in bucket.objects.filter(Prefix=f"{export_date}/{prefix}/{patient_hash}"): file_name = obj.key.rsplit('/',1)[1] bucket.download_file(obj.key, str(folder_path/prefix/file_name)) diff --git a/data_transfer/schemas/record.py b/data_transfer/schemas/record.py index f2e71a0..cdeab84 100644 --- a/data_transfer/schemas/record.py +++ b/data_transfer/schemas/record.py @@ -16,7 +16,7 @@ class Record(BaseModel): start_wear: datetime end_wear: datetime - vttsma_dump_date: Optional[str] + vttsma_export_date: Optional[str] # Each stage of the pipeline is_downloaded: Optional[bool] = False diff --git a/data_transfer/tasks/vttsma.py b/data_transfer/tasks/vttsma.py index 20eb2d4..cf505a0 100644 --- a/data_transfer/tasks/vttsma.py +++ b/data_transfer/tasks/vttsma.py @@ -5,7 +5,7 @@ def task_download_data(mongoid: str) -> str: """ - Download a datafile for the VTT dump device. + Download a datafile for the VTT export device. """ Vttsma().download_file(mongoid) return mongoid