Skip to content

Commit

Permalink
names: add orcid public data sync
Browse files Browse the repository at this point in the history
* closes #353
  • Loading branch information
jrcastro2 committed Jul 11, 2024
1 parent 18c4cec commit 2e8e25b
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 10 deletions.
7 changes: 7 additions & 0 deletions invenio_vocabularies/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,10 @@
"yaml": YamlWriter,
}
"""Data Streams writers."""


# TODO: REMOVE KYES!!!
VOCABULARIES_ORCID_ACCESS_KEY="TODO"
VOCABULARIES_ORCID_SECRET_KEY="TODO"
VOCABULARIES_ORCID_FOLDER="/tmp/ORCID_public_data_files/"
VOCABULARIES_ORCID_SUMMARIES_BUCKET="v3.0-summaries"
98 changes: 93 additions & 5 deletions invenio_vocabularies/contrib/names/datastreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,99 @@
from invenio_records.dictutils import dict_lookup

from ...datastreams.errors import TransformerError
from ...datastreams.readers import SimpleHTTPReader
from ...datastreams.readers import SimpleHTTPReader, BaseReader
from ...datastreams.transformers import BaseTransformer
from ...datastreams.writers import ServiceWriter
from invenio_vocabularies.datastreams.xml import etree_to_dict
import boto3
import logging
from flask import current_app
import subprocess
import os
from datetime import datetime
from datetime import timedelta
import xmltodict
import shutil
import tarfile
import io

class OrcidDataSyncReader(BaseReader):
"""ORCiD Data Sync Reader."""

def _iter(self, fp, *args, **kwargs):
"""."""
raise NotImplementedError(
"OrcidDataSyncReader downloads one file and therefore does not iterate through items"
)

def read(self, item=None, *args, **kwargs):
"""Downloads the ORCiD lambda file and yields an in-memory binary stream of it."""

path = current_app.config["VOCABULARIES_ORCID_FOLDER"]
date_format = '%Y-%m-%d %H:%M:%S.%f'
date_format_no_millis = '%Y-%m-%d %H:%M:%S'

orcids_to_sync = []

# Download the lambda file
s3client = boto3.client('s3', aws_access_key_id=current_app.config["VOCABULARIES_ORCID_ACCESS_KEY"], aws_secret_access_key=current_app.config["VOCABULARIES_ORCID_SECRET_KEY"])
response = s3client.get_object(Bucket='orcid-lambda-file', Key='last_modified.csv.tar')

# Stream the tar file content
tar_content = response['Body'].read()

days_to_sync = 1 # TODO move to variable
minutes_to_sync = 60*15 # TODO remove and use days_to_sync
last_sync = None
last_ran_path = path + 'last_ran.config'
if minutes_to_sync is not None: # TODO use this as fallback and rely on last_ran_path
last_sync = (datetime.now() - timedelta(minutes=minutes_to_sync))
elif os.path.isfile(last_ran_path):
f = open(last_ran_path, 'r')
date_string = f.readline()
last_sync = datetime.strptime(date_string, date_format)
else:
last_sync = (datetime.now() - timedelta(days=30))

with open(last_ran_path, 'w') as f:
f.write(datetime.now().strftime(date_format))


# Open the tar file from the streamed content
with tarfile.open(fileobj=io.BytesIO(tar_content)) as tar:
for member in tar.getmembers():
f = tar.extractfile(member)
if f:
file_content = f.read().decode('utf-8') # Decode bytes to string
is_first_line = True
for line in file_content.splitlines():
if is_first_line:
is_first_line = False
continue
line = line.rstrip('\n')
elements = line.split(',')
orcid = elements[0]

last_modified_str = elements[3]
try:
last_modified_date = datetime.strptime(last_modified_str, date_format)
except ValueError:
last_modified_date = datetime.strptime(last_modified_str, date_format_no_millis)

if last_modified_date >= last_sync:
orcids_to_sync.append(orcid)
if len(orcids_to_sync) % 10000 == 0:
current_app.logger.info("Orcids ready to sync so far: %s" % len(orcids_to_sync))
else:
# Since the lambda file is ordered by last_modified date descendant,
# when last_modified_date < last_sync we don't need to parse any more lines
break

for orcid_to_sync in orcids_to_sync:
suffix = orcid_to_sync[-3:]
key = suffix + '/' + orcid_to_sync + '.xml'
file_response = s3client.get_object(Bucket=current_app.config["VOCABULARIES_ORCID_SUMMARIES_BUCKET"], Key=key)
yield file_response['Body'].read()


class OrcidHTTPReader(SimpleHTTPReader):
Expand Down Expand Up @@ -89,6 +179,7 @@ def _entry_id(self, entry):

VOCABULARIES_DATASTREAM_READERS = {
"orcid-http": OrcidHTTPReader,
"orcid-data-sync": OrcidDataSyncReader,
}


Expand All @@ -107,10 +198,7 @@ def _entry_id(self, entry):
DATASTREAM_CONFIG = {
"readers": [
{
"type": "tar",
"args": {
"regex": "\\.xml$",
},
"type": "orcid-data-sync",
},
{"type": "xml"},
],
Expand Down
4 changes: 2 additions & 2 deletions invenio_vocabularies/datastreams/datastreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def process(self, *args, **kwargs):
the reader, apply the transformations and yield the result of
writing it.
"""
for stream_entry in self.read():
for stream_entry in self.read(): # TODO: do it on btaches
if stream_entry.errors:
yield stream_entry # reading errors
else:
Expand All @@ -58,7 +58,7 @@ def process(self, *args, **kwargs):
transformed_entry.filtered = True
yield transformed_entry
else:
yield self.write(transformed_entry)
yield self.write(transformed_entry) # TODO: Bulk update

def read(self):
"""Recursively read the entries."""
Expand Down
7 changes: 4 additions & 3 deletions invenio_vocabularies/datastreams/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

import requests
import yaml
from lxml.html import parse as html_parse
from lxml.html import fromstring

from .errors import ReaderError
from .xml import etree_to_dict



class BaseReader(ABC):
"""Base reader."""

Expand Down Expand Up @@ -219,8 +220,8 @@ class XMLReader(BaseReader):
def _iter(self, fp, *args, **kwargs):
"""Read and parse an XML file to dict."""
# NOTE: We parse HTML, to skip XML validation and strip XML namespaces
xml_tree = html_parse(fp).getroot()
record = etree_to_dict(xml_tree)["html"]["body"].get("record")
xml_tree = fromstring(fp)
record = etree_to_dict(xml_tree).get("record")

if not record:
raise ReaderError(f"Record not found in XML entry.")
Expand Down
4 changes: 4 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ install_requires =
invenio-records-resources>=6.0.0,<7.0.0
lxml>=4.5.0
PyYAML>=5.4.1
awscli>=1.33.23
boto3>=1.12.6
botocore>=1.34.141
iso8601>=0.1.11

[options.extras_require]
tests =
Expand Down

0 comments on commit 2e8e25b

Please sign in to comment.