Skip to content

Commit

Permalink
This PR builds on the work in the initial PR to move business rules t…
Browse files Browse the repository at this point in the history
…o celery along with info learned deploying this.

Avoid filling the task queue with orchestration tasks and starving the workers.
===============================================================================

In the previous system there were about 3 layers of tasks, that orchestrated other tasks,
by using the .replace() API in each task.

Unfortunately it was possible for celery workers to become full of orchestration tasks
leaving no room for the business rule tasks at the bottom of the to actually run.

This PR attempts two mitigations:

1. Use celery workflows instead of .replace()

This PR builds a celery workflow in the check_workbasket using celery constructs such as chain and group.
In theory, since most of the work is done ahead of time the system should have more awareness of the task structure avoiding the issue of starvation.

2. Cancel existing workbasket checks when a new check is requested.

When check_workbasket is started, it will attempt to revoke existing check_workbasket tasks for the same workbasket.

Treat intermediate data structures as ephemeral
===============================================

A celery task may execute at any time, right now - or when a system comes up tomorrow, based on this assumption models such as TrackedModelCheck (which stores the result of a business rule check on a TrackedModel) are no longer passed to celery tasks by ID, instead all the information needed to receate the data is passed to the celery task, this means the system will still work even if developers delete these while it is running.

Reduce layers in business rule checking
=======================================

BusinessRuleChecker and LinkedModelsBusinessRuleChecker are now the only checkers, these now take BusinessRule instances, instead of being subclassed for each business rule.
While more parameters are passed when rules are checked a conceptual layer has been removed and the simplification is reflected with around 20 lines of code being removed from checks.py

Celery flower is now very easier to read
========================================
Due to the changes above, the output in celery flower should correspond more closely to a users intentions - ids of models.

Content Checksums
=================

Result caching now validates using checksums of the content, which should reduce the amount of checking the system needs to do.

When a workbasket has been published, it's content could invalidate some content in other unpublished workbaskets, by associating business rule checks with checksums of a models content, any models that do not clash can be skipped.

Model checksums (generated by `.content_hash()`) are not currently stored in the database (though it may be desirable to store them on TrackedModels, as it would provide an mechanism to address any content in the system).
The checksuming scheme is a combination of the type and a sha256 of the fields in `.copyable_fields` (which should represent the fields a user can edit, but not fields such as pk).
Blake3 was tested, as it provides a fast hashing algorithm, in practice it didn't provide much of a speedup over sha256.

PK ranges
=========

Occasionally workbaskets with many items may need to be checker (the initial workbasket has 9 million items).
Based on the observations that the ID column of the contained TrackedModels is mostly continguous, the system allows passing sequences of contiguous TrackedModels specified by tuples of (first_pk, last_pk).
This is relatively compact, suitable for passing over the network with celery and readable in Celery flower.

This also enables chunking of tasks - further enabled by specifying a maximum amount of items in each tuple.

On TrackedModelQueryset `.as_pk_intervals` and `.from_pk_intervals` are provided to go to and from this format.

Greets
======

This PR adapts changes and builds on the hard work done in the initial work to check the business rules with celery, thanks to Simon Worthington and the hard work of the other devs on the project.
  • Loading branch information
stuaxo committed Aug 10, 2022
1 parent e59fc8a commit 392667e
Show file tree
Hide file tree
Showing 35 changed files with 1,703 additions and 958 deletions.
341 changes: 178 additions & 163 deletions checks/checks.py

Large diffs are not rendered by default.

31 changes: 31 additions & 0 deletions checks/migrations/0004_auto_20220718_1653.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Generated by Django 3.1.14 on 2022-07-18 16:53

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
("checks", "0003_auto_20220525_1046"),
]

operations = [
migrations.RemoveField(
model_name="transactioncheck",
name="head_transaction",
),
migrations.RemoveField(
model_name="transactioncheck",
name="latest_tracked_model",
),
migrations.RemoveField(
model_name="transactioncheck",
name="transaction",
),
migrations.DeleteModel(
name="TrackedModelCheck",
),
migrations.DeleteModel(
name="TransactionCheck",
),
]
53 changes: 53 additions & 0 deletions checks/migrations/0005_trackedmodelcheck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Generated by Django 3.1.14 on 2022-08-02 20:32

import django.db.models.deletion
from django.db import migrations
from django.db import models


class Migration(migrations.Migration):

initial = True

dependencies = [
("common", "0006_modelcelerytask_taskmodel"),
("checks", "0004_auto_20220718_1653"),
]

operations = [
migrations.CreateModel(
name="TrackedModelCheck",
fields=[
(
"taskmodel_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="common.taskmodel",
),
),
("created_at", models.DateTimeField(auto_now_add=True)),
("updated_at", models.DateTimeField(auto_now=True)),
("check_name", models.CharField(max_length=255)),
("successful", models.BooleanField()),
("message", models.TextField(null=True)),
("content_hash", models.BinaryField(max_length=32, null=True)),
(
"model",
models.ForeignKey(
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="checks",
to="common.trackedmodel",
),
),
],
options={
"unique_together": {("model", "check_name")},
},
bases=("common.taskmodel", models.Model),
),
]
143 changes: 26 additions & 117 deletions checks/models.py
Original file line number Diff line number Diff line change
@@ -1,136 +1,34 @@
import logging

from django.db import models
from django.db.models import fields
from polymorphic.managers import PolymorphicManager

from checks.querysets import TransactionCheckQueryset
from checks.querysets import TrackedModelCheckQueryset
from common.models import TimestampedMixin
from common.models.celerytask import TaskModel
from common.models.trackedmodel import TrackedModel
from common.models.transactions import Transaction


class TransactionCheck(models.Model):
"""
Represents an in-progress or completed check of a transaction for
correctness.
The ``TransactionCheck`` gets created once the check starts and has a flag
to track completeness.
"""

transaction = models.ForeignKey(
Transaction,
on_delete=models.CASCADE,
related_name="checks",
)

completed = fields.BooleanField(default=False)
"""True if all of the checks expected to be carried out against the models
in this transaction have recorded any result."""

successful = fields.BooleanField(null=True)
"""
True if all of the checks carried out against the models in this
transaction returned a positive result.
This value will be null until ``completed`` is `True`.
"""

head_transaction_id: int
head_transaction = models.ForeignKey(
Transaction,
on_delete=models.CASCADE,
)
"""
The latest transaction in the stream of approved transactions (i.e. in the
REVISION partition) at the moment this check was carried out.
logger = logging.getLogger(__name__)

Once new transactions are commited and the head transaction is no longer the
latest, this check will no longer be an accurate signal of correctness
because the new transactions could include new data which would invalidate
the checks. (Unless the checked transaction < head transaction, in which
case it will always be correct.)
"""

tracked_model_count = fields.PositiveSmallIntegerField()
class TrackedModelCheck(TimestampedMixin, TaskModel):
"""
The number of tracked models in the transaction at the moment this check was
carried out.
Represents the result of running a single check against a single model.
If something is removed from the transaction later, the number of tracked
models will no longer match. This is used to detect if the check is now
stale.
Stores `content_hash`, a hash of the content for validity checking of the
stored result.
"""

latest_tracked_model = models.ForeignKey(
TrackedModel,
on_delete=models.CASCADE,
null=True,
)
"""
The latest tracked model in the transaction at the moment this check was
carried out.
If some models are removed and subsequent ones added to the transaction, the
count may be the same but the latest transaction will have a new primary
key. This is used to detect if the check is now stale.
"""

model_checks: models.QuerySet["TrackedModelCheck"]

objects: TransactionCheckQueryset = models.Manager.from_queryset(
TransactionCheckQueryset,
)()

def save(self, *args, **kwargs):
"""Computes the metadata we will need later to detect if the check is
current and fresh."""
if not self.head_transaction_id:
self.head_transaction = Transaction.approved.last()

self.tracked_model_count = self.transaction.tracked_models.count()
self.latest_tracked_model = self.transaction.tracked_models.order_by(
"pk",
).last()

return super().save(*args, **kwargs)

class Meta:
ordering = (
"transaction__partition",
"transaction__order",
"head_transaction__partition",
"head_transaction__order",
)

constraints = (
models.CheckConstraint(
check=(
models.Q(completed=False, successful__isnull=True)
| models.Q(completed=True, successful__isnull=False)
),
name="completed_checks_include_successfulness",
),
)


class TrackedModelCheck(models.Model):
"""
Represents the result of running a single check against a single model.
The ``TrackedModelCheck`` only gets created once the check is complete, and
hence success should always be known. The reason is that a single model
check is atomic (i.e. there is no smaller structure) and so it's either done
or not, and it can't be "resumed".
"""
unique_together = ("model", "check_name")

objects = PolymorphicManager.from_queryset(TrackedModelCheckQueryset)()
model = models.ForeignKey(
TrackedModel,
related_name="checks",
on_delete=models.CASCADE,
)

transaction_check = models.ForeignKey(
TransactionCheck,
on_delete=models.CASCADE,
related_name="model_checks",
on_delete=models.SET_NULL,
null=True,
)

check_name = fields.CharField(max_length=255)
Expand All @@ -141,3 +39,14 @@ class TrackedModelCheck(models.Model):

message = fields.TextField(null=True)
"""The text content returned by the check, if any."""

content_hash = models.BinaryField(max_length=32, null=True)
"""
Hash of the content ('copyable_fields') at the time the data was checked.
"""

def __str__(self):
if self.successful:
return f"{self.model} {self.check_name} [Passed at {self.updated_at}]"

return f"{self.model} {self.check_name} [Failed at {self.updated_at}, Message: {self.message}]"
Loading

0 comments on commit 392667e

Please sign in to comment.