From a311932e4e092dfdc355d456bbeec91d53022c39 Mon Sep 17 00:00:00 2001 From: Walter Lorenzetti Date: Thu, 28 Sep 2023 09:16:32 +0200 Subject: [PATCH] Fix issue 603 (#606) * Add a QGIS server QgsAccessControlFilter for check the layer ACL. * Fix test for layer acl QGIS server filter. * Restyling code. * Add tests for WFS service. --------- Co-authored-by: wlorenzetti --- g3w-admin/caching/tests/test_api.py | 2 + g3w-admin/qdjango/auth.py | 1 + .../server_filters/accesscontrol/layer_acl.py | 50 +++++++++++++ .../data/geodata/qgis_widget_test_data.gpkg | Bin 122880 -> 122880 bytes g3w-admin/qdjango/tests/test_constraints.py | 63 ++++++++++++++-- g3w-admin/qdjango/tests/test_ows.py | 68 ++++++++++++++++-- 6 files changed, 172 insertions(+), 12 deletions(-) create mode 100644 g3w-admin/qdjango/server_filters/accesscontrol/layer_acl.py diff --git a/g3w-admin/caching/tests/test_api.py b/g3w-admin/caching/tests/test_api.py index a12af58cd..e54a9a6ed 100644 --- a/g3w-admin/caching/tests/test_api.py +++ b/g3w-admin/caching/tests/test_api.py @@ -80,6 +80,8 @@ def test_tilestache_api(self): client = Client() layer = Layer.objects.get(project=self.project.instance, qgs_layer_id='spatialite_points20190604101052075') assign_perm('view_project', self.anonymoususer, self.project.instance) + for l in self.project.instance.layer_set.all(): + assign_perm("view_layer", self.anonymoususer, l) # active caching for layer cachinglayer = G3WCachingLayer.objects.create(app_name='qdjango', layer_id=layer.pk) diff --git a/g3w-admin/qdjango/auth.py b/g3w-admin/qdjango/auth.py index 6ac45feee..0ab70f6b2 100644 --- a/g3w-admin/qdjango/auth.py +++ b/g3w-admin/qdjango/auth.py @@ -25,6 +25,7 @@ def auth_request(self, **kwargs): try: ba = BasicAuthentication() user, other = ba.authenticate(self.request) + self.request.user = user return user.has_perm('qdjango.view_project', self.project) except Exception as e: print(e) diff --git a/g3w-admin/qdjango/server_filters/accesscontrol/layer_acl.py b/g3w-admin/qdjango/server_filters/accesscontrol/layer_acl.py new file mode 100644 index 000000000..0aa90bbf6 --- /dev/null +++ b/g3w-admin/qdjango/server_filters/accesscontrol/layer_acl.py @@ -0,0 +1,50 @@ +# coding=utf-8 +"""" Che layer acl +.. note:: This program is free software; you can redistribute it and/or modify + it under the terms of the Mozilla Public License 2.0. + +""" + +__author__ = "lorenzetti@gis3w.it" +__date__ = "2023-09-25" +__copyright__ = "Copyright 2015 - 2023, Gis3w" +__license__ = "MPL 2.0" + +from guardian.shortcuts import get_perms +from qgis.server import QgsAccessControlFilter +from qgis.core import QgsMessageLog, Qgis +from qdjango.apps import QGS_SERVER +from qdjango.models import Layer + + +class LayerAclAccessControlFilter(QgsAccessControlFilter): + """Filter layer by ACL properties""" + + def __init__(self, server_iface): + super().__init__(server_iface) + + def layerPermissions(self, layer): + + rights = QgsAccessControlFilter.LayerPermissions() + + try: + qdjango_layer = Layer.objects.get( + project=QGS_SERVER.project, qgs_layer_id=layer.id()) + + # Check permission + perms = get_perms(QGS_SERVER.user, qdjango_layer) + rights.canRead = "view_layer" in perms + rights.canInsert = "add_layer" in perms + rights.canUpdate = "change_layer" in perms + rights.canDelete = "delete_layer" in perms + + except Layer.DoesNotExist: + pass + + return rights + + +# Register the filter, keep a reference because of the garbage collector +layeracl_filter = LayerAclAccessControlFilter(QGS_SERVER.serverInterface()) +# Note: this should be the last filter, set the priority to 10000 +QGS_SERVER.serverInterface().registerAccessControl(layeracl_filter, 10010) \ No newline at end of file diff --git a/g3w-admin/qdjango/tests/data/geodata/qgis_widget_test_data.gpkg b/g3w-admin/qdjango/tests/data/geodata/qgis_widget_test_data.gpkg index 0dac454a15cefe93cc49b0994e09c11aeae334b2..69b5fbbd8c9358c12465cc6aa480c2ab3d8c0f20 100644 GIT binary patch delta 47 zcmZoTz}|3xT_!ltC$l6~AuYcsH?c&)m_dMnk&(ecL4kpR`N>2XXT~Ru30o5w7u*K` DSd|W) delta 47 zcmZoTz}|3xT_!ltC$l6~AuYcsH?c&)m_dMniHX5ML4kpR`Rqg)XU4OQ30o5w7u*K` DR^1M9 diff --git a/g3w-admin/qdjango/tests/test_constraints.py b/g3w-admin/qdjango/tests/test_constraints.py index 5d6deacf7..f215193ca 100644 --- a/g3w-admin/qdjango/tests/test_constraints.py +++ b/g3w-admin/qdjango/tests/test_constraints.py @@ -164,6 +164,31 @@ def _check_subset_string(self, login=True): return is_rome + def _check_wfs_getfeature(self, login=True): + """Check for ROME in the returned content""" + + ows_url = reverse('OWS:ows', kwargs={'group_slug': self.qdjango_project.group.slug, + 'project_type': 'qdjango', 'project_id': self.qdjango_project.id}) + + c = Client() + if login: + self.assertTrue(c.login(username='admin01', password='admin01')) + response = c.get(ows_url, { + 'REQUEST': 'GetFeature', + 'SERVICE': 'WFS', + 'VERSION': '1.1.0', + 'TYPENAME': 'world' + }) + + is_rome = b"ROME" in response.content + # Now query another location to make sure the whole layer was not invalidated + assert b"BERLIN" in response.content + + if login: + c.logout() + + return is_rome + class SingleLayerSubsetStringConstraints(TestSingleLayerConstraintsBase): """Test single layer subset string constraints""" @@ -172,6 +197,7 @@ def test_user_constraint(self): """Test model with user constraint""" self.assertTrue(self._check_subset_string()) + self.assertTrue(self._check_wfs_getfeature()) admin01 = self.test_user1 constraint = SingleLayerConstraint(layer=self.world, active=True) @@ -196,6 +222,7 @@ def test_user_constraint(self): admin01, self.world.pk), "(NAME != 'ITALY')") self.assertFalse(self._check_subset_string()) + self.assertFalse(self._check_wfs_getfeature()) self.assertEqual(constraint.layer_name, 'world') self.assertEqual(constraint.qgs_layer_id, 'world20181008111156525') @@ -222,6 +249,7 @@ def test_user_constraint(self): admin01, self.world.pk), "(NAME != 'ITALY')") self.assertFalse(self._check_subset_string()) + self.assertFalse(self._check_wfs_getfeature()) self.assertEqual(constraint.layer_name, 'world') self.assertEqual(constraint.qgs_layer_id, 'world20181008111156525') @@ -252,12 +280,16 @@ def test_user_constraint(self): # for OGC service only in v an ve context self.assertTrue(self._check_subset_string()) + self.assertTrue(self._check_wfs_getfeature()) def test_anonymoususer_constraint(self): """Test for anonymous user""" # For AnonymousUser assign_perm('view_project', get_anonymous_user(), self.qdjango_project) + for l in self.qdjango_project.layer_set.all(): + assign_perm('view_layer', get_anonymous_user(), l) + self.assertTrue(self._check_subset_string(login=False)) constraint_anonymous = SingleLayerConstraint(layer=self.world, active=True) @@ -267,11 +299,13 @@ def test_anonymoususer_constraint(self): rule_anonymous.save() self.assertFalse(self._check_subset_string(login=False)) + self.assertFalse(self._check_wfs_getfeature(login=False)) def test_group_constraint(self): """Test model with group constraint""" self.assertTrue(self._check_subset_string()) + self.assertTrue(self._check_wfs_getfeature()) admin01 = self.test_user1 group1 = admin01.groups.all()[0] @@ -298,6 +332,7 @@ def test_group_constraint(self): admin01, world.pk), "(NAME != 'ITALY')") self.assertFalse(self._check_subset_string()) + self.assertFalse(self._check_wfs_getfeature()) @skipIf(IS_QGIS_3_10, "In QGIS 3.10 setSubsetString() always returns True") def test_validate_sql(self): @@ -395,6 +430,7 @@ def test_user_constraint(self): """Test model with user constraint""" self.assertTrue(self._check_subset_string()) + self.assertTrue(self._check_wfs_getfeature()) admin01 = self.test_user1 world = self.world @@ -420,6 +456,7 @@ def test_user_constraint(self): admin01, world.pk), "(NAME != 'ITALY')") self.assertFalse(self._check_subset_string()) + self.assertFalse(self._check_wfs_getfeature()) self.assertEqual(constraint.layer_name, 'world') self.assertEqual(constraint.qgs_layer_id, 'world20181008111156525') @@ -438,6 +475,7 @@ def test_user_constraint(self): admin01, world.pk, context='e'), "(NAME != 'ITALY')") self.assertFalse(self._check_subset_string()) + self.assertFalse(self._check_wfs_getfeature()) self.assertEqual(constraint.layer_name, 'world') self.assertEqual(constraint.qgs_layer_id, 'world20181008111156525') @@ -458,6 +496,7 @@ def test_user_constraint(self): admin01, world.pk, context='e'), "(NAME != 'ITALY')") self.assertTrue(self._check_subset_string()) + self.assertTrue(self._check_wfs_getfeature()) self.assertEqual(constraint.layer_name, 'world') self.assertEqual(constraint.qgs_layer_id, 'world20181008111156525') @@ -468,7 +507,11 @@ def test_anonymoususer_constraint(self): # For AnonymousUser assign_perm('view_project', get_anonymous_user(), self.qdjango_project) + for l in self.qdjango_project.layer_set.all(): + assign_perm('view_layer', get_anonymous_user(), l) + self.assertTrue(self._check_subset_string(login=False)) + self.assertTrue(self._check_wfs_getfeature(login=False)) constraint_anonymous = SingleLayerConstraint(layer=self.world, active=True) constraint_anonymous.save() @@ -477,11 +520,13 @@ def test_anonymoususer_constraint(self): rule_anonymous.save() self.assertFalse(self._check_subset_string(login=False)) + self.assertFalse(self._check_wfs_getfeature(login=False)) def test_group_constraint(self): """Test model with group constraint""" self.assertTrue(self._check_subset_string()) + self.assertTrue(self._check_wfs_getfeature()) admin01 = self.test_user1 group1 = admin01.groups.all()[0] @@ -508,6 +553,7 @@ def test_group_constraint(self): admin01, world.pk), "(NAME != 'ITALY')") self.assertFalse(self._check_subset_string()) + self.assertFalse(self._check_wfs_getfeature()) # context view + editing ve # ========================= @@ -529,6 +575,7 @@ def test_group_constraint(self): admin01, world.pk), "(NAME != 'ITALY')") self.assertFalse(self._check_subset_string()) + self.assertFalse(self._check_wfs_getfeature()) # context editing e # ========================= @@ -554,6 +601,7 @@ def test_group_constraint(self): # for OWS service only for context v and ve self.assertTrue(self._check_subset_string()) + self.assertTrue(self._check_wfs_getfeature()) def test_validate_sql(self): @@ -768,6 +816,8 @@ def test_shp_api(self): # ============================= assign_perm('view_project', get_anonymous_user(), self.qdjango_project) + for l in self.qdjango_project.layer_set.all(): + assign_perm('view_layer', get_anonymous_user(), l) rule = ConstraintExpressionRule( constraint=constraint, user=get_anonymous_user(), rule="NAME != 'ITALY'", anonymoususer=True) @@ -982,6 +1032,8 @@ def test_xls_api(self): # ----------------- assign_perm('view_project', get_anonymous_user(), self.qdjango_project) + for l in self.qdjango_project.layer_set.all(): + assign_perm('view_layer', get_anonymous_user(), l) rule = ConstraintExpressionRule( constraint=constraint, user=get_anonymous_user(), rule="NAME != 'ITALY'", anonymoususer=True) @@ -1614,6 +1666,7 @@ def test_bbox_filter(self): rule="intersects_bbox( $geometry, geom_from_wkt( 'POLYGON((8 51, 11 51, 11 52, 11 52, 8 51))') )") rule.save() self.assertFalse(self._check_subset_string()) + self.assertFalse(self._check_wfs_getfeature()) rule.delete() @@ -1773,12 +1826,12 @@ def test_geoconstraint_filter(self): constraint.save() # assign permissions - assign_perm('view_project', self.test_viewer1, self.qdjango_project) - assign_perm('view_project', self.test_viewer1_3, self.qdjango_project) - assign_perm('view_project', self.test_gu_viewer1, self.qdjango_project) - # also to Anonymous user - assign_perm('view_project', get_anonymous_user(), self.qdjango_project) + for u in (self.test_viewer1, self.test_viewer1_3, self.test_gu_viewer1, get_anonymous_user()): + assign_perm('view_project', u, self.qdjango_project) + for l in self.qdjango_project.layer_set.all(): + assign_perm("view_layer", u, l) + ows_url = reverse('OWS:ows', kwargs={'group_slug': self.qdjango_project.group.slug, 'project_type': 'qdjango', 'project_id': self.qdjango_project.id}) diff --git a/g3w-admin/qdjango/tests/test_ows.py b/g3w-admin/qdjango/tests/test_ows.py index a0071dd11..b3ee298ed 100644 --- a/g3w-admin/qdjango/tests/test_ows.py +++ b/g3w-admin/qdjango/tests/test_ows.py @@ -50,12 +50,19 @@ class OwsTest(QdjangoTestBase): def setUpTestData(cls): super().setUpTestData() - cls.qdjango_project = Project( - qgis_file=cls.project.qgisProjectFile, - title='Test qdjango project', - group=cls.project_group, - ) - cls.qdjango_project.save() + #cls.qdjango_project = Project( + # qgis_file=cls.project.qgisProjectFile, + # title='Test qdjango project', + # group=cls.project_group, + #) + #cls.qdjango_project.save() + + cls.project2 = QgisProject(cls.project.qgisProjectFile) + cls.project2.title = "Test qdjango project" + cls.project2.group = cls.project_group + cls.project2.save() + + cls.qdjango_project = cls.project2.instance qgis_project_file_widget = File(open('{}{}{}'.format( CURRENT_PATH, TEST_BASE_PATH, QGS310_WIDGET_FILE), 'r')) @@ -174,6 +181,8 @@ def test_authorizzer(self): # give permission to user assign_perm('view_project', self.test_viewer1, self.qdjango_project) + for l in self.qdjango_project.layer_set.all(): + assign_perm("view_layer", self.test_viewer1, l) response = c.get(ows_url, { 'REQUEST': 'GetCapabilities', @@ -188,7 +197,7 @@ def test_authorizzer(self): # try basic authentication # for viewer1 c = Client(HTTP_AUTHORIZATION='Basic dmlld2VyMTp2aWV3ZXIx') - esponse = c.get(ows_url, { + response = c.get(ows_url, { 'REQUEST': 'GetCapabilities', 'SERVICE': 'WMS' }) @@ -196,6 +205,51 @@ def test_authorizzer(self): self.assertEqual(response.status_code, 200) self.assertTrue(b'bluemarble' in response.content) + # Filter layer by user + for l in self.qdjango_project.layer_set.filter(name__in=['bluemarble', 'world']): + remove_perm("view_layer", self.test_viewer1, l) + + response = c.get(ows_url, { + "REQUEST": "GetCapabilities", + "SERVICE": "WMS" + }) + + self.assertEqual(response.status_code, 200) + self.assertFalse(b'bluemarble' in response.content) + self.assertFalse(b"world" in response.content) + self.assertTrue(b"spatialite_points" in response.content) + + # For WFS + response = c.get(ows_url, { + "REQUEST": "GetCapabilities", + "SERVICE": "WFS", + "VERSION": "1.1.0", + "TYPENAME": "world" + }) + + self.assertEqual(response.status_code, 200) + self.assertFalse(b"world" in response.content) + self.assertTrue(b"spatialite_points" in response.content) + + response = c.get(ows_url, { + "REQUEST": "GetCapabilities", + "SERVICE": "WFS" + }) + + self.assertEqual(response.status_code, 200) + + for l in self.qdjango_project.layer_set.filter(name='world'): + assign_perm("view_layer", self.test_viewer1, l) + + response = c.get(ows_url, { + "REQUEST": "GetCapabilities", + "SERVICE": "WFS" + }) + + self.assertEqual(response.status_code, 200) + self.assertTrue(b"world" in response.content) + + def test_get_getfeatureinfo(self): """Test GetFeatureInfo for QGIS widget"""