Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Affiliations OpenAIRE OpenOrgs PIC #392

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions invenio_vocabularies/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,17 @@
}
""""Generic identifier schemes, usable by other vocabularies."""


def is_pic(val):
"""Test if argument is a Participant Identification Code (PIC)."""
if len(val) != 9:
return False
return val.isdigit()
Comment on lines +47 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def is_pic(val):
"""Test if argument is a Participant Identification Code (PIC)."""
if len(val) != 9:
return False
return val.isdigit()
def is_pic(val):
"""Test if argument is a Participant Identification Code (PIC)."""
return len(val) == 9 and val.isdigit()



VOCABULARIES_AFFILIATION_SCHEMES = {
**VOCABULARIES_IDENTIFIER_SCHEMES,
"pic": {"label": _("PIC"), "validator": is_pic},
}
"""Affiliations allowed identifier schemes."""

Expand Down
95 changes: 94 additions & 1 deletion invenio_vocabularies/contrib/affiliations/datastreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
"""Affiliations datastreams, transformers, writers and readers."""

from flask import current_app
from invenio_i18n import lazy_gettext as _

from ...datastreams.errors import WriterError
from ...datastreams.transformers import BaseTransformer
from ...datastreams.writers import ServiceWriter
from ..common.ror.datastreams import RORTransformer

Expand Down Expand Up @@ -46,16 +47,77 @@ def __init__(
)


class OpenAIREOrganizationTransformer(BaseTransformer):
"""OpenAIRE Organization Transformer."""

def apply(self, stream_entry, **kwargs):
"""Applies the transformation to the stream entry."""
record = stream_entry.entry

organization = {"openaire_id": record["id"]}

for pid in record["pid"]:
if pid["scheme"] == "ROR":
organization["id"] = pid["value"].removeprefix("https://ror.org/")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: does this have to be id? would it not be better to be ror/rorid for clarity? or be pid to match the column in affiliation_metadata

elif pid["scheme"] == "PIC":
organization["identifiers"] = [
{
"scheme": "pic",
"identifier": pid["value"],
}
]

stream_entry.entry = organization
return stream_entry


class OpenAIREAffiliationsServiceWriter(ServiceWriter):
"""OpenAIRE Affiliations service writer."""

def __init__(self, *args, **kwargs):
"""Constructor."""
service_or_name = kwargs.pop("service_or_name", "affiliations")
# Here we only update and we do not insert, since OpenAIRE data is used to augment existing affiliations
# (with PIC identifiers) and is not used to create new affiliations.
super().__init__(service_or_name=service_or_name, insert=False, *args, **kwargs)

def _entry_id(self, entry):
"""Get the id from an entry."""
return entry["id"]

def write(self, stream_entry, *args, **kwargs):
"""Writes the input entry using a given service."""
entry = stream_entry.entry

if not entry["openaire_id"].startswith("openorgs____::"):
raise WriterError([f"Not valid OpenAIRE OpenOrgs id for: {entry}"])
del entry["openaire_id"]
Comment on lines +92 to +94
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: why do we validate the id and then delete it? would it be better to have the check in the transformer instead?


if "id" not in entry:
raise WriterError([f"No id for: {entry}"])

if "identifiers" not in entry:
raise WriterError([f"No alternative identifiers for: {entry}"])

return super().write(stream_entry, *args, **kwargs)

def write_many(self, stream_entries, *args, **kwargs):
"""Writes the input entries using a given service."""
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write_many in ServiceWriter already does not really handle the update flag logic. Now that I also reject entries in write, I am not sure how to reuse all this logic here.

return super().write_many(stream_entries, *args, **kwargs)


VOCABULARIES_DATASTREAM_READERS = {}
"""Affiliations datastream readers."""

VOCABULARIES_DATASTREAM_WRITERS = {
"affiliations-service": AffiliationsServiceWriter,
"openaire-affiliations-service": OpenAIREAffiliationsServiceWriter,
}
"""Affiliations datastream writers."""

VOCABULARIES_DATASTREAM_TRANSFORMERS = {
"ror-affiliations": AffiliationsRORTransformer,
"openaire-organization": OpenAIREOrganizationTransformer,
}
"""Affiliations datastream transformers."""

Expand Down Expand Up @@ -90,3 +152,34 @@ def __init__(

An origin is required for the reader.
"""

DATASTREAM_CONFIG_OPENAIRE = {
"readers": [
{"type": "openaire-http", "args": {"tar_href": "/organization.tar"}},
{
"type": "tar",
"args": {
"regex": "\\.json.gz$",
"mode": "r",
},
},
{"type": "gzip"},
{"type": "jsonl"},
],
"transformers": [
{
"type": "openaire-organization",
},
],
"writers": [
{
"type": "async",
"args": {
"writer": {
"type": "openaire-affiliations-service",
}
},
}
],
}
"""Alternative Data Stream configuration for OpenAIRE Affiliations."""
18 changes: 17 additions & 1 deletion invenio_vocabularies/datastreams/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,23 @@ def _resolve(self, id_):
def _do_update(self, entry):
vocab_id = self._entry_id(entry)
current = self._resolve(vocab_id)
updated = dict(current.to_dict(), **entry)

# Merge the `current` dictionary with new data in the `entry` dictionary
# by appending to lists at the top level instead of overwriting the list.
updated = current.to_dict()
for key, value in entry.items():
if (
key in updated
and isinstance(updated[key], list)
and isinstance(value, list)
):
for value_item in value:
# TODO: If an identifier was wrong and is then corrected, this will cause duplicated entries.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To fix this problem, we would need to not only check for equality like we do here, but we would need to check the scheme of each identifier to know if it's a new scheme or an existing scheme being updated. This would mean that the writer logic would be specific to a given vocabulary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we think duplicates would be problematic? maybe better to just accumulate IDs 😅

if value_item not in updated[key]:
updated[key].append(value_item)
Comment on lines +94 to +103
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for key, value in entry.items():
if (
key in updated
and isinstance(updated[key], list)
and isinstance(value, list)
):
for value_item in value:
# TODO: If an identifier was wrong and is then corrected, this will cause duplicated entries.
if value_item not in updated[key]:
updated[key].append(value_item)
for key, values in entry.items():
if (
key in updated
and isinstance(updated[key], list)
and isinstance(value, list)
):
for value in values:
# TODO: If an identifier was wrong and is then corrected, this will cause duplicated entries.
if value not in updated[key]:
updated[key].append(value)

minor: maybe better to have values instead of value_items in value

else:
updated[key] = value

return StreamEntry(self._service.update(self._identity, vocab_id, updated))

def write(self, stream_entry, *args, **kwargs):
Expand Down
15 changes: 15 additions & 0 deletions invenio_vocabularies/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from .contrib.affiliations.datastreams import (
DATASTREAM_CONFIG as affiliations_ds_config,
)
from .contrib.affiliations.datastreams import (
DATASTREAM_CONFIG_OPENAIRE as affiliations_openaire_ds_config,
)
from .contrib.awards.datastreams import DATASTREAM_CONFIG as awards_ds_config
from .contrib.funders.datastreams import DATASTREAM_CONFIG as funders_ds_config
from .contrib.names.datastreams import DATASTREAM_CONFIG as names_ds_config
Expand Down Expand Up @@ -95,13 +98,25 @@ def get_service(self):
raise NotImplementedError("Service not implemented for Affiliations")


class AffiliationsOpenAIREVocabularyConfig(VocabularyConfig):
"""OpenAIRE Affiliations Vocabulary Config."""

config = affiliations_openaire_ds_config
vocabulary_name = "affiliations:openaire"

def get_service(self):
"""Get the service for the vocabulary."""
raise NotImplementedError("Service not implemented for OpenAIRE Affiliations")


def get_vocabulary_config(vocabulary):
"""Factory function to get the appropriate Vocabulary Config."""
vocab_config = {
"names": NamesVocabularyConfig,
"funders": FundersVocabularyConfig,
"awards": AwardsVocabularyConfig,
"affiliations": AffiliationsVocabularyConfig,
"affiliations:openaire": AffiliationsOpenAIREVocabularyConfig,
Copy link
Member Author

@ptamarit ptamarit Aug 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remark: here we are introducing the notion of different datasources for a given vocabulary type, using : as a kind of namespacing.

"subjects": SubjectsVocabularyConfig,
}
return vocab_config.get(vocabulary, VocabularyConfig)()
10 changes: 10 additions & 0 deletions tests/contrib/affiliations/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ def affiliation_full_data():
}


@pytest.fixture(scope="function")
def openaire_affiliation_full_data():
"""Full OpenAIRE affiliation data."""
return {
"openaire_id": "openorgs____::47efb6602225236c0b207761a8b3a21c",
"id": "01ggx4157",
"identifiers": [{"identifier": "999988133", "scheme": "pic"}],
}


@pytest.fixture(scope="module")
def service():
"""Affiliations service object."""
Expand Down
133 changes: 133 additions & 0 deletions tests/contrib/affiliations/test_affiliations_datastreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from invenio_vocabularies.contrib.affiliations.config import affiliation_schemes
from invenio_vocabularies.contrib.affiliations.datastreams import (
AffiliationsServiceWriter,
OpenAIREAffiliationsServiceWriter,
OpenAIREOrganizationTransformer,
)
from invenio_vocabularies.contrib.common.ror.datastreams import RORTransformer
from invenio_vocabularies.datastreams import StreamEntry
Expand Down Expand Up @@ -118,3 +120,134 @@ def test_affiliations_service_writer_update_non_existing(

# not-ideal cleanup
affiliation_rec._record.delete(force=True)


@pytest.fixture()
def dict_openaire_organization_entry():
"""An example entry from OpenAIRE organization Data Dump."""
return StreamEntry(
{
"alternativeNames": [
"European Organization for Nuclear Research",
"Organisation européenne pour la recherche nucléaire",
"CERN",
],
"country": {"code": "CH", "label": "Switzerland"},
"id": "openorgs____::47efb6602225236c0b207761a8b3a21c",
"legalName": "European Organization for Nuclear Research",
"legalShortName": "CERN",
"pid": [
{"scheme": "mag_id", "value": "67311998"},
{"scheme": "ISNI", "value": "000000012156142X"},
{"scheme": "Wikidata", "value": "Q42944"},
{"scheme": "PIC", "value": "999988133"},
{"scheme": "ROR", "value": "https://ror.org/01ggx4157"},
{"scheme": "OrgReg", "value": "INT1011"},
{"scheme": "ISNI", "value": "000000012156142X"},
{"scheme": "FundRef", "value": "100012470"},
{"scheme": "GRID", "value": "grid.9132.9"},
{"scheme": "OrgRef", "value": "37351"},
],
"websiteUrl": "http://home.web.cern.ch/",
}
)


@pytest.fixture(scope="module")
def expected_from_openaire_json():
return {
"openaire_id": "openorgs____::47efb6602225236c0b207761a8b3a21c",
"id": "01ggx4157",
"identifiers": [{"identifier": "999988133", "scheme": "pic"}],
}


def test_openaire_organization_transformer(
app, dict_openaire_organization_entry, expected_from_openaire_json
):
transformer = OpenAIREOrganizationTransformer()
assert (
expected_from_openaire_json
== transformer.apply(dict_openaire_organization_entry).entry
)


def test_openaire_affiliations_service_writer(
app, search_clear, affiliation_full_data, openaire_affiliation_full_data, service
):
# create vocabulary with original service writer
orig_writer = AffiliationsServiceWriter()
orig_affiliation_rec = orig_writer.write(StreamEntry(affiliation_full_data))
orig_affiliation_dict = orig_affiliation_rec.entry.to_dict()
Affiliation.index.refresh() # refresh index to make changes live

# update vocabulary and check changes vocabulary with OpenAIRE service writer
writer = OpenAIREAffiliationsServiceWriter(update=True)
_ = writer.write(StreamEntry(openaire_affiliation_full_data))
Affiliation.index.refresh() # refresh index to make changes live
affiliation_rec = service.read(system_identity, orig_affiliation_rec.entry.id)
affiliation_dict = affiliation_rec.to_dict()

assert _.entry.id == orig_affiliation_rec.entry.id

# updating fields changing from one update to the other
orig_affiliation_dict["revision_id"] = affiliation_dict["revision_id"]
orig_affiliation_dict["updated"] = affiliation_dict["updated"]
# Adding the extra identifier coming from OpenAIRE
orig_affiliation_dict["identifiers"].extend(
openaire_affiliation_full_data["identifiers"]
)

assert dict(orig_affiliation_dict) == affiliation_dict

# not-ideal cleanup
affiliation_rec._record.delete(force=True)


def test_openaire_affiliations_service_writer_non_openorgs(
app, openaire_affiliation_full_data
):
writer = OpenAIREAffiliationsServiceWriter()

updated_openaire_affiliation = deepcopy(openaire_affiliation_full_data)
updated_openaire_affiliation["openaire_id"] = (
"pending_org_::627931d047132a4e20dbc4a882eb9a35"
)

with pytest.raises(WriterError) as err:
writer.write(StreamEntry(updated_openaire_affiliation))

expected_error = [
f"Not valid OpenAIRE OpenOrgs id for: {updated_openaire_affiliation}"
]
assert expected_error in err.value.args


def test_openaire_affiliations_service_writer_no_id(
app, openaire_affiliation_full_data
):
writer = OpenAIREAffiliationsServiceWriter()

updated_openaire_affiliation = deepcopy(openaire_affiliation_full_data)
del updated_openaire_affiliation["id"]

with pytest.raises(WriterError) as err:
writer.write(StreamEntry(updated_openaire_affiliation))

expected_error = [f"No id for: {updated_openaire_affiliation}"]
assert expected_error in err.value.args


def test_openaire_affiliations_service_writer_no_alternative_identifiers(
app, openaire_affiliation_full_data
):
writer = OpenAIREAffiliationsServiceWriter()

updated_openaire_affiliation = deepcopy(openaire_affiliation_full_data)
del updated_openaire_affiliation["identifiers"]

with pytest.raises(WriterError) as err:
writer.write(StreamEntry(updated_openaire_affiliation))

expected_error = [f"No alternative identifiers for: {updated_openaire_affiliation}"]
assert expected_error in err.value.args