Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agendar tarefa assíncrona para persistir requests bloqueadas #446

Merged
merged 15 commits into from
Sep 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ run_django:
run_rqworker:
python manage.py rqworker --sentry-dsn=""

run_scheduler:
python manage.py rqscheduler

run: clear_cache
make -j2 run_django run_rqworker
make -j3 run_django run_rqworker run_scheduler

.PHONY: black clear_cache run_django run_rqworker run
1 change: 1 addition & 0 deletions Procfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
web: bin/web.sh
worker: bin/worker.sh
scheduler: bin/scheduler.sh
release: bin/release.sh
1 change: 1 addition & 0 deletions bin/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ set -o pipefail
set -o nounset

python manage.py migrate --no-input
python manage.py schedule_traffic_control_jobs
7 changes: 7 additions & 0 deletions bin/scheduler.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

set -o errexit
set -o pipefail
set -o nounset

python manage.py rqscheduler
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ whitenoise==5.0.1
sorl-thumbnail==12.6.3
django-ratelimit==3.0.1
django-admin-rangefilter==0.6.2
rq-scheduler==0.10.0
53 changes: 53 additions & 0 deletions traffic_control/blocked_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import json
from collections import deque

from cached_property import cached_property
from django.conf import settings
from django_redis import get_redis_connection


class BlockedRequestList:
"""
Singleton indirection between our code and Redis to isolate how we're enqueing the request
data to be further read by the system. Doing this also allow us to unit test it without
relying on complicated mocking strategies. Import as:

from traffic_control.blocked_list import blocked_requests
"""

def __init__(self):
self._requests_data = deque()

@cached_property
def redis_conn(self):
if settings.RQ_BLOCKED_REQUESTS_LIST:
return get_redis_connection("default")

def lpush(self, request_data):
if self.redis_conn:
self.redis_conn.lpush(settings.RQ_BLOCKED_REQUESTS_LIST, json.dumps(request_data))
else:
self._requests_data.appendleft(request_data)
print(f"BLOCKED REQUEST - Response {request_data}")

def lpop(self):
if self.redis_conn:
return json.loads(self.redis_conn.lpop(settings.RQ_BLOCKED_REQUESTS_LIST))
else:
return self._requests_data.popleft()

def __len__(self):
if self.redis_conn:
return self.redis_conn.llen(settings.RQ_BLOCKED_REQUESTS_LIST)
else:
return len(self._requests_data)

def clear(self):
if self.redis_conn:
while len(self) > 0:
self.lpop()
else:
self._requests_data.clear()


blocked_requests = BlockedRequestList()
32 changes: 32 additions & 0 deletions traffic_control/commands.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,42 @@
from cached_property import cached_property
from django.conf import settings
from tqdm import tqdm

from traffic_control.blocked_list import blocked_requests
from traffic_control.cloudflare import Cloudflare
from traffic_control.models import BlockedRequest


class PersistBlockedRequestsCommand:
@classmethod
def execute(cls, batch_size=10):
self = cls()
requests, counter = [], 0

progress = tqdm("Reading requests...")
while len(blocked_requests):
requests.append(blocked_requests.lpop())
if len(requests) == batch_size:
self.persist_requests(requests)
counter += batch_size
requests = []
progress.update()

if requests:
self.persist_requests(requests)
counter += len(requests)
berinhard marked this conversation as resolved.
Show resolved Hide resolved
progress.update()
progress.close()

if counter:
print(f"New {counter} BlockedRequests were created!")
else:
print("There aren't new blocked requests.")

def persist_requests(self, requests):
BlockedRequest.objects.bulk_create([BlockedRequest.from_request_data(r) for r in requests])


class UpdateBlockedIPsCommand:
def __init__(self, account_name, rule_name):
self.cf = Cloudflare(settings.CLOUDFLARE_AUTH_EMAIL, settings.CLOUDFLARE_AUTH_KEY)
Expand Down
18 changes: 8 additions & 10 deletions traffic_control/logging.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import json
from traffic_control.blocked_list import blocked_requests

from django.conf import settings
from django_redis import get_redis_connection


def log_blocked_request(request, response_status_code):
def format_request(request, response_status_code):
user = getattr(request, "user", None)

request_data = {
Expand All @@ -16,8 +13,9 @@ def log_blocked_request(request, response_status_code):
"http": {key: value for key, value in request.META.items() if key.lower().startswith("http_")},
}
request_data["http"]["remote-addr"] = request.META.get("REMOTE_ADDR", "").strip()
if settings.RQ_BLOCKED_REQUESTS_LIST:
conn = get_redis_connection("default")
conn.lpush(settings.RQ_BLOCKED_REQUESTS_LIST, json.dumps(request_data))
else:
print(f"BLOCKED REQUEST - Response {response_status_code}: {request_data}")
return request_data


def log_blocked_request(request, response_status_code):
request_data = format_request(request, response_status_code)
blocked_requests.lpush(request_data)
21 changes: 2 additions & 19 deletions traffic_control/management/commands/persist_blocked_requests.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,10 @@
import json

from django.conf import settings
from django.core.management.base import BaseCommand
from django_redis import get_redis_connection
from tqdm import tqdm

from traffic_control.models import BlockedRequest
from traffic_control.commands import PersistBlockedRequestsCommand


class Command(BaseCommand):
help = "Read blocked requests from redis and persist them on potgres"

def handle(self, *args, **kwargs):
conn = get_redis_connection("default")
all_requests = []
cache_key = settings.RQ_BLOCKED_REQUESTS_LIST
progress = tqdm("Reading requests...")
while conn.llen(cache_key) > 0:
data = json.loads(conn.lpop(cache_key))
all_requests.append(data)
progress.update()
progress.close()

print(f"Bulk inserting {len(all_requests)} entries")
BlockedRequest.objects.bulk_create([BlockedRequest.from_request_data(request_data=req) for req in all_requests])
print("Done")
PersistBlockedRequestsCommand.execute()
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import django_rq
from django.core.management.base import BaseCommand
from django.utils import timezone

from traffic_control import tasks


class Command(BaseCommand):
help = "Schedule recurrent traffic control jobs"

def schedule(self, func, interval):
scheduler = django_rq.get_scheduler("default")
job = scheduler.schedule(
scheduled_time=timezone.now(), func=tasks.persist_blocked_requests_task, interval=interval, repeat=None,
)
print(f"Task {func.__name__} scheduled as {job}")

def handle(self, *args, **kwargs):
self.schedule(tasks.persist_blocked_requests_task, 300)
self.schedule(tasks.update_blocked_ips_task, 3600)
3 changes: 2 additions & 1 deletion traffic_control/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
from copy import deepcopy
from itertools import chain

from django.db import models
Expand Down Expand Up @@ -43,7 +44,7 @@ class BlockedRequest(models.Model):

@classmethod
def from_request_data(cls, request_data):
obj = cls(request_data=request_data)
obj = cls(request_data=deepcopy(request_data))
headers = {key.lower(): value for key, value in dict(request_data.pop("headers", [])).items()}
query_string = dict(request_data.pop("query_string", []))

Expand Down
13 changes: 13 additions & 0 deletions traffic_control/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from django_rq import job

from traffic_control.commands import PersistBlockedRequestsCommand, UpdateBlockedIPsCommand


@job
def persist_blocked_requests_task():
PersistBlockedRequestsCommand.execute()


@job
def update_blocked_ips_task():
UpdateBlockedIPsCommand.execute()
42 changes: 42 additions & 0 deletions traffic_control/tests/test_blocked_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import json
from unittest.mock import Mock, patch

from django.test import TestCase, override_settings
from redis.client import Redis

from traffic_control.blocked_list import blocked_requests


class BlockedRequestListTests(TestCase):
def test_in_memory_enqueueing(self):
blocked_requests.clear()

msg_1, msg_2 = {"path": "/"}, {"path": "/about/"}
blocked_requests.lpush(msg_1)
blocked_requests.lpush(msg_2)

assert 2 == len(blocked_requests)
assert msg_2 == blocked_requests.lpop()
assert msg_1 == blocked_requests.lpop()
assert 0 == len(blocked_requests)

@override_settings(RQ_BLOCKED_REQUESTS_LIST="blocked_list")
@patch("traffic_control.blocked_list.get_redis_connection")
def test_redis_enqueing(self, mocked_get_conn):
blocked_requests.__dict__.pop("redis_conn", None)
conn = Mock(Redis, autospec=True)
mocked_get_conn.return_value = conn
msg = {"path": "/"}
json_msg = json.dumps(msg)

blocked_requests.lpush(msg)

mocked_get_conn.assert_called_once_with("default")
conn.lpush.assert_called_once_with("blocked_list", json_msg)

conn.lpop.return_value = json_msg
data = blocked_requests.lpop()
assert msg == data

conn.lpop.assert_called_once_with("blocked_list")
blocked_requests.__dict__.pop("redis_conn", None)
25 changes: 25 additions & 0 deletions traffic_control/tests/test_commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from django.test import RequestFactory, TestCase

from traffic_control.blocked_list import blocked_requests
from traffic_control.commands import PersistBlockedRequestsCommand
from traffic_control.logging import format_request
from traffic_control.models import BlockedRequest


class PersistBlockedRequestsCommandTests(TestCase):
def setUp(self):
self.factory = RequestFactory()

def test_purge_all_blocked_requests(self):
assert not BlockedRequest.objects.all().exists()
blocked_requests._requests_data.clear()
req_1 = format_request(self.factory.get("/"), 429)
req_2 = format_request(self.factory.get("/login/"), 429)
blocked_requests.lpush(req_1)
blocked_requests.lpush(req_2)

PersistBlockedRequestsCommand.execute()

assert 2 == BlockedRequest.objects.count()
assert BlockedRequest.objects.filter(path="/").exists()
assert BlockedRequest.objects.filter(path="/login/").exists()
73 changes: 73 additions & 0 deletions traffic_control/tests/test_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from unittest.mock import Mock

import pytest
from django.contrib.auth import get_user_model
from django.test import RequestFactory

from traffic_control.blocked_list import blocked_requests
from traffic_control.logging import format_request, log_blocked_request


@pytest.fixture
def request_factory():
return RequestFactory()


def test_format_simplest_request(request_factory):
request = request_factory.get("/")
data = format_request(request, 200)

assert [] == data["query_string"]
assert "/" == data["path"]
assert [("Cookie", "")] == data["headers"]
assert 200 == data["response_status_code"]
assert data["user_id"] is None
assert {"remote-addr": "127.0.0.1", "HTTP_COOKIE": ""} == data["http"]


def test_format_request_query_string(request_factory):
request = request_factory.get("/", data={"arg1": "foo", "arg2": "bar"})
data = format_request(request, 200)

assert [("arg1", "foo"), ("arg2", "bar")] == data["query_string"]


def test_format_custom_headers(request_factory):
request = request_factory.get("/", HTTP_FOO=42, HTTP_BAR="data")
data = format_request(request, 200)
headers = data["headers"]

assert 3 == len(headers)
assert ("Foo", 42) in headers
assert ("Bar", "data") in headers
assert ("Cookie", "") in headers
assert 42 == data["http"]["HTTP_FOO"]
assert "data" == data["http"]["HTTP_BAR"]


def test_format_user_id_for_authenticated_request(request_factory):
request = request_factory.get("/")
request.user = Mock(get_user_model(), id=42)
data = format_request(request, 200)

assert 42 == data["user_id"]


def test_fail_safe_if_no_remote_addr(request_factory):
request = request_factory.get("/", HTTP_FOO=42, HTTP_BAR="data")
request.META.pop("REMOTE_ADDR")

data = format_request(request, 200)

assert "" == data["http"]["remote-addr"]


def test_logging_enqueue_message_to_be_processed(request_factory):
blocked_requests.clear()

request = request_factory.get("/", HTTP_FOO=42, HTTP_BAR="data")
log_blocked_request(request, 429)

assert 1 == len(blocked_requests)
assert format_request(request, 429) == blocked_requests.lpop()
assert 0 == len(blocked_requests)
Loading