Skip to content

Commit

Permalink
[Bugfix] Access control override filter expression instead of combine
Browse files Browse the repository at this point in the history
  • Loading branch information
github-actions[bot] authored and nyalldawson committed Dec 14, 2020
1 parent 9fb3e33 commit c595db7
Show file tree
Hide file tree
Showing 4 changed files with 2,085 additions and 1,242 deletions.
2 changes: 1 addition & 1 deletion src/server/qgsaccesscontrol.cpp
Expand Up @@ -75,7 +75,7 @@ void QgsAccessControl::filterFeatures( const QgsVectorLayer *layer, QgsFeatureRe

if ( !expression.isEmpty() )
{
featureRequest.setFilterExpression( expression );
featureRequest.combineFilterExpression( expression );
}
}

Expand Down
7 changes: 6 additions & 1 deletion tests/src/python/test_qgsserver_accesscontrol.py
Expand Up @@ -54,7 +54,12 @@ def layerFilterExpression(self, layer):
if not self._active:
return super(RestrictedAccessControl, self).layerFilterExpression(layer)

return "$id = 1" if layer.name() == "Hello" else None
if layer.name() == "Hello":
return "$id = 1"
elif layer.name() == "Hello_Filter":
return "pkuid = 6 or pkuid = 7"
else:
return None

def layerFilterSubsetString(self, layer):
""" Return an additional subset string (typically SQL) filter """
Expand Down
270 changes: 270 additions & 0 deletions tests/src/python/test_qgsserver_accesscontrol_wfs.py
Expand Up @@ -127,11 +127,67 @@ def test_wfs_getfeature_hello2(self):
self.assertTrue(
str(response).find("<qgs:pk>2</qgs:pk>") != -1,
"No result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:pk>1</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)

response, headers = self._post_restricted(data)
self.assertFalse(
str(response).find("<qgs:pk>2</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:pk>1</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)

def test_wfs_getfeature_filter(self):
data = """<?xml version="1.0" encoding="UTF-8"?>
<wfs:GetFeature {xml_ns}>
<wfs:Query typeName="Hello_Filter" srsName="EPSG:3857" xmlns:feature="http://www.qgis.org/gml">
<ogc:Filter xmlns:ogc="http://www.opengis.net/ogc"><ogc:PropertyIsEqualTo>
<ogc:PropertyName>pkuid</ogc:PropertyName>
<ogc:Literal>1</ogc:Literal>
</ogc:PropertyIsEqualTo></ogc:Filter></wfs:Query></wfs:GetFeature>""".format(xml_ns=XML_NS)

response, headers = self._post_fullaccess(data)
self.assertTrue(
str(response).find("<qgs:pk>1</qgs:pk>") != -1,
"No result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:pk>6</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)

response, headers = self._post_restricted(data)
self.assertFalse(
str(response).find("<qgs:pk>1</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:pk>6</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)

def test_wfs_getfeature_filter2(self):
data = """<?xml version="1.0" encoding="UTF-8"?>
<wfs:GetFeature {xml_ns}>
<wfs:Query typeName="Hello_Filter" srsName="EPSG:3857" xmlns:feature="http://www.qgis.org/gml">
<ogc:Filter xmlns:ogc="http://www.opengis.net/ogc"><ogc:PropertyIsEqualTo>
<ogc:PropertyName>pkuid</ogc:PropertyName>
<ogc:Literal>6</ogc:Literal>
</ogc:PropertyIsEqualTo></ogc:Filter></wfs:Query></wfs:GetFeature>""".format(xml_ns=XML_NS)

response, headers = self._post_fullaccess(data)
self.assertTrue(
str(response).find("<qgs:pk>6</qgs:pk>") != -1,
"No result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:pk>7</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)

response, headers = self._post_restricted(data)
self.assertTrue(
str(response).find("<qgs:pk>6</qgs:pk>") != -1,
"No result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:pk>7</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)

def test_wfs_getfeature_country(self):
data = """<?xml version="1.0" encoding="UTF-8"?>
Expand Down Expand Up @@ -284,6 +340,220 @@ def test_wfs_getfeature_project_subsetstring3(self):
str(response).find("<qgs:pk>") != -1,
"Project based layer subsetString not respected in GetFeature with restricted access\n%s" % response)

def test_wfs_getfeature_exp_filter_hello(self):
query_string = "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WFS",
"VERSION": "1.0.0",
"REQUEST": "GetFeature",
"TYPENAME": "Hello",
"EXP_FILTER": "pkuid = 1"
}.items())])

response, headers = self._get_fullaccess(query_string)
self.assertTrue(
str(response).find("<qgs:pk>1</qgs:pk>") != -1,
"No result in GetFeature\n%s" % response)
self.assertTrue(
str(response).find("<qgs:color>red</qgs:color>") != -1, # spellok
"No color in result of GetFeature\n%s" % response)

response, headers = self._get_restricted(query_string)
self.assertTrue(
str(response).find("<qgs:pk>1</qgs:pk>") != -1,
"No result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:color>red</qgs:color>") != -1, # spellok
"Unexpected color in result of GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:color>NULL</qgs:color>") != -1, # spellok
"Unexpected color NULL in result of GetFeature\n%s" % response)

def test_wfs_getfeature_exp_filter_hello2(self):
query_string = "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WFS",
"VERSION": "1.0.0",
"REQUEST": "GetFeature",
"TYPENAME": "Hello",
"EXP_FILTER": "pkuid = 2"
}.items())])

response, headers = self._get_fullaccess(query_string)
self.assertTrue(
str(response).find("<qgs:pk>2</qgs:pk>") != -1,
"No result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:pk>1</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)

response, headers = self._get_restricted(query_string)
self.assertFalse(
str(response).find("<qgs:pk>2</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:pk>1</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)

def test_wfs_getfeature_exp_filter_hello_filter(self):
query_string = "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WFS",
"VERSION": "1.0.0",
"REQUEST": "GetFeature",
"TYPENAME": "Hello_Filter",
"EXP_FILTER": "pkuid = 1"
}.items())])

response, headers = self._get_fullaccess(query_string)
self.assertTrue(
str(response).find("<qgs:pk>1</qgs:pk>") != -1,
"No result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:pk>6</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)

response, headers = self._get_restricted(query_string)
self.assertFalse(
str(response).find("<qgs:pk>1</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:pk>6</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)

def test_wfs_getfeature_exp_filter_hello_filter2(self):
query_string = "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WFS",
"VERSION": "1.0.0",
"REQUEST": "GetFeature",
"TYPENAME": "Hello_Filter",
"EXP_FILTER": "pkuid = 6"
}.items())])

response, headers = self._get_fullaccess(query_string)
self.assertTrue(
str(response).find("<qgs:pk>6</qgs:pk>") != -1,
"No result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:pk>7</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)

response, headers = self._get_restricted(query_string)
self.assertTrue(
str(response).find("<qgs:pk>6</qgs:pk>") != -1,
"No result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:pk>7</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)

def test_wfs_getfeature_featureid_hello(self):
query_string = "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WFS",
"VERSION": "1.0.0",
"REQUEST": "GetFeature",
"TYPENAME": "Hello",
"FEATUREID": "Hello.1"
}.items())])

response, headers = self._get_fullaccess(query_string)
self.assertTrue(
str(response).find("<qgs:pk>1</qgs:pk>") != -1,
"No result in GetFeature\n%s" % response)
self.assertTrue(
str(response).find("<qgs:color>red</qgs:color>") != -1, # spellok
"No color in result of GetFeature\n%s" % response)

response, headers = self._get_restricted(query_string)
self.assertTrue(
str(response).find("<qgs:pk>1</qgs:pk>") != -1,
"No result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:color>red</qgs:color>") != -1, # spellok
"Unexpected color in result of GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:color>NULL</qgs:color>") != -1, # spellok
"Unexpected color NULL in result of GetFeature\n%s" % response)

def test_wfs_getfeature_featureid_hello(self):
query_string = "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WFS",
"VERSION": "1.0.0",
"REQUEST": "GetFeature",
"TYPENAME": "Hello",
"FEATUREID": "Hello.2"
}.items())])

response, headers = self._get_fullaccess(query_string)
self.assertTrue(
str(response).find("<qgs:pk>2</qgs:pk>") != -1,
"No result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:pk>1</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)

response, headers = self._get_restricted(query_string)
self.assertFalse(
str(response).find("<qgs:pk>2</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:pk>1</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)

def test_wfs_getfeature_featureid_hello_filter(self):
query_string = "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WFS",
"VERSION": "1.0.0",
"REQUEST": "GetFeature",
"TYPENAME": "Hello_Filter",
"FEATUREID": "Hello_Filter.1"
}.items())])

response, headers = self._get_fullaccess(query_string)
self.assertTrue(
str(response).find("<qgs:pk>1</qgs:pk>") != -1,
"No result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:pk>6</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)

response, headers = self._get_restricted(query_string)
self.assertFalse(
str(response).find("<qgs:pk>1</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:pk>6</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)

def test_wfs_getfeature_featureid_hello_filter2(self):
query_string = "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(self.projectPath),
"SERVICE": "WFS",
"VERSION": "1.0.0",
"REQUEST": "GetFeature",
"TYPENAME": "Hello_Filter",
"FEATUREID": "Hello_Filter.6"
}.items())])

response, headers = self._get_fullaccess(query_string)
self.assertTrue(
str(response).find("<qgs:pk>6</qgs:pk>") != -1,
"No result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:pk>7</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)

response, headers = self._get_restricted(query_string)
self.assertTrue(
str(response).find("<qgs:pk>6</qgs:pk>") != -1,
"No result in GetFeature\n%s" % response)
self.assertFalse(
str(response).find("<qgs:pk>7</qgs:pk>") != -1,
"Unexpected result in GetFeature\n%s" % response)


if __name__ == "__main__":
unittest.main()

0 comments on commit c595db7

Please sign in to comment.