From 9f5053f2e7e4dbcdf55897264de122f93d7e314d Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Wed, 19 Jun 2024 18:36:55 +0200 Subject: [PATCH] Fix #3402: Prevent invalid timestamps to reach the database --- kinto/core/resource/__init__.py | 8 ++++++-- tests/core/resource/test_filter.py | 9 +++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/kinto/core/resource/__init__.py b/kinto/core/resource/__init__.py index 994f43cf4..f8aaa7b15 100644 --- a/kinto/core/resource/__init__.py +++ b/kinto/core/resource/__init__.py @@ -1058,6 +1058,10 @@ def _extract_limit(self): def _extract_filters(self): """Extracts filters from QueryString parameters.""" + + def is_valid_timestamp(value): + return isinstance(value, int) or re.match(r'^"?\d+"?$', str(value)) + queryparams = self.request.validated["querystring"] filters = [] @@ -1090,7 +1094,7 @@ def _extract_filters(self): send_alert(self.request, message, url) operator = COMPARISON.LT - if value == "" or not isinstance(value, (int, str, type(None))): + if value is not None and not is_valid_timestamp(value): raise_invalid(self.request, **error_details) filters.append(Filter(self.model.modified_field, value, operator)) @@ -1127,7 +1131,7 @@ def _extract_filters(self): error_details["description"] = "Invalid character 0x00" raise_invalid(self.request, **error_details) - if field == self.model.modified_field and value == "": + if field == self.model.modified_field and not is_valid_timestamp(value): raise_invalid(self.request, **error_details) filters.append(Filter(field, value, operator)) diff --git a/tests/core/resource/test_filter.py b/tests/core/resource/test_filter.py index ba8603c90..d34eb8f0a 100644 --- a/tests/core/resource/test_filter.py +++ b/tests/core/resource/test_filter.py @@ -83,6 +83,15 @@ def test_filter_raises_error_if_last_modified_value_is_empty(self): self.validated["querystring"] = {"lt_last_modified": ""} self.assertRaises(httpexceptions.HTTPBadRequest, self.resource.plural_get) + def test_filter_raises_error_if_last_modified_value_is_not_int(self): + bad_value = "171103608603432920249' or '7127'='7127" + self.validated["querystring"] = {"last_modified": bad_value} + self.assertRaises(httpexceptions.HTTPBadRequest, self.resource.plural_get) + self.validated["querystring"] = {"_since": bad_value} + self.assertRaises(httpexceptions.HTTPBadRequest, self.resource.plural_get) + self.validated["querystring"] = {"lt_last_modified": bad_value} + self.assertRaises(httpexceptions.HTTPBadRequest, self.resource.plural_get) + def test_filter_works_with_since_none(self): self.validated["querystring"] = {"_since": None} result = self.resource.plural_get()