Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Link EC Projects (Awards vocabulary) to EuroSciVoc subjects and participating organizations with data from CORDIS #399

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions invenio_vocabularies/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,14 @@ def update(vocabulary, filepath=None, origin=None):

for w_conf in config["writers"]:
if w_conf["type"] == "async":
w_conf["args"]["writer"]["args"]["update"] = True
w_conf_update = w_conf["args"]["writer"]
else:
w_conf["args"]["update"] = True
w_conf_update = w_conf

if "args" in w_conf_update:
w_conf_update["args"]["update"] = True
else:
w_conf_update["args"] = {"update": True}

success, errored, filtered = _process_vocab(config)

Expand Down
9 changes: 9 additions & 0 deletions invenio_vocabularies/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,17 @@
}
""""Generic identifier schemes, usable by other vocabularies."""


def is_pic(val):
"""Test if argument is a Participant Identification Code (PIC)."""
if len(val) != 9:
return False
return val.isdigit()


VOCABULARIES_AFFILIATION_SCHEMES = {
**VOCABULARIES_IDENTIFIER_SCHEMES,
"pic": {"label": _("PIC"), "validator": is_pic},
}
"""Affiliations allowed identifier schemes."""

Expand Down
95 changes: 94 additions & 1 deletion invenio_vocabularies/contrib/affiliations/datastreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
"""Affiliations datastreams, transformers, writers and readers."""

from flask import current_app
from invenio_i18n import lazy_gettext as _

from ...datastreams.errors import WriterError
from ...datastreams.transformers import BaseTransformer
from ...datastreams.writers import ServiceWriter
from ..common.ror.datastreams import RORTransformer

Expand Down Expand Up @@ -46,16 +47,77 @@ def __init__(
)


class OpenAIREOrganizationTransformer(BaseTransformer):
"""OpenAIRE Organization Transformer."""

def apply(self, stream_entry, **kwargs):
"""Applies the transformation to the stream entry."""
record = stream_entry.entry

organization = {"openaire_id": record["id"]}

for pid in record["pid"]:
if pid["scheme"] == "ROR":
organization["id"] = pid["value"].removeprefix("https://ror.org/")
elif pid["scheme"] == "PIC":
organization["identifiers"] = [
{
"scheme": "pic",
"identifier": pid["value"],
}
]

stream_entry.entry = organization
return stream_entry


class OpenAIREAffiliationsServiceWriter(ServiceWriter):
"""OpenAIRE Affiliations service writer."""

def __init__(self, *args, **kwargs):
"""Constructor."""
service_or_name = kwargs.pop("service_or_name", "affiliations")
# Here we only update and we do not insert, since OpenAIRE data is used to augment existing affiliations
# (with PIC identifiers) and is not used to create new affiliations.
super().__init__(service_or_name=service_or_name, insert=False, *args, **kwargs)

def _entry_id(self, entry):
"""Get the id from an entry."""
return entry["id"]

def write(self, stream_entry, *args, **kwargs):
"""Writes the input entry using a given service."""
entry = stream_entry.entry

if not entry["openaire_id"].startswith("openorgs____::"):
raise WriterError([f"Not valid OpenAIRE OpenOrgs id for: {entry}"])
del entry["openaire_id"]

if "id" not in entry:
raise WriterError([f"No id for: {entry}"])

if "identifiers" not in entry:
raise WriterError([f"No alternative identifiers for: {entry}"])

return super().write(stream_entry, *args, **kwargs)

def write_many(self, stream_entries, *args, **kwargs):
"""Writes the input entries using a given service."""
return super().write_many(stream_entries, *args, **kwargs)


VOCABULARIES_DATASTREAM_READERS = {}
"""Affiliations datastream readers."""

VOCABULARIES_DATASTREAM_WRITERS = {
"affiliations-service": AffiliationsServiceWriter,
"openaire-affiliations-service": OpenAIREAffiliationsServiceWriter,
}
"""Affiliations datastream writers."""

VOCABULARIES_DATASTREAM_TRANSFORMERS = {
"ror-affiliations": AffiliationsRORTransformer,
"openaire-organization": OpenAIREOrganizationTransformer,
}
"""Affiliations datastream transformers."""

Expand Down Expand Up @@ -90,3 +152,34 @@ def __init__(

An origin is required for the reader.
"""

DATASTREAM_CONFIG_OPENAIRE = {
"readers": [
{"type": "openaire-http", "args": {"tar_href": "/organization.tar"}},
{
"type": "tar",
"args": {
"regex": "\\.json.gz$",
"mode": "r",
},
},
{"type": "gzip"},
{"type": "jsonl"},
],
"transformers": [
{
"type": "openaire-organization",
},
],
"writers": [
{
"type": "async",
"args": {
"writer": {
"type": "openaire-affiliations-service",
}
},
}
],
}
"""Alternative Data Stream configuration for OpenAIRE Affiliations."""
177 changes: 117 additions & 60 deletions invenio_vocabularies/contrib/awards/datastreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,73 +8,15 @@

"""Awards datastreams, transformers, writers and readers."""

import io

import requests
from invenio_access.permissions import system_identity
from invenio_i18n import lazy_gettext as _

from ...datastreams.errors import ReaderError, TransformerError
from ...datastreams.readers import BaseReader
from ...datastreams.errors import TransformerError
from ...datastreams.transformers import BaseTransformer
from ...datastreams.writers import ServiceWriter
from .config import awards_ec_ror_id, awards_openaire_funders_mapping


class OpenAIREProjectHTTPReader(BaseReader):
"""OpenAIRE Project HTTP Reader returning an in-memory binary stream of the latest OpenAIRE Graph Dataset project tar file."""

def _iter(self, fp, *args, **kwargs):
raise NotImplementedError(
"OpenAIREProjectHTTPReader downloads one file and therefore does not iterate through items"
)

def read(self, item=None, *args, **kwargs):
"""Reads the latest OpenAIRE Graph Dataset project tar file from Zenodo and yields an in-memory binary stream of it."""
if item:
raise NotImplementedError(
"OpenAIREProjectHTTPReader does not support being chained after another reader"
)

if self._origin == "full":
# OpenAIRE Graph Dataset
api_url = "https://zenodo.org/api/records/3516917"
elif self._origin == "diff":
# OpenAIRE Graph dataset: new collected projects
api_url = "https://zenodo.org/api/records/6419021"
else:
raise ReaderError("The --origin option should be either 'full' or 'diff'")

# Call the signposting `linkset+json` endpoint for the Concept DOI (i.e. latest version) of the OpenAIRE Graph Dataset.
# See: https://github.com/inveniosoftware/rfcs/blob/master/rfcs/rdm-0071-signposting.md#provide-an-applicationlinksetjson-endpoint
headers = {"Accept": "application/linkset+json"}
api_resp = requests.get(api_url, headers=headers)
api_resp.raise_for_status()

# Extract the Landing page Link Set Object located as the first (index 0) item.
landing_page_linkset = api_resp.json()["linkset"][0]

# Extract the URL of the only project tar file linked to the record.
landing_page_project_tar_items = [
item
for item in landing_page_linkset["item"]
if item["type"] == "application/x-tar"
and item["href"].endswith("/project.tar")
]
if len(landing_page_project_tar_items) != 1:
raise ReaderError(
f"Expected 1 project tar item but got {len(landing_page_project_tar_items)}"
)
file_url = landing_page_project_tar_items[0]["href"]

# Download the project tar file and fully load the response bytes content in memory.
# The bytes content are then wrapped by a BytesIO to be file-like object (as required by `tarfile.open`).
# Using directly `file_resp.raw` is not possible since `tarfile.open` requires the file-like object to be seekable.
file_resp = requests.get(file_url)
file_resp.raise_for_status()
yield io.BytesIO(file_resp.content)


class AwardsServiceWriter(ServiceWriter):
"""Funders service writer."""

Expand Down Expand Up @@ -172,20 +114,135 @@ def apply(self, stream_entry, **kwargs):
return stream_entry


class CORDISProjectHTTPReader(BaseReader):
"""CORDIS Project HTTP Reader returning an in-memory binary stream of the latest CORDIS Horizon Europe project zip file."""

def _iter(self, fp, *args, **kwargs):
raise NotImplementedError(
"CORDISProjectHTTPReader downloads one file and therefore does not iterate through items"
)

def read(self, item=None, *args, **kwargs):
"""Reads the latest CORDIS Horizon Europe project zip file and yields an in-memory binary stream of it."""
if item:
raise NotImplementedError(
"CORDISProjectHTTPReader does not support being chained after another reader"
)

file_url = "https://cordis.europa.eu/data/cordis-HORIZONprojects-xml.zip"

# Download the ZIP file and fully load the response bytes content in memory.
# The bytes content are then wrapped by a BytesIO to be file-like object (as required by `zipfile.ZipFile`).
# Using directly `file_resp.raw` is not possible since `zipfile.ZipFile` requires the file-like object to be seekable.
file_resp = requests.get(file_url)
file_resp.raise_for_status()
yield io.BytesIO(file_resp.content)


class CORDISProjectTransformer(BaseTransformer):
"""Transforms a CORDIS project record into an award record."""

def apply(self, stream_entry, **kwargs):
"""Applies the transformation to the stream entry."""
record = stream_entry.entry
award = {}

# Here `id` is the project ID, which will be used to attach the update to the existing project.
award["id"] = f"00k4n6c32::{record['id']}"

award["subjects"] = [
{
"scheme": "EuroSciVoc",
# TODO: Here lowercase while title cased in the subjects vocabulary.
"subject": category["title"],
}
for category in record["relations"]["categories"]["category"]
if category["@classification"] == "euroSciVoc"
]

organizations = record["relations"]["associations"]["organization"]
# Projects with a single organization are not wrapped in a list,
# so we do this here to be able to iterate over it.
organizations = organizations if isinstance(organizations, list) else [organizations]
award["organizations"] = [
{
# TODO: Here the legal name is uppercase.
"organization": organization["legalname"],
"scheme": "pic",
"id": organization["id"],
}
for organization in organizations
]

stream_entry.entry = award
return stream_entry


class CORDISAwardsServiceWriter(ServiceWriter):
"""CORDIS Awards service writer."""

def __init__(self, *args, **kwargs):
"""Constructor."""
service_or_name = kwargs.pop("service_or_name", "awards")
# Here we only update and we do not insert, since CORDIS data is used to augment existing awards
# (with subjects and organizations information) and is not used to create new awards.
super().__init__(service_or_name=service_or_name, insert=False, *args, **kwargs)

def _entry_id(self, entry):
"""Get the id from an entry."""
return entry["id"]


VOCABULARIES_DATASTREAM_READERS = {
"openaire-project-http": OpenAIREProjectHTTPReader,
"cordis-project-http": CORDISProjectHTTPReader,
}

VOCABULARIES_DATASTREAM_TRANSFORMERS = {
"openaire-award": OpenAIREProjectTransformer,
"cordis-award": CORDISProjectTransformer,
}
"""ORCiD Data Streams transformers."""

VOCABULARIES_DATASTREAM_WRITERS = {
"awards-service": AwardsServiceWriter,
"cordis-awards-service": CORDISAwardsServiceWriter,
}
"""ORCiD Data Streams transformers."""

DATASTREAM_CONFIG_CORDIS = {
"readers": [
# {"type": "cordis-project-http"},
# {
# "type": "zip",
# "args": {
# "regex": "\\.xml$",
# "mode": "r",
# },
# },
{
"type": "xml",
"args": {
"root_element": "project",
},
},
],
"transformers": [
{"type": "cordis-award"},
],
"writers": [
{
"type": "cordis-awards-service",
"args": {
"identity": system_identity,
},
}
],
}
"""Data Stream configuration.

An origin is required for the reader.
"""

DATASTREAM_CONFIG = {
"readers": [
{
Expand Down
Loading
Loading