Skip to content

Commit

Permalink
Merge pull request #2630 from zimmerman-team/develop
Browse files Browse the repository at this point in the history
Official Release of automatic incremental parse
  • Loading branch information
sylvanr authored Jun 28, 2021
2 parents ab255e9 + 0b9ba75 commit 1e14edd
Show file tree
Hide file tree
Showing 9 changed files with 299 additions and 15 deletions.
18 changes: 9 additions & 9 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,13 @@ workflows:
test_and_deploy:
jobs:
- test
- deploy_staging:
requires:
- test
filters:
branches:
only:
- develop
# - deploy_staging:
# requires:
# - test
# filters:
# branches:
# only:
# - develop
- deploy_production_1:
requires:
- test
Expand All @@ -208,8 +208,8 @@ workflows:
only:
- master
- prerelease:
requires:
- deploy_staging
# requires:
# - deploy_staging
filters:
branches:
only:
Expand Down
8 changes: 6 additions & 2 deletions OIPA/iati_synchroniser/dataset_syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
create_publisher_organisation
)
from iati_synchroniser.models import (
Dataset, DatasetFailedPickup, DatasetUpdateDates, Publisher
Dataset, DatasetDownloadsStarted, DatasetFailedPickup, DatasetUpdateDates,
Publisher
)
from task_queue.tasks import DatasetDownloadTask

Expand Down Expand Up @@ -136,7 +137,10 @@ def update_or_create_dataset(self, dataset):
if not len(dataset['resources']) or not dataset['organization']:
return

# Pass the data generated here to the
# Add a row to the DatasetDownloadsStarted table
DatasetDownloadsStarted.objects.create()

# Pass the data generated here to the DatasetDownloadTask
DatasetDownloadTask.delay(dataset_data=dataset)

def remove_deprecated(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Generated by Django 2.0.13 on 2021-03-31 10:29

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('iati_synchroniser', '0023_auto_20210311_1111'),
]

operations = [
migrations.CreateModel(
name='DatasetDownloadsFinished',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('timestamp', models.DateTimeField(auto_now=True)),
],
),
migrations.CreateModel(
name='DatasetDownloadsStarted',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('timestamp', models.DateTimeField(auto_now=True)),
],
),
]
17 changes: 17 additions & 0 deletions OIPA/iati_synchroniser/migrations/0025_auto_20210331_1603.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Generated by Django 2.0.13 on 2021-03-31 16:03

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
('iati_synchroniser', '0024_datasetdownloadsfinished_datasetdownloadsstarted'),
]

operations = [
migrations.RenameModel(
old_name='DatasetDownloadsFinished',
new_name='AsyncTasksFinished',
),
]
12 changes: 12 additions & 0 deletions OIPA/iati_synchroniser/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,18 @@ class DatasetUpdateDates(models.Model):
null=False, blank=True, auto_now=False)


# This model is added for the automation of the incremental parsing procedure
class DatasetDownloadsStarted(models.Model):
timestamp = models.DateTimeField(
null=False, blank=True, auto_now=True)


# This model is added for the automation of the incremental parsing procedure
class AsyncTasksFinished(models.Model):
timestamp = models.DateTimeField(
null=False, blank=True, auto_now=True)


class Codelist(models.Model):
name = models.CharField(primary_key=True, max_length=100)
description = models.TextField(max_length=1000, blank=True, null=True)
Expand Down
5 changes: 4 additions & 1 deletion OIPA/task_queue/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from django.utils.encoding import smart_text

from iati_synchroniser.models import (
Dataset, DatasetFailedPickup, Publisher, filetype_choices
AsyncTasksFinished, Dataset, DatasetFailedPickup, Publisher,
filetype_choices
)

# Get an instance of a logger
Expand Down Expand Up @@ -300,6 +301,8 @@ def run(self, dataset_data, *args, **kwargs):
'internal_url': internal_url
}
)
# Save a row to the AsyncTasksFinished table.
AsyncTasksFinished.objects.create()

# URL string to save as a Dataset attribute:
return os.path.join(main_download_dir, filename)
162 changes: 160 additions & 2 deletions OIPA/task_queue/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
from iati.models import Activity, Budget, Document, DocumentLink, Result
from iati.transaction.models import Transaction
from iati_organisation.models import Organisation
from iati_synchroniser.models import Dataset, DatasetNote, DatasetUpdateDates
from iati_synchroniser.models import (
AsyncTasksFinished, Dataset, DatasetNote, DatasetUpdateDates
)
from OIPA.celery import app
from solr.activity.tasks import ActivityTaskIndexing
from solr.activity.tasks import solr as solr_activity
Expand All @@ -40,7 +42,9 @@
from solr.transaction.tasks import solr as solr_transaction
from solr.transaction_sector.tasks import solr as solr_transaction_sector
from task_queue.download import DatasetDownloadTask
from task_queue.utils import Tasks
from task_queue.utils import (
Tasks, await_async_subtasks, reset_automatic_incremental_parse_dbs
)
from task_queue.validation import DatasetValidationTask

# Get an instance of a logger
Expand Down Expand Up @@ -82,6 +86,145 @@ def add_activity_to_solr(activity_id):
# All Registered Celery tasks that are actively used
# TODO: 25-02-2020 rename tasks
#
# This task 'automatically' does a complete parse and index.
# Meaning the different steps that were previously manual are all included.
@shared_task
def automatic_incremental_parse(start_at=1):
"""
There are several steps that need to be taken to complete an incremental
parse/index.
1. Import datasets task (async, not able to directly tell when finished)
2. Drop old datasets task (when is this finished?)
3. Dataset validation task (when does it start? when does it finish?)
4. Parse all datasets task (when does it finish?)
Solutions:
1 - When running the 'import datasets task', separate 'download dataset
tasks' are fired. To track whether or not all of them finished - we
add a table which gets filled from the 'update_or_create_dataset'
function which in turn queues a DatasetDownloadTask. We add a table
which gets filled when the DatasetDownloadTask finishes, leading to
eventually having two tables with an equal number of rows. To make sure
it is not a false positive, check three times whether or not the
size of both tables remains the same and still matches (to catch cases
like '5 datasets found in the IATI registry and 5 datasets downloaded).
Also, we include a 'grace' filter, where we try to catch failed datasets.
If the finished datasets hasn't changed in 10 iterations, we assume they
have failed. As we expect 0 failed downloads, but historically have seen
a maximum of four datasetDownloadTasks failing, we allow a difference of
TEN at maximum.
2 - Simple, just fire the task. Not async
3 - We check whether or not the dataset validation on the validation
endpoint has started by checking their API. If this API endpoint returns
empty, it means the validation endpoint is not running. The steps to
take to ensure the validation has completed are: Check if it has started
validating datasets. If it has not started yet, wait until it has started
If it has started, wait until it no longer returns any data. Then confirm
that it has stopped returning data by checking several times in a row.
Then, we need to actually do the validation step. Start off that task,
then re-use the logic from step one: we know the number of validation task
as it is the same as the number of existing datasets.
Make the DatasetValidationTask update a table with a row, check if the
number of rows matches the number of existing datasets.
4 - We know that the parse all sources task queues a
parse_source_by_id_task for each of the existing datasets.
We can simply reuse the logic from the previous step to check which
of the datasets have been parsed, and when all async tasks finish, we have
completed the incremental parse/index. The process can then start anew.
"""
# Before starting, reset the databases we use for checks, in case they
# Still contain data.
reset_automatic_incremental_parse_dbs()

# STEP ONE -- Import Datasets #
if start_at == 1:
# Start the task
get_new_sources_from_iati_api_task()

too_many_failed = await_async_subtasks(started_not_set=False)
if too_many_failed:
return "Too many dataset imports failed"
# STEP ONE -- End #

# STEP TWO -- DROP OLD DATASETS #
if start_at in (1, 2):
drop_old_datasets()
# STEP TWO -- End #

# STEP THREE -- DATASET VALIDATION TASK #
# Prepare checks
if start_at in (1, 2, 3):
check_validation_has_started = False
check_validation_is_active = False
check_empty_iteration_count = 0
check_empty_iteration_maximum = 3

while True:
url = "https://iativalidator.iatistandard.org/api/v1/queue/next"
response = requests.get(url, timeout=30)

# If the response is not 200, reset and check back later.
if response.status_code != 200:
check_validation_has_started = False
check_validation_is_active = False
time.sleep(60)
continue

check_content_is_empty = response.content.decode("utf-8") == ""

"""
Case 1: content empty - started = false - active = false
wait for the validator to start
Case 2: content has data - started = false - active = false
set started to true and active to true
Case 3: content has data - started = true - active = true
wait for the content to stop having data!
Case 4: content empty - started = true - active = true
with three iterations, confirm the content is actually empty!
set active to false.
"""
# if check_content_is_empty and not check_validation_has_started and not check_validation_is_active: # NOQA: E501
if not check_content_is_empty and not check_validation_has_started and not check_validation_is_active: # NOQA: E501
check_validation_has_started = True
check_validation_is_active = True
if not check_content_is_empty and check_validation_has_started and check_validation_is_active: # NOQA: E501
check_empty_iteration_count = 0
if check_content_is_empty and check_validation_has_started and check_validation_is_active: # NOQA: E501
if check_empty_iteration_count < check_empty_iteration_maximum:
check_empty_iteration_count += 1
else: # Validation has finished
break
time.sleep(60)

# Now that the "waiting for validator to finish" loop is over, we know
# The validator is finished. Run the task. To reduce complexity, reuse
# the AsyncTasksFinished table.
get_validation_results_task()

started = len(Dataset.objects.all())
await_async_subtasks(started)
# STEP THREE -- End #

# STEP FOUR -- PARSE ALL DATASETS #
if start_at in (1, 2, 3, 4):
# parse_all_existing_sources_task() does not actually run the parsing,
# Reusing the code here.
for dataset in Dataset.objects.all().filter(filetype=2):
parse_source_by_id_task.delay(dataset_id=dataset.id,
force=False,
check_validation=True)
for dataset in Dataset.objects.all().filter(filetype=1):
parse_source_by_id_task.delay(dataset_id=dataset.id,
force=False,
check_validation=True)
await_async_subtasks(started)
# STEP FOUR -- End #

# Restart the automatic_incremental_parse asynchronously and end this task.
automatic_incremental_parse.apply_async()


# This task updates all of the currency exchange rates in the local database
@shared_task
def update_exchange_rates():
Expand Down Expand Up @@ -115,6 +258,8 @@ def get_validation_results_task():


# This task is used to parse a specific dataset by passing it an ID.
# For all of the different try and catches, store a AsyncTasksFinished
# for the automatic incremental parse procedure
# TODO: 25-02-2020 document this function.
@shared_task(bind=True)
def parse_source_by_id_task(self, dataset_id, force=False,
Expand All @@ -126,14 +271,25 @@ def parse_source_by_id_task(self, dataset_id, force=False,
validation_status__critical__lte=0) # NOQA: E501
dataset = dataset.first()
dataset.process(force_reparse=force)

# Save a row to the AsyncTasksFinished table.
AsyncTasksFinished.objects.create()
except AttributeError:
print('no dataset found')

# Save a row to the AsyncTasksFinished table.
AsyncTasksFinished.objects.create()
pass
else:
try:
dataset = Dataset.objects.get(pk=dataset_id)
dataset.process(force_reparse=force)

# Save a row to the AsyncTasksFinished table.
AsyncTasksFinished.objects.create()
except Dataset.DoesNotExist:
# Save a row to the AsyncTasksFinished table.
AsyncTasksFinished.objects.create()
pass
except Exception as exc:
raise self.retry(kwargs={'dataset_id': dataset_id, 'force': True},
Expand Down Expand Up @@ -826,6 +982,8 @@ def download_file(d):
# print str(e)
doc.document_content = document_content.decode("latin-1")
doc.save()


#
#
# @shared_task
Expand Down
Loading

0 comments on commit 1e14edd

Please sign in to comment.