Skip to content

Commit

Permalink
support sending supplementary metadata to trove
Browse files Browse the repository at this point in the history
  • Loading branch information
aaxelb committed Oct 3, 2024
1 parent cc1e078 commit b23ec90
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 38 deletions.
45 changes: 31 additions & 14 deletions api/share/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ def _enqueue_update_share(osfresource):


@celery_app.task(bind=True, max_retries=4, acks_late=True)
def task__update_share(self, guid: str, is_backfill=False):
def task__update_share(self, guid: str, is_backfill=False, is_supplementary=False):
"""
This function updates share takes Preprints, Projects and Registrations.
:param self:
:param guid:
:return:
"""
resp = _do_update_share(guid, is_backfill=is_backfill)
resp = _do_update_share(guid, is_backfill=is_backfill, is_supplementary=is_supplementary)
try:
resp.raise_for_status()
except Exception as e:
Expand All @@ -93,22 +93,31 @@ def task__update_share(self, guid: str, is_backfill=False):
log_exception(e)
else:
log_exception(e)
else: # succeeded -- enqueue followup task for supplementary metadata
if not is_supplementary:
task__update_share.delay(guid, is_backfill=is_backfill, is_supplementary=True)

return resp


def pls_send_trove_indexcard(osf_item, *, is_backfill=False):
def pls_send_trove_indexcard(osf_item, *, is_backfill=False, is_supplementary=False):
try:
_iri = osf_item.get_semantic_iri()
except (AttributeError, ValueError):
raise ValueError(f'could not get iri for {osf_item}')
_metadata_record = pls_gather_metadata_file(osf_item, 'turtle')
_metadata_record = pls_gather_metadata_file(
osf_item,
format_key='turtle',
serializer_config={'is_supplementary': is_supplementary},
)
_queryparams = {
'focus_iri': _iri,
'record_identifier': _shtrove_record_identifier(osf_item),
'record_identifier': _shtrove_record_identifier(osf_item, is_supplementary=is_supplementary),
}
if is_backfill:
_queryparams['nonurgent'] = True
if is_supplementary:
_queryparams['is_supplementary'] = True
return requests.post(
shtrove_ingest_url(),
params=_queryparams,
Expand All @@ -120,32 +129,40 @@ def pls_send_trove_indexcard(osf_item, *, is_backfill=False):
)


def pls_delete_trove_indexcard(osf_item):
def pls_delete_trove_indexcard(osf_item, *, is_supplementary=False):
_queryparams = {
'record_identifier': _shtrove_record_identifier(osf_item, is_supplementary=is_supplementary),
}
if is_supplementary:
_queryparams['is_supplementary'] = True
return requests.delete(
shtrove_ingest_url(),
params={
'record_identifier': _shtrove_record_identifier(osf_item),
},
params=_queryparams,
headers=_shtrove_auth_headers(osf_item),
)


def _do_update_share(osfguid: str, *, is_backfill=False):
def _do_update_share(osfguid: str, *, is_backfill=False, is_supplementary=False):
logger.debug('%s._do_update_share("%s", is_backfill=%s)', __name__, osfguid, is_backfill)
_guid_instance = apps.get_model('osf.Guid').load(osfguid)
if _guid_instance is None:
raise ValueError(f'unknown osfguid "{osfguid}"')
_resource = _guid_instance.referent
_response = (
pls_delete_trove_indexcard(_resource)
pls_delete_trove_indexcard(_resource, is_supplementary=is_supplementary)
if _should_delete_indexcard(_resource)
else pls_send_trove_indexcard(_resource, is_backfill=is_backfill)
else pls_send_trove_indexcard(
_resource,
is_backfill=is_backfill,
is_supplementary=is_supplementary,
)
)
return _response


def _shtrove_record_identifier(osf_item):
return osf_item.guids.values_list('_id', flat=True).first()
def _shtrove_record_identifier(osf_item, *, is_supplementary=False):
_id = osf_item.guids.values_list('_id', flat=True).first()
return (f'{_id}/supplement' if is_supplementary else _id)


def _shtrove_auth_headers(osf_item):
Expand Down
43 changes: 33 additions & 10 deletions api_tests/share/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,36 +40,58 @@ def mock_update_share():


@contextlib.contextmanager
def expect_ingest_request(mock_share_responses, osfguid, *, token=None, delete=False, count=1):
def expect_ingest_request(mock_share_responses, osfguid, *, token=None, delete=False, count=1, with_supplementary=True):
mock_share_responses._calls.reset()
yield
_double_count = count * 2 # pushing to share two ways
assert len(mock_share_responses.calls) == _double_count, (
f'expected {_double_count} call(s), got {len(mock_share_responses.calls)}: {list(mock_share_responses.calls)}'
_total_count = (
count * 3 # (1) legacy (2) trove (3) trove supplementary
if with_supplementary
else count * 2
)
assert len(mock_share_responses.calls) == _total_count, (
f'expected {_total_count} call(s), got {len(mock_share_responses.calls)}: {list(mock_share_responses.calls)}'
)
_trove_ingest_calls = []
_trove_supp_ingest_calls = []
_legacy_push_calls = []
for _call in mock_share_responses.calls:
if _call.request.url.startswith(shtrove_ingest_url()):
assert_ingest_request(_call.request, osfguid, token=token, delete=delete)
if 'is_supplementary' in _call.request.url:
_trove_supp_ingest_calls.append(_call)
else:
_trove_ingest_calls.append(_call)
else:
assert _call.request.url.startswith(sharev2_push_url())
_legacy_push_calls.append(_call)
assert len(_trove_ingest_calls) == count
assert len(_trove_supp_ingest_calls) == (count if with_supplementary else 0)
assert len(_legacy_push_calls) == count
for _call in _trove_ingest_calls:
assert_ingest_request(_call.request, osfguid, token=token, delete=delete)
for _call in _trove_supp_ingest_calls:
assert_ingest_request(_call.request, osfguid, token=token, delete=delete, supp=True)
for _call in _legacy_push_calls:
assert _call.request.url.startswith(sharev2_push_url())


def assert_ingest_request(request, expected_osfguid, *, token=None, delete=False):
def assert_ingest_request(request, expected_osfguid, *, token=None, delete=False, supp=False):
_querydict = QueryDict(urlsplit(request.path_url).query)
assert _querydict['record_identifier'] == expected_osfguid
assert _querydict['record_identifier'] == (
f'{expected_osfguid}/supplement' if supp else expected_osfguid
)
if delete:
assert request.method == 'DELETE'
else:
assert request.method == 'POST'
_focus_iri = _querydict['focus_iri']
assert _focus_iri == f'{website_settings.DOMAIN}{expected_osfguid}'
assert _focus_iri in request.body.decode('utf-8')
_request_body = request.body.decode('utf-8')
assert (_focus_iri in _request_body) or (supp and not _request_body.strip())
_token = token or website_settings.SHARE_API_TOKEN
assert request.headers['Authorization'] == f'Bearer {_token}'


@contextlib.contextmanager
def expect_preprint_ingest_request(mock_share_responses, preprint, *, delete=False, count=1):
def expect_preprint_ingest_request(mock_share_responses, preprint, *, delete=False, count=1, with_supplementary=True):
# same as expect_ingest_request, but with convenience for preprint specifics
# and postcommit-task handling (so on_preprint_updated actually runs)
with expect_ingest_request(
Expand All @@ -78,6 +100,7 @@ def expect_preprint_ingest_request(mock_share_responses, preprint, *, delete=Fal
token=preprint.provider.access_token,
delete=delete,
count=count,
with_supplementary=with_supplementary,
):
# clear out postcommit tasks from factories
postcommit_queue().clear()
Expand Down
2 changes: 1 addition & 1 deletion api_tests/share/test_share_preprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def test_no_call_async_update_on_400_failure(self, mock_share_responses, preprin
mock_share_responses.replace(responses.POST, shtrove_ingest_url(), status=400)
mock_share_responses.replace(responses.POST, sharev2_push_url(), status=400)
preprint.set_published(True, auth=auth, save=True)
with expect_preprint_ingest_request(mock_share_responses, preprint, count=1):
with expect_preprint_ingest_request(mock_share_responses, preprint, count=1, with_supplementary=False):
preprint.update_search()

def test_delete_from_share(self, mock_share_responses):
Expand Down
19 changes: 12 additions & 7 deletions osf/metadata/gather/basket.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ class Basket:
def __init__(self, focus: Focus):
assert isinstance(focus, Focus)
self.focus = focus
self.reset() # start with an empty basket (except the focus itself)
self.reset() # start with an empty basket

def reset(self):
self._gathertasks_done = set()
self._known_focus_dict = {}
self._known_focus_dict = {self.focus.iri: {self.focus}}
self.gathered_metadata = rdfutils.contextualized_graph()
self._add_focus_reference(self.focus)

def pls_gather(self, predicate_map): # TODO: async
def pls_gather(self, predicate_map, *, include_defaults=True): # TODO: async
'''go gatherers, go!
@predicate_map: dict with rdflib.URIRef keys
Expand All @@ -48,7 +47,7 @@ def pls_gather(self, predicate_map): # TODO: async
},
})
'''
self._do_gather(self.focus, predicate_map)
self._do_gather(self.focus, predicate_map, include_defaults=include_defaults)

def __getitem__(self, slice_or_arg) -> typing.Iterable[rdflib.term.Node]:
'''convenience for getting values from the basket
Expand Down Expand Up @@ -98,14 +97,20 @@ def _maybe_gather_for_predicate_map(self, iri_or_focus, predicate_map):
else:
raise ValueError(f'expected `iri_or_focus` to be Focus or URIRef (got {iri_or_focus})')

def _do_gather(self, focus, predicate_map):
def _do_gather(self, focus, predicate_map, *, include_defaults=True):
if include_defaults:
self._add_focus_reference(focus)
if not isinstance(predicate_map, dict):
# allow iterable of predicates with no deeper paths
predicate_map = {
predicate_iri: None
for predicate_iri in predicate_map
}
for gatherer in get_gatherers(focus.rdftype, predicate_map.keys()):
for gatherer in get_gatherers(
focus.rdftype,
predicate_map.keys(),
include_focustype_defaults=include_defaults,
):
for (subj, pred, obj) in self._do_a_gathertask(gatherer, focus):
if isinstance(obj, Focus):
self._add_focus_reference(obj)
Expand Down
9 changes: 7 additions & 2 deletions osf/metadata/gather/gatherer.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,16 @@ def add_gatherer(gatherer, predicate_iris, focustype_iris):
)


def get_gatherers(focustype_iri, predicate_iris):
def get_gatherers(focustype_iri, predicate_iris, *, include_focustype_defaults=True):
gatherer_set = set()
for focustype in (None, focustype_iri):
for_focustype = __gatherer_registry.get(focustype, {})
for predicate in (None, *predicate_iris):
_predicates = (
(None, *predicate_iris)
if include_focustype_defaults
else predicate_iris
)
for predicate in _predicates:
gatherer_set.update(for_focustype.get(predicate, ()))
return gatherer_set

Expand Down
22 changes: 22 additions & 0 deletions osf/metadata/osf_gathering.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ def osfmap_for_type(rdftype_iri: str):
raise ValueError(f'invalid OSFMAP type! expected one of {set(OSFMAP.keys())}, got {rdftype_iri}')


def osfmap_supplement_for_type(rdftype_iri: str):
try:
return OSFMAP_SUPPLEMENT[rdftype_iri]
except KeyError:
return {}


##### END "public" api #####


Expand Down Expand Up @@ -211,6 +218,21 @@ def osfmap_for_type(rdftype_iri: str):
},
}

OSFMAP_SUPPLEMENT = {
OSF.Project: {
},
OSF.ProjectComponent: {
},
OSF.Registration: {
},
OSF.RegistrationComponent: {
},
OSF.Preprint: {
},
OSF.File: {
},
}

OSF_ARTIFACT_PREDICATES = {
ArtifactTypes.ANALYTIC_CODE: OSF.hasAnalyticCodeResource,
ArtifactTypes.DATA: OSF.hasDataResource,
Expand Down
10 changes: 8 additions & 2 deletions osf/metadata/serializers/turtle.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from osf.metadata.osf_gathering import osfmap_for_type
from osf.metadata.osf_gathering import osfmap_for_type, osfmap_supplement_for_type
from osf.metadata.serializers import _base


Expand All @@ -9,5 +9,11 @@ def filename_for_itemid(self, itemid: str):
return f'{itemid}-metadata.ttl'

def serialize(self) -> str:
self.basket.pls_gather(osfmap_for_type(self.basket.focus.rdftype))
if self.serializer_config.get('is_supplementary', False):
self.basket.pls_gather(
osfmap_supplement_for_type(self.basket.focus.rdftype),
include_defaults=False,
)
else:
self.basket.pls_gather(osfmap_for_type(self.basket.focus.rdftype))
return self.basket.gathered_metadata.serialize(format='turtle')
5 changes: 3 additions & 2 deletions osf_tests/metadata/test_basket.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_goodbasket():
basket = gather.Basket(focus)
assert basket.focus == focus
assert isinstance(basket.gathered_metadata, rdflib.Graph)
assert len(basket.gathered_metadata) == 1
assert len(basket.gathered_metadata) == 0
assert len(basket._gathertasks_done) == 0
assert len(basket._known_focus_dict) == 1
# no repeat gathertasks:
Expand Down Expand Up @@ -78,5 +78,6 @@ def test_goodbasket():

# reset
basket.reset()
assert len(basket.gathered_metadata) == 1
assert len(basket.gathered_metadata) == 0
assert len(basket._gathertasks_done) == 0
assert len(basket._known_focus_dict) == 1
4 changes: 4 additions & 0 deletions osf_tests/metadata/test_gatherer_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ def gather_agent_name(focus):
gather_preprint_or_project_creator,
gather_special_preprint_creator,
}
assert get_gatherers(BAZ.Preprint, [BAZ.creator], include_focustype_defaults=False) == {
gather_preprint_or_project_creator,
gather_special_preprint_creator,
}
assert get_gatherers(BAZ.Agent, [FOO.name, FOO.identifier, FOO.unknown]) == {
gather_agent_name,
gather_identifiers,
Expand Down

0 comments on commit b23ec90

Please sign in to comment.