Skip to content

Commit

Permalink
datastreams: affiliations: OpenAIRE transformer and writer adding PIC…
Browse files Browse the repository at this point in the history
… identifier
  • Loading branch information
ptamarit committed Aug 29, 2024
1 parent 88ba835 commit 9d54a2a
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 2 deletions.
9 changes: 9 additions & 0 deletions invenio_vocabularies/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,17 @@
}
""""Generic identifier schemes, usable by other vocabularies."""


def is_pic(val):
"""Test if argument is a Participant Identification Code (PIC)."""
if len(val) != 9:
return False
return val.isdigit()


VOCABULARIES_AFFILIATION_SCHEMES = {
**VOCABULARIES_IDENTIFIER_SCHEMES,
"pic": {"label": _("PIC"), "validator": is_pic},
}
"""Affiliations allowed identifier schemes."""

Expand Down
95 changes: 94 additions & 1 deletion invenio_vocabularies/contrib/affiliations/datastreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
"""Affiliations datastreams, transformers, writers and readers."""

from flask import current_app
from invenio_i18n import lazy_gettext as _

from ...datastreams.errors import WriterError
from ...datastreams.transformers import BaseTransformer
from ...datastreams.writers import ServiceWriter
from ..common.ror.datastreams import RORTransformer

Expand Down Expand Up @@ -46,16 +47,77 @@ def __init__(
)


class OpenAIREOrganizationTransformer(BaseTransformer):
"""OpenAIRE Organization Transformer."""

def apply(self, stream_entry, **kwargs):
"""Applies the transformation to the stream entry."""
record = stream_entry.entry

organization = {"openaire_id": record["id"]}

for pid in record["pid"]:
if pid["scheme"] == "ROR":
organization["id"] = pid["value"].removeprefix("https://ror.org/")
elif pid["scheme"] == "PIC":
organization["identifiers"] = [
{
"scheme": "pic",
"identifier": pid["value"],
}
]

stream_entry.entry = organization
return stream_entry


class OpenAIREAffiliationsServiceWriter(ServiceWriter):
"""OpenAIRE Affiliations service writer."""

def __init__(self, *args, **kwargs):
"""Constructor."""
service_or_name = kwargs.pop("service_or_name", "affiliations")
# Here we only update and we do not insert, since OpenAIRE data is used to augment existing affiliations
# (with PIC identifiers) and is not used to create new affiliations.
super().__init__(service_or_name=service_or_name, insert=False, *args, **kwargs)

def _entry_id(self, entry):
"""Get the id from an entry."""
return entry["id"]

def write(self, stream_entry, *args, **kwargs):
"""Writes the input entry using a given service."""
entry = stream_entry.entry

if not entry["openaire_id"].startswith("openorgs____::"):
raise WriterError([f"Not valid OpenAIRE OpenOrgs id for: {entry}"])
del entry["openaire_id"]

if "id" not in entry:
raise WriterError([f"No id for: {entry}"])

if "identifiers" not in entry:
raise WriterError([f"No alternative identifiers for: {entry}"])

return super().write(stream_entry, *args, **kwargs)

def write_many(self, stream_entries, *args, **kwargs):
"""Writes the input entries using a given service."""
return super().write_many(stream_entries, *args, **kwargs)


VOCABULARIES_DATASTREAM_READERS = {}
"""Affiliations datastream readers."""

VOCABULARIES_DATASTREAM_WRITERS = {
"affiliations-service": AffiliationsServiceWriter,
"openaire-affiliations-service": OpenAIREAffiliationsServiceWriter,
}
"""Affiliations datastream writers."""

VOCABULARIES_DATASTREAM_TRANSFORMERS = {
"ror-affiliations": AffiliationsRORTransformer,
"openaire-organization": OpenAIREOrganizationTransformer,
}
"""Affiliations datastream transformers."""

Expand Down Expand Up @@ -90,3 +152,34 @@ def __init__(
An origin is required for the reader.
"""

DATASTREAM_CONFIG_OPENAIRE = {
"readers": [
{"type": "openaire-http", "args": {"tar_href": "/organization.tar"}},
{
"type": "tar",
"args": {
"regex": "\\.json.gz$",
"mode": "r",
},
},
{"type": "gzip"},
{"type": "jsonl"},
],
"transformers": [
{
"type": "openaire-organization",
},
],
"writers": [
{
"type": "async",
"args": {
"writer": {
"type": "openaire-affiliations-service",
}
},
}
],
}
"""Alternative Data Stream configuration for OpenAIRE Affiliations."""
18 changes: 17 additions & 1 deletion invenio_vocabularies/datastreams/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,23 @@ def _resolve(self, id_):
def _do_update(self, entry):
vocab_id = self._entry_id(entry)
current = self._resolve(vocab_id)
updated = dict(current.to_dict(), **entry)

# Merge the `current` dictionary with new data in the `entry` dictionary
# by appending to lists at the top level instead of overwriting the list.
updated = current.to_dict()
for key, value in entry.items():
if (
key in updated
and isinstance(updated[key], list)
and isinstance(value, list)
):
for value_item in value:
# TODO: If an identifier was wrong and is then corrected, this will cause duplicated entries.
if value_item not in updated[key]:
updated[key].append(value_item)
else:
updated[key] = value

return StreamEntry(self._service.update(self._identity, vocab_id, updated))

def write(self, stream_entry, *args, **kwargs):
Expand Down
15 changes: 15 additions & 0 deletions invenio_vocabularies/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from .contrib.affiliations.datastreams import (
DATASTREAM_CONFIG as affiliations_ds_config,
)
from .contrib.affiliations.datastreams import (
DATASTREAM_CONFIG_OPENAIRE as affiliations_openaire_ds_config,
)
from .contrib.awards.datastreams import DATASTREAM_CONFIG as awards_ds_config
from .contrib.funders.datastreams import DATASTREAM_CONFIG as funders_ds_config
from .contrib.names.datastreams import DATASTREAM_CONFIG as names_ds_config
Expand Down Expand Up @@ -95,13 +98,25 @@ def get_service(self):
raise NotImplementedError("Service not implemented for Affiliations")


class AffiliationsOpenAIREVocabularyConfig(VocabularyConfig):
"""OpenAIRE Affiliations Vocabulary Config."""

config = affiliations_openaire_ds_config
vocabulary_name = "affiliations:openaire"

def get_service(self):
"""Get the service for the vocabulary."""
raise NotImplementedError("Service not implemented for OpenAIRE Affiliations")


def get_vocabulary_config(vocabulary):
"""Factory function to get the appropriate Vocabulary Config."""
vocab_config = {
"names": NamesVocabularyConfig,
"funders": FundersVocabularyConfig,
"awards": AwardsVocabularyConfig,
"affiliations": AffiliationsVocabularyConfig,
"affiliations:openaire": AffiliationsOpenAIREVocabularyConfig,
"subjects": SubjectsVocabularyConfig,
}
return vocab_config.get(vocabulary, VocabularyConfig)()
10 changes: 10 additions & 0 deletions tests/contrib/affiliations/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ def affiliation_full_data():
}


@pytest.fixture(scope="function")
def openaire_affiliation_full_data():
"""Full OpenAIRE affiliation data."""
return {
"openaire_id": "openorgs____::47efb6602225236c0b207761a8b3a21c",
"id": "01ggx4157",
"identifiers": [{"identifier": "999988133", "scheme": "pic"}],
}


@pytest.fixture(scope="module")
def service():
"""Affiliations service object."""
Expand Down
133 changes: 133 additions & 0 deletions tests/contrib/affiliations/test_affiliations_datastreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from invenio_vocabularies.contrib.affiliations.config import affiliation_schemes
from invenio_vocabularies.contrib.affiliations.datastreams import (
AffiliationsServiceWriter,
OpenAIREAffiliationsServiceWriter,
OpenAIREOrganizationTransformer,
)
from invenio_vocabularies.contrib.common.ror.datastreams import RORTransformer
from invenio_vocabularies.datastreams import StreamEntry
Expand Down Expand Up @@ -118,3 +120,134 @@ def test_affiliations_service_writer_update_non_existing(

# not-ideal cleanup
affiliation_rec._record.delete(force=True)


@pytest.fixture()
def dict_openaire_organization_entry():
"""An example entry from OpenAIRE organization Data Dump."""
return StreamEntry(
{
"alternativeNames": [
"European Organization for Nuclear Research",
"Organisation européenne pour la recherche nucléaire",
"CERN",
],
"country": {"code": "CH", "label": "Switzerland"},
"id": "openorgs____::47efb6602225236c0b207761a8b3a21c",
"legalName": "European Organization for Nuclear Research",
"legalShortName": "CERN",
"pid": [
{"scheme": "mag_id", "value": "67311998"},
{"scheme": "ISNI", "value": "000000012156142X"},
{"scheme": "Wikidata", "value": "Q42944"},
{"scheme": "PIC", "value": "999988133"},
{"scheme": "ROR", "value": "https://ror.org/01ggx4157"},
{"scheme": "OrgReg", "value": "INT1011"},
{"scheme": "ISNI", "value": "000000012156142X"},
{"scheme": "FundRef", "value": "100012470"},
{"scheme": "GRID", "value": "grid.9132.9"},
{"scheme": "OrgRef", "value": "37351"},
],
"websiteUrl": "http://home.web.cern.ch/",
}
)


@pytest.fixture(scope="module")
def expected_from_openaire_json():
return {
"openaire_id": "openorgs____::47efb6602225236c0b207761a8b3a21c",
"id": "01ggx4157",
"identifiers": [{"identifier": "999988133", "scheme": "pic"}],
}


def test_openaire_organization_transformer(
app, dict_openaire_organization_entry, expected_from_openaire_json
):
transformer = OpenAIREOrganizationTransformer()
assert (
expected_from_openaire_json
== transformer.apply(dict_openaire_organization_entry).entry
)


def test_openaire_affiliations_service_writer(
app, search_clear, affiliation_full_data, openaire_affiliation_full_data, service
):
# create vocabulary with original service writer
orig_writer = AffiliationsServiceWriter()
orig_affiliation_rec = orig_writer.write(StreamEntry(affiliation_full_data))
orig_affiliation_dict = orig_affiliation_rec.entry.to_dict()
Affiliation.index.refresh() # refresh index to make changes live

# update vocabulary and check changes vocabulary with OpenAIRE service writer
writer = OpenAIREAffiliationsServiceWriter(update=True)
_ = writer.write(StreamEntry(openaire_affiliation_full_data))
Affiliation.index.refresh() # refresh index to make changes live
affiliation_rec = service.read(system_identity, orig_affiliation_rec.entry.id)
affiliation_dict = affiliation_rec.to_dict()

assert _.entry.id == orig_affiliation_rec.entry.id

# updating fields changing from one update to the other
orig_affiliation_dict["revision_id"] = affiliation_dict["revision_id"]
orig_affiliation_dict["updated"] = affiliation_dict["updated"]
# Adding the extra identifier coming from OpenAIRE
orig_affiliation_dict["identifiers"].extend(
openaire_affiliation_full_data["identifiers"]
)

assert dict(orig_affiliation_dict) == affiliation_dict

# not-ideal cleanup
affiliation_rec._record.delete(force=True)


def test_openaire_affiliations_service_writer_non_openorgs(
app, openaire_affiliation_full_data
):
writer = OpenAIREAffiliationsServiceWriter()

updated_openaire_affiliation = deepcopy(openaire_affiliation_full_data)
updated_openaire_affiliation["openaire_id"] = (
"pending_org_::627931d047132a4e20dbc4a882eb9a35"
)

with pytest.raises(WriterError) as err:
writer.write(StreamEntry(updated_openaire_affiliation))

expected_error = [
f"Not valid OpenAIRE OpenOrgs id for: {updated_openaire_affiliation}"
]
assert expected_error in err.value.args


def test_openaire_affiliations_service_writer_no_id(
app, openaire_affiliation_full_data
):
writer = OpenAIREAffiliationsServiceWriter()

updated_openaire_affiliation = deepcopy(openaire_affiliation_full_data)
del updated_openaire_affiliation["id"]

with pytest.raises(WriterError) as err:
writer.write(StreamEntry(updated_openaire_affiliation))

expected_error = [f"No id for: {updated_openaire_affiliation}"]
assert expected_error in err.value.args


def test_openaire_affiliations_service_writer_no_alternative_identifiers(
app, openaire_affiliation_full_data
):
writer = OpenAIREAffiliationsServiceWriter()

updated_openaire_affiliation = deepcopy(openaire_affiliation_full_data)
del updated_openaire_affiliation["identifiers"]

with pytest.raises(WriterError) as err:
writer.write(StreamEntry(updated_openaire_affiliation))

expected_error = [f"No alternative identifiers for: {updated_openaire_affiliation}"]
assert expected_error in err.value.args

0 comments on commit 9d54a2a

Please sign in to comment.