diff --git a/.circleci/config.yml b/.circleci/config.yml index f417887d5..fe8048827 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -178,13 +178,13 @@ workflows: test_and_deploy: jobs: - test - - deploy_staging: - requires: - - test - filters: - branches: - only: - - develop +# - deploy_staging: +# requires: +# - test +# filters: +# branches: +# only: +# - develop - deploy_production_1: requires: - test @@ -208,8 +208,8 @@ workflows: only: - master - prerelease: - requires: - - deploy_staging +# requires: +# - deploy_staging filters: branches: only: diff --git a/OIPA/iati_synchroniser/dataset_syncer.py b/OIPA/iati_synchroniser/dataset_syncer.py index 9b3d4ecda..a9f2e4141 100644 --- a/OIPA/iati_synchroniser/dataset_syncer.py +++ b/OIPA/iati_synchroniser/dataset_syncer.py @@ -9,7 +9,8 @@ create_publisher_organisation ) from iati_synchroniser.models import ( - Dataset, DatasetFailedPickup, DatasetUpdateDates, Publisher + Dataset, DatasetDownloadsStarted, DatasetFailedPickup, DatasetUpdateDates, + Publisher ) from task_queue.tasks import DatasetDownloadTask @@ -136,7 +137,10 @@ def update_or_create_dataset(self, dataset): if not len(dataset['resources']) or not dataset['organization']: return - # Pass the data generated here to the + # Add a row to the DatasetDownloadsStarted table + DatasetDownloadsStarted.objects.create() + + # Pass the data generated here to the DatasetDownloadTask DatasetDownloadTask.delay(dataset_data=dataset) def remove_deprecated(self): diff --git a/OIPA/iati_synchroniser/migrations/0024_datasetdownloadsfinished_datasetdownloadsstarted.py b/OIPA/iati_synchroniser/migrations/0024_datasetdownloadsfinished_datasetdownloadsstarted.py new file mode 100644 index 000000000..13c2c60d6 --- /dev/null +++ b/OIPA/iati_synchroniser/migrations/0024_datasetdownloadsfinished_datasetdownloadsstarted.py @@ -0,0 +1,27 @@ +# Generated by Django 2.0.13 on 2021-03-31 10:29 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('iati_synchroniser', '0023_auto_20210311_1111'), + ] + + operations = [ + migrations.CreateModel( + name='DatasetDownloadsFinished', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('timestamp', models.DateTimeField(auto_now=True)), + ], + ), + migrations.CreateModel( + name='DatasetDownloadsStarted', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('timestamp', models.DateTimeField(auto_now=True)), + ], + ), + ] diff --git a/OIPA/iati_synchroniser/migrations/0025_auto_20210331_1603.py b/OIPA/iati_synchroniser/migrations/0025_auto_20210331_1603.py new file mode 100644 index 000000000..d560c5679 --- /dev/null +++ b/OIPA/iati_synchroniser/migrations/0025_auto_20210331_1603.py @@ -0,0 +1,17 @@ +# Generated by Django 2.0.13 on 2021-03-31 16:03 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('iati_synchroniser', '0024_datasetdownloadsfinished_datasetdownloadsstarted'), + ] + + operations = [ + migrations.RenameModel( + old_name='DatasetDownloadsFinished', + new_name='AsyncTasksFinished', + ), + ] diff --git a/OIPA/iati_synchroniser/models.py b/OIPA/iati_synchroniser/models.py index eacbe698a..5deb72087 100644 --- a/OIPA/iati_synchroniser/models.py +++ b/OIPA/iati_synchroniser/models.py @@ -234,6 +234,18 @@ class DatasetUpdateDates(models.Model): null=False, blank=True, auto_now=False) +# This model is added for the automation of the incremental parsing procedure +class DatasetDownloadsStarted(models.Model): + timestamp = models.DateTimeField( + null=False, blank=True, auto_now=True) + + +# This model is added for the automation of the incremental parsing procedure +class AsyncTasksFinished(models.Model): + timestamp = models.DateTimeField( + null=False, blank=True, auto_now=True) + + class Codelist(models.Model): name = models.CharField(primary_key=True, max_length=100) description = models.TextField(max_length=1000, blank=True, null=True) diff --git a/OIPA/task_queue/download.py b/OIPA/task_queue/download.py index 9c4b10996..71a9e0d41 100644 --- a/OIPA/task_queue/download.py +++ b/OIPA/task_queue/download.py @@ -10,7 +10,8 @@ from django.utils.encoding import smart_text from iati_synchroniser.models import ( - Dataset, DatasetFailedPickup, Publisher, filetype_choices + AsyncTasksFinished, Dataset, DatasetFailedPickup, Publisher, + filetype_choices ) # Get an instance of a logger @@ -300,6 +301,8 @@ def run(self, dataset_data, *args, **kwargs): 'internal_url': internal_url } ) + # Save a row to the AsyncTasksFinished table. + AsyncTasksFinished.objects.create() # URL string to save as a Dataset attribute: return os.path.join(main_download_dir, filename) diff --git a/OIPA/task_queue/tasks.py b/OIPA/task_queue/tasks.py index 1166ee812..c09234010 100644 --- a/OIPA/task_queue/tasks.py +++ b/OIPA/task_queue/tasks.py @@ -28,7 +28,9 @@ from iati.models import Activity, Budget, Document, DocumentLink, Result from iati.transaction.models import Transaction from iati_organisation.models import Organisation -from iati_synchroniser.models import Dataset, DatasetNote, DatasetUpdateDates +from iati_synchroniser.models import ( + AsyncTasksFinished, Dataset, DatasetNote, DatasetUpdateDates +) from OIPA.celery import app from solr.activity.tasks import ActivityTaskIndexing from solr.activity.tasks import solr as solr_activity @@ -40,7 +42,9 @@ from solr.transaction.tasks import solr as solr_transaction from solr.transaction_sector.tasks import solr as solr_transaction_sector from task_queue.download import DatasetDownloadTask -from task_queue.utils import Tasks +from task_queue.utils import ( + Tasks, await_async_subtasks, reset_automatic_incremental_parse_dbs +) from task_queue.validation import DatasetValidationTask # Get an instance of a logger @@ -82,6 +86,145 @@ def add_activity_to_solr(activity_id): # All Registered Celery tasks that are actively used # TODO: 25-02-2020 rename tasks # +# This task 'automatically' does a complete parse and index. +# Meaning the different steps that were previously manual are all included. +@shared_task +def automatic_incremental_parse(start_at=1): + """ + There are several steps that need to be taken to complete an incremental + parse/index. + 1. Import datasets task (async, not able to directly tell when finished) + 2. Drop old datasets task (when is this finished?) + 3. Dataset validation task (when does it start? when does it finish?) + 4. Parse all datasets task (when does it finish?) + + Solutions: + 1 - When running the 'import datasets task', separate 'download dataset + tasks' are fired. To track whether or not all of them finished - we + add a table which gets filled from the 'update_or_create_dataset' + function which in turn queues a DatasetDownloadTask. We add a table + which gets filled when the DatasetDownloadTask finishes, leading to + eventually having two tables with an equal number of rows. To make sure + it is not a false positive, check three times whether or not the + size of both tables remains the same and still matches (to catch cases + like '5 datasets found in the IATI registry and 5 datasets downloaded). + Also, we include a 'grace' filter, where we try to catch failed datasets. + If the finished datasets hasn't changed in 10 iterations, we assume they + have failed. As we expect 0 failed downloads, but historically have seen + a maximum of four datasetDownloadTasks failing, we allow a difference of + TEN at maximum. + 2 - Simple, just fire the task. Not async + 3 - We check whether or not the dataset validation on the validation + endpoint has started by checking their API. If this API endpoint returns + empty, it means the validation endpoint is not running. The steps to + take to ensure the validation has completed are: Check if it has started + validating datasets. If it has not started yet, wait until it has started + If it has started, wait until it no longer returns any data. Then confirm + that it has stopped returning data by checking several times in a row. + + Then, we need to actually do the validation step. Start off that task, + then re-use the logic from step one: we know the number of validation task + as it is the same as the number of existing datasets. + Make the DatasetValidationTask update a table with a row, check if the + number of rows matches the number of existing datasets. + 4 - We know that the parse all sources task queues a + parse_source_by_id_task for each of the existing datasets. + We can simply reuse the logic from the previous step to check which + of the datasets have been parsed, and when all async tasks finish, we have + completed the incremental parse/index. The process can then start anew. + """ + # Before starting, reset the databases we use for checks, in case they + # Still contain data. + reset_automatic_incremental_parse_dbs() + + # STEP ONE -- Import Datasets # + if start_at == 1: + # Start the task + get_new_sources_from_iati_api_task() + + too_many_failed = await_async_subtasks(started_not_set=False) + if too_many_failed: + return "Too many dataset imports failed" + # STEP ONE -- End # + + # STEP TWO -- DROP OLD DATASETS # + if start_at in (1, 2): + drop_old_datasets() + # STEP TWO -- End # + + # STEP THREE -- DATASET VALIDATION TASK # + # Prepare checks + if start_at in (1, 2, 3): + check_validation_has_started = False + check_validation_is_active = False + check_empty_iteration_count = 0 + check_empty_iteration_maximum = 3 + + while True: + url = "https://iativalidator.iatistandard.org/api/v1/queue/next" + response = requests.get(url, timeout=30) + + # If the response is not 200, reset and check back later. + if response.status_code != 200: + check_validation_has_started = False + check_validation_is_active = False + time.sleep(60) + continue + + check_content_is_empty = response.content.decode("utf-8") == "" + + """ + Case 1: content empty - started = false - active = false + wait for the validator to start + Case 2: content has data - started = false - active = false + set started to true and active to true + Case 3: content has data - started = true - active = true + wait for the content to stop having data! + Case 4: content empty - started = true - active = true + with three iterations, confirm the content is actually empty! + set active to false. + """ + # if check_content_is_empty and not check_validation_has_started and not check_validation_is_active: # NOQA: E501 + if not check_content_is_empty and not check_validation_has_started and not check_validation_is_active: # NOQA: E501 + check_validation_has_started = True + check_validation_is_active = True + if not check_content_is_empty and check_validation_has_started and check_validation_is_active: # NOQA: E501 + check_empty_iteration_count = 0 + if check_content_is_empty and check_validation_has_started and check_validation_is_active: # NOQA: E501 + if check_empty_iteration_count < check_empty_iteration_maximum: + check_empty_iteration_count += 1 + else: # Validation has finished + break + time.sleep(60) + + # Now that the "waiting for validator to finish" loop is over, we know + # The validator is finished. Run the task. To reduce complexity, reuse + # the AsyncTasksFinished table. + get_validation_results_task() + + started = len(Dataset.objects.all()) + await_async_subtasks(started) + # STEP THREE -- End # + + # STEP FOUR -- PARSE ALL DATASETS # + if start_at in (1, 2, 3, 4): + # parse_all_existing_sources_task() does not actually run the parsing, + # Reusing the code here. + for dataset in Dataset.objects.all().filter(filetype=2): + parse_source_by_id_task.delay(dataset_id=dataset.id, + force=False, + check_validation=True) + for dataset in Dataset.objects.all().filter(filetype=1): + parse_source_by_id_task.delay(dataset_id=dataset.id, + force=False, + check_validation=True) + await_async_subtasks(started) + # STEP FOUR -- End # + + # Restart the automatic_incremental_parse asynchronously and end this task. + automatic_incremental_parse.apply_async() + + # This task updates all of the currency exchange rates in the local database @shared_task def update_exchange_rates(): @@ -115,6 +258,8 @@ def get_validation_results_task(): # This task is used to parse a specific dataset by passing it an ID. +# For all of the different try and catches, store a AsyncTasksFinished +# for the automatic incremental parse procedure # TODO: 25-02-2020 document this function. @shared_task(bind=True) def parse_source_by_id_task(self, dataset_id, force=False, @@ -126,14 +271,25 @@ def parse_source_by_id_task(self, dataset_id, force=False, validation_status__critical__lte=0) # NOQA: E501 dataset = dataset.first() dataset.process(force_reparse=force) + + # Save a row to the AsyncTasksFinished table. + AsyncTasksFinished.objects.create() except AttributeError: print('no dataset found') + + # Save a row to the AsyncTasksFinished table. + AsyncTasksFinished.objects.create() pass else: try: dataset = Dataset.objects.get(pk=dataset_id) dataset.process(force_reparse=force) + + # Save a row to the AsyncTasksFinished table. + AsyncTasksFinished.objects.create() except Dataset.DoesNotExist: + # Save a row to the AsyncTasksFinished table. + AsyncTasksFinished.objects.create() pass except Exception as exc: raise self.retry(kwargs={'dataset_id': dataset_id, 'force': True}, @@ -826,6 +982,8 @@ def download_file(d): # print str(e) doc.document_content = document_content.decode("latin-1") doc.save() + + # # # @shared_task diff --git a/OIPA/task_queue/utils.py b/OIPA/task_queue/utils.py index 4dfa183f0..36b133ec9 100644 --- a/OIPA/task_queue/utils.py +++ b/OIPA/task_queue/utils.py @@ -1,5 +1,11 @@ +import time + from celery.task.control import inspect +from iati_synchroniser.models import ( + AsyncTasksFinished, DatasetDownloadsStarted +) + class Tasks: """ @@ -78,3 +84,57 @@ def extract(obj, arr, key): results = extract(obj, arr, key) return results + + +def reset_automatic_incremental_parse_dbs(): + dds = DatasetDownloadsStarted.objects.all() + dds.delete() + ddf = AsyncTasksFinished.objects.all() + ddf.delete() + + +# Await asynchronous subtasks from other tasks. Started is the number of +# elements that are expected to be in +def await_async_subtasks(started=-1, started_not_set=True): + check_iteration_count = 0 + check_iteration_maximum = 3 + check_previous_finished_length = 0 + check_grace_iteration_count = 0 + check_grace_iteration_maximum = 10 + check_grace_maximum_disparity = 10 + while True: + # Get the size of the started datasets + if not started_not_set: + started = len(DatasetDownloadsStarted.objects.all()) + finished = len(AsyncTasksFinished.objects.all()) + + # Check if the grace should take effect. + # Grace is when the number of failed tasks is very small but the + # number of finished tasks no longer changes. This makes sure that + # the automatic parsing does not get stuck waiting for an unfinished + # async task. + if finished == check_previous_finished_length: + check_grace_iteration_count += 1 + if check_grace_iteration_count == check_grace_iteration_maximum: + if started - finished < check_grace_maximum_disparity: + break + else: # More async tasks than expected failed, + # exit automatic parsing + return True + else: + check_grace_iteration_count = 0 + + # Check if the async tasks are done + if started == finished: + if finished == check_previous_finished_length: + check_iteration_count += 1 + if check_iteration_count == check_iteration_maximum: + break + else: + check_iteration_count = 0 + + # Wait a minute and check again + time.sleep(60) + check_previous_finished_length = finished + # After this while loop finishes, we clear the DatasetDownloads tables + reset_automatic_incremental_parse_dbs() diff --git a/OIPA/task_queue/validation.py b/OIPA/task_queue/validation.py index 15f6f3e14..c4eec0bbb 100644 --- a/OIPA/task_queue/validation.py +++ b/OIPA/task_queue/validation.py @@ -9,7 +9,7 @@ from django.conf import settings from requests.exceptions import RequestException -from iati_synchroniser.models import Dataset +from iati_synchroniser.models import AsyncTasksFinished, Dataset # Get an instance of a logger logger = logging.getLogger(__name__) @@ -51,6 +51,9 @@ def run(self, dataset_id=None, *args, **kwargs): if self._check(): self._updated() + # Save a row to the AsyncTasksFinished table. + AsyncTasksFinished.objects.create() + # We don't do ad-hoc validation anymore # else: # # Upload file