diff --git a/invenio_vocabularies/config.py b/invenio_vocabularies/config.py index d2203667..976df71a 100644 --- a/invenio_vocabularies/config.py +++ b/invenio_vocabularies/config.py @@ -124,3 +124,10 @@ "yaml": YamlWriter, } """Data Streams writers.""" + + +# TODO: REMOVE KYES!!! +VOCABULARIES_ORCID_ACCESS_KEY="TODO" +VOCABULARIES_ORCID_SECRET_KEY="TODO" +VOCABULARIES_ORCID_FOLDER="/tmp/ORCID_public_data_files/" +VOCABULARIES_ORCID_SUMMARIES_BUCKET="v3.0-summaries" \ No newline at end of file diff --git a/invenio_vocabularies/contrib/names/datastreams.py b/invenio_vocabularies/contrib/names/datastreams.py index ce050f3e..62fe5abb 100644 --- a/invenio_vocabularies/contrib/names/datastreams.py +++ b/invenio_vocabularies/contrib/names/datastreams.py @@ -12,9 +12,99 @@ from invenio_records.dictutils import dict_lookup from ...datastreams.errors import TransformerError -from ...datastreams.readers import SimpleHTTPReader +from ...datastreams.readers import SimpleHTTPReader, BaseReader from ...datastreams.transformers import BaseTransformer from ...datastreams.writers import ServiceWriter +from invenio_vocabularies.datastreams.xml import etree_to_dict +import boto3 +import logging +from flask import current_app +import subprocess +import os +from datetime import datetime +from datetime import timedelta +import xmltodict +import shutil +import tarfile +import io + +class OrcidDataSyncReader(BaseReader): + """ORCiD Data Sync Reader.""" + + def _iter(self, fp, *args, **kwargs): + """.""" + raise NotImplementedError( + "OrcidDataSyncReader downloads one file and therefore does not iterate through items" + ) + + def read(self, item=None, *args, **kwargs): + """Downloads the ORCiD lambda file and yields an in-memory binary stream of it.""" + + path = current_app.config["VOCABULARIES_ORCID_FOLDER"] + date_format = '%Y-%m-%d %H:%M:%S.%f' + date_format_no_millis = '%Y-%m-%d %H:%M:%S' + + orcids_to_sync = [] + + # Download the lambda file + s3client = boto3.client('s3', aws_access_key_id=current_app.config["VOCABULARIES_ORCID_ACCESS_KEY"], aws_secret_access_key=current_app.config["VOCABULARIES_ORCID_SECRET_KEY"]) + response = s3client.get_object(Bucket='orcid-lambda-file', Key='last_modified.csv.tar') + + # Stream the tar file content + tar_content = response['Body'].read() + + days_to_sync = 1 # TODO move to variable + minutes_to_sync = 60*15 # TODO remove and use days_to_sync + last_sync = None + last_ran_path = path + 'last_ran.config' + if minutes_to_sync is not None: # TODO use this as fallback and rely on last_ran_path + last_sync = (datetime.now() - timedelta(minutes=minutes_to_sync)) + elif os.path.isfile(last_ran_path): + f = open(last_ran_path, 'r') + date_string = f.readline() + last_sync = datetime.strptime(date_string, date_format) + else: + last_sync = (datetime.now() - timedelta(days=30)) + + with open(last_ran_path, 'w') as f: + f.write(datetime.now().strftime(date_format)) + + + # Open the tar file from the streamed content + with tarfile.open(fileobj=io.BytesIO(tar_content)) as tar: + for member in tar.getmembers(): + f = tar.extractfile(member) + if f: + file_content = f.read().decode('utf-8') # Decode bytes to string + is_first_line = True + for line in file_content.splitlines(): + if is_first_line: + is_first_line = False + continue + line = line.rstrip('\n') + elements = line.split(',') + orcid = elements[0] + + last_modified_str = elements[3] + try: + last_modified_date = datetime.strptime(last_modified_str, date_format) + except ValueError: + last_modified_date = datetime.strptime(last_modified_str, date_format_no_millis) + + if last_modified_date >= last_sync: + orcids_to_sync.append(orcid) + if len(orcids_to_sync) % 10000 == 0: + current_app.logger.info("Orcids ready to sync so far: %s" % len(orcids_to_sync)) + else: + # Since the lambda file is ordered by last_modified date descendant, + # when last_modified_date < last_sync we don't need to parse any more lines + break + + for orcid_to_sync in orcids_to_sync: + suffix = orcid_to_sync[-3:] + key = suffix + '/' + orcid_to_sync + '.xml' + file_response = s3client.get_object(Bucket=current_app.config["VOCABULARIES_ORCID_SUMMARIES_BUCKET"], Key=key) + yield file_response['Body'].read() class OrcidHTTPReader(SimpleHTTPReader): @@ -89,6 +179,7 @@ def _entry_id(self, entry): VOCABULARIES_DATASTREAM_READERS = { "orcid-http": OrcidHTTPReader, + "orcid-data-sync": OrcidDataSyncReader, } @@ -107,10 +198,7 @@ def _entry_id(self, entry): DATASTREAM_CONFIG = { "readers": [ { - "type": "tar", - "args": { - "regex": "\\.xml$", - }, + "type": "orcid-data-sync", }, {"type": "xml"}, ], diff --git a/invenio_vocabularies/datastreams/datastreams.py b/invenio_vocabularies/datastreams/datastreams.py index 3fc2d1e4..caecc413 100644 --- a/invenio_vocabularies/datastreams/datastreams.py +++ b/invenio_vocabularies/datastreams/datastreams.py @@ -47,7 +47,7 @@ def process(self, *args, **kwargs): the reader, apply the transformations and yield the result of writing it. """ - for stream_entry in self.read(): + for stream_entry in self.read(): # TODO: do it on btaches if stream_entry.errors: yield stream_entry # reading errors else: @@ -58,7 +58,7 @@ def process(self, *args, **kwargs): transformed_entry.filtered = True yield transformed_entry else: - yield self.write(transformed_entry) + yield self.write(transformed_entry) # TODO: Bulk update def read(self): """Recursively read the entries.""" diff --git a/invenio_vocabularies/datastreams/readers.py b/invenio_vocabularies/datastreams/readers.py index 313d0de0..402287b8 100644 --- a/invenio_vocabularies/datastreams/readers.py +++ b/invenio_vocabularies/datastreams/readers.py @@ -19,12 +19,13 @@ import requests import yaml -from lxml.html import parse as html_parse +from lxml.html import fromstring from .errors import ReaderError from .xml import etree_to_dict + class BaseReader(ABC): """Base reader.""" @@ -219,8 +220,8 @@ class XMLReader(BaseReader): def _iter(self, fp, *args, **kwargs): """Read and parse an XML file to dict.""" # NOTE: We parse HTML, to skip XML validation and strip XML namespaces - xml_tree = html_parse(fp).getroot() - record = etree_to_dict(xml_tree)["html"]["body"].get("record") + xml_tree = fromstring(fp) + record = etree_to_dict(xml_tree).get("record") if not record: raise ReaderError(f"Record not found in XML entry.") diff --git a/setup.cfg b/setup.cfg index 5fd098c5..ae3adbb8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -31,6 +31,10 @@ install_requires = invenio-records-resources>=6.0.0,<7.0.0 lxml>=4.5.0 PyYAML>=5.4.1 + awscli>=1.33.23 + boto3>=1.12.6 + botocore>=1.34.141 + iso8601>=0.1.11 [options.extras_require] tests =