diff --git a/invenio_vocabularies/cli.py b/invenio_vocabularies/cli.py index d50a9d13..aded3193 100644 --- a/invenio_vocabularies/cli.py +++ b/invenio_vocabularies/cli.py @@ -101,9 +101,14 @@ def update(vocabulary, filepath=None, origin=None): for w_conf in config["writers"]: if w_conf["type"] == "async": - w_conf["args"]["writer"]["args"]["update"] = True + w_conf_update = w_conf["args"]["writer"] else: - w_conf["args"]["update"] = True + w_conf_update = w_conf + + if "args" in w_conf_update: + w_conf_update["args"]["update"] = True + else: + w_conf_update["args"] = {"update": True} success, errored, filtered = _process_vocab(config) diff --git a/invenio_vocabularies/config.py b/invenio_vocabularies/config.py index ef121034..7e492f3c 100644 --- a/invenio_vocabularies/config.py +++ b/invenio_vocabularies/config.py @@ -43,8 +43,17 @@ } """"Generic identifier schemes, usable by other vocabularies.""" + +def is_pic(val): + """Test if argument is a Participant Identification Code (PIC).""" + if len(val) != 9: + return False + return val.isdigit() + + VOCABULARIES_AFFILIATION_SCHEMES = { **VOCABULARIES_IDENTIFIER_SCHEMES, + "pic": {"label": _("PIC"), "validator": is_pic}, } """Affiliations allowed identifier schemes.""" diff --git a/invenio_vocabularies/contrib/affiliations/datastreams.py b/invenio_vocabularies/contrib/affiliations/datastreams.py index 381bce2b..cffe6855 100644 --- a/invenio_vocabularies/contrib/affiliations/datastreams.py +++ b/invenio_vocabularies/contrib/affiliations/datastreams.py @@ -10,8 +10,9 @@ """Affiliations datastreams, transformers, writers and readers.""" from flask import current_app -from invenio_i18n import lazy_gettext as _ +from ...datastreams.errors import WriterError +from ...datastreams.transformers import BaseTransformer from ...datastreams.writers import ServiceWriter from ..common.ror.datastreams import RORTransformer @@ -46,16 +47,77 @@ def __init__( ) +class OpenAIREOrganizationTransformer(BaseTransformer): + """OpenAIRE Organization Transformer.""" + + def apply(self, stream_entry, **kwargs): + """Applies the transformation to the stream entry.""" + record = stream_entry.entry + + organization = {"openaire_id": record["id"]} + + for pid in record["pid"]: + if pid["scheme"] == "ROR": + organization["id"] = pid["value"].removeprefix("https://ror.org/") + elif pid["scheme"] == "PIC": + organization["identifiers"] = [ + { + "scheme": "pic", + "identifier": pid["value"], + } + ] + + stream_entry.entry = organization + return stream_entry + + +class OpenAIREAffiliationsServiceWriter(ServiceWriter): + """OpenAIRE Affiliations service writer.""" + + def __init__(self, *args, **kwargs): + """Constructor.""" + service_or_name = kwargs.pop("service_or_name", "affiliations") + # Here we only update and we do not insert, since OpenAIRE data is used to augment existing affiliations + # (with PIC identifiers) and is not used to create new affiliations. + super().__init__(service_or_name=service_or_name, insert=False, *args, **kwargs) + + def _entry_id(self, entry): + """Get the id from an entry.""" + return entry["id"] + + def write(self, stream_entry, *args, **kwargs): + """Writes the input entry using a given service.""" + entry = stream_entry.entry + + if not entry["openaire_id"].startswith("openorgs____::"): + raise WriterError([f"Not valid OpenAIRE OpenOrgs id for: {entry}"]) + del entry["openaire_id"] + + if "id" not in entry: + raise WriterError([f"No id for: {entry}"]) + + if "identifiers" not in entry: + raise WriterError([f"No alternative identifiers for: {entry}"]) + + return super().write(stream_entry, *args, **kwargs) + + def write_many(self, stream_entries, *args, **kwargs): + """Writes the input entries using a given service.""" + return super().write_many(stream_entries, *args, **kwargs) + + VOCABULARIES_DATASTREAM_READERS = {} """Affiliations datastream readers.""" VOCABULARIES_DATASTREAM_WRITERS = { "affiliations-service": AffiliationsServiceWriter, + "openaire-affiliations-service": OpenAIREAffiliationsServiceWriter, } """Affiliations datastream writers.""" VOCABULARIES_DATASTREAM_TRANSFORMERS = { "ror-affiliations": AffiliationsRORTransformer, + "openaire-organization": OpenAIREOrganizationTransformer, } """Affiliations datastream transformers.""" @@ -90,3 +152,34 @@ def __init__( An origin is required for the reader. """ + +DATASTREAM_CONFIG_OPENAIRE = { + "readers": [ + {"type": "openaire-http", "args": {"tar_href": "/organization.tar"}}, + { + "type": "tar", + "args": { + "regex": "\\.json.gz$", + "mode": "r", + }, + }, + {"type": "gzip"}, + {"type": "jsonl"}, + ], + "transformers": [ + { + "type": "openaire-organization", + }, + ], + "writers": [ + { + "type": "async", + "args": { + "writer": { + "type": "openaire-affiliations-service", + } + }, + } + ], +} +"""Alternative Data Stream configuration for OpenAIRE Affiliations.""" diff --git a/invenio_vocabularies/contrib/awards/datastreams.py b/invenio_vocabularies/contrib/awards/datastreams.py index f4109bf0..22547f39 100644 --- a/invenio_vocabularies/contrib/awards/datastreams.py +++ b/invenio_vocabularies/contrib/awards/datastreams.py @@ -8,73 +8,15 @@ """Awards datastreams, transformers, writers and readers.""" -import io - -import requests from invenio_access.permissions import system_identity from invenio_i18n import lazy_gettext as _ -from ...datastreams.errors import ReaderError, TransformerError -from ...datastreams.readers import BaseReader +from ...datastreams.errors import TransformerError from ...datastreams.transformers import BaseTransformer from ...datastreams.writers import ServiceWriter from .config import awards_ec_ror_id, awards_openaire_funders_mapping -class OpenAIREProjectHTTPReader(BaseReader): - """OpenAIRE Project HTTP Reader returning an in-memory binary stream of the latest OpenAIRE Graph Dataset project tar file.""" - - def _iter(self, fp, *args, **kwargs): - raise NotImplementedError( - "OpenAIREProjectHTTPReader downloads one file and therefore does not iterate through items" - ) - - def read(self, item=None, *args, **kwargs): - """Reads the latest OpenAIRE Graph Dataset project tar file from Zenodo and yields an in-memory binary stream of it.""" - if item: - raise NotImplementedError( - "OpenAIREProjectHTTPReader does not support being chained after another reader" - ) - - if self._origin == "full": - # OpenAIRE Graph Dataset - api_url = "https://zenodo.org/api/records/3516917" - elif self._origin == "diff": - # OpenAIRE Graph dataset: new collected projects - api_url = "https://zenodo.org/api/records/6419021" - else: - raise ReaderError("The --origin option should be either 'full' or 'diff'") - - # Call the signposting `linkset+json` endpoint for the Concept DOI (i.e. latest version) of the OpenAIRE Graph Dataset. - # See: https://github.com/inveniosoftware/rfcs/blob/master/rfcs/rdm-0071-signposting.md#provide-an-applicationlinksetjson-endpoint - headers = {"Accept": "application/linkset+json"} - api_resp = requests.get(api_url, headers=headers) - api_resp.raise_for_status() - - # Extract the Landing page Link Set Object located as the first (index 0) item. - landing_page_linkset = api_resp.json()["linkset"][0] - - # Extract the URL of the only project tar file linked to the record. - landing_page_project_tar_items = [ - item - for item in landing_page_linkset["item"] - if item["type"] == "application/x-tar" - and item["href"].endswith("/project.tar") - ] - if len(landing_page_project_tar_items) != 1: - raise ReaderError( - f"Expected 1 project tar item but got {len(landing_page_project_tar_items)}" - ) - file_url = landing_page_project_tar_items[0]["href"] - - # Download the project tar file and fully load the response bytes content in memory. - # The bytes content are then wrapped by a BytesIO to be file-like object (as required by `tarfile.open`). - # Using directly `file_resp.raw` is not possible since `tarfile.open` requires the file-like object to be seekable. - file_resp = requests.get(file_url) - file_resp.raise_for_status() - yield io.BytesIO(file_resp.content) - - class AwardsServiceWriter(ServiceWriter): """Funders service writer.""" @@ -172,9 +114,7 @@ def apply(self, stream_entry, **kwargs): return stream_entry -VOCABULARIES_DATASTREAM_READERS = { - "openaire-project-http": OpenAIREProjectHTTPReader, -} +VOCABULARIES_DATASTREAM_READERS = {} VOCABULARIES_DATASTREAM_TRANSFORMERS = { "openaire-award": OpenAIREProjectTransformer, diff --git a/invenio_vocabularies/contrib/common/openaire/__init__.py b/invenio_vocabularies/contrib/common/openaire/__init__.py new file mode 100644 index 00000000..a1cf1934 --- /dev/null +++ b/invenio_vocabularies/contrib/common/openaire/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2024 CERN. +# +# Invenio-Vocabularies is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""OpenAIRE-related module.""" diff --git a/invenio_vocabularies/contrib/common/openaire/datastreams.py b/invenio_vocabularies/contrib/common/openaire/datastreams.py new file mode 100644 index 00000000..95dba01c --- /dev/null +++ b/invenio_vocabularies/contrib/common/openaire/datastreams.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2024 CERN. +# +# Invenio-Vocabularies is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""OpenAIRE-related Datastreams Readers/Writers/Transformers module.""" + +import io + +import requests + +from invenio_vocabularies.datastreams.errors import ReaderError +from invenio_vocabularies.datastreams.readers import BaseReader + + +class OpenAIREHTTPReader(BaseReader): + """OpenAIRE HTTP Reader returning an in-memory binary stream of the latest OpenAIRE Graph Dataset tar file of a given type.""" + + def __init__(self, origin=None, mode="r", tar_href=None, *args, **kwargs): + """Constructor.""" + self.tar_href = tar_href + super().__init__(origin, mode, *args, **kwargs) + + def _iter(self, fp, *args, **kwargs): + raise NotImplementedError( + "OpenAIREHTTPReader downloads one file and therefore does not iterate through items" + ) + + def read(self, item=None, *args, **kwargs): + """Reads the latest OpenAIRE Graph Dataset tar file of a given type from Zenodo and yields an in-memory binary stream of it.""" + if item: + raise NotImplementedError( + "OpenAIREHTTPReader does not support being chained after another reader" + ) + + if self._origin == "full": + # OpenAIRE Graph Dataset + api_url = "https://zenodo.org/api/records/3516917" + elif self._origin == "diff": + # OpenAIRE Graph dataset: new collected projects + api_url = "https://zenodo.org/api/records/6419021" + else: + raise ReaderError("The --origin option should be either 'full' or 'diff'") + + # Call the signposting `linkset+json` endpoint for the Concept DOI (i.e. latest version) of the OpenAIRE Graph Dataset. + # See: https://github.com/inveniosoftware/rfcs/blob/master/rfcs/rdm-0071-signposting.md#provide-an-applicationlinksetjson-endpoint + headers = {"Accept": "application/linkset+json"} + api_resp = requests.get(api_url, headers=headers) + api_resp.raise_for_status() + + # Extract the Landing page Link Set Object located as the first (index 0) item. + landing_page_linkset = api_resp.json()["linkset"][0] + + # Extract the URL of the only tar file matching `tar_href` linked to the record. + landing_page_matching_tar_items = [ + item + for item in landing_page_linkset["item"] + if item["type"] == "application/x-tar" + and item["href"].endswith(self.tar_href) + ] + if len(landing_page_matching_tar_items) != 1: + raise ReaderError( + f"Expected 1 tar item matching {self.tar_href} but got {len(landing_page_matching_tar_items)}" + ) + file_url = landing_page_matching_tar_items[0]["href"] + + # Download the matching tar file and fully load the response bytes content in memory. + # The bytes content are then wrapped by a BytesIO to be file-like object (as required by `tarfile.open`). + # Using directly `file_resp.raw` is not possible since `tarfile.open` requires the file-like object to be seekable. + file_resp = requests.get(file_url) + file_resp.raise_for_status() + yield io.BytesIO(file_resp.content) + + +VOCABULARIES_DATASTREAM_READERS = { + "openaire-http": OpenAIREHTTPReader, +} + +VOCABULARIES_DATASTREAM_TRANSFORMERS = {} + +VOCABULARIES_DATASTREAM_WRITERS = {} diff --git a/invenio_vocabularies/datastreams/writers.py b/invenio_vocabularies/datastreams/writers.py index 84084599..090fc7d8 100644 --- a/invenio_vocabularies/datastreams/writers.py +++ b/invenio_vocabularies/datastreams/writers.py @@ -13,10 +13,11 @@ import yaml from invenio_access.permissions import system_identity -from invenio_pidstore.errors import PIDAlreadyExists +from invenio_pidstore.errors import PIDAlreadyExists, PIDDoesNotExistError from invenio_records.systemfields.relations.errors import InvalidRelationValue from invenio_records_resources.proxies import current_service_registry from marshmallow import ValidationError +from sqlalchemy.exc import NoResultFound from .datastreams import StreamEntry from .errors import WriterError @@ -55,12 +56,15 @@ def write_many(self, stream_entries, *args, **kwargs): class ServiceWriter(BaseWriter): """Writes the entries to an RDM instance using a Service object.""" - def __init__(self, service_or_name, *args, identity=None, update=False, **kwargs): + def __init__( + self, service_or_name, *args, identity=None, insert=True, update=False, **kwargs + ): """Constructor. :param service_or_name: a service instance or a key of the service registry. :param identity: access identity. + :param insert: if True it will insert records which do not exist. :param update: if True it will update records if they exist. """ if isinstance(service_or_name, str): @@ -68,6 +72,7 @@ def __init__(self, service_or_name, *args, identity=None, update=False, **kwargs self._service = service_or_name self._identity = identity or system_identity + self._insert = insert self._update = update super().__init__(*args, **kwargs) @@ -79,20 +84,47 @@ def _entry_id(self, entry): def _resolve(self, id_): return self._service.read(self._identity, id_) + def _do_update(self, entry): + vocab_id = self._entry_id(entry) + current = self._resolve(vocab_id) + + # Merge the `current` dictionary with new data in the `entry` dictionary + # by appending to lists at the top level instead of overwriting the list. + updated = current.to_dict() + for key, value in entry.items(): + if ( + key in updated + and isinstance(updated[key], list) + and isinstance(value, list) + ): + for value_item in value: + # TODO: If an identifier was wrong and is then corrected, this will cause duplicated entries. + if value_item not in updated[key]: + updated[key].append(value_item) + else: + updated[key] = value + + return StreamEntry(self._service.update(self._identity, vocab_id, updated)) + def write(self, stream_entry, *args, **kwargs): """Writes the input entry using a given service.""" entry = stream_entry.entry try: - try: - return StreamEntry(self._service.create(self._identity, entry)) - except PIDAlreadyExists: - if not self._update: - raise WriterError([f"Vocabulary entry already exists: {entry}"]) - vocab_id = self._entry_id(entry) - current = self._resolve(vocab_id) - updated = dict(current.to_dict(), **entry) - return StreamEntry( - self._service.update(self._identity, vocab_id, updated) + if self._insert: + try: + return StreamEntry(self._service.create(self._identity, entry)) + except PIDAlreadyExists: + if not self._update: + raise WriterError([f"Vocabulary entry already exists: {entry}"]) + return self._do_update(entry) + elif self._update: + try: + return self._do_update(entry) + except (NoResultFound, PIDDoesNotExistError): + raise WriterError([f"Vocabulary entry does not exist: {entry}"]) + else: + raise WriterError( + ["Writer wrongly configured to not insert and to not update"] ) except ValidationError as err: diff --git a/invenio_vocabularies/factories.py b/invenio_vocabularies/factories.py index 11135b1a..7d64d974 100644 --- a/invenio_vocabularies/factories.py +++ b/invenio_vocabularies/factories.py @@ -16,6 +16,9 @@ from .contrib.affiliations.datastreams import ( DATASTREAM_CONFIG as affiliations_ds_config, ) +from .contrib.affiliations.datastreams import ( + DATASTREAM_CONFIG_OPENAIRE as affiliations_openaire_ds_config, +) from .contrib.awards.datastreams import DATASTREAM_CONFIG as awards_ds_config from .contrib.funders.datastreams import DATASTREAM_CONFIG as funders_ds_config from .contrib.names.datastreams import DATASTREAM_CONFIG as names_ds_config @@ -95,6 +98,17 @@ def get_service(self): raise NotImplementedError("Service not implemented for Affiliations") +class AffiliationsOpenAIREVocabularyConfig(VocabularyConfig): + """OpenAIRE Affiliations Vocabulary Config.""" + + config = affiliations_openaire_ds_config + vocabulary_name = "affiliations:openaire" + + def get_service(self): + """Get the service for the vocabulary.""" + raise NotImplementedError("Service not implemented for OpenAIRE Affiliations") + + def get_vocabulary_config(vocabulary): """Factory function to get the appropriate Vocabulary Config.""" vocab_config = { @@ -102,6 +116,7 @@ def get_vocabulary_config(vocabulary): "funders": FundersVocabularyConfig, "awards": AwardsVocabularyConfig, "affiliations": AffiliationsVocabularyConfig, + "affiliations:openaire": AffiliationsOpenAIREVocabularyConfig, "subjects": SubjectsVocabularyConfig, } return vocab_config.get(vocabulary, VocabularyConfig)() diff --git a/tests/contrib/affiliations/conftest.py b/tests/contrib/affiliations/conftest.py index c428e297..0c0d6b2b 100644 --- a/tests/contrib/affiliations/conftest.py +++ b/tests/contrib/affiliations/conftest.py @@ -33,6 +33,16 @@ def affiliation_full_data(): } +@pytest.fixture(scope="function") +def openaire_affiliation_full_data(): + """Full OpenAIRE affiliation data.""" + return { + "openaire_id": "openorgs____::47efb6602225236c0b207761a8b3a21c", + "id": "01ggx4157", + "identifiers": [{"identifier": "999988133", "scheme": "pic"}], + } + + @pytest.fixture(scope="module") def service(): """Affiliations service object.""" diff --git a/tests/contrib/affiliations/test_affiliations_datastreams.py b/tests/contrib/affiliations/test_affiliations_datastreams.py index 243b540d..22c49e16 100644 --- a/tests/contrib/affiliations/test_affiliations_datastreams.py +++ b/tests/contrib/affiliations/test_affiliations_datastreams.py @@ -18,6 +18,8 @@ from invenio_vocabularies.contrib.affiliations.config import affiliation_schemes from invenio_vocabularies.contrib.affiliations.datastreams import ( AffiliationsServiceWriter, + OpenAIREAffiliationsServiceWriter, + OpenAIREOrganizationTransformer, ) from invenio_vocabularies.contrib.common.ror.datastreams import RORTransformer from invenio_vocabularies.datastreams import StreamEntry @@ -118,3 +120,134 @@ def test_affiliations_service_writer_update_non_existing( # not-ideal cleanup affiliation_rec._record.delete(force=True) + + +@pytest.fixture() +def dict_openaire_organization_entry(): + """An example entry from OpenAIRE organization Data Dump.""" + return StreamEntry( + { + "alternativeNames": [ + "European Organization for Nuclear Research", + "Organisation européenne pour la recherche nucléaire", + "CERN", + ], + "country": {"code": "CH", "label": "Switzerland"}, + "id": "openorgs____::47efb6602225236c0b207761a8b3a21c", + "legalName": "European Organization for Nuclear Research", + "legalShortName": "CERN", + "pid": [ + {"scheme": "mag_id", "value": "67311998"}, + {"scheme": "ISNI", "value": "000000012156142X"}, + {"scheme": "Wikidata", "value": "Q42944"}, + {"scheme": "PIC", "value": "999988133"}, + {"scheme": "ROR", "value": "https://ror.org/01ggx4157"}, + {"scheme": "OrgReg", "value": "INT1011"}, + {"scheme": "ISNI", "value": "000000012156142X"}, + {"scheme": "FundRef", "value": "100012470"}, + {"scheme": "GRID", "value": "grid.9132.9"}, + {"scheme": "OrgRef", "value": "37351"}, + ], + "websiteUrl": "http://home.web.cern.ch/", + } + ) + + +@pytest.fixture(scope="module") +def expected_from_openaire_json(): + return { + "openaire_id": "openorgs____::47efb6602225236c0b207761a8b3a21c", + "id": "01ggx4157", + "identifiers": [{"identifier": "999988133", "scheme": "pic"}], + } + + +def test_openaire_organization_transformer( + app, dict_openaire_organization_entry, expected_from_openaire_json +): + transformer = OpenAIREOrganizationTransformer() + assert ( + expected_from_openaire_json + == transformer.apply(dict_openaire_organization_entry).entry + ) + + +def test_openaire_affiliations_service_writer( + app, search_clear, affiliation_full_data, openaire_affiliation_full_data, service +): + # create vocabulary with original service writer + orig_writer = AffiliationsServiceWriter() + orig_affiliation_rec = orig_writer.write(StreamEntry(affiliation_full_data)) + orig_affiliation_dict = orig_affiliation_rec.entry.to_dict() + Affiliation.index.refresh() # refresh index to make changes live + + # update vocabulary and check changes vocabulary with OpenAIRE service writer + writer = OpenAIREAffiliationsServiceWriter(update=True) + _ = writer.write(StreamEntry(openaire_affiliation_full_data)) + Affiliation.index.refresh() # refresh index to make changes live + affiliation_rec = service.read(system_identity, orig_affiliation_rec.entry.id) + affiliation_dict = affiliation_rec.to_dict() + + assert _.entry.id == orig_affiliation_rec.entry.id + + # updating fields changing from one update to the other + orig_affiliation_dict["revision_id"] = affiliation_dict["revision_id"] + orig_affiliation_dict["updated"] = affiliation_dict["updated"] + # Adding the extra identifier coming from OpenAIRE + orig_affiliation_dict["identifiers"].extend( + openaire_affiliation_full_data["identifiers"] + ) + + assert dict(orig_affiliation_dict) == affiliation_dict + + # not-ideal cleanup + affiliation_rec._record.delete(force=True) + + +def test_openaire_affiliations_service_writer_non_openorgs( + app, openaire_affiliation_full_data +): + writer = OpenAIREAffiliationsServiceWriter() + + updated_openaire_affiliation = deepcopy(openaire_affiliation_full_data) + updated_openaire_affiliation["openaire_id"] = ( + "pending_org_::627931d047132a4e20dbc4a882eb9a35" + ) + + with pytest.raises(WriterError) as err: + writer.write(StreamEntry(updated_openaire_affiliation)) + + expected_error = [ + f"Not valid OpenAIRE OpenOrgs id for: {updated_openaire_affiliation}" + ] + assert expected_error in err.value.args + + +def test_openaire_affiliations_service_writer_no_id( + app, openaire_affiliation_full_data +): + writer = OpenAIREAffiliationsServiceWriter() + + updated_openaire_affiliation = deepcopy(openaire_affiliation_full_data) + del updated_openaire_affiliation["id"] + + with pytest.raises(WriterError) as err: + writer.write(StreamEntry(updated_openaire_affiliation)) + + expected_error = [f"No id for: {updated_openaire_affiliation}"] + assert expected_error in err.value.args + + +def test_openaire_affiliations_service_writer_no_alternative_identifiers( + app, openaire_affiliation_full_data +): + writer = OpenAIREAffiliationsServiceWriter() + + updated_openaire_affiliation = deepcopy(openaire_affiliation_full_data) + del updated_openaire_affiliation["identifiers"] + + with pytest.raises(WriterError) as err: + writer.write(StreamEntry(updated_openaire_affiliation)) + + expected_error = [f"No alternative identifiers for: {updated_openaire_affiliation}"] + assert expected_error in err.value.args diff --git a/tests/contrib/awards/test_awards_datastreams.py b/tests/contrib/awards/test_awards_datastreams.py index b6555de9..0a43188c 100644 --- a/tests/contrib/awards/test_awards_datastreams.py +++ b/tests/contrib/awards/test_awards_datastreams.py @@ -8,9 +8,7 @@ """Awards datastreams tests.""" -import io from copy import deepcopy -from unittest.mock import patch import pytest from invenio_access.permissions import system_identity @@ -18,11 +16,10 @@ from invenio_vocabularies.contrib.awards.api import Award from invenio_vocabularies.contrib.awards.datastreams import ( AwardsServiceWriter, - OpenAIREProjectHTTPReader, OpenAIREProjectTransformer, ) from invenio_vocabularies.datastreams import StreamEntry -from invenio_vocabularies.datastreams.errors import ReaderError, WriterError +from invenio_vocabularies.datastreams.errors import WriterError @pytest.fixture(scope="function") @@ -115,126 +112,6 @@ def expected_from_award_json_ec(): } -API_JSON_RESPONSE_CONTENT = { - "linkset": [ - { - "anchor": "https://example.com/records/10488385", - "item": [ - { - "href": "https://example.com/records/10488385/files/organization.tar", - "type": "application/x-tar", - }, - { - "href": "https://example.com/records/10488385/files/project.tar", - "type": "application/x-tar", - }, - ], - }, - { - "anchor": "https://example.com/api/records/10488385", - "describes": [ - {"href": "https://example.com/records/10488385", "type": "text/html"} - ], - "type": "application/dcat+xml", - }, - ] -} - -API_JSON_RESPONSE_CONTENT_WRONG_NUMBER_PROJECT_TAR_ITEMS_ERROR = { - "linkset": [ - { - "anchor": "https://example.com/records/10488385", - "item": [ - { - "href": "https://example.com/records/10488385/files/organization.tar", - "type": "application/x-tar", - }, - { - "href": "https://example.com/records/10488385/files/project.tar", - "type": "application/x-tar", - }, - { - "href": "https://example.com/another/project.tar", - "type": "application/x-tar", - }, - ], - }, - { - "anchor": "https://example.com/api/records/10488385", - "describes": [ - {"href": "https://example.com/records/10488385", "type": "text/html"} - ], - "type": "application/dcat+xml", - }, - ] -} - -DOWNLOAD_FILE_BYTES_CONTENT = b"The content of the file" - - -class MockResponse: - content = DOWNLOAD_FILE_BYTES_CONTENT - - def __init__(self, api_json_response_content): - self.api_json_response_content = api_json_response_content - - def json(self, **kwargs): - return self.api_json_response_content - - def raise_for_status(self): - pass - - -@pytest.fixture(scope="function") -def download_file_bytes_content(): - return DOWNLOAD_FILE_BYTES_CONTENT - - -@patch( - "requests.get", - side_effect=lambda url, headers=None: MockResponse(API_JSON_RESPONSE_CONTENT), -) -def test_openaire_project_http_reader(_, download_file_bytes_content): - reader = OpenAIREProjectHTTPReader(origin="full") - results = [] - for entry in reader.read(): - results.append(entry) - - assert len(results) == 1 - assert isinstance(results[0], io.BytesIO) - assert results[0].read() == download_file_bytes_content - - -@patch( - "requests.get", - side_effect=lambda url, headers=None: MockResponse( - API_JSON_RESPONSE_CONTENT_WRONG_NUMBER_PROJECT_TAR_ITEMS_ERROR - ), -) -def test_openaire_project_http_reader_wrong_number_tar_items_error(_): - reader = OpenAIREProjectHTTPReader(origin="full") - with pytest.raises(ReaderError): - next(reader.read()) - - -def test_openaire_project_http_reader_unsupported_origin_option(): - reader = OpenAIREProjectHTTPReader(origin="unsupported_origin_option") - with pytest.raises(ReaderError): - next(reader.read()) - - -def test_openaire_project_http_reader_item_not_implemented(): - reader = OpenAIREProjectHTTPReader() - with pytest.raises(NotImplementedError): - next(reader.read("A fake item")) - - -def test_openaire_project_http_reader_iter_not_implemented(): - reader = OpenAIREProjectHTTPReader() - with pytest.raises(NotImplementedError): - reader._iter("A fake file pointer") - - def test_awards_transformer(app, dict_award_entry, expected_from_award_json): transformer = OpenAIREProjectTransformer() assert expected_from_award_json == transformer.apply(dict_award_entry).entry diff --git a/tests/contrib/common/openaire/test_openaire_datastreams.py b/tests/contrib/common/openaire/test_openaire_datastreams.py new file mode 100644 index 00000000..372b1dc6 --- /dev/null +++ b/tests/contrib/common/openaire/test_openaire_datastreams.py @@ -0,0 +1,136 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2024 CERN. +# +# Invenio-Vocabularies is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""OpenAIRE-related Datastreams Readers/Writers/Transformers tests.""" + +import io +from unittest.mock import patch + +import pytest + +from invenio_vocabularies.contrib.common.openaire.datastreams import OpenAIREHTTPReader +from invenio_vocabularies.datastreams.errors import ReaderError + +API_JSON_RESPONSE_CONTENT = { + "linkset": [ + { + "anchor": "https://example.com/records/10488385", + "item": [ + { + "href": "https://example.com/records/10488385/files/organization.tar", + "type": "application/x-tar", + }, + { + "href": "https://example.com/records/10488385/files/project.tar", + "type": "application/x-tar", + }, + ], + }, + { + "anchor": "https://example.com/api/records/10488385", + "describes": [ + {"href": "https://example.com/records/10488385", "type": "text/html"} + ], + "type": "application/dcat+xml", + }, + ] +} + +API_JSON_RESPONSE_CONTENT_WRONG_NUMBER_PROJECT_TAR_ITEMS_ERROR = { + "linkset": [ + { + "anchor": "https://example.com/records/10488385", + "item": [ + { + "href": "https://example.com/records/10488385/files/organization.tar", + "type": "application/x-tar", + }, + { + "href": "https://example.com/records/10488385/files/project.tar", + "type": "application/x-tar", + }, + { + "href": "https://example.com/another/project.tar", + "type": "application/x-tar", + }, + ], + }, + { + "anchor": "https://example.com/api/records/10488385", + "describes": [ + {"href": "https://example.com/records/10488385", "type": "text/html"} + ], + "type": "application/dcat+xml", + }, + ] +} + +DOWNLOAD_FILE_BYTES_CONTENT = b"The content of the file" + + +class MockResponse: + content = DOWNLOAD_FILE_BYTES_CONTENT + + def __init__(self, api_json_response_content): + self.api_json_response_content = api_json_response_content + + def json(self, **kwargs): + return self.api_json_response_content + + def raise_for_status(self): + pass + + +@pytest.fixture(scope="function") +def download_file_bytes_content(): + return DOWNLOAD_FILE_BYTES_CONTENT + + +@patch( + "requests.get", + side_effect=lambda url, headers=None: MockResponse(API_JSON_RESPONSE_CONTENT), +) +def test_openaire_http_reader(_, download_file_bytes_content): + reader = OpenAIREHTTPReader(origin="full", tar_href="/project.tar") + results = [] + for entry in reader.read(): + results.append(entry) + + assert len(results) == 1 + assert isinstance(results[0], io.BytesIO) + assert results[0].read() == download_file_bytes_content + + +@patch( + "requests.get", + side_effect=lambda url, headers=None: MockResponse( + API_JSON_RESPONSE_CONTENT_WRONG_NUMBER_PROJECT_TAR_ITEMS_ERROR + ), +) +def test_openaire_http_reader_wrong_number_tar_items_error(_): + reader = OpenAIREHTTPReader(origin="full", tar_href="/project.tar") + with pytest.raises(ReaderError): + next(reader.read()) + + +def test_openaire_http_reader_unsupported_origin_option(): + reader = OpenAIREHTTPReader(origin="unsupported_origin_option") + with pytest.raises(ReaderError): + next(reader.read()) + + +def test_openaire_http_reader_item_not_implemented(): + reader = OpenAIREHTTPReader() + with pytest.raises(NotImplementedError): + next(reader.read("A fake item")) + + +def test_openaire_http_reader_iter_not_implemented(): + reader = OpenAIREHTTPReader() + with pytest.raises(NotImplementedError): + reader._iter("A fake file pointer") diff --git a/tests/datastreams/test_writers.py b/tests/datastreams/test_writers.py index 04a1b1eb..8456b22f 100644 --- a/tests/datastreams/test_writers.py +++ b/tests/datastreams/test_writers.py @@ -76,6 +76,28 @@ def test_service_writer_update_non_existing(lang_type, lang_data, service, ident assert dict(record, **updated_lang) == record +def test_writer_wrong_config_no_insert_no_update( + lang_type, lang_data, service, identity +): + writer = ServiceWriter(service, identity=identity, insert=False, update=False) + + with pytest.raises(WriterError) as err: + writer.write(stream_entry=StreamEntry(lang_data)) + + expected_error = ["Writer wrongly configured to not insert and to not update"] + assert expected_error in err.value.args + + +def test_writer_no_insert(lang_type, lang_data, service, identity): + writer = ServiceWriter(service, identity=identity, insert=False, update=True) + + with pytest.raises(WriterError) as err: + writer.write(stream_entry=StreamEntry(lang_data)) + + expected_error = [f"Vocabulary entry does not exist: {lang_data}"] + assert expected_error in err.value.args + + ## # YAML Writer ##