From ef5b009c50a07d3e7cc85c85ee81bcf6a0fd841a Mon Sep 17 00:00:00 2001 From: abram axel booth Date: Mon, 14 Oct 2024 15:53:36 -0400 Subject: [PATCH] allow raw data to expire - add `RawDatum.expiration_date` model field (with index) - add "expiration_date" query param to `/trove/ingest` - update digestive tract to appropriately handle expired data - add periodic task to expel expired data daily --- how-to/use-the-api.md | 1 + project/settings.py | 4 + .../0075_rawdatum_expiration_date.py | 18 ++ ..._rawdatum_share_rawdatum_expiration_idx.py | 17 ++ share/models/ingest.py | 26 ++- tests/trove/digestive_tract/test_expel.py | 207 ++++++++++++++++++ tests/trove/digestive_tract/test_extract.py | 12 + tests/trove/digestive_tract/test_swallow.py | 45 ++++ tests/trove/views/test_ingest.py | 42 ++++ trove/digestive_tract.py | 66 ++++-- trove/exceptions.py | 4 + trove/models/indexcard.py | 2 + trove/views/ingest.py | 10 + 13 files changed, 439 insertions(+), 15 deletions(-) create mode 100644 share/migrations/0075_rawdatum_expiration_date.py create mode 100644 share/migrations/0076_rawdatum_share_rawdatum_expiration_idx.py create mode 100644 tests/trove/digestive_tract/test_expel.py diff --git a/how-to/use-the-api.md b/how-to/use-the-api.md index 0af873993..2a220615b 100644 --- a/how-to/use-the-api.md +++ b/how-to/use-the-api.md @@ -20,6 +20,7 @@ query params: - `record_identifier` (required): a source-specific identifier for the metadata record (no format restrictions) -- sending another record with the same `record_identifier` is considered a full update (only the most recent is used) - `nonurgent`: if present (regardless of value), ingestion may be given a lower priority -- recommended for bulk or background operations - `is_supplementary`: if present (regardless of value), this record's metadata will be added to all pre-existing index-cards from the same user with the same `focus_iri` (if any), but will not get an index-card of its own nor affect the last-updated timestamp (e.g. in OAI-PMH) of the index-cards it supplements +- `expiration_date`: optional date (in format `YYYY-MM-DD`) when the record is no longer valid and should be removed ## Deleting index-cards diff --git a/project/settings.py b/project/settings.py index c8da60aee..df3a0500b 100644 --- a/project/settings.py +++ b/project/settings.py @@ -385,6 +385,10 @@ def split(string, delim): 'task': 'share.tasks.harvest', 'schedule': 120, }, + 'Expel expired data': { + 'task': 'trove.digestive_tract.task__expel_expired_data', + 'schedule': crontab(hour=0, minute=0), # every day at midnight UTC + }, } if not DEBUG: diff --git a/share/migrations/0075_rawdatum_expiration_date.py b/share/migrations/0075_rawdatum_expiration_date.py new file mode 100644 index 000000000..11cd2d285 --- /dev/null +++ b/share/migrations/0075_rawdatum_expiration_date.py @@ -0,0 +1,18 @@ +# Generated by Django 3.2.25 on 2024-10-14 15:52 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('share', '0074_sourceuniqueidentifier_is_supplementary'), + ] + + operations = [ + migrations.AddField( + model_name='rawdatum', + name='expiration_date', + field=models.DateField(help_text='An (optional) date after which this datum is no longer valid.', null=True), + ), + ] diff --git a/share/migrations/0076_rawdatum_share_rawdatum_expiration_idx.py b/share/migrations/0076_rawdatum_share_rawdatum_expiration_idx.py new file mode 100644 index 000000000..590c2153c --- /dev/null +++ b/share/migrations/0076_rawdatum_share_rawdatum_expiration_idx.py @@ -0,0 +1,17 @@ +from django.db import migrations, models +from django.contrib.postgres.operations import AddIndexConcurrently + + +class Migration(migrations.Migration): + atomic = False # allow adding indexes concurrently (without locking tables) + + dependencies = [ + ('share', '0075_rawdatum_expiration_date'), + ] + + operations = [ + AddIndexConcurrently( + model_name='rawdatum', + index=models.Index(fields=['expiration_date'], name='share_rawdatum_expiration_idx'), + ), + ] diff --git a/share/models/ingest.py b/share/models/ingest.py index 9823782fd..58bb22d6f 100644 --- a/share/models/ingest.py +++ b/share/models/ingest.py @@ -355,7 +355,15 @@ def store_data(self, config, fetch_result): return rd - def store_datum_for_suid(self, *, suid, datum: str, mediatype, datestamp: datetime.datetime): + def store_datum_for_suid( + self, + *, + suid, + datum: str, + mediatype: str | None, # `None` indicates sharev2-legacy ingestion + datestamp: datetime.datetime, + expiration_date: datetime.date | None = None, + ): _raw, _raw_created = self.get_or_create( suid=suid, sha256=hashlib.sha256(datum.encode()).hexdigest(), @@ -363,6 +371,7 @@ def store_datum_for_suid(self, *, suid, datum: str, mediatype, datestamp: dateti 'datum': datum, 'mediatype': mediatype, 'datestamp': datestamp, + 'expiration_date': expiration_date, }, ) if not _raw_created: @@ -371,10 +380,11 @@ def store_datum_for_suid(self, *, suid, datum: str, mediatype, datestamp: dateti logger.critical(_msg) sentry_sdk.capture_message(_msg) _raw.mediatype = mediatype + _raw.expiration_date = expiration_date # keep the latest datestamp if (not _raw.datestamp) or (datestamp > _raw.datestamp): _raw.datestamp = datestamp - _raw.save(update_fields=('mediatype', 'datestamp')) + _raw.save(update_fields=('mediatype', 'datestamp', 'expiration_date')) return _raw def latest_by_suid_id(self, suid_id) -> models.QuerySet: @@ -420,6 +430,10 @@ class RawDatum(models.Model): 'This may be, but is not limited to, a deletion, modification, publication, or creation datestamp. ' 'Ideally, this datetime should be appropriate for determining the chronological order its data will be applied.' )) + expiration_date = models.DateField( + null=True, + help_text='An (optional) date after which this datum is no longer valid.', + ) date_modified = models.DateTimeField(auto_now=True, editable=False) date_created = models.DateTimeField(auto_now_add=True, editable=False) @@ -447,11 +461,19 @@ def is_latest(self): .exists() ) + @property + def is_expired(self) -> bool: + return ( + self.expiration_date is not None + and self.expiration_date >= datetime.date.today() + ) + class Meta: unique_together = ('suid', 'sha256') verbose_name_plural = 'Raw Data' indexes = [ models.Index(fields=['no_output'], name='share_rawda_no_outp_f0330f_idx'), + models.Index(fields=['expiration_date'], name='share_rawdatum_expiration_idx'), ] class JSONAPIMeta(BaseJSONAPIMeta): diff --git a/tests/trove/digestive_tract/test_expel.py b/tests/trove/digestive_tract/test_expel.py new file mode 100644 index 000000000..2517172ea --- /dev/null +++ b/tests/trove/digestive_tract/test_expel.py @@ -0,0 +1,207 @@ +import datetime +from unittest import mock + +from django.test import TestCase +from primitive_metadata import primitive_rdf as rdf + +from share import models as share_db +from tests import factories +from trove import digestive_tract +from trove import models as trove_db + + +_BLARG = rdf.IriNamespace('https://blarg.example/') + + +class TestDigestiveTractExpel(TestCase): + @classmethod + def setUpTestData(cls): + cls.focus_1 = _BLARG.this1 + cls.focus_2 = _BLARG.this2 + cls.raw_1, cls.indexcard_1 = _setup_ingested(cls.focus_1) + cls.raw_2, cls.indexcard_2 = _setup_ingested(cls.focus_2) + cls.raw_supp = _setup_supplementary(cls.focus_1, cls.raw_1.suid, cls.indexcard_1) + + def setUp(self): + super().setUp() + self.notified_indexcard_ids = set() + self.enterContext(mock.patch( + 'share.search.index_messenger.IndexMessenger.notify_indexcard_update', + new=self._replacement_notify_indexcard_update, + )) + self.mock_derive_task = self.enterContext(mock.patch('trove.digestive_tract.task__derive')) + + def _replacement_notify_indexcard_update(self, indexcards, **kwargs): + self.notified_indexcard_ids.update(_card.id for _card in indexcards) + + def enterContext(self, context_manager): + # TestCase.enterContext added in python3.11 -- implementing here until then + result = context_manager.__enter__() + self.addCleanup(lambda: context_manager.__exit__(None, None, None)) + return result + + def test_setup(self): + self.indexcard_1.refresh_from_db() + self.indexcard_2.refresh_from_db() + self.assertIsNone(self.indexcard_1.deleted) + self.assertIsNone(self.indexcard_2.deleted) + self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3) + self.assertEqual(share_db.RawDatum.objects.count(), 3) + self.assertIsNotNone(self.indexcard_1.latest_rdf) + self.assertIsNotNone(self.indexcard_2.latest_rdf) + self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1) + self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1) + self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 1) + self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0) + self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 1) + self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 1) + # neither notified indexes nor enqueued re-derive + self.assertEqual(self.notified_indexcard_ids, set()) + self.mock_derive_task.delay.assert_not_called() + + def test_expel(self): + with mock.patch('trove.digestive_tract.expel_suid') as _mock_expel_suid: + _user = self.raw_1.suid.source_config.source.user + digestive_tract.expel(from_user=_user, record_identifier=self.raw_1.suid.identifier) + _mock_expel_suid.assert_called_once_with(self.raw_1.suid) + + def test_expel_suid(self): + digestive_tract.expel_suid(self.raw_1.suid) + self.indexcard_1.refresh_from_db() + self.indexcard_2.refresh_from_db() + self.assertIsNotNone(self.indexcard_1.deleted) + self.assertIsNone(self.indexcard_2.deleted) + self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3) + self.assertEqual(share_db.RawDatum.objects.count(), 3) + with self.assertRaises(trove_db.LatestIndexcardRdf.DoesNotExist): + self.indexcard_1.latest_rdf # deleted + self.assertIsNotNone(self.indexcard_2.latest_rdf) + self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1) # not deleted + self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1) + self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 1) # not deleted + self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0) + self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 0) # deleted + self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 1) + # notified indexes of update; did not enqueue re-derive + self.assertEqual(self.notified_indexcard_ids, {self.indexcard_1.id}) + self.mock_derive_task.delay.assert_not_called() + + def test_expel_supplementary_suid(self): + digestive_tract.expel_suid(self.raw_supp.suid) + self.indexcard_1.refresh_from_db() + self.indexcard_2.refresh_from_db() + self.assertIsNone(self.indexcard_1.deleted) + self.assertIsNone(self.indexcard_2.deleted) + self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3) + self.assertEqual(share_db.RawDatum.objects.count(), 3) + self.assertIsNotNone(self.indexcard_1.latest_rdf) + self.assertIsNotNone(self.indexcard_2.latest_rdf) + self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1) + self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1) + self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 0) # deleted + self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0) + self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 1) + self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 1) + # did not notify indexes of update; did enqueue re-derive + self.assertEqual(self.notified_indexcard_ids, set()) + self.mock_derive_task.delay.assert_called_once_with(self.indexcard_1.id) + + def test_expel_expired_task(self): + with mock.patch('trove.digestive_tract.expel_expired_data') as _mock_expel_expired: + digestive_tract.task__expel_expired_data.apply() + _mock_expel_expired.assert_called_once_with(datetime.date.today()) + + def test_expel_expired(self): + _today = datetime.date.today() + self.raw_2.expiration_date = _today + self.raw_2.save() + digestive_tract.expel_expired_data(_today) + self.indexcard_1.refresh_from_db() + self.indexcard_2.refresh_from_db() + self.assertIsNone(self.indexcard_1.deleted) + self.assertIsNotNone(self.indexcard_2.deleted) # marked deleted + self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3) + self.assertEqual(share_db.RawDatum.objects.count(), 3) + self.assertIsNotNone(self.indexcard_1.latest_rdf) + with self.assertRaises(trove_db.LatestIndexcardRdf.DoesNotExist): + self.indexcard_2.latest_rdf # deleted + self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1) + self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1) # not deleted + self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 1) + self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0) # deleted + self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 1) + self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 0) # deleted + # notified indexes of update; did not enqueue re-derive + self.assertEqual(self.notified_indexcard_ids, {self.indexcard_2.id}) + self.mock_derive_task.delay.assert_not_called() + + def test_expel_expired_supplement(self): + _today = datetime.date.today() + self.raw_supp.expiration_date = _today + self.raw_supp.save() + digestive_tract.expel_expired_data(_today) + self.indexcard_1.refresh_from_db() + self.indexcard_2.refresh_from_db() + self.assertIsNone(self.indexcard_1.deleted) + self.assertIsNone(self.indexcard_2.deleted) + self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3) + self.assertEqual(share_db.RawDatum.objects.count(), 3) + self.assertIsNotNone(self.indexcard_1.latest_rdf) + self.assertIsNotNone(self.indexcard_2.latest_rdf) + self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1) + self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1) + self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 0) # deleted + self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0) + self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 1) + self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 1) + # did not notify indexes of update; did enqueue re-derive + self.assertEqual(self.notified_indexcard_ids, set()) + self.mock_derive_task.delay.assert_called_once_with(self.indexcard_1.id) + + +def _setup_ingested(focus_iri: str): + _focus_ident = trove_db.ResourceIdentifier.objects.get_or_create_for_iri(focus_iri) + _suid = factories.SourceUniqueIdentifierFactory( + focus_identifier=_focus_ident, + ) + _raw = factories.RawDatumFactory(suid=_suid) + _indexcard = trove_db.Indexcard.objects.create(source_record_suid=_raw.suid) + _indexcard.focus_identifier_set.add(_focus_ident) + _latest_rdf = trove_db.LatestIndexcardRdf.objects.create( + indexcard=_indexcard, + from_raw_datum=_raw, + focus_iri=focus_iri, + rdf_as_turtle='...', + ) + trove_db.ArchivedIndexcardRdf.objects.create( + indexcard=_indexcard, + from_raw_datum=_raw, + focus_iri=focus_iri, + rdf_as_turtle=_latest_rdf.rdf_as_turtle, + ) + _deriver_iri = _BLARG.deriver + _deriver_ident = trove_db.ResourceIdentifier.objects.get_or_create_for_iri(_deriver_iri) + trove_db.DerivedIndexcard.objects.create( + upriver_indexcard=_indexcard, + deriver_identifier=_deriver_ident, + derived_checksum_iri='...', + derived_text='...', + ) + return _raw, _indexcard + + +def _setup_supplementary(focus_iri, main_suid, indexcard): + _supp_suid = factories.SourceUniqueIdentifierFactory( + focus_identifier=main_suid.focus_identifier, + source_config=main_suid.source_config, + is_supplementary=True, + ) + _supp_raw = factories.RawDatumFactory(suid=_supp_suid) + trove_db.SupplementaryIndexcardRdf.objects.create( + indexcard=indexcard, + from_raw_datum=_supp_raw, + supplementary_suid=_supp_suid, + focus_iri=focus_iri, + rdf_as_turtle='...', + ) + return _supp_raw diff --git a/tests/trove/digestive_tract/test_extract.py b/tests/trove/digestive_tract/test_extract.py index a40b1b62e..64f975e34 100644 --- a/tests/trove/digestive_tract/test_extract.py +++ b/tests/trove/digestive_tract/test_extract.py @@ -1,8 +1,10 @@ +import datetime from django.test import TestCase from primitive_metadata import primitive_rdf as rdf from tests import factories from trove import digestive_tract +from trove import exceptions as trove_exceptions from trove import models as trove_db @@ -128,3 +130,13 @@ def test_extract_empty_supplementary(self): (_indexcard,) = digestive_tract.extract(_empty_raw) self.assertEqual(_indexcard.id, _orig_indexcard.id) self.assertFalse(_orig_indexcard.supplementary_rdf_set.exists()) + + def test_extract_expired(self): + self.raw.expiration_date = datetime.date.today() + with self.assertRaises(trove_exceptions.CannotDigestExpiredDatum): + digestive_tract.extract(self.raw) + + def test_extract_expired_supplement(self): + self.supplementary_raw.expiration_date = datetime.date.today() + with self.assertRaises(trove_exceptions.CannotDigestExpiredDatum): + digestive_tract.extract(self.supplementary_raw) diff --git a/tests/trove/digestive_tract/test_swallow.py b/tests/trove/digestive_tract/test_swallow.py index 62a81309e..096f26ae2 100644 --- a/tests/trove/digestive_tract/test_swallow.py +++ b/tests/trove/digestive_tract/test_swallow.py @@ -1,3 +1,4 @@ +import datetime from unittest import mock from django.test import TestCase @@ -32,6 +33,7 @@ def test_swallow(self): (_raw,) = share_db.RawDatum.objects.all() self.assertEqual(_raw.datum, self.turtle) self.assertEqual(_raw.mediatype, 'text/turtle') + self.assertIsNone(_raw.expiration_date) self.assertEqual(_raw.suid.identifier, 'blarg') self.assertEqual(_raw.suid.focus_identifier.sufficiently_unique_iri, '://blarg.example/this') self.assertEqual(_raw.suid.source_config.source.user_id, self.user.id) @@ -51,6 +53,7 @@ def test_swallow_urgent(self): (_raw,) = share_db.RawDatum.objects.all() self.assertEqual(_raw.datum, self.turtle) self.assertEqual(_raw.mediatype, 'text/turtle') + self.assertIsNone(_raw.expiration_date) self.assertEqual(_raw.suid.identifier, 'blarg') self.assertEqual(_raw.suid.focus_identifier.sufficiently_unique_iri, '://blarg.example/this') self.assertEqual(_raw.suid.source_config.source.user_id, self.user.id) @@ -70,6 +73,48 @@ def test_swallow_supplementary(self): (_raw,) = share_db.RawDatum.objects.all() self.assertEqual(_raw.datum, self.turtle) self.assertEqual(_raw.mediatype, 'text/turtle') + self.assertIsNone(_raw.expiration_date) + self.assertEqual(_raw.suid.identifier, 'blarg') + self.assertEqual(_raw.suid.focus_identifier.sufficiently_unique_iri, '://blarg.example/this') + self.assertEqual(_raw.suid.source_config.source.user_id, self.user.id) + self.assertTrue(_raw.suid.is_supplementary) + _mock_task.delay.assert_called_once_with(_raw.id, urgent=False) + + def test_swallow_with_expiration(self): + with mock.patch('trove.digestive_tract.task__extract_and_derive') as _mock_task: + digestive_tract.swallow( + from_user=self.user, + record=self.turtle, + record_identifier='blarg', + record_mediatype='text/turtle', + focus_iri='https://blarg.example/this', + expiration_date=datetime.date(2048, 1, 3), + ) + (_raw,) = share_db.RawDatum.objects.all() + self.assertEqual(_raw.datum, self.turtle) + self.assertEqual(_raw.mediatype, 'text/turtle') + self.assertEqual(_raw.expiration_date, datetime.date(2048, 1, 3)) + self.assertEqual(_raw.suid.identifier, 'blarg') + self.assertEqual(_raw.suid.focus_identifier.sufficiently_unique_iri, '://blarg.example/this') + self.assertEqual(_raw.suid.source_config.source.user_id, self.user.id) + self.assertFalse(_raw.suid.is_supplementary) + _mock_task.delay.assert_called_once_with(_raw.id, urgent=False) + + def test_swallow_supplementary_with_expiration(self): + with mock.patch('trove.digestive_tract.task__extract_and_derive') as _mock_task: + digestive_tract.swallow( + from_user=self.user, + record=self.turtle, + record_identifier='blarg', + record_mediatype='text/turtle', + focus_iri='https://blarg.example/this', + is_supplementary=True, + expiration_date=datetime.date(2047, 1, 3), + ) + (_raw,) = share_db.RawDatum.objects.all() + self.assertEqual(_raw.datum, self.turtle) + self.assertEqual(_raw.mediatype, 'text/turtle') + self.assertEqual(_raw.expiration_date, datetime.date(2047, 1, 3)) self.assertEqual(_raw.suid.identifier, 'blarg') self.assertEqual(_raw.suid.focus_identifier.sufficiently_unique_iri, '://blarg.example/this') self.assertEqual(_raw.suid.source_config.source.user_id, self.user.id) diff --git a/tests/trove/views/test_ingest.py b/tests/trove/views/test_ingest.py index 2b828f399..f0cf1c24f 100644 --- a/tests/trove/views/test_ingest.py +++ b/tests/trove/views/test_ingest.py @@ -1,3 +1,4 @@ +import datetime from http import HTTPStatus from unittest import mock from urllib.parse import urlencode @@ -80,6 +81,31 @@ def test_post_supplementary(self): is_supplementary=True, ) + def test_post_with_expiration(self): + with mock.patch('trove.views.ingest.digestive_tract') as _mock_tract: + _resp = self.client.post( + '/trove/ingest?' + urlencode({ + 'focus_iri': 'https://foo.example/blarg', + 'record_identifier': 'blarg', + 'is_supplementary': '', + 'expiration_date': '2055-05-05', + }), + content_type='text/turtle', + data='turtleturtleturtle', + HTTP_AUTHORIZATION=self.user.authorization(), + ) + self.assertEqual(_resp.status_code, HTTPStatus.CREATED) + _mock_tract.swallow.assert_called_once_with( + from_user=self.user, + record='turtleturtleturtle', + record_identifier='blarg', + record_mediatype='text/turtle', + focus_iri='https://foo.example/blarg', + urgent=True, + is_supplementary=True, + expiration_date=datetime.date(2055, 5, 5), + ) + def test_delete(self): with mock.patch('trove.views.ingest.digestive_tract') as _mock_tract: _resp = self.client.delete( @@ -137,3 +163,19 @@ def test_nontrusted_delete(self): ) self.assertEqual(_resp.status_code, HTTPStatus.FORBIDDEN) self.assertFalse(_mock_tract.expel.called) + + def test_invalid_expiration_date(self): + with mock.patch('trove.views.ingest.digestive_tract') as _mock_tract: + _resp = self.client.post( + '/trove/ingest?' + urlencode({ + 'focus_iri': 'https://foo.example/blarg', + 'record_identifier': 'blarg', + 'is_supplementary': '', + 'expiration_date': '05-05-2055', + }), + content_type='text/turtle', + data='turtleturtleturtle', + HTTP_AUTHORIZATION=self.user.authorization(), + ) + self.assertEqual(_resp.status_code, HTTPStatus.BAD_REQUEST) + self.assertFalse(_mock_tract.swallow.called) diff --git a/trove/digestive_tract.py b/trove/digestive_tract.py index 7b78dcc00..9be767642 100644 --- a/trove/digestive_tract.py +++ b/trove/digestive_tract.py @@ -21,7 +21,7 @@ from share.search import IndexMessenger from share.util.checksum_iri import ChecksumIri from trove import models as trove_db -from trove.exceptions import DigestiveError +from trove.exceptions import DigestiveError, CannotDigestExpiredDatum from trove.extract import get_rdf_extractor_class from trove.derive import get_deriver_classes from trove.vocab.namespaces import RDFS, RDF, OWL @@ -36,11 +36,12 @@ def swallow( from_user: share_db.ShareUser, record: str, record_identifier: str, - record_mediatype: typing.Optional[str], # passing None indicates sharev2 backcompat + record_mediatype: str | None, # passing None indicates sharev2 backcompat focus_iri: str, - datestamp=None, # default "now" - urgent=False, - is_supplementary=False, + datestamp: datetime.datetime | None = None, # default "now" + expiration_date: datetime.date | None = None, + urgent: bool = False, + is_supplementary: bool = False, ): '''swallow: store a given record by checksum; queue for extraction @@ -74,6 +75,7 @@ def swallow( datum=record, mediatype=record_mediatype, datestamp=(datestamp or datetime.datetime.now(tz=datetime.timezone.utc)), + expiration_date=expiration_date, ) _task = task__extract_and_derive.delay(_raw.id, urgent=urgent) return _task.id @@ -120,6 +122,8 @@ def extract(raw: share_db.RawDatum, *, undelete_indexcards=False) -> list[trove_ LatestIndexcardRdf (previously extracted from the record, but no longer present) ''' assert raw.mediatype is not None, 'raw datum has no mediatype -- did you mean to call extract_legacy?' + if raw.is_expired: + raise CannotDigestExpiredDatum(raw) _tripledicts_by_focus_iri = {} _extractor = get_rdf_extractor_class(raw.mediatype)(raw.suid.source_config) # TODO normalize (or just validate) tripledict: @@ -206,14 +210,37 @@ def expel(from_user: share_db.ShareUser, record_identifier: str): expel_suid(_suid) -def expel_suid(suid: share_db.SourceUniqueIdentifier): - ( - trove_db.SupplementaryIndexcardRdf.objects - .filter(supplementary_suid=suid) - .delete() - ) +def expel_suid(suid: share_db.SourceUniqueIdentifier) -> None: for _indexcard in trove_db.Indexcard.objects.filter(source_record_suid=suid): _indexcard.pls_delete() + _expel_supplementary_rdf( + trove_db.SupplementaryIndexcardRdf.objects.filter(supplementary_suid=suid), + ) + + +def expel_expired_data(today: datetime.date) -> None: + # mark indexcards deleted if their latest update has now expired + for _indexcard in trove_db.Indexcard.objects.filter( + trove_latestindexcardrdf_set__from_raw_datum__expiration_date__lte=today, + ): + _indexcard.pls_delete() + # delete expired supplementary metadata + _expel_supplementary_rdf( + trove_db.SupplementaryIndexcardRdf.objects.filter( + from_raw_datum__expiration_date__lte=today, + ), + ) + + +def _expel_supplementary_rdf(supplementary_rdf_queryset) -> None: + # delete expired supplementary metadata + _affected_indexcards = set() + for _supplementary_rdf in supplementary_rdf_queryset.select_related('indexcard'): + if not _supplementary_rdf.indexcard.deleted: + _affected_indexcards.add(_supplementary_rdf.indexcard) + _supplementary_rdf.delete() + for _indexcard in _affected_indexcards: + task__derive.delay(_indexcard.id) ### BEGIN celery tasks @@ -241,9 +268,17 @@ def task__extract_and_derive(task: celery.Task, raw_id: int, urgent=False): @celery.shared_task(acks_late=True, bind=True) -def task__derive(task: celery.Task, indexcard_id: int, deriver_iri: str, notify_index=True): +def task__derive( + task: celery.Task, + indexcard_id: int, + deriver_iri: str | None = None, + notify_index=True, +): _indexcard = trove_db.Indexcard.objects.get(id=indexcard_id) - derive(_indexcard, deriver_iris=[deriver_iri]) + derive( + _indexcard, + deriver_iris=(None if deriver_iri is None else [deriver_iri]), + ) # TODO: avoid unnecessary work; let IndexStrategy subscribe to a specific # IndexcardDeriver (perhaps by deriver-specific MessageType?) if notify_index: @@ -276,6 +311,11 @@ def task__schedule_all_for_deriver(deriver_iri: str, notify_index=False): task__derive.apply_async((_indexcard_id, deriver_iri, notify_index)) +@celery.shared_task(acks_late=True) +def task__expel_expired_data(): + expel_expired_data(datetime.date.today()) + + # TODO: remove legacy ingest def _sharev2_legacy_ingest(raw, urgent: bool): assert raw.mediatype is None, 'raw datum has a mediatype -- did you mean to call non-legacy extract?' diff --git a/trove/exceptions.py b/trove/exceptions.py index 6f68b0f20..516f6c200 100644 --- a/trove/exceptions.py +++ b/trove/exceptions.py @@ -27,6 +27,10 @@ class CannotDigestDateValue(DigestiveError): pass +class CannotDigestExpiredDatum(DigestiveError): + pass + + ### # parsing a request diff --git a/trove/models/indexcard.py b/trove/models/indexcard.py index 091c67f4a..d6e46c7c7 100644 --- a/trove/models/indexcard.py +++ b/trove/models/indexcard.py @@ -64,6 +64,7 @@ def supplement_indexcards_from_tripledicts( rdf_tripledicts_by_focus_iri: dict[str, rdf.RdfTripleDictionary], ) -> list[Indexcard]: assert from_raw_datum.suid.is_supplementary + assert not from_raw_datum.is_expired from_raw_datum.no_output = (not rdf_tripledicts_by_focus_iri) from_raw_datum.save(update_fields=['no_output']) if not from_raw_datum.is_latest(): @@ -96,6 +97,7 @@ def save_indexcard_from_tripledict( undelete: bool = False, ): assert not from_raw_datum.suid.is_supplementary + assert not from_raw_datum.is_expired _focus_identifier_set = ( ResourceIdentifier.objects .save_equivalent_identifier_set(rdf_tripledict, focus_iri) diff --git a/trove/views/ingest.py b/trove/views/ingest.py index 3f4dc654b..315684b80 100644 --- a/trove/views/ingest.py +++ b/trove/views/ingest.py @@ -1,3 +1,4 @@ +import datetime from http import HTTPStatus import logging @@ -30,6 +31,14 @@ def post(self, request): _record_identifier = request.GET.get('record_identifier') if not _record_identifier: return http.HttpResponse('record_identifier queryparam required', status=HTTPStatus.BAD_REQUEST) + _expiration_date_str = request.GET.get('expiration_date') + if _expiration_date_str is None: + _expiration_date = None + else: + try: + _expiration_date = datetime.date.fromisoformat(_expiration_date_str) + except ValueError: + return http.HttpResponse('expiration_date queryparam must be in ISO-8601 date format (YYYY-MM-DD)', status=HTTPStatus.BAD_REQUEST) try: digestive_tract.swallow( from_user=request.user, @@ -39,6 +48,7 @@ def post(self, request): focus_iri=_focus_iri, urgent=(request.GET.get('nonurgent') is None), is_supplementary=(request.GET.get('is_supplementary') is not None), + expiration_date=_expiration_date, ) except exceptions.IngestError as e: logger.exception(str(e))