With Django Datawatch you are able to implement arbitrary checks on data, review their status and even describe what to do to resolve them. Think of nagios/icinga for data.
Will execute all tasks synchronously which is not recommended but the most simple way to get started.
Will execute the tasks asynchronously using celery as a task broker and executor. Requires celery 5.0.0 or later.
Feel free to implement other task execution backends and send a pull request.
$ pip install django-datawatch
Add django_datawatch
to your INSTALLED_APPS
If the datawatch scheduler should be run using the celery beat database scheduler, you need to install django_celery_beat.
Add django_datawatch.tasks.django_datawatch_scheduler
to the CELERYBEAT_SCHEDULE
of your app.
This task should be executed every minute e.g. crontab(minute='*/1')
, see example app.
Create checks.py
inside your module.
from datetime import datetime
from celery.schedules import crontab
from django_datawatch.datawatch import datawatch
from django_datawatch.base import BaseCheck, CheckResponse
from django_datawatch.models import Result
@datawatch.register
class CheckTime(BaseCheck):
run_every = crontab(minute='*/5') # scheduler will execute this check every 5 minutes
def generate(self):
yield datetime.now()
def check(self, payload):
response = CheckResponse()
if payload.hour <= 7:
response.set_status(Result.STATUS.ok)
elif payload.hour <= 12:
response.set_status(Result.STATUS.warning)
else:
response.set_status(Result.STATUS.critical)
return response
def get_identifier(self, payload):
# payload will be our datetime object that we are getting from generate method
return payload
def get_payload(self, identifier):
# as get_identifier returns the object we don't need to process it
# we can return identifier directly
return identifier
def user_forced_refresh_hook(self, payload):
payload.do_something()
Must yield payloads to be checked. The check method will then be called for every payload.
Must return an instance of CheckResponse.
Must return a unique identifier for the payload.
A function that gets executed when the refresh is requested by a user through the ResultRefreshView
.
This is used in checks that are purely based on triggers, e.g. when a field changes the test gets executed.
Check updates for individual payloads can also be triggered when related datasets are changed. The map for update triggers is defined in the Check class' trigger_update attribute.
trigger_update = dict(subproduct=models_customer.SubProduct)
The key is a slug to define your trigger while the value is the model that issues the trigger when saved. You must implement a resolver function for each entry with the name of get__payload which returns the payload or multiple payloads (as a list) to check (same datatype as .check would expect or .generate would yield).
def get_subproduct_payload(self, instance):
return instance.product
raise this exception to skip current check. The result will not appear in the checks results.
A management command is provided to queue the execution of all checks based on their schedule. Add a crontab to run this command every minute and it will check if there's something to do.
$ ./manage.py datawatch_run_checks
$ ./manage.py datawatch_run_checks --slug=example.checks.UserHasEnoughBalance
A management command is provided to forcefully refresh all existing results for a check. This comes in handy if you changes the logic of your check and don't want to wait until the periodic execution or an update trigger.
$ ./manage.py datawatch_refresh_results
$ ./manage.py datawatch_refresh_results --slug=example.checks.UserHasEnoughBalance
$ ./manage.py datawatch_list_checks
Remove the unnecessary check results and executions if you've removed the code for a check.
$ ./manage.py datawatch_clean_up
DJANGO_DATAWATCH_BACKEND = 'django_datawatch.backends.synchronous'
DJANGO_DATAWATCH_RUN_SIGNALS = True
You can chose the backend to run the tasks. Supported are 'django_datawatch.backends.synchronous' and 'django_datawatch.backends.celery'.
Default: 'django_datawatch.backends.synchronous'
Use this setting to disable running post_save updates during unittests if required.
Default: True
Datawatch supported setting a specific queue in release < 0.4.0
With the switch to celery 4, you should use task routing to define the queue for your tasks, see http://docs.celeryproject.org/en/latest/userguide/routing.html
- Docker (19.03.0 or later)
- Compose plugin for Docker
Please make sure that no other container is using port 8000 as this is the one you're install gets exposed to: http://localhost:8000/
We've included an example app to show how django_datawatch works. Start by launching the included docker container.
docker compose up -d
Then setup the example app environment.
docker compose run --rm app migrate
docker compose run --rm app loaddata example
The installed superuser is "example" with password "datawatch".
Open http://localhost:8000/, log in and then go back to http://localhost:8000/. You'll be prompted with an empty dashboard. That's because we didn't run any checks yet. Let's enqueue an update.
docker compose run --rm app datawatch_run_checks --force
The checks for the example app are run synchronously and should be updated immediately. If you decide to switch to the celery backend, you should now start a celery worker to process the checks.
docker compose run --rm --entrypoint celery app -A example worker -l DEBUG
To execute the celery beat scheduler which runs the datawatch scheduler every minute, just run:
docker compose run --rm --entrypoint celery app -A example beat --scheduler django_celery_beat.schedulers:DatabaseScheduler
You will see some failed check now after you refreshed the dashboard view.
docker compose run --rm app test
Check for upgradeable packages by running
docker compose up -d
docker compose exec app pip-check
Collect and compile translations for all registered locales
docker compose run --rm app makemessages --no-location --all
docker compose run --rm app compilemessages
bumpversion is used to manage releases.
Add your changes to the CHANGELOG, run
docker compose exec app bumpversion <major|minor|patch>
then push (including tags).