Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement VTT DAG process, resolves #20 #21

Merged
merged 15 commits into from
Feb 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
5 changes: 5 additions & 0 deletions .dtransfer.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ DATABASE_URI="mongodb://localhost:27017/"
SUPPORT_BASE_URL=""
SUPPORT_TOKEN=""

VTTSMA_AWS_ACCESSKEY=""
VTTSMA_AWS_SECRET_ACCESSKEY=""
VTTSMA_AWS_BUCKET_NAME=""
VTTSMA_GLOBAL_DEVICE_ID=""

DREEM_LOGIN_URL=""
DREEM_API_URL=""

Expand Down
6 changes: 6 additions & 0 deletions data_transfer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ class Settings(BaseSettings):
support_base_url: str
support_token: str

# NOTE: VTT does not differentiate between study sites
vttsma_aws_accesskey: str
vttsma_aws_secret_accesskey: str
vttsma_aws_bucket_name: str
vttsma_global_device_id: str

dreem_login_url: str
dreem_api_url: str

Expand Down
5 changes: 3 additions & 2 deletions data_transfer/db/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pymongo import MongoClient
from data_transfer.config import config
from data_transfer.schemas.record import Record
from data_transfer.utils import DeviceType


client = MongoClient(config.database_uri)
Expand Down Expand Up @@ -35,6 +36,6 @@ def all_filenames() -> [str]:
return [doc['filename'] for doc in _db.records.find()]


def records_not_downloaded() -> [Record]:
docs = _db.records.find({"is_downloaded": False})
def records_not_downloaded(device_type: DeviceType) -> [Record]:
docs = _db.records.find({"is_downloaded": False, "device_type": device_type.name})
return [Record(**doc) for doc in docs]
1 change: 1 addition & 0 deletions data_transfer/devices/dreem.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def download_metadata(self) -> None:
record = Record(
# NOTE: id is a unique uuid used to GET raw data from Dreem
filename=item['id'],
device_type=utils.DeviceType.DRM.name,
device_id=device_id,
patient_id=patient_id,
start_wear=start_wear,
Expand Down
107 changes: 107 additions & 0 deletions data_transfer/devices/vttsma.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from data_transfer.config import config
from data_transfer.lib import vttsma as vttsma_api
from data_transfer.services import ucam
from data_transfer.schemas.record import Record
from data_transfer.db import create_record, \
read_record, update_record, all_filenames
from data_transfer import utils

from pathlib import Path
from datetime import datetime

class Vttsma:
def __init__(self):
"""
Set up a session to the s3 bucket to use in multiple steps
"""
self.bucket = self.authenticate()

def authenticate(self):
"""
Authenticate once when object created to share session between requests
"""
credentials = dict(
aws_ak=config.vttsma_aws_accesskey,
aws_ask=config.vttsma_aws_secret_accesskey,
bucket_name=config.vttsma_aws_bucket_name,
)

bucket = vttsma_api.get_bucket(credentials)
return bucket

def download_metadata(self) -> None:
"""
Before downloading raw data we need to know which files to download.
VTT provides a weekly export in an S3 bucket, with a symbolic structure:
.
├── data_yyyy_mm_dd
│ ├── users.txt
│ ├── raw
│ | ├── vtt_patient (hash)
│ | └── vtt_patient (hash)
│ | ├── vtt_patient (hash).zip
| | └── vtt_patient (hash).nfo
| └── files
| (audio files - unknown structure)
.

davidverweij marked this conversation as resolved.
Show resolved Hide resolved
NOTE:
- users.txt contains the user hashes present in this export (equal to subfolders)
- .nfo files contain the time spans of the specific hash exports, e.g.:
- Start time : 2020-10-28T00:00:01Z
- End time : 2020-11-24T00:00:00.001Z

NOTE/TODO: will run as BATCH job.
"""

# NOTE: currently downloads all exports (inc. historical) TODO: only since last run
all_records = vttsma_api.get_list(self.bucket)

# Only add records that are not known in the DB based on stored filename (id = VTT hash id)
unknown_records = [r for r in all_records if r['id'] not in set(all_filenames())]

# Aim: construct valid record (metadata) and add to DB
for item in unknown_records:

if patient_record := ucam.record_by_vtt(item['id']):
davidverweij marked this conversation as resolved.
Show resolved Hide resolved
device_used = [r for r in patient_record.devices if r.vttsma_id == item['id']]

# Assuming that only one device (phone) is used for the VTT SMA
# TODO: re-evaluate once data from Newcastle is present on S3
device_used = device_used[0]

record = Record(
filename=device_used.vttsma_id, # NOTE: id is the hashedID provided by VTT
device_type=utils.DeviceType.SMA.name,
vttsma_export_date=item['exports'][0], # TODO: expect data across exports
device_id=config.vttsma_global_device_id, # All VTT-SMA share the same device ID
patient_id=patient_record.patient.id,
start_wear=device_used.start_wear,
end_wear=device_used.end_wear
)

create_record(record)

path = Path(config.storage_vol / f'{record.filename}-meta.json')
# Store metadata from memory to file
utils.write_json(path, item)

else:
# throw / log here
pass


def download_file(self, mongo_id: str) -> None:
"""
Downloads files and store them to {config.storage_vol}

Tracking: {db.record.is_downloaded} indicates success

NOTE/TODO: is run as a task.
"""
record = read_record(mongo_id)
is_downloaded_success = vttsma_api.download_files(self.bucket, record.filename, record.vttsma_export_date)
if is_downloaded_success:
record.is_downloaded = is_downloaded_success
update_record(record)
# TODO: otherwise re-start task to try again
15 changes: 15 additions & 0 deletions data_transfer/jobs/vttsma.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

from data_transfer.devices.vttsma import Vttsma
from data_transfer.schemas.record import Record


def batch_metadata() -> [Record]:
"""
VTT exports the patient data weekly in an S3 bucket

This method stores the records we have not yet processed.

NOTE/TODO: this cron batch should be run weekly to accomodate the weekly VTT S3 export
"""
vttsma = Vttsma()
vttsma.download_metadata()
67 changes: 67 additions & 0 deletions data_transfer/lib/vttsma.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from data_transfer.config import config
from data_transfer.services import dmpy
from pathlib import Path

import boto3
from mypy_boto3_s3.service_resource import Bucket


def get_bucket(creds: dict) -> Bucket:
"""
Builds a S3 session bucket object to interface with the S3 bucket
"""
session = boto3.session.Session(
aws_access_key_id=creds['aws_ak'],
aws_secret_access_key=creds['aws_ask'],
)

s3 = session.resource('s3')
bucket = s3.Bucket(creds['bucket_name'])

return bucket


def get_list(bucket: Bucket) -> [dict]:
"""
GET all records (metadata) from the AWS S3 bucket

NOTE: S3 folder structure is symbolic. The 'key' (str) for each file object \
represents the path. See also `download_metadata()` in devices > vttsma.py
"""
objects = bucket.objects.all()
object_paths = [obj.key for obj in objects]

# ignore users.txt files - data already present in object key
split_paths = [p.split('/') for p in object_paths if 'users.txt' not in p]

# follows [export_date, raw/files, patienthash, patienthash.nfo/.zip/.audio?)]
# remove duplicates via a set()
patients = set([p[2] for p in split_paths])

return [
dict(id=patient, exports=list(set([p[0] for p in split_paths if p[2] == patient])))
for patient in patients
]


def download_files(bucket: Bucket, patient_hash: str, export_date: str,) -> bool:
"""
GET all files associated with the known record.
NOTE: S3 folder association is symbolic, so a need to pull down data through a nested loop.
"""
folder_path = Path(config.storage_vol) / f"{patient_hash}"

# 'raw' and 'files' are 2nd level top folders
for prefix in ['raw','files']:
sub_folder = folder_path/prefix
sub_folder.mkdir(parents=True, exist_ok=True)

# filter to limit returned results to just this patient
for obj in bucket.objects.filter(Prefix=f"{export_date}/{prefix}/{patient_hash}"):
file_name = obj.key.rsplit('/',1)[1]
bucket.download_file(obj.key, str(folder_path/prefix/file_name))

# added method to dmpy service
dmpy.zip_folder_and_rm_local(folder_path)

return True
36 changes: 31 additions & 5 deletions data_transfer/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from data_transfer.jobs import dreem as dreem_jobs, shared as shared_jobs
from data_transfer.tasks import dreem as dreem_tasks, shared as shared_tasks
from data_transfer.jobs import shared as shared_jobs, \
dreem as dreem_jobs, \
vttsma as vttsma_jobs
from data_transfer.tasks import shared as shared_tasks, \
dreem as dreem_tasks, \
vttsma as vttsma_tasks
from data_transfer.db import records_not_downloaded
from data_transfer.utils import DeviceType


def dreem_dag(study_site):
Expand All @@ -19,16 +24,37 @@ def dreem_dag(study_site):

# NOTE: simulates initiation of tasks upon metadata download
# TODO: in practice the tasks should be invoked within the batch job.
for record in records_not_downloaded():
for record in records_not_downloaded(DeviceType.DRM):
# Each task should be idempotent. Returned values feeds subsequent task
mongoid = dreem_tasks.task_download_data(study_site, record.id)
mongoid = dreem_tasks.task_preprocess_data(mongoid)
# Data is finalised and moved to a folder in /uploading/
shared_tasks.task_prepare_data("DRM", mongoid)
shared_tasks.task_prepare_data(DeviceType.DRM, mongoid)

# All said folders FOR ALL DEVICES are uploaded once per day
# shared_jobs.batch_upload_data()

def vttsma_dag():
"""
Directed acyclic graph (DAG) representing data_transfer pipeline as used for all devices
Note that VTT does not distinguish between study sites

NOTE/TODO: this method simulates the pipeline.
"""
vttsma_jobs.batch_metadata()

# NOTE: simulates initiation of tasks upon metadata download
# TODO: in practice the tasks should be invoked within the batch job.
for record in records_not_downloaded(DeviceType.SMA):
# Each task should be idempotent. Returned values feeds subsequent task
mongoid = vttsma_tasks.task_download_data(record.id)
mongoid = vttsma_tasks.task_preprocess_data(mongoid)
# Data is finalised and moved to a folder in /uploading/
shared_tasks.task_prepare_data(DeviceType.SMA, mongoid)

# All said folders FOR ALL DEVICES are uploaded once per day
# shared_jobs.batch_upload_data()

if __name__ == "__main__":
dreem_dag("newcastle")
# dreem_dag("munster")
vttsma_dag()
3 changes: 3 additions & 0 deletions data_transfer/schemas/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ class Record(BaseModel):
"""
id: Optional[Any] = Field(alias='_id')
filename: str
device_type: str
device_id: str
patient_id: str
start_wear: datetime
end_wear: datetime

vttsma_export_date: Optional[str]

# Each stage of the pipeline
is_downloaded: Optional[bool] = False
is_processed: Optional[bool] = False
Expand Down
7 changes: 7 additions & 0 deletions data_transfer/services/dmpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ def zip_folder(path: Path) -> Path:
return Path(shutil.make_archive(path, 'zip', path))


def zip_folder_and_rm_local(path: Path) -> Path:
"""Zips folder and removes the original immediately"""
zip_path = zip_folder(path)
shutil.rmtree(path)
return zip_path


def upload(path: Path) -> bool:
return True

Expand Down
32 changes: 25 additions & 7 deletions data_transfer/services/ucam.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,12 @@ def __create_record(data: dict) -> Payload:

return [__create_record(d) for d in read_csv_from_cache(config.ucam_data)]


def get_record(patient_id: str) -> Optional[PatientRecord]:
"""
Transforms the payload for consistent access with Record

GET /patients/<patient_id>/
def __serialise_records(patient_records: [Payload]) -> Optional[PatientRecord]:
"""
patient_records = [r for r in __get_patients() if r.patient_id == patient_id]
All records from the UCAM DB.

GET /patients/
"""
# No records exist for that patient,
# e.g., if a device was not worn or a staff member forget to add the record
if len(patient_records) == 0:
Expand Down Expand Up @@ -65,6 +62,27 @@ def __device_from_record(device: Payload) -> Device:
return PatientRecord(patient, devices)


def get_record(patient_id: str) -> Optional[PatientRecord]:
"""
Transforms the payload for consistent access with Record

GET /patients/<patient_id>/
"""
patient_records = [r for r in __get_patients() if r.patient_id == patient_id]
return __serialise_records(patient_records)


def record_by_vtt(vtt_hash: str) -> Optional[PatientRecord]:
"""
Return a patient record based on then Hashed ID provided by VTT
The VTT hashes are unique to each patient

GET /vtt/<vtt_hash>/
"""
patient_records = [r for r in __get_patients() if r.vttsma_id == vtt_hash]
return __serialise_records(patient_records)


def device_history(device_id: str) -> [Payload]:
"""
A device may be worn by many patients. This returns such history.
Expand Down
Loading