Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Affiliations OpenAIRE OpenOrgs PIC #392

Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 2 additions & 62 deletions invenio_vocabularies/contrib/awards/datastreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,73 +8,15 @@

"""Awards datastreams, transformers, writers and readers."""

import io

import requests
from invenio_access.permissions import system_identity
from invenio_i18n import lazy_gettext as _

from ...datastreams.errors import ReaderError, TransformerError
from ...datastreams.readers import BaseReader
from ...datastreams.errors import TransformerError
from ...datastreams.transformers import BaseTransformer
from ...datastreams.writers import ServiceWriter
from .config import awards_ec_ror_id, awards_openaire_funders_mapping


class OpenAIREProjectHTTPReader(BaseReader):
"""OpenAIRE Project HTTP Reader returning an in-memory binary stream of the latest OpenAIRE Graph Dataset project tar file."""

def _iter(self, fp, *args, **kwargs):
raise NotImplementedError(
"OpenAIREProjectHTTPReader downloads one file and therefore does not iterate through items"
)

def read(self, item=None, *args, **kwargs):
"""Reads the latest OpenAIRE Graph Dataset project tar file from Zenodo and yields an in-memory binary stream of it."""
if item:
raise NotImplementedError(
"OpenAIREProjectHTTPReader does not support being chained after another reader"
)

if self._origin == "full":
# OpenAIRE Graph Dataset
api_url = "https://zenodo.org/api/records/3516917"
elif self._origin == "diff":
# OpenAIRE Graph dataset: new collected projects
api_url = "https://zenodo.org/api/records/6419021"
else:
raise ReaderError("The --origin option should be either 'full' or 'diff'")

# Call the signposting `linkset+json` endpoint for the Concept DOI (i.e. latest version) of the OpenAIRE Graph Dataset.
# See: https://github.com/inveniosoftware/rfcs/blob/master/rfcs/rdm-0071-signposting.md#provide-an-applicationlinksetjson-endpoint
headers = {"Accept": "application/linkset+json"}
api_resp = requests.get(api_url, headers=headers)
api_resp.raise_for_status()

# Extract the Landing page Link Set Object located as the first (index 0) item.
landing_page_linkset = api_resp.json()["linkset"][0]

# Extract the URL of the only project tar file linked to the record.
landing_page_project_tar_items = [
item
for item in landing_page_linkset["item"]
if item["type"] == "application/x-tar"
and item["href"].endswith("/project.tar")
]
if len(landing_page_project_tar_items) != 1:
raise ReaderError(
f"Expected 1 project tar item but got {len(landing_page_project_tar_items)}"
)
file_url = landing_page_project_tar_items[0]["href"]

# Download the project tar file and fully load the response bytes content in memory.
# The bytes content are then wrapped by a BytesIO to be file-like object (as required by `tarfile.open`).
# Using directly `file_resp.raw` is not possible since `tarfile.open` requires the file-like object to be seekable.
file_resp = requests.get(file_url)
file_resp.raise_for_status()
yield io.BytesIO(file_resp.content)


class AwardsServiceWriter(ServiceWriter):
"""Funders service writer."""

Expand Down Expand Up @@ -172,9 +114,7 @@ def apply(self, stream_entry, **kwargs):
return stream_entry


VOCABULARIES_DATASTREAM_READERS = {
"openaire-project-http": OpenAIREProjectHTTPReader,
}
VOCABULARIES_DATASTREAM_READERS = {}

VOCABULARIES_DATASTREAM_TRANSFORMERS = {
"openaire-award": OpenAIREProjectTransformer,
Expand Down
9 changes: 9 additions & 0 deletions invenio_vocabularies/contrib/common/openaire/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2024 CERN.
#
# Invenio-Vocabularies is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""OpenAIRE-related module."""
84 changes: 84 additions & 0 deletions invenio_vocabularies/contrib/common/openaire/datastreams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2024 CERN.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: maintain 2022-2024?

#
# Invenio-Vocabularies is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""OpenAIRE-related Datastreams Readers/Writers/Transformers module."""

import io

import requests

from invenio_vocabularies.datastreams.errors import ReaderError
from invenio_vocabularies.datastreams.readers import BaseReader


class OpenAIREHTTPReader(BaseReader):
"""OpenAIRE HTTP Reader returning an in-memory binary stream of the latest OpenAIRE Graph Dataset tar file of a given type."""

def __init__(self, origin=None, mode="r", tar_href=None, *args, **kwargs):
"""Constructor."""
self.tar_href = tar_href
super().__init__(origin, mode, *args, **kwargs)

def _iter(self, fp, *args, **kwargs):
raise NotImplementedError(
"OpenAIREHTTPReader downloads one file and therefore does not iterate through items"
)

def read(self, item=None, *args, **kwargs):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remark: this reader still does not do a comparison of the publication date with the last successful run.
The only reader implementing such a logic so far is RORHTTPReader.

"""Reads the latest OpenAIRE Graph Dataset tar file of a given type from Zenodo and yields an in-memory binary stream of it."""
if item:
raise NotImplementedError(
"OpenAIREHTTPReader does not support being chained after another reader"
)

if self._origin == "full":
# OpenAIRE Graph Dataset
api_url = "https://zenodo.org/api/records/3516917"
elif self._origin == "diff":
# OpenAIRE Graph dataset: new collected projects
api_url = "https://zenodo.org/api/records/6419021"
else:
raise ReaderError("The --origin option should be either 'full' or 'diff'")

# Call the signposting `linkset+json` endpoint for the Concept DOI (i.e. latest version) of the OpenAIRE Graph Dataset.
# See: https://github.com/inveniosoftware/rfcs/blob/master/rfcs/rdm-0071-signposting.md#provide-an-applicationlinksetjson-endpoint
headers = {"Accept": "application/linkset+json"}
api_resp = requests.get(api_url, headers=headers)
api_resp.raise_for_status()

# Extract the Landing page Link Set Object located as the first (index 0) item.
landing_page_linkset = api_resp.json()["linkset"][0]

# Extract the URL of the only tar file matching `tar_href` linked to the record.
landing_page_matching_tar_items = [
item
for item in landing_page_linkset["item"]
if item["type"] == "application/x-tar"
and item["href"].endswith(self.tar_href)
]
if len(landing_page_matching_tar_items) != 1:
raise ReaderError(
f"Expected 1 tar item matching {self.tar_href} but got {len(landing_page_matching_tar_items)}"
)
file_url = landing_page_matching_tar_items[0]["href"]

# Download the matching tar file and fully load the response bytes content in memory.
# The bytes content are then wrapped by a BytesIO to be file-like object (as required by `tarfile.open`).
# Using directly `file_resp.raw` is not possible since `tarfile.open` requires the file-like object to be seekable.
file_resp = requests.get(file_url)
file_resp.raise_for_status()
yield io.BytesIO(file_resp.content)


VOCABULARIES_DATASTREAM_READERS = {
"openaire-http": OpenAIREHTTPReader,
}

VOCABULARIES_DATASTREAM_TRANSFORMERS = {}

VOCABULARIES_DATASTREAM_WRITERS = {}
125 changes: 1 addition & 124 deletions tests/contrib/awards/test_awards_datastreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,18 @@

"""Awards datastreams tests."""

import io
from copy import deepcopy
from unittest.mock import patch

import pytest
from invenio_access.permissions import system_identity

from invenio_vocabularies.contrib.awards.api import Award
from invenio_vocabularies.contrib.awards.datastreams import (
AwardsServiceWriter,
OpenAIREProjectHTTPReader,
OpenAIREProjectTransformer,
)
from invenio_vocabularies.datastreams import StreamEntry
from invenio_vocabularies.datastreams.errors import ReaderError, WriterError
from invenio_vocabularies.datastreams.errors import WriterError


@pytest.fixture(scope="function")
Expand Down Expand Up @@ -115,126 +112,6 @@ def expected_from_award_json_ec():
}


API_JSON_RESPONSE_CONTENT = {
"linkset": [
{
"anchor": "https://example.com/records/10488385",
"item": [
{
"href": "https://example.com/records/10488385/files/organization.tar",
"type": "application/x-tar",
},
{
"href": "https://example.com/records/10488385/files/project.tar",
"type": "application/x-tar",
},
],
},
{
"anchor": "https://example.com/api/records/10488385",
"describes": [
{"href": "https://example.com/records/10488385", "type": "text/html"}
],
"type": "application/dcat+xml",
},
]
}

API_JSON_RESPONSE_CONTENT_WRONG_NUMBER_PROJECT_TAR_ITEMS_ERROR = {
"linkset": [
{
"anchor": "https://example.com/records/10488385",
"item": [
{
"href": "https://example.com/records/10488385/files/organization.tar",
"type": "application/x-tar",
},
{
"href": "https://example.com/records/10488385/files/project.tar",
"type": "application/x-tar",
},
{
"href": "https://example.com/another/project.tar",
"type": "application/x-tar",
},
],
},
{
"anchor": "https://example.com/api/records/10488385",
"describes": [
{"href": "https://example.com/records/10488385", "type": "text/html"}
],
"type": "application/dcat+xml",
},
]
}

DOWNLOAD_FILE_BYTES_CONTENT = b"The content of the file"


class MockResponse:
content = DOWNLOAD_FILE_BYTES_CONTENT

def __init__(self, api_json_response_content):
self.api_json_response_content = api_json_response_content

def json(self, **kwargs):
return self.api_json_response_content

def raise_for_status(self):
pass


@pytest.fixture(scope="function")
def download_file_bytes_content():
return DOWNLOAD_FILE_BYTES_CONTENT


@patch(
"requests.get",
side_effect=lambda url, headers=None: MockResponse(API_JSON_RESPONSE_CONTENT),
)
def test_openaire_project_http_reader(_, download_file_bytes_content):
reader = OpenAIREProjectHTTPReader(origin="full")
results = []
for entry in reader.read():
results.append(entry)

assert len(results) == 1
assert isinstance(results[0], io.BytesIO)
assert results[0].read() == download_file_bytes_content


@patch(
"requests.get",
side_effect=lambda url, headers=None: MockResponse(
API_JSON_RESPONSE_CONTENT_WRONG_NUMBER_PROJECT_TAR_ITEMS_ERROR
),
)
def test_openaire_project_http_reader_wrong_number_tar_items_error(_):
reader = OpenAIREProjectHTTPReader(origin="full")
with pytest.raises(ReaderError):
next(reader.read())


def test_openaire_project_http_reader_unsupported_origin_option():
reader = OpenAIREProjectHTTPReader(origin="unsupported_origin_option")
with pytest.raises(ReaderError):
next(reader.read())


def test_openaire_project_http_reader_item_not_implemented():
reader = OpenAIREProjectHTTPReader()
with pytest.raises(NotImplementedError):
next(reader.read("A fake item"))


def test_openaire_project_http_reader_iter_not_implemented():
reader = OpenAIREProjectHTTPReader()
with pytest.raises(NotImplementedError):
reader._iter("A fake file pointer")


def test_awards_transformer(app, dict_award_entry, expected_from_award_json):
transformer = OpenAIREProjectTransformer()
assert expected_from_award_json == transformer.apply(dict_award_entry).entry
Expand Down
Loading