From 755ebd1c365158ae967101469fe70429dbee6eeb Mon Sep 17 00:00:00 2001 From: Pablo Tamarit Date: Tue, 27 Aug 2024 15:57:22 +0200 Subject: [PATCH 1/5] cli: make the update command work for writers without args --- invenio_vocabularies/cli.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/invenio_vocabularies/cli.py b/invenio_vocabularies/cli.py index d50a9d13..aded3193 100644 --- a/invenio_vocabularies/cli.py +++ b/invenio_vocabularies/cli.py @@ -101,9 +101,14 @@ def update(vocabulary, filepath=None, origin=None): for w_conf in config["writers"]: if w_conf["type"] == "async": - w_conf["args"]["writer"]["args"]["update"] = True + w_conf_update = w_conf["args"]["writer"] else: - w_conf["args"]["update"] = True + w_conf_update = w_conf + + if "args" in w_conf_update: + w_conf_update["args"]["update"] = True + else: + w_conf_update["args"] = {"update": True} success, errored, filtered = _process_vocab(config) From bc517a25683a768a1cc5ed2cf469c36c944dc981 Mon Sep 17 00:00:00 2001 From: Pablo Tamarit Date: Tue, 27 Aug 2024 16:07:14 +0200 Subject: [PATCH 2/5] datastreams: writers: add option to not insert --- invenio_vocabularies/datastreams/writers.py | 40 ++++++++++++++------- tests/datastreams/test_writers.py | 22 ++++++++++++ 2 files changed, 50 insertions(+), 12 deletions(-) diff --git a/invenio_vocabularies/datastreams/writers.py b/invenio_vocabularies/datastreams/writers.py index 84084599..f898c1ad 100644 --- a/invenio_vocabularies/datastreams/writers.py +++ b/invenio_vocabularies/datastreams/writers.py @@ -13,10 +13,11 @@ import yaml from invenio_access.permissions import system_identity -from invenio_pidstore.errors import PIDAlreadyExists +from invenio_pidstore.errors import PIDAlreadyExists, PIDDoesNotExistError from invenio_records.systemfields.relations.errors import InvalidRelationValue from invenio_records_resources.proxies import current_service_registry from marshmallow import ValidationError +from sqlalchemy.exc import NoResultFound from .datastreams import StreamEntry from .errors import WriterError @@ -55,12 +56,15 @@ def write_many(self, stream_entries, *args, **kwargs): class ServiceWriter(BaseWriter): """Writes the entries to an RDM instance using a Service object.""" - def __init__(self, service_or_name, *args, identity=None, update=False, **kwargs): + def __init__( + self, service_or_name, *args, identity=None, insert=True, update=False, **kwargs + ): """Constructor. :param service_or_name: a service instance or a key of the service registry. :param identity: access identity. + :param insert: if True it will insert records which do not exist. :param update: if True it will update records if they exist. """ if isinstance(service_or_name, str): @@ -68,6 +72,7 @@ def __init__(self, service_or_name, *args, identity=None, update=False, **kwargs self._service = service_or_name self._identity = identity or system_identity + self._insert = insert self._update = update super().__init__(*args, **kwargs) @@ -79,20 +84,31 @@ def _entry_id(self, entry): def _resolve(self, id_): return self._service.read(self._identity, id_) + def _do_update(self, entry): + vocab_id = self._entry_id(entry) + current = self._resolve(vocab_id) + updated = dict(current.to_dict(), **entry) + return StreamEntry(self._service.update(self._identity, vocab_id, updated)) + def write(self, stream_entry, *args, **kwargs): """Writes the input entry using a given service.""" entry = stream_entry.entry try: - try: - return StreamEntry(self._service.create(self._identity, entry)) - except PIDAlreadyExists: - if not self._update: - raise WriterError([f"Vocabulary entry already exists: {entry}"]) - vocab_id = self._entry_id(entry) - current = self._resolve(vocab_id) - updated = dict(current.to_dict(), **entry) - return StreamEntry( - self._service.update(self._identity, vocab_id, updated) + if self._insert: + try: + return StreamEntry(self._service.create(self._identity, entry)) + except PIDAlreadyExists: + if not self._update: + raise WriterError([f"Vocabulary entry already exists: {entry}"]) + return self._do_update(entry) + elif self._update: + try: + return self._do_update(entry) + except (NoResultFound, PIDDoesNotExistError): + raise WriterError([f"Vocabulary entry does not exist: {entry}"]) + else: + raise WriterError( + ["Writer wrongly configured to not insert and to not update"] ) except ValidationError as err: diff --git a/tests/datastreams/test_writers.py b/tests/datastreams/test_writers.py index 04a1b1eb..8456b22f 100644 --- a/tests/datastreams/test_writers.py +++ b/tests/datastreams/test_writers.py @@ -76,6 +76,28 @@ def test_service_writer_update_non_existing(lang_type, lang_data, service, ident assert dict(record, **updated_lang) == record +def test_writer_wrong_config_no_insert_no_update( + lang_type, lang_data, service, identity +): + writer = ServiceWriter(service, identity=identity, insert=False, update=False) + + with pytest.raises(WriterError) as err: + writer.write(stream_entry=StreamEntry(lang_data)) + + expected_error = ["Writer wrongly configured to not insert and to not update"] + assert expected_error in err.value.args + + +def test_writer_no_insert(lang_type, lang_data, service, identity): + writer = ServiceWriter(service, identity=identity, insert=False, update=True) + + with pytest.raises(WriterError) as err: + writer.write(stream_entry=StreamEntry(lang_data)) + + expected_error = [f"Vocabulary entry does not exist: {lang_data}"] + assert expected_error in err.value.args + + ## # YAML Writer ## From 335ca5a7ce43b4af0004d9767e228efae233647e Mon Sep 17 00:00:00 2001 From: Pablo Tamarit Date: Tue, 27 Aug 2024 16:13:48 +0200 Subject: [PATCH 3/5] datastreams: move OpenAIREProjectHTTPReader to generic OpenAIREHTTPReader --- .../contrib/awards/datastreams.py | 64 +-------- .../contrib/common/openaire/__init__.py | 9 ++ .../contrib/common/openaire/datastreams.py | 84 +++++++++++ .../contrib/awards/test_awards_datastreams.py | 125 +--------------- .../openaire/test_openaire_datastreams.py | 136 ++++++++++++++++++ 5 files changed, 232 insertions(+), 186 deletions(-) create mode 100644 invenio_vocabularies/contrib/common/openaire/__init__.py create mode 100644 invenio_vocabularies/contrib/common/openaire/datastreams.py create mode 100644 tests/contrib/common/openaire/test_openaire_datastreams.py diff --git a/invenio_vocabularies/contrib/awards/datastreams.py b/invenio_vocabularies/contrib/awards/datastreams.py index f4109bf0..22547f39 100644 --- a/invenio_vocabularies/contrib/awards/datastreams.py +++ b/invenio_vocabularies/contrib/awards/datastreams.py @@ -8,73 +8,15 @@ """Awards datastreams, transformers, writers and readers.""" -import io - -import requests from invenio_access.permissions import system_identity from invenio_i18n import lazy_gettext as _ -from ...datastreams.errors import ReaderError, TransformerError -from ...datastreams.readers import BaseReader +from ...datastreams.errors import TransformerError from ...datastreams.transformers import BaseTransformer from ...datastreams.writers import ServiceWriter from .config import awards_ec_ror_id, awards_openaire_funders_mapping -class OpenAIREProjectHTTPReader(BaseReader): - """OpenAIRE Project HTTP Reader returning an in-memory binary stream of the latest OpenAIRE Graph Dataset project tar file.""" - - def _iter(self, fp, *args, **kwargs): - raise NotImplementedError( - "OpenAIREProjectHTTPReader downloads one file and therefore does not iterate through items" - ) - - def read(self, item=None, *args, **kwargs): - """Reads the latest OpenAIRE Graph Dataset project tar file from Zenodo and yields an in-memory binary stream of it.""" - if item: - raise NotImplementedError( - "OpenAIREProjectHTTPReader does not support being chained after another reader" - ) - - if self._origin == "full": - # OpenAIRE Graph Dataset - api_url = "https://zenodo.org/api/records/3516917" - elif self._origin == "diff": - # OpenAIRE Graph dataset: new collected projects - api_url = "https://zenodo.org/api/records/6419021" - else: - raise ReaderError("The --origin option should be either 'full' or 'diff'") - - # Call the signposting `linkset+json` endpoint for the Concept DOI (i.e. latest version) of the OpenAIRE Graph Dataset. - # See: https://github.com/inveniosoftware/rfcs/blob/master/rfcs/rdm-0071-signposting.md#provide-an-applicationlinksetjson-endpoint - headers = {"Accept": "application/linkset+json"} - api_resp = requests.get(api_url, headers=headers) - api_resp.raise_for_status() - - # Extract the Landing page Link Set Object located as the first (index 0) item. - landing_page_linkset = api_resp.json()["linkset"][0] - - # Extract the URL of the only project tar file linked to the record. - landing_page_project_tar_items = [ - item - for item in landing_page_linkset["item"] - if item["type"] == "application/x-tar" - and item["href"].endswith("/project.tar") - ] - if len(landing_page_project_tar_items) != 1: - raise ReaderError( - f"Expected 1 project tar item but got {len(landing_page_project_tar_items)}" - ) - file_url = landing_page_project_tar_items[0]["href"] - - # Download the project tar file and fully load the response bytes content in memory. - # The bytes content are then wrapped by a BytesIO to be file-like object (as required by `tarfile.open`). - # Using directly `file_resp.raw` is not possible since `tarfile.open` requires the file-like object to be seekable. - file_resp = requests.get(file_url) - file_resp.raise_for_status() - yield io.BytesIO(file_resp.content) - - class AwardsServiceWriter(ServiceWriter): """Funders service writer.""" @@ -172,9 +114,7 @@ def apply(self, stream_entry, **kwargs): return stream_entry -VOCABULARIES_DATASTREAM_READERS = { - "openaire-project-http": OpenAIREProjectHTTPReader, -} +VOCABULARIES_DATASTREAM_READERS = {} VOCABULARIES_DATASTREAM_TRANSFORMERS = { "openaire-award": OpenAIREProjectTransformer, diff --git a/invenio_vocabularies/contrib/common/openaire/__init__.py b/invenio_vocabularies/contrib/common/openaire/__init__.py new file mode 100644 index 00000000..a1cf1934 --- /dev/null +++ b/invenio_vocabularies/contrib/common/openaire/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2024 CERN. +# +# Invenio-Vocabularies is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""OpenAIRE-related module.""" diff --git a/invenio_vocabularies/contrib/common/openaire/datastreams.py b/invenio_vocabularies/contrib/common/openaire/datastreams.py new file mode 100644 index 00000000..95dba01c --- /dev/null +++ b/invenio_vocabularies/contrib/common/openaire/datastreams.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2024 CERN. +# +# Invenio-Vocabularies is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""OpenAIRE-related Datastreams Readers/Writers/Transformers module.""" + +import io + +import requests + +from invenio_vocabularies.datastreams.errors import ReaderError +from invenio_vocabularies.datastreams.readers import BaseReader + + +class OpenAIREHTTPReader(BaseReader): + """OpenAIRE HTTP Reader returning an in-memory binary stream of the latest OpenAIRE Graph Dataset tar file of a given type.""" + + def __init__(self, origin=None, mode="r", tar_href=None, *args, **kwargs): + """Constructor.""" + self.tar_href = tar_href + super().__init__(origin, mode, *args, **kwargs) + + def _iter(self, fp, *args, **kwargs): + raise NotImplementedError( + "OpenAIREHTTPReader downloads one file and therefore does not iterate through items" + ) + + def read(self, item=None, *args, **kwargs): + """Reads the latest OpenAIRE Graph Dataset tar file of a given type from Zenodo and yields an in-memory binary stream of it.""" + if item: + raise NotImplementedError( + "OpenAIREHTTPReader does not support being chained after another reader" + ) + + if self._origin == "full": + # OpenAIRE Graph Dataset + api_url = "https://zenodo.org/api/records/3516917" + elif self._origin == "diff": + # OpenAIRE Graph dataset: new collected projects + api_url = "https://zenodo.org/api/records/6419021" + else: + raise ReaderError("The --origin option should be either 'full' or 'diff'") + + # Call the signposting `linkset+json` endpoint for the Concept DOI (i.e. latest version) of the OpenAIRE Graph Dataset. + # See: https://github.com/inveniosoftware/rfcs/blob/master/rfcs/rdm-0071-signposting.md#provide-an-applicationlinksetjson-endpoint + headers = {"Accept": "application/linkset+json"} + api_resp = requests.get(api_url, headers=headers) + api_resp.raise_for_status() + + # Extract the Landing page Link Set Object located as the first (index 0) item. + landing_page_linkset = api_resp.json()["linkset"][0] + + # Extract the URL of the only tar file matching `tar_href` linked to the record. + landing_page_matching_tar_items = [ + item + for item in landing_page_linkset["item"] + if item["type"] == "application/x-tar" + and item["href"].endswith(self.tar_href) + ] + if len(landing_page_matching_tar_items) != 1: + raise ReaderError( + f"Expected 1 tar item matching {self.tar_href} but got {len(landing_page_matching_tar_items)}" + ) + file_url = landing_page_matching_tar_items[0]["href"] + + # Download the matching tar file and fully load the response bytes content in memory. + # The bytes content are then wrapped by a BytesIO to be file-like object (as required by `tarfile.open`). + # Using directly `file_resp.raw` is not possible since `tarfile.open` requires the file-like object to be seekable. + file_resp = requests.get(file_url) + file_resp.raise_for_status() + yield io.BytesIO(file_resp.content) + + +VOCABULARIES_DATASTREAM_READERS = { + "openaire-http": OpenAIREHTTPReader, +} + +VOCABULARIES_DATASTREAM_TRANSFORMERS = {} + +VOCABULARIES_DATASTREAM_WRITERS = {} diff --git a/tests/contrib/awards/test_awards_datastreams.py b/tests/contrib/awards/test_awards_datastreams.py index b6555de9..0a43188c 100644 --- a/tests/contrib/awards/test_awards_datastreams.py +++ b/tests/contrib/awards/test_awards_datastreams.py @@ -8,9 +8,7 @@ """Awards datastreams tests.""" -import io from copy import deepcopy -from unittest.mock import patch import pytest from invenio_access.permissions import system_identity @@ -18,11 +16,10 @@ from invenio_vocabularies.contrib.awards.api import Award from invenio_vocabularies.contrib.awards.datastreams import ( AwardsServiceWriter, - OpenAIREProjectHTTPReader, OpenAIREProjectTransformer, ) from invenio_vocabularies.datastreams import StreamEntry -from invenio_vocabularies.datastreams.errors import ReaderError, WriterError +from invenio_vocabularies.datastreams.errors import WriterError @pytest.fixture(scope="function") @@ -115,126 +112,6 @@ def expected_from_award_json_ec(): } -API_JSON_RESPONSE_CONTENT = { - "linkset": [ - { - "anchor": "https://example.com/records/10488385", - "item": [ - { - "href": "https://example.com/records/10488385/files/organization.tar", - "type": "application/x-tar", - }, - { - "href": "https://example.com/records/10488385/files/project.tar", - "type": "application/x-tar", - }, - ], - }, - { - "anchor": "https://example.com/api/records/10488385", - "describes": [ - {"href": "https://example.com/records/10488385", "type": "text/html"} - ], - "type": "application/dcat+xml", - }, - ] -} - -API_JSON_RESPONSE_CONTENT_WRONG_NUMBER_PROJECT_TAR_ITEMS_ERROR = { - "linkset": [ - { - "anchor": "https://example.com/records/10488385", - "item": [ - { - "href": "https://example.com/records/10488385/files/organization.tar", - "type": "application/x-tar", - }, - { - "href": "https://example.com/records/10488385/files/project.tar", - "type": "application/x-tar", - }, - { - "href": "https://example.com/another/project.tar", - "type": "application/x-tar", - }, - ], - }, - { - "anchor": "https://example.com/api/records/10488385", - "describes": [ - {"href": "https://example.com/records/10488385", "type": "text/html"} - ], - "type": "application/dcat+xml", - }, - ] -} - -DOWNLOAD_FILE_BYTES_CONTENT = b"The content of the file" - - -class MockResponse: - content = DOWNLOAD_FILE_BYTES_CONTENT - - def __init__(self, api_json_response_content): - self.api_json_response_content = api_json_response_content - - def json(self, **kwargs): - return self.api_json_response_content - - def raise_for_status(self): - pass - - -@pytest.fixture(scope="function") -def download_file_bytes_content(): - return DOWNLOAD_FILE_BYTES_CONTENT - - -@patch( - "requests.get", - side_effect=lambda url, headers=None: MockResponse(API_JSON_RESPONSE_CONTENT), -) -def test_openaire_project_http_reader(_, download_file_bytes_content): - reader = OpenAIREProjectHTTPReader(origin="full") - results = [] - for entry in reader.read(): - results.append(entry) - - assert len(results) == 1 - assert isinstance(results[0], io.BytesIO) - assert results[0].read() == download_file_bytes_content - - -@patch( - "requests.get", - side_effect=lambda url, headers=None: MockResponse( - API_JSON_RESPONSE_CONTENT_WRONG_NUMBER_PROJECT_TAR_ITEMS_ERROR - ), -) -def test_openaire_project_http_reader_wrong_number_tar_items_error(_): - reader = OpenAIREProjectHTTPReader(origin="full") - with pytest.raises(ReaderError): - next(reader.read()) - - -def test_openaire_project_http_reader_unsupported_origin_option(): - reader = OpenAIREProjectHTTPReader(origin="unsupported_origin_option") - with pytest.raises(ReaderError): - next(reader.read()) - - -def test_openaire_project_http_reader_item_not_implemented(): - reader = OpenAIREProjectHTTPReader() - with pytest.raises(NotImplementedError): - next(reader.read("A fake item")) - - -def test_openaire_project_http_reader_iter_not_implemented(): - reader = OpenAIREProjectHTTPReader() - with pytest.raises(NotImplementedError): - reader._iter("A fake file pointer") - - def test_awards_transformer(app, dict_award_entry, expected_from_award_json): transformer = OpenAIREProjectTransformer() assert expected_from_award_json == transformer.apply(dict_award_entry).entry diff --git a/tests/contrib/common/openaire/test_openaire_datastreams.py b/tests/contrib/common/openaire/test_openaire_datastreams.py new file mode 100644 index 00000000..372b1dc6 --- /dev/null +++ b/tests/contrib/common/openaire/test_openaire_datastreams.py @@ -0,0 +1,136 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2024 CERN. +# +# Invenio-Vocabularies is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""OpenAIRE-related Datastreams Readers/Writers/Transformers tests.""" + +import io +from unittest.mock import patch + +import pytest + +from invenio_vocabularies.contrib.common.openaire.datastreams import OpenAIREHTTPReader +from invenio_vocabularies.datastreams.errors import ReaderError + +API_JSON_RESPONSE_CONTENT = { + "linkset": [ + { + "anchor": "https://example.com/records/10488385", + "item": [ + { + "href": "https://example.com/records/10488385/files/organization.tar", + "type": "application/x-tar", + }, + { + "href": "https://example.com/records/10488385/files/project.tar", + "type": "application/x-tar", + }, + ], + }, + { + "anchor": "https://example.com/api/records/10488385", + "describes": [ + {"href": "https://example.com/records/10488385", "type": "text/html"} + ], + "type": "application/dcat+xml", + }, + ] +} + +API_JSON_RESPONSE_CONTENT_WRONG_NUMBER_PROJECT_TAR_ITEMS_ERROR = { + "linkset": [ + { + "anchor": "https://example.com/records/10488385", + "item": [ + { + "href": "https://example.com/records/10488385/files/organization.tar", + "type": "application/x-tar", + }, + { + "href": "https://example.com/records/10488385/files/project.tar", + "type": "application/x-tar", + }, + { + "href": "https://example.com/another/project.tar", + "type": "application/x-tar", + }, + ], + }, + { + "anchor": "https://example.com/api/records/10488385", + "describes": [ + {"href": "https://example.com/records/10488385", "type": "text/html"} + ], + "type": "application/dcat+xml", + }, + ] +} + +DOWNLOAD_FILE_BYTES_CONTENT = b"The content of the file" + + +class MockResponse: + content = DOWNLOAD_FILE_BYTES_CONTENT + + def __init__(self, api_json_response_content): + self.api_json_response_content = api_json_response_content + + def json(self, **kwargs): + return self.api_json_response_content + + def raise_for_status(self): + pass + + +@pytest.fixture(scope="function") +def download_file_bytes_content(): + return DOWNLOAD_FILE_BYTES_CONTENT + + +@patch( + "requests.get", + side_effect=lambda url, headers=None: MockResponse(API_JSON_RESPONSE_CONTENT), +) +def test_openaire_http_reader(_, download_file_bytes_content): + reader = OpenAIREHTTPReader(origin="full", tar_href="/project.tar") + results = [] + for entry in reader.read(): + results.append(entry) + + assert len(results) == 1 + assert isinstance(results[0], io.BytesIO) + assert results[0].read() == download_file_bytes_content + + +@patch( + "requests.get", + side_effect=lambda url, headers=None: MockResponse( + API_JSON_RESPONSE_CONTENT_WRONG_NUMBER_PROJECT_TAR_ITEMS_ERROR + ), +) +def test_openaire_http_reader_wrong_number_tar_items_error(_): + reader = OpenAIREHTTPReader(origin="full", tar_href="/project.tar") + with pytest.raises(ReaderError): + next(reader.read()) + + +def test_openaire_http_reader_unsupported_origin_option(): + reader = OpenAIREHTTPReader(origin="unsupported_origin_option") + with pytest.raises(ReaderError): + next(reader.read()) + + +def test_openaire_http_reader_item_not_implemented(): + reader = OpenAIREHTTPReader() + with pytest.raises(NotImplementedError): + next(reader.read("A fake item")) + + +def test_openaire_http_reader_iter_not_implemented(): + reader = OpenAIREHTTPReader() + with pytest.raises(NotImplementedError): + reader._iter("A fake file pointer") From 7055ed7aa065743e9d32f4a7fafef5c33a69a396 Mon Sep 17 00:00:00 2001 From: Pablo Tamarit Date: Tue, 27 Aug 2024 16:16:42 +0200 Subject: [PATCH 4/5] datastreams: affiliations: OpenAIRE transformer and writer adding PIC identifier --- invenio_vocabularies/config.py | 9 ++ .../contrib/affiliations/datastreams.py | 95 ++++++++++++- invenio_vocabularies/datastreams/writers.py | 18 ++- invenio_vocabularies/factories.py | 15 ++ tests/contrib/affiliations/conftest.py | 10 ++ .../test_affiliations_datastreams.py | 133 ++++++++++++++++++ 6 files changed, 278 insertions(+), 2 deletions(-) diff --git a/invenio_vocabularies/config.py b/invenio_vocabularies/config.py index ef121034..7e492f3c 100644 --- a/invenio_vocabularies/config.py +++ b/invenio_vocabularies/config.py @@ -43,8 +43,17 @@ } """"Generic identifier schemes, usable by other vocabularies.""" + +def is_pic(val): + """Test if argument is a Participant Identification Code (PIC).""" + if len(val) != 9: + return False + return val.isdigit() + + VOCABULARIES_AFFILIATION_SCHEMES = { **VOCABULARIES_IDENTIFIER_SCHEMES, + "pic": {"label": _("PIC"), "validator": is_pic}, } """Affiliations allowed identifier schemes.""" diff --git a/invenio_vocabularies/contrib/affiliations/datastreams.py b/invenio_vocabularies/contrib/affiliations/datastreams.py index 381bce2b..cffe6855 100644 --- a/invenio_vocabularies/contrib/affiliations/datastreams.py +++ b/invenio_vocabularies/contrib/affiliations/datastreams.py @@ -10,8 +10,9 @@ """Affiliations datastreams, transformers, writers and readers.""" from flask import current_app -from invenio_i18n import lazy_gettext as _ +from ...datastreams.errors import WriterError +from ...datastreams.transformers import BaseTransformer from ...datastreams.writers import ServiceWriter from ..common.ror.datastreams import RORTransformer @@ -46,16 +47,77 @@ def __init__( ) +class OpenAIREOrganizationTransformer(BaseTransformer): + """OpenAIRE Organization Transformer.""" + + def apply(self, stream_entry, **kwargs): + """Applies the transformation to the stream entry.""" + record = stream_entry.entry + + organization = {"openaire_id": record["id"]} + + for pid in record["pid"]: + if pid["scheme"] == "ROR": + organization["id"] = pid["value"].removeprefix("https://ror.org/") + elif pid["scheme"] == "PIC": + organization["identifiers"] = [ + { + "scheme": "pic", + "identifier": pid["value"], + } + ] + + stream_entry.entry = organization + return stream_entry + + +class OpenAIREAffiliationsServiceWriter(ServiceWriter): + """OpenAIRE Affiliations service writer.""" + + def __init__(self, *args, **kwargs): + """Constructor.""" + service_or_name = kwargs.pop("service_or_name", "affiliations") + # Here we only update and we do not insert, since OpenAIRE data is used to augment existing affiliations + # (with PIC identifiers) and is not used to create new affiliations. + super().__init__(service_or_name=service_or_name, insert=False, *args, **kwargs) + + def _entry_id(self, entry): + """Get the id from an entry.""" + return entry["id"] + + def write(self, stream_entry, *args, **kwargs): + """Writes the input entry using a given service.""" + entry = stream_entry.entry + + if not entry["openaire_id"].startswith("openorgs____::"): + raise WriterError([f"Not valid OpenAIRE OpenOrgs id for: {entry}"]) + del entry["openaire_id"] + + if "id" not in entry: + raise WriterError([f"No id for: {entry}"]) + + if "identifiers" not in entry: + raise WriterError([f"No alternative identifiers for: {entry}"]) + + return super().write(stream_entry, *args, **kwargs) + + def write_many(self, stream_entries, *args, **kwargs): + """Writes the input entries using a given service.""" + return super().write_many(stream_entries, *args, **kwargs) + + VOCABULARIES_DATASTREAM_READERS = {} """Affiliations datastream readers.""" VOCABULARIES_DATASTREAM_WRITERS = { "affiliations-service": AffiliationsServiceWriter, + "openaire-affiliations-service": OpenAIREAffiliationsServiceWriter, } """Affiliations datastream writers.""" VOCABULARIES_DATASTREAM_TRANSFORMERS = { "ror-affiliations": AffiliationsRORTransformer, + "openaire-organization": OpenAIREOrganizationTransformer, } """Affiliations datastream transformers.""" @@ -90,3 +152,34 @@ def __init__( An origin is required for the reader. """ + +DATASTREAM_CONFIG_OPENAIRE = { + "readers": [ + {"type": "openaire-http", "args": {"tar_href": "/organization.tar"}}, + { + "type": "tar", + "args": { + "regex": "\\.json.gz$", + "mode": "r", + }, + }, + {"type": "gzip"}, + {"type": "jsonl"}, + ], + "transformers": [ + { + "type": "openaire-organization", + }, + ], + "writers": [ + { + "type": "async", + "args": { + "writer": { + "type": "openaire-affiliations-service", + } + }, + } + ], +} +"""Alternative Data Stream configuration for OpenAIRE Affiliations.""" diff --git a/invenio_vocabularies/datastreams/writers.py b/invenio_vocabularies/datastreams/writers.py index f898c1ad..090fc7d8 100644 --- a/invenio_vocabularies/datastreams/writers.py +++ b/invenio_vocabularies/datastreams/writers.py @@ -87,7 +87,23 @@ def _resolve(self, id_): def _do_update(self, entry): vocab_id = self._entry_id(entry) current = self._resolve(vocab_id) - updated = dict(current.to_dict(), **entry) + + # Merge the `current` dictionary with new data in the `entry` dictionary + # by appending to lists at the top level instead of overwriting the list. + updated = current.to_dict() + for key, value in entry.items(): + if ( + key in updated + and isinstance(updated[key], list) + and isinstance(value, list) + ): + for value_item in value: + # TODO: If an identifier was wrong and is then corrected, this will cause duplicated entries. + if value_item not in updated[key]: + updated[key].append(value_item) + else: + updated[key] = value + return StreamEntry(self._service.update(self._identity, vocab_id, updated)) def write(self, stream_entry, *args, **kwargs): diff --git a/invenio_vocabularies/factories.py b/invenio_vocabularies/factories.py index 11135b1a..7d64d974 100644 --- a/invenio_vocabularies/factories.py +++ b/invenio_vocabularies/factories.py @@ -16,6 +16,9 @@ from .contrib.affiliations.datastreams import ( DATASTREAM_CONFIG as affiliations_ds_config, ) +from .contrib.affiliations.datastreams import ( + DATASTREAM_CONFIG_OPENAIRE as affiliations_openaire_ds_config, +) from .contrib.awards.datastreams import DATASTREAM_CONFIG as awards_ds_config from .contrib.funders.datastreams import DATASTREAM_CONFIG as funders_ds_config from .contrib.names.datastreams import DATASTREAM_CONFIG as names_ds_config @@ -95,6 +98,17 @@ def get_service(self): raise NotImplementedError("Service not implemented for Affiliations") +class AffiliationsOpenAIREVocabularyConfig(VocabularyConfig): + """OpenAIRE Affiliations Vocabulary Config.""" + + config = affiliations_openaire_ds_config + vocabulary_name = "affiliations:openaire" + + def get_service(self): + """Get the service for the vocabulary.""" + raise NotImplementedError("Service not implemented for OpenAIRE Affiliations") + + def get_vocabulary_config(vocabulary): """Factory function to get the appropriate Vocabulary Config.""" vocab_config = { @@ -102,6 +116,7 @@ def get_vocabulary_config(vocabulary): "funders": FundersVocabularyConfig, "awards": AwardsVocabularyConfig, "affiliations": AffiliationsVocabularyConfig, + "affiliations:openaire": AffiliationsOpenAIREVocabularyConfig, "subjects": SubjectsVocabularyConfig, } return vocab_config.get(vocabulary, VocabularyConfig)() diff --git a/tests/contrib/affiliations/conftest.py b/tests/contrib/affiliations/conftest.py index c428e297..0c0d6b2b 100644 --- a/tests/contrib/affiliations/conftest.py +++ b/tests/contrib/affiliations/conftest.py @@ -33,6 +33,16 @@ def affiliation_full_data(): } +@pytest.fixture(scope="function") +def openaire_affiliation_full_data(): + """Full OpenAIRE affiliation data.""" + return { + "openaire_id": "openorgs____::47efb6602225236c0b207761a8b3a21c", + "id": "01ggx4157", + "identifiers": [{"identifier": "999988133", "scheme": "pic"}], + } + + @pytest.fixture(scope="module") def service(): """Affiliations service object.""" diff --git a/tests/contrib/affiliations/test_affiliations_datastreams.py b/tests/contrib/affiliations/test_affiliations_datastreams.py index 243b540d..22c49e16 100644 --- a/tests/contrib/affiliations/test_affiliations_datastreams.py +++ b/tests/contrib/affiliations/test_affiliations_datastreams.py @@ -18,6 +18,8 @@ from invenio_vocabularies.contrib.affiliations.config import affiliation_schemes from invenio_vocabularies.contrib.affiliations.datastreams import ( AffiliationsServiceWriter, + OpenAIREAffiliationsServiceWriter, + OpenAIREOrganizationTransformer, ) from invenio_vocabularies.contrib.common.ror.datastreams import RORTransformer from invenio_vocabularies.datastreams import StreamEntry @@ -118,3 +120,134 @@ def test_affiliations_service_writer_update_non_existing( # not-ideal cleanup affiliation_rec._record.delete(force=True) + + +@pytest.fixture() +def dict_openaire_organization_entry(): + """An example entry from OpenAIRE organization Data Dump.""" + return StreamEntry( + { + "alternativeNames": [ + "European Organization for Nuclear Research", + "Organisation européenne pour la recherche nucléaire", + "CERN", + ], + "country": {"code": "CH", "label": "Switzerland"}, + "id": "openorgs____::47efb6602225236c0b207761a8b3a21c", + "legalName": "European Organization for Nuclear Research", + "legalShortName": "CERN", + "pid": [ + {"scheme": "mag_id", "value": "67311998"}, + {"scheme": "ISNI", "value": "000000012156142X"}, + {"scheme": "Wikidata", "value": "Q42944"}, + {"scheme": "PIC", "value": "999988133"}, + {"scheme": "ROR", "value": "https://ror.org/01ggx4157"}, + {"scheme": "OrgReg", "value": "INT1011"}, + {"scheme": "ISNI", "value": "000000012156142X"}, + {"scheme": "FundRef", "value": "100012470"}, + {"scheme": "GRID", "value": "grid.9132.9"}, + {"scheme": "OrgRef", "value": "37351"}, + ], + "websiteUrl": "http://home.web.cern.ch/", + } + ) + + +@pytest.fixture(scope="module") +def expected_from_openaire_json(): + return { + "openaire_id": "openorgs____::47efb6602225236c0b207761a8b3a21c", + "id": "01ggx4157", + "identifiers": [{"identifier": "999988133", "scheme": "pic"}], + } + + +def test_openaire_organization_transformer( + app, dict_openaire_organization_entry, expected_from_openaire_json +): + transformer = OpenAIREOrganizationTransformer() + assert ( + expected_from_openaire_json + == transformer.apply(dict_openaire_organization_entry).entry + ) + + +def test_openaire_affiliations_service_writer( + app, search_clear, affiliation_full_data, openaire_affiliation_full_data, service +): + # create vocabulary with original service writer + orig_writer = AffiliationsServiceWriter() + orig_affiliation_rec = orig_writer.write(StreamEntry(affiliation_full_data)) + orig_affiliation_dict = orig_affiliation_rec.entry.to_dict() + Affiliation.index.refresh() # refresh index to make changes live + + # update vocabulary and check changes vocabulary with OpenAIRE service writer + writer = OpenAIREAffiliationsServiceWriter(update=True) + _ = writer.write(StreamEntry(openaire_affiliation_full_data)) + Affiliation.index.refresh() # refresh index to make changes live + affiliation_rec = service.read(system_identity, orig_affiliation_rec.entry.id) + affiliation_dict = affiliation_rec.to_dict() + + assert _.entry.id == orig_affiliation_rec.entry.id + + # updating fields changing from one update to the other + orig_affiliation_dict["revision_id"] = affiliation_dict["revision_id"] + orig_affiliation_dict["updated"] = affiliation_dict["updated"] + # Adding the extra identifier coming from OpenAIRE + orig_affiliation_dict["identifiers"].extend( + openaire_affiliation_full_data["identifiers"] + ) + + assert dict(orig_affiliation_dict) == affiliation_dict + + # not-ideal cleanup + affiliation_rec._record.delete(force=True) + + +def test_openaire_affiliations_service_writer_non_openorgs( + app, openaire_affiliation_full_data +): + writer = OpenAIREAffiliationsServiceWriter() + + updated_openaire_affiliation = deepcopy(openaire_affiliation_full_data) + updated_openaire_affiliation["openaire_id"] = ( + "pending_org_::627931d047132a4e20dbc4a882eb9a35" + ) + + with pytest.raises(WriterError) as err: + writer.write(StreamEntry(updated_openaire_affiliation)) + + expected_error = [ + f"Not valid OpenAIRE OpenOrgs id for: {updated_openaire_affiliation}" + ] + assert expected_error in err.value.args + + +def test_openaire_affiliations_service_writer_no_id( + app, openaire_affiliation_full_data +): + writer = OpenAIREAffiliationsServiceWriter() + + updated_openaire_affiliation = deepcopy(openaire_affiliation_full_data) + del updated_openaire_affiliation["id"] + + with pytest.raises(WriterError) as err: + writer.write(StreamEntry(updated_openaire_affiliation)) + + expected_error = [f"No id for: {updated_openaire_affiliation}"] + assert expected_error in err.value.args + + +def test_openaire_affiliations_service_writer_no_alternative_identifiers( + app, openaire_affiliation_full_data +): + writer = OpenAIREAffiliationsServiceWriter() + + updated_openaire_affiliation = deepcopy(openaire_affiliation_full_data) + del updated_openaire_affiliation["identifiers"] + + with pytest.raises(WriterError) as err: + writer.write(StreamEntry(updated_openaire_affiliation)) + + expected_error = [f"No alternative identifiers for: {updated_openaire_affiliation}"] + assert expected_error in err.value.args From 4a3b0dd2e95c94a0c65e03fbb6952187ad8f2ffc Mon Sep 17 00:00:00 2001 From: Pablo Tamarit Date: Tue, 27 Aug 2024 17:03:20 +0200 Subject: [PATCH 5/5] wip: awards CORDIS datastreams --- .../contrib/awards/datastreams.py | 119 +++++++++++++++++- .../jsonschemas/awards/award-v1.0.0.json | 42 +++++++ .../mappings/os-v1/awards/award-v1.0.0.json | 26 ++++ .../mappings/os-v2/awards/award-v1.0.0.json | 26 ++++ .../mappings/v7/awards/award-v1.0.0.json | 26 ++++ invenio_vocabularies/contrib/awards/schema.py | 16 ++- .../contrib/awards/serializer.py | 1 + .../contrib/names/datastreams.py | 14 ++- invenio_vocabularies/datastreams/readers.py | 19 ++- .../datastreams/transformers.py | 19 ++- invenio_vocabularies/datastreams/writers.py | 4 + invenio_vocabularies/factories.py | 27 +++- 12 files changed, 326 insertions(+), 13 deletions(-) diff --git a/invenio_vocabularies/contrib/awards/datastreams.py b/invenio_vocabularies/contrib/awards/datastreams.py index 22547f39..e9f90126 100644 --- a/invenio_vocabularies/contrib/awards/datastreams.py +++ b/invenio_vocabularies/contrib/awards/datastreams.py @@ -114,18 +114,135 @@ def apply(self, stream_entry, **kwargs): return stream_entry -VOCABULARIES_DATASTREAM_READERS = {} +class CORDISProjectHTTPReader(BaseReader): + """CORDIS Project HTTP Reader returning an in-memory binary stream of the latest CORDIS Horizon Europe project zip file.""" + + def _iter(self, fp, *args, **kwargs): + raise NotImplementedError( + "CORDISProjectHTTPReader downloads one file and therefore does not iterate through items" + ) + + def read(self, item=None, *args, **kwargs): + """Reads the latest CORDIS Horizon Europe project zip file and yields an in-memory binary stream of it.""" + if item: + raise NotImplementedError( + "CORDISProjectHTTPReader does not support being chained after another reader" + ) + + file_url = "https://cordis.europa.eu/data/cordis-HORIZONprojects-xml.zip" + + # Download the ZIP file and fully load the response bytes content in memory. + # The bytes content are then wrapped by a BytesIO to be file-like object (as required by `zipfile.ZipFile`). + # Using directly `file_resp.raw` is not possible since `zipfile.ZipFile` requires the file-like object to be seekable. + file_resp = requests.get(file_url) + file_resp.raise_for_status() + yield io.BytesIO(file_resp.content) + + +class CORDISProjectTransformer(BaseTransformer): + """Transforms a CORDIS project record into an award record.""" + + def apply(self, stream_entry, **kwargs): + """Applies the transformation to the stream entry.""" + record = stream_entry.entry + award = {} + + # Here `id` is the project ID, which will be used to attach the update to the existing project. + award["id"] = f"00k4n6c32::{record['id']}" + + award["subjects"] = [ + { + "scheme": "EuroSciVoc", + # TODO: Here lowercase while title cased in the subjects vocabulary. + "subject": category["title"], + } + for category in record["relations"]["categories"]["category"] + if category["@classification"] == "euroSciVoc" + ] + + organizations = record["relations"]["associations"]["organization"] + # Projects with a single organization are not wrapped in a list, + # so we do this here to be able to iterate over it. + organizations = organizations if isinstance(organizations, list) else [organizations] + award["organizations"] = [ + { + # TODO: Here the legal name is uppercase. + "organization": organization["legalname"], + "scheme": "pic", + "id": organization["id"], + } + for organization in organizations + ] + + stream_entry.entry = award + return stream_entry + + +class CORDISAwardsServiceWriter(ServiceWriter): + """CORDIS Awards service writer.""" + + def __init__(self, *args, **kwargs): + """Constructor.""" + service_or_name = kwargs.pop("service_or_name", "awards") + # Here we only update and we do not insert, since CORDIS data is used to augment existing awards + # (with subjects and organizations information) and is not used to create new awards. + super().__init__(service_or_name=service_or_name, insert=False, *args, **kwargs) + + def _entry_id(self, entry): + """Get the id from an entry.""" + return entry["id"] + + +VOCABULARIES_DATASTREAM_READERS = { + "cordis-project-http": CORDISProjectHTTPReader, +} VOCABULARIES_DATASTREAM_TRANSFORMERS = { "openaire-award": OpenAIREProjectTransformer, + "cordis-award": CORDISProjectTransformer, } """ORCiD Data Streams transformers.""" VOCABULARIES_DATASTREAM_WRITERS = { "awards-service": AwardsServiceWriter, + "cordis-awards-service": CORDISAwardsServiceWriter, } """ORCiD Data Streams transformers.""" +DATASTREAM_CONFIG_CORDIS = { + "readers": [ + # {"type": "cordis-project-http"}, + # { + # "type": "zip", + # "args": { + # "regex": "\\.xml$", + # "mode": "r", + # }, + # }, + { + "type": "xml", + "args": { + "root_element": "project", + }, + }, + ], + "transformers": [ + {"type": "cordis-award"}, + ], + "writers": [ + { + "type": "cordis-awards-service", + "args": { + "identity": system_identity, + }, + } + ], +} +"""Data Stream configuration. + +An origin is required for the reader. +""" + DATASTREAM_CONFIG = { "readers": [ { diff --git a/invenio_vocabularies/contrib/awards/jsonschemas/awards/award-v1.0.0.json b/invenio_vocabularies/contrib/awards/jsonschemas/awards/award-v1.0.0.json index bdc2cfa1..8c89e487 100644 --- a/invenio_vocabularies/contrib/awards/jsonschemas/awards/award-v1.0.0.json +++ b/invenio_vocabularies/contrib/awards/jsonschemas/awards/award-v1.0.0.json @@ -42,6 +42,48 @@ }, "program": { "type": "string" + }, + "subjects": { + "description": "Award's subjects.", + "type": "array", + "items": { + "type": "object", + "additionalProperties": false, + "properties": { + "scheme": { + "description": "Identifier of the subject scheme.", + "$ref": "local://definitions-v1.0.0.json#/identifier" + }, + "subject": { + "description": "Human readable label.", + "type": "string" + } + } + }, + "uniqueItems": true + }, + "organizations": { + "description": "Award's organizations.", + "type": "array", + "items": { + "type": "object", + "additionalProperties": false, + "properties": { + "scheme": { + "description": "Identifier of the organization scheme.", + "$ref": "local://definitions-v1.0.0.json#/identifier" + }, + "id": { + "description": "Identifier of the organization for the given scheme.", + "$ref": "local://definitions-v1.0.0.json#/identifier" + }, + "organization": { + "description": "Human readable label.", + "type": "string" + } + } + }, + "uniqueItems": true } } } diff --git a/invenio_vocabularies/contrib/awards/mappings/os-v1/awards/award-v1.0.0.json b/invenio_vocabularies/contrib/awards/mappings/os-v1/awards/award-v1.0.0.json index 90ec8049..86657803 100644 --- a/invenio_vocabularies/contrib/awards/mappings/os-v1/awards/award-v1.0.0.json +++ b/invenio_vocabularies/contrib/awards/mappings/os-v1/awards/award-v1.0.0.json @@ -64,6 +64,32 @@ "program": { "type": "keyword" }, + "subjects": { + "properties": { + "identifier": { + "type": "keyword" + }, + "scheme": { + "type": "keyword" + }, + "subject": { + "type": "keyword" + } + } + }, + "organizations": { + "properties": { + "scheme": { + "type": "keyword" + }, + "id": { + "type": "keyword" + }, + "organization": { + "type": "keyword" + } + } + }, "funder": { "type": "object", "properties": { diff --git a/invenio_vocabularies/contrib/awards/mappings/os-v2/awards/award-v1.0.0.json b/invenio_vocabularies/contrib/awards/mappings/os-v2/awards/award-v1.0.0.json index 90ec8049..86657803 100644 --- a/invenio_vocabularies/contrib/awards/mappings/os-v2/awards/award-v1.0.0.json +++ b/invenio_vocabularies/contrib/awards/mappings/os-v2/awards/award-v1.0.0.json @@ -64,6 +64,32 @@ "program": { "type": "keyword" }, + "subjects": { + "properties": { + "identifier": { + "type": "keyword" + }, + "scheme": { + "type": "keyword" + }, + "subject": { + "type": "keyword" + } + } + }, + "organizations": { + "properties": { + "scheme": { + "type": "keyword" + }, + "id": { + "type": "keyword" + }, + "organization": { + "type": "keyword" + } + } + }, "funder": { "type": "object", "properties": { diff --git a/invenio_vocabularies/contrib/awards/mappings/v7/awards/award-v1.0.0.json b/invenio_vocabularies/contrib/awards/mappings/v7/awards/award-v1.0.0.json index 90ec8049..86657803 100644 --- a/invenio_vocabularies/contrib/awards/mappings/v7/awards/award-v1.0.0.json +++ b/invenio_vocabularies/contrib/awards/mappings/v7/awards/award-v1.0.0.json @@ -64,6 +64,32 @@ "program": { "type": "keyword" }, + "subjects": { + "properties": { + "identifier": { + "type": "keyword" + }, + "scheme": { + "type": "keyword" + }, + "subject": { + "type": "keyword" + } + } + }, + "organizations": { + "properties": { + "scheme": { + "type": "keyword" + }, + "id": { + "type": "keyword" + }, + "organization": { + "type": "keyword" + } + } + }, "funder": { "type": "object", "properties": { diff --git a/invenio_vocabularies/contrib/awards/schema.py b/invenio_vocabularies/contrib/awards/schema.py index 487bafaf..f15453f5 100644 --- a/invenio_vocabularies/contrib/awards/schema.py +++ b/invenio_vocabularies/contrib/awards/schema.py @@ -15,15 +15,25 @@ from marshmallow_utils.fields import IdentifierSet, SanitizedUnicode from marshmallow_utils.schemas import IdentifierSchema +from ..subjects.schema import SubjectRelationSchema from ...services.schema import ( BaseVocabularySchema, ModePIDFieldVocabularyMixin, - i18n_strings, + i18n_strings, ContribVocabularyRelationSchema, ) from ..funders.schema import FunderRelationSchema from .config import award_schemes +class AwardOrganizationRelationSchema(ContribVocabularyRelationSchema): + """Schema to define an organization relation in an award.""" + + ftf_name = "organization" + parent_field_name = "organizations" + organization = SanitizedUnicode() + scheme = SanitizedUnicode() + + class AwardSchema(BaseVocabularySchema, ModePIDFieldVocabularyMixin): """Award schema.""" @@ -46,6 +56,10 @@ class AwardSchema(BaseVocabularySchema, ModePIDFieldVocabularyMixin): program = SanitizedUnicode() + subjects = fields.List(fields.Nested(SubjectRelationSchema)) + + organizations = fields.List(fields.Nested(AwardOrganizationRelationSchema)) + id = SanitizedUnicode( validate=validate.Length(min=1, error=_("PID cannot be blank.")) ) diff --git a/invenio_vocabularies/contrib/awards/serializer.py b/invenio_vocabularies/contrib/awards/serializer.py index 32aa1188..55e8a195 100644 --- a/invenio_vocabularies/contrib/awards/serializer.py +++ b/invenio_vocabularies/contrib/awards/serializer.py @@ -37,4 +37,5 @@ class AwardL10NItemSchema(Schema): acronym = fields.String(dump_only=True) program = fields.String(dump_only=True) funder = fields.Nested(FunderRelationSchema, dump_only=True) + # TODO: Add subjects and organizations here? identifiers = fields.List(fields.Nested(IdentifierSchema), dump_only=True) diff --git a/invenio_vocabularies/contrib/names/datastreams.py b/invenio_vocabularies/contrib/names/datastreams.py index d682526b..9929d173 100644 --- a/invenio_vocabularies/contrib/names/datastreams.py +++ b/invenio_vocabularies/contrib/names/datastreams.py @@ -243,7 +243,12 @@ def _entry_id(self, entry): "regex": "\\.xml$", }, }, - {"type": "xml"}, + { + "type": "xml", + "args": { + "root_element": "record", + }, + }, ], "transformers": [{"type": "orcid"}], "writers": [ @@ -266,7 +271,12 @@ def _entry_id(self, entry): { "type": "orcid-data-sync", }, - {"type": "xml"}, + { + "type": "xml", + "args": { + "root_element": "record", + }, + }, ], "transformers": [{"type": "orcid"}], "writers": [ diff --git a/invenio_vocabularies/datastreams/readers.py b/invenio_vocabularies/datastreams/readers.py index 4f017eb0..b559cf58 100644 --- a/invenio_vocabularies/datastreams/readers.py +++ b/invenio_vocabularies/datastreams/readers.py @@ -224,19 +224,30 @@ def _iter(self, fp, *args, **kwargs): class XMLReader(BaseReader): """XML reader.""" + def __init__(self, root_element=None, *args, **kwargs): + """Constructor.""" + # TODO: How to make root_element mandatory? + self.root_element = root_element + super().__init__(*args, **kwargs) + + def _iter(self, fp, *args, **kwargs): """Read and parse an XML file to dict.""" # NOTE: We parse HTML, to skip XML validation and strip XML namespaces record = None try: xml_tree = fromstring(fp) - record = etree_to_dict(xml_tree).get("record") + xml_dict = etree_to_dict(xml_tree) except Exception as e: xml_tree = html_parse(fp).getroot() - record = etree_to_dict(xml_tree)["html"]["body"].get("record") + xml_dict = etree_to_dict(xml_tree)["html"]["body"] - if not record: - raise ReaderError(f"Record not found in XML entry.") + if self.root_element: + record = xml_dict.get(self.root_element) + if not record: + raise ReaderError(f"Root element '{self.root_element}' not found in XML entry.") + else: + record = xml_dict yield record diff --git a/invenio_vocabularies/datastreams/transformers.py b/invenio_vocabularies/datastreams/transformers.py index d4274a68..cb495f47 100644 --- a/invenio_vocabularies/datastreams/transformers.py +++ b/invenio_vocabularies/datastreams/transformers.py @@ -32,6 +32,13 @@ def apply(self, stream_entry, *args, **kwargs): class XMLTransformer(BaseTransformer): """XML transformer.""" + def __init__( + self, root_element=None, *args, **kwargs + ): + """Initializes the transformer.""" + self.root_element = root_element + super().__init__(*args, **kwargs) + @classmethod def _xml_to_etree(cls, xml): """Converts XML to a lxml etree.""" @@ -43,10 +50,14 @@ def apply(self, stream_entry, **kwargs): Requires the root element to be named "record". """ xml_tree = self._xml_to_etree(stream_entry.entry) - record = etree_to_dict(xml_tree)["html"]["body"].get("record") - - if not record: - raise TransformerError(f"Record not found in XML entry.") + xml_dict = etree_to_dict(xml_tree)["html"]["body"] + + if self.root_element: + record = xml_dict.get(self.root_element) + if not record: + raise TransformerError(f"Root element '{self.root_element}' not found in XML entry.") + else: + record = xml_dict stream_entry.entry = record return stream_entry diff --git a/invenio_vocabularies/datastreams/writers.py b/invenio_vocabularies/datastreams/writers.py index 090fc7d8..060fcfd5 100644 --- a/invenio_vocabularies/datastreams/writers.py +++ b/invenio_vocabularies/datastreams/writers.py @@ -88,6 +88,10 @@ def _do_update(self, entry): vocab_id = self._entry_id(entry) current = self._resolve(vocab_id) + # updated = dict(current.to_dict(), **entry) + # TODO: Try to use _record instead of to_dict() + # updated = dict(current._record, **entry) + # Merge the `current` dictionary with new data in the `entry` dictionary # by appending to lists at the top level instead of overwriting the list. updated = current.to_dict() diff --git a/invenio_vocabularies/factories.py b/invenio_vocabularies/factories.py index 7d64d974..aa73b142 100644 --- a/invenio_vocabularies/factories.py +++ b/invenio_vocabularies/factories.py @@ -19,10 +19,11 @@ from .contrib.affiliations.datastreams import ( DATASTREAM_CONFIG_OPENAIRE as affiliations_openaire_ds_config, ) -from .contrib.awards.datastreams import DATASTREAM_CONFIG as awards_ds_config +from .contrib.awards.datastreams import DATASTREAM_CONFIG as awards_ds_config, DATASTREAM_CONFIG_CORDIS as awards_cordis_ds_config from .contrib.funders.datastreams import DATASTREAM_CONFIG as funders_ds_config from .contrib.names.datastreams import DATASTREAM_CONFIG as names_ds_config from .contrib.subjects.datastreams import DATASTREAM_CONFIG as subjects_ds_config +from .contrib.projects.datastreams import DATASTREAM_CONFIG as projects_ds_config class VocabularyConfig: @@ -87,6 +88,17 @@ def get_service(self): raise NotImplementedError("Service not implemented for Awards") +class AwardsCordisVocabularyConfig(VocabularyConfig): + """Awards Vocabulary Config.""" + + config = awards_cordis_ds_config + vocabulary_name = "awards:cordis" + + def get_service(self): + """Get the service for the vocabulary.""" + raise NotImplementedError("Service not implemented for CORDIS Awards") + + class AffiliationsVocabularyConfig(VocabularyConfig): """Affiliations Vocabulary Config.""" @@ -109,14 +121,27 @@ def get_service(self): raise NotImplementedError("Service not implemented for OpenAIRE Affiliations") +class ProjectsVocabularyConfig(VocabularyConfig): # TODO: Delete this config + """Projects Vocabulary Config.""" + + config = projects_ds_config + vocabulary_name = "projects" + + def get_service(self): + """Get the service for the vocabulary.""" + raise NotImplementedError("Service not implemented for Projects") + + def get_vocabulary_config(vocabulary): """Factory function to get the appropriate Vocabulary Config.""" vocab_config = { "names": NamesVocabularyConfig, "funders": FundersVocabularyConfig, "awards": AwardsVocabularyConfig, + "awards:cordis": AwardsCordisVocabularyConfig, "affiliations": AffiliationsVocabularyConfig, "affiliations:openaire": AffiliationsOpenAIREVocabularyConfig, + "projects": ProjectsVocabularyConfig, "subjects": SubjectsVocabularyConfig, } return vocab_config.get(vocabulary, VocabularyConfig)()