Skip to content

Commit

Permalink
Fix issue 603 (#606)
Browse files Browse the repository at this point in the history
* Add a QGIS server QgsAccessControlFilter for check the layer ACL.

* Fix test for layer acl QGIS server filter.

* Restyling code.

* Add tests for WFS service.

---------

Co-authored-by: wlorenzetti <[email protected]>
  • Loading branch information
wlorenzetti and wlorenzetti authored Sep 28, 2023
1 parent 6963b6a commit a311932
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 12 deletions.
2 changes: 2 additions & 0 deletions g3w-admin/caching/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ def test_tilestache_api(self):
client = Client()
layer = Layer.objects.get(project=self.project.instance, qgs_layer_id='spatialite_points20190604101052075')
assign_perm('view_project', self.anonymoususer, self.project.instance)
for l in self.project.instance.layer_set.all():
assign_perm("view_layer", self.anonymoususer, l)

# active caching for layer
cachinglayer = G3WCachingLayer.objects.create(app_name='qdjango', layer_id=layer.pk)
Expand Down
1 change: 1 addition & 0 deletions g3w-admin/qdjango/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def auth_request(self, **kwargs):
try:
ba = BasicAuthentication()
user, other = ba.authenticate(self.request)
self.request.user = user
return user.has_perm('qdjango.view_project', self.project)
except Exception as e:
print(e)
Expand Down
50 changes: 50 additions & 0 deletions g3w-admin/qdjango/server_filters/accesscontrol/layer_acl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# coding=utf-8
"""" Che layer acl
.. note:: This program is free software; you can redistribute it and/or modify
it under the terms of the Mozilla Public License 2.0.
"""

__author__ = "[email protected]"
__date__ = "2023-09-25"
__copyright__ = "Copyright 2015 - 2023, Gis3w"
__license__ = "MPL 2.0"

from guardian.shortcuts import get_perms
from qgis.server import QgsAccessControlFilter
from qgis.core import QgsMessageLog, Qgis
from qdjango.apps import QGS_SERVER
from qdjango.models import Layer


class LayerAclAccessControlFilter(QgsAccessControlFilter):
"""Filter layer by ACL properties"""

def __init__(self, server_iface):
super().__init__(server_iface)

def layerPermissions(self, layer):

rights = QgsAccessControlFilter.LayerPermissions()

try:
qdjango_layer = Layer.objects.get(
project=QGS_SERVER.project, qgs_layer_id=layer.id())

# Check permission
perms = get_perms(QGS_SERVER.user, qdjango_layer)
rights.canRead = "view_layer" in perms
rights.canInsert = "add_layer" in perms
rights.canUpdate = "change_layer" in perms
rights.canDelete = "delete_layer" in perms

except Layer.DoesNotExist:
pass

return rights


# Register the filter, keep a reference because of the garbage collector
layeracl_filter = LayerAclAccessControlFilter(QGS_SERVER.serverInterface())
# Note: this should be the last filter, set the priority to 10000
QGS_SERVER.serverInterface().registerAccessControl(layeracl_filter, 10010)
Binary file modified g3w-admin/qdjango/tests/data/geodata/qgis_widget_test_data.gpkg
Binary file not shown.
63 changes: 58 additions & 5 deletions g3w-admin/qdjango/tests/test_constraints.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,31 @@ def _check_subset_string(self, login=True):

return is_rome

def _check_wfs_getfeature(self, login=True):
"""Check for ROME in the returned content"""

ows_url = reverse('OWS:ows', kwargs={'group_slug': self.qdjango_project.group.slug,
'project_type': 'qdjango', 'project_id': self.qdjango_project.id})

c = Client()
if login:
self.assertTrue(c.login(username='admin01', password='admin01'))
response = c.get(ows_url, {
'REQUEST': 'GetFeature',
'SERVICE': 'WFS',
'VERSION': '1.1.0',
'TYPENAME': 'world'
})

is_rome = b"ROME" in response.content
# Now query another location to make sure the whole layer was not invalidated
assert b"BERLIN" in response.content

if login:
c.logout()

return is_rome


class SingleLayerSubsetStringConstraints(TestSingleLayerConstraintsBase):
"""Test single layer subset string constraints"""
Expand All @@ -172,6 +197,7 @@ def test_user_constraint(self):
"""Test model with user constraint"""

self.assertTrue(self._check_subset_string())
self.assertTrue(self._check_wfs_getfeature())

admin01 = self.test_user1
constraint = SingleLayerConstraint(layer=self.world, active=True)
Expand All @@ -196,6 +222,7 @@ def test_user_constraint(self):
admin01, self.world.pk), "(NAME != 'ITALY')")

self.assertFalse(self._check_subset_string())
self.assertFalse(self._check_wfs_getfeature())

self.assertEqual(constraint.layer_name, 'world')
self.assertEqual(constraint.qgs_layer_id, 'world20181008111156525')
Expand All @@ -222,6 +249,7 @@ def test_user_constraint(self):
admin01, self.world.pk), "(NAME != 'ITALY')")

self.assertFalse(self._check_subset_string())
self.assertFalse(self._check_wfs_getfeature())

self.assertEqual(constraint.layer_name, 'world')
self.assertEqual(constraint.qgs_layer_id, 'world20181008111156525')
Expand Down Expand Up @@ -252,12 +280,16 @@ def test_user_constraint(self):

# for OGC service only in v an ve context
self.assertTrue(self._check_subset_string())
self.assertTrue(self._check_wfs_getfeature())

def test_anonymoususer_constraint(self):
"""Test for anonymous user"""

# For AnonymousUser
assign_perm('view_project', get_anonymous_user(), self.qdjango_project)
for l in self.qdjango_project.layer_set.all():
assign_perm('view_layer', get_anonymous_user(), l)

self.assertTrue(self._check_subset_string(login=False))

constraint_anonymous = SingleLayerConstraint(layer=self.world, active=True)
Expand All @@ -267,11 +299,13 @@ def test_anonymoususer_constraint(self):
rule_anonymous.save()

self.assertFalse(self._check_subset_string(login=False))
self.assertFalse(self._check_wfs_getfeature(login=False))

def test_group_constraint(self):
"""Test model with group constraint"""

self.assertTrue(self._check_subset_string())
self.assertTrue(self._check_wfs_getfeature())

admin01 = self.test_user1
group1 = admin01.groups.all()[0]
Expand All @@ -298,6 +332,7 @@ def test_group_constraint(self):
admin01, world.pk), "(NAME != 'ITALY')")

self.assertFalse(self._check_subset_string())
self.assertFalse(self._check_wfs_getfeature())

@skipIf(IS_QGIS_3_10, "In QGIS 3.10 setSubsetString() always returns True")
def test_validate_sql(self):
Expand Down Expand Up @@ -395,6 +430,7 @@ def test_user_constraint(self):
"""Test model with user constraint"""

self.assertTrue(self._check_subset_string())
self.assertTrue(self._check_wfs_getfeature())

admin01 = self.test_user1
world = self.world
Expand All @@ -420,6 +456,7 @@ def test_user_constraint(self):
admin01, world.pk), "(NAME != 'ITALY')")

self.assertFalse(self._check_subset_string())
self.assertFalse(self._check_wfs_getfeature())

self.assertEqual(constraint.layer_name, 'world')
self.assertEqual(constraint.qgs_layer_id, 'world20181008111156525')
Expand All @@ -438,6 +475,7 @@ def test_user_constraint(self):
admin01, world.pk, context='e'), "(NAME != 'ITALY')")

self.assertFalse(self._check_subset_string())
self.assertFalse(self._check_wfs_getfeature())

self.assertEqual(constraint.layer_name, 'world')
self.assertEqual(constraint.qgs_layer_id, 'world20181008111156525')
Expand All @@ -458,6 +496,7 @@ def test_user_constraint(self):
admin01, world.pk, context='e'), "(NAME != 'ITALY')")

self.assertTrue(self._check_subset_string())
self.assertTrue(self._check_wfs_getfeature())

self.assertEqual(constraint.layer_name, 'world')
self.assertEqual(constraint.qgs_layer_id, 'world20181008111156525')
Expand All @@ -468,7 +507,11 @@ def test_anonymoususer_constraint(self):

# For AnonymousUser
assign_perm('view_project', get_anonymous_user(), self.qdjango_project)
for l in self.qdjango_project.layer_set.all():
assign_perm('view_layer', get_anonymous_user(), l)

self.assertTrue(self._check_subset_string(login=False))
self.assertTrue(self._check_wfs_getfeature(login=False))

constraint_anonymous = SingleLayerConstraint(layer=self.world, active=True)
constraint_anonymous.save()
Expand All @@ -477,11 +520,13 @@ def test_anonymoususer_constraint(self):
rule_anonymous.save()

self.assertFalse(self._check_subset_string(login=False))
self.assertFalse(self._check_wfs_getfeature(login=False))

def test_group_constraint(self):
"""Test model with group constraint"""

self.assertTrue(self._check_subset_string())
self.assertTrue(self._check_wfs_getfeature())

admin01 = self.test_user1
group1 = admin01.groups.all()[0]
Expand All @@ -508,6 +553,7 @@ def test_group_constraint(self):
admin01, world.pk), "(NAME != 'ITALY')")

self.assertFalse(self._check_subset_string())
self.assertFalse(self._check_wfs_getfeature())

# context view + editing ve
# =========================
Expand All @@ -529,6 +575,7 @@ def test_group_constraint(self):
admin01, world.pk), "(NAME != 'ITALY')")

self.assertFalse(self._check_subset_string())
self.assertFalse(self._check_wfs_getfeature())

# context editing e
# =========================
Expand All @@ -554,6 +601,7 @@ def test_group_constraint(self):

# for OWS service only for context v and ve
self.assertTrue(self._check_subset_string())
self.assertTrue(self._check_wfs_getfeature())


def test_validate_sql(self):
Expand Down Expand Up @@ -768,6 +816,8 @@ def test_shp_api(self):
# =============================

assign_perm('view_project', get_anonymous_user(), self.qdjango_project)
for l in self.qdjango_project.layer_set.all():
assign_perm('view_layer', get_anonymous_user(), l)

rule = ConstraintExpressionRule(
constraint=constraint, user=get_anonymous_user(), rule="NAME != 'ITALY'", anonymoususer=True)
Expand Down Expand Up @@ -982,6 +1032,8 @@ def test_xls_api(self):
# -----------------

assign_perm('view_project', get_anonymous_user(), self.qdjango_project)
for l in self.qdjango_project.layer_set.all():
assign_perm('view_layer', get_anonymous_user(), l)

rule = ConstraintExpressionRule(
constraint=constraint, user=get_anonymous_user(), rule="NAME != 'ITALY'", anonymoususer=True)
Expand Down Expand Up @@ -1614,6 +1666,7 @@ def test_bbox_filter(self):
rule="intersects_bbox( $geometry, geom_from_wkt( 'POLYGON((8 51, 11 51, 11 52, 11 52, 8 51))') )")
rule.save()
self.assertFalse(self._check_subset_string())
self.assertFalse(self._check_wfs_getfeature())

rule.delete()

Expand Down Expand Up @@ -1773,12 +1826,12 @@ def test_geoconstraint_filter(self):
constraint.save()

# assign permissions
assign_perm('view_project', self.test_viewer1, self.qdjango_project)
assign_perm('view_project', self.test_viewer1_3, self.qdjango_project)
assign_perm('view_project', self.test_gu_viewer1, self.qdjango_project)

# also to Anonymous user
assign_perm('view_project', get_anonymous_user(), self.qdjango_project)
for u in (self.test_viewer1, self.test_viewer1_3, self.test_gu_viewer1, get_anonymous_user()):
assign_perm('view_project', u, self.qdjango_project)
for l in self.qdjango_project.layer_set.all():
assign_perm("view_layer", u, l)


ows_url = reverse('OWS:ows', kwargs={'group_slug': self.qdjango_project.group.slug,
'project_type': 'qdjango', 'project_id': self.qdjango_project.id})
Expand Down
68 changes: 61 additions & 7 deletions g3w-admin/qdjango/tests/test_ows.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,19 @@ class OwsTest(QdjangoTestBase):
def setUpTestData(cls):

super().setUpTestData()
cls.qdjango_project = Project(
qgis_file=cls.project.qgisProjectFile,
title='Test qdjango project',
group=cls.project_group,
)
cls.qdjango_project.save()
#cls.qdjango_project = Project(
# qgis_file=cls.project.qgisProjectFile,
# title='Test qdjango project',
# group=cls.project_group,
#)
#cls.qdjango_project.save()

cls.project2 = QgisProject(cls.project.qgisProjectFile)
cls.project2.title = "Test qdjango project"
cls.project2.group = cls.project_group
cls.project2.save()

cls.qdjango_project = cls.project2.instance

qgis_project_file_widget = File(open('{}{}{}'.format(
CURRENT_PATH, TEST_BASE_PATH, QGS310_WIDGET_FILE), 'r'))
Expand Down Expand Up @@ -174,6 +181,8 @@ def test_authorizzer(self):

# give permission to user
assign_perm('view_project', self.test_viewer1, self.qdjango_project)
for l in self.qdjango_project.layer_set.all():
assign_perm("view_layer", self.test_viewer1, l)

response = c.get(ows_url, {
'REQUEST': 'GetCapabilities',
Expand All @@ -188,14 +197,59 @@ def test_authorizzer(self):
# try basic authentication
# for viewer1
c = Client(HTTP_AUTHORIZATION='Basic dmlld2VyMTp2aWV3ZXIx')
esponse = c.get(ows_url, {
response = c.get(ows_url, {
'REQUEST': 'GetCapabilities',
'SERVICE': 'WMS'
})

self.assertEqual(response.status_code, 200)
self.assertTrue(b'<Name>bluemarble</Name>' in response.content)

# Filter layer by user
for l in self.qdjango_project.layer_set.filter(name__in=['bluemarble', 'world']):
remove_perm("view_layer", self.test_viewer1, l)

response = c.get(ows_url, {
"REQUEST": "GetCapabilities",
"SERVICE": "WMS"
})

self.assertEqual(response.status_code, 200)
self.assertFalse(b'<Name>bluemarble</Name>' in response.content)
self.assertFalse(b"<Name>world</Name>" in response.content)
self.assertTrue(b"<Name>spatialite_points</Name>" in response.content)

# For WFS
response = c.get(ows_url, {
"REQUEST": "GetCapabilities",
"SERVICE": "WFS",
"VERSION": "1.1.0",
"TYPENAME": "world"
})

self.assertEqual(response.status_code, 200)
self.assertFalse(b"<Name>world</Name>" in response.content)
self.assertTrue(b"<Name>spatialite_points</Name>" in response.content)

response = c.get(ows_url, {
"REQUEST": "GetCapabilities",
"SERVICE": "WFS"
})

self.assertEqual(response.status_code, 200)

for l in self.qdjango_project.layer_set.filter(name='world'):
assign_perm("view_layer", self.test_viewer1, l)

response = c.get(ows_url, {
"REQUEST": "GetCapabilities",
"SERVICE": "WFS"
})

self.assertEqual(response.status_code, 200)
self.assertTrue(b"<Name>world</Name>" in response.content)


def test_get_getfeatureinfo(self):
"""Test GetFeatureInfo for QGIS widget"""

Expand Down

0 comments on commit a311932

Please sign in to comment.