Skip to content

Commit

Permalink
some fixes to the server access control tests on windows
Browse files Browse the repository at this point in the history
  • Loading branch information
jef-n committed Nov 23, 2015
1 parent 7bcc935 commit b10e708
Showing 1 changed file with 43 additions and 17 deletions.
60 changes: 43 additions & 17 deletions tests/src/python/test_qgsserver_accesscontrol.py
Expand Up @@ -24,6 +24,7 @@
from qgis.core import QgsRenderChecker
from PyQt4.QtCore import QSize
import tempfile
import urllib


XML_NS = \
Expand All @@ -37,7 +38,7 @@
'xmlns:gml="http://www.opengis.net/gml" ' \
'xmlns:ows="http://www.opengis.net/ows" '

WFS_TRANSATION_INSERT = """<?xml version="1.0" encoding="UTF-8"?>
WFS_TRANSACTION_INSERT = """<?xml version="1.0" encoding="UTF-8"?>
<wfs:Transaction {xml_ns}>
<wfs:Insert idgen="GenerateNew">
<qgs:db_point>
Expand All @@ -52,7 +53,7 @@
</wfs:Insert>
</wfs:Transaction>""".format(x=1000, y=2000, name="test", color="{color}", xml_ns=XML_NS)

WFS_TRANSATION_UPDATE = """<?xml version="1.0" encoding="UTF-8"?>
WFS_TRANSACTION_UPDATE = """<?xml version="1.0" encoding="UTF-8"?>
<wfs:Transaction {xml_ns}>
<wfs:Update typeName="db_point">
<wfs:Property>
Expand Down Expand Up @@ -80,31 +81,31 @@ class RestrictedAccessControl(QgsAccessControlFilter):
""" Used to have restriction access """

# Be able to deactivate the access control to have a reference point
_acctive = False
_active = False

def __init__(self, server_iface):
super(QgsAccessControlFilter, self).__init__(server_iface)

def layerFilterExpression(self, layer):
""" Return an additional expression filter """

if not self._acctive:
if not self._active:
return super(RestrictedAccessControl, self).layerFilterExpression(layer)

return "$id = 1" if layer.name() == "Hello" else None

def layerFilterSubsetString(self, layer):
""" Return an additional subset string (typically SQL) filter """

if not self._acctive:
if not self._active:
return super(RestrictedAccessControl, self).layerFilterSubsetString(layer)

return "pk = 1" if layer.name() == "Hello_SubsetString" else None

def layerPermissions(self, layer):
""" Return the layer rights """

if not self._acctive:
if not self._active:
return super(RestrictedAccessControl, self).layerPermissions(layer)

rh = self.serverInterface().requestHandler()
Expand All @@ -125,7 +126,7 @@ def layerPermissions(self, layer):
def authorizedLayerAttributes(self, layer, attributes):
""" Return the authorised layer attributes """

if not self._acctive:
if not self._active:
return super(RestrictedAccessControl, self).authorizedLayerAttributes(layer, attributes)

if "colour" in attributes:
Expand All @@ -135,13 +136,13 @@ def authorizedLayerAttributes(self, layer, attributes):
def allowToEdit(self, layer, feature):
""" Are we authorise to modify the following geometry """

if not self._acctive:
if not self._active:
return super(RestrictedAccessControl, self).allowToEdit(layer, feature)

return feature.attribute("color") in ["red", "yellow"]

def cacheKey(self):
return "r" if self._acctive else "f"
return "r" if self._active else "f"


server = QgsServer()
Expand All @@ -158,10 +159,11 @@ def setUp(self):

copyfile(path.join(self.testdata_path, "helloworld.db"), path.join(self.testdata_path, "_helloworld.db"))

if "QUERY_STRING" in environ:
del environ["QUERY_STRING"]
for k in ["QUERY_STRING", "QGIS_PROJECT_FILE"]:
if k in environ:
del environ[k]

environ["QGIS_PROJECT_FILE"] = path.join(self.testdata_path, "project.qgs")
self.projectPath = urllib.quote( path.join(self.testdata_path, "project.qgs") )

def tearDown(self):
copyfile(path.join(self.testdata_path, "_helloworld.db"), path.join(self.testdata_path, "helloworld.db"))
Expand All @@ -170,6 +172,7 @@ def tearDown(self):

def test_wms_getcapabilities(self):
query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WMS",
"VERSION": "1.1.1",
"REQUEST": "GetCapabilities"
Expand All @@ -193,6 +196,7 @@ def test_wms_getcapabilities(self):

def test_wms_describelayer_hello(self):
query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WMS",
"VERSION": "1.1.1",
"REQUEST": "DescribeLayer",
Expand All @@ -212,6 +216,7 @@ def test_wms_describelayer_hello(self):

def test_wms_describelayer_country(self):
query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WMS",
"VERSION": "1.1.1",
"REQUEST": "DescribeLayer",
Expand All @@ -231,6 +236,7 @@ def test_wms_describelayer_country(self):

def test_wms_getlegendgraphic_hello(self):
query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WMS",
"VERSION": "1.1.1",
"REQUEST": "GetLegendGraphic",
Expand All @@ -246,6 +252,7 @@ def test_wms_getlegendgraphic_hello(self):

def test_wms_getlegendgraphic_country(self):
query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WMS",
"VERSION": "1.1.1",
"REQUEST": "GetLegendGraphic",
Expand All @@ -267,6 +274,7 @@ def test_wms_getlegendgraphic_country(self):

def test_wms_getmap(self):
query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WMS",
"VERSION": "1.1.1",
"REQUEST": "GetMap",
Expand All @@ -283,6 +291,7 @@ def test_wms_getmap(self):
self._img_diff_error(response, headers, "WMS_GetMap")

query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WMS",
"VERSION": "1.1.1",
"REQUEST": "GetMap",
Expand All @@ -298,6 +307,7 @@ def test_wms_getmap(self):
self._img_diff_error(response, headers, "Restricted_WMS_GetMap")

query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WMS",
"VERSION": "1.1.1",
"REQUEST": "GetMap",
Expand All @@ -320,6 +330,7 @@ def test_wms_getmap(self):

def test_wms_getfeatureinfo_hello(self):
query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WMS",
"VERSION": "1.1.1",
"REQUEST": "GetFeatureInfo",
Expand Down Expand Up @@ -358,6 +369,7 @@ def test_wms_getfeatureinfo_hello(self):

def test_wms_getfeatureinfo_hello2(self):
query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WMS",
"VERSION": "1.1.1",
"REQUEST": "GetFeatureInfo",
Expand Down Expand Up @@ -387,6 +399,7 @@ def test_wms_getfeatureinfo_hello2(self):

def test_wms_getfeatureinfo_country(self):
query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WMS",
"VERSION": "1.1.1",
"REQUEST": "GetFeatureInfo",
Expand Down Expand Up @@ -418,6 +431,7 @@ def test_wms_getfeatureinfo_country(self):

def test_wfs_getcapabilities(self):
query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WFS",
"VERSION": "1.1.0",
"REQUEST": "GetCapabilities"
Expand All @@ -441,6 +455,7 @@ def test_wfs_getcapabilities(self):

def test_wfs_describefeaturetype_hello(self):
query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WFS",
"VERSION": "1.1.0",
"REQUEST": "DescribeFeatureType",
Expand All @@ -459,6 +474,7 @@ def test_wfs_describefeaturetype_hello(self):

def test_wfs_describefeaturetype_country(self):
query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WFS",
"VERSION": "1.1.0",
"REQUEST": "DescribeFeatureType",
Expand Down Expand Up @@ -546,6 +562,7 @@ def test_wfs_getfeature_country(self):

def test_wcs_getcapabilities(self):
query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WCS",
"VERSION": "1.0.0",
"REQUEST": "GetCapabilities",
Expand All @@ -562,6 +579,7 @@ def test_wcs_getcapabilities(self):
"No dem layer in WCS/GetCapabilities\n%s" % response)

query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WCS",
"VERSION": "1.0.0",
"REQUEST": "GetCapabilities",
Expand All @@ -575,6 +593,7 @@ def test_wcs_getcapabilities(self):

def test_wcs_describecoverage(self):
query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WCS",
"VERSION": "1.0.0",
"REQUEST": "DescribeCoverage",
Expand All @@ -592,6 +611,7 @@ def test_wcs_describecoverage(self):
"No dem layer in DescribeCoverage\n%s" % response)

query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WCS",
"VERSION": "1.0.0",
"REQUEST": "DescribeCoverage",
Expand All @@ -606,6 +626,7 @@ def test_wcs_describecoverage(self):

def test_wcs_getcoverage(self):
query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WCS",
"VERSION": "1.0.0",
"REQUEST": "GetCoverage",
Expand Down Expand Up @@ -634,6 +655,7 @@ def test_wcs_getcoverage(self):
"Image for GetCoverage is wrong")

query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WCS",
"VERSION": "1.0.0",
"REQUEST": "GetCoverage",
Expand All @@ -658,7 +680,7 @@ def test_wcs_getcoverage(self):
# # WFS/Transactions # #

def test_wfstransaction_insert(self):
data = WFS_TRANSATION_INSERT.format(x=1000, y=2000, name="test", color="{color}", xml_ns=XML_NS)
data = WFS_TRANSACTION_INSERT.format(x=1000, y=2000, name="test", color="{color}", xml_ns=XML_NS)
self._test_colors({1: "blue"})

response, headers = self._post_fullaccess(data.format(color="red"))
Expand Down Expand Up @@ -698,7 +720,7 @@ def test_wfstransaction_insert(self):
self._test_colors({3: "yellow"})

def test_wfstransaction_update(self):
data = WFS_TRANSATION_UPDATE.format(id="1", color="{color}", xml_ns=XML_NS)
data = WFS_TRANSACTION_UPDATE.format(id="1", color="{color}", xml_ns=XML_NS)
self._test_colors({1: "blue"})

response, headers = self._post_restricted(data.format(color="yellow"))
Expand Down Expand Up @@ -774,7 +796,7 @@ def test_wfstransaction_delete_restricted(self):
'<ServiceException code="Security">Feature modify permission denied</ServiceException>') != -1,
"WFS/Transactions Delete succeed\n%s" % response)

data_update = WFS_TRANSATION_UPDATE.format(id="1", color="red", xml_ns=XML_NS)
data_update = WFS_TRANSACTION_UPDATE.format(id="1", color="red", xml_ns=XML_NS)
response, headers = self._post_fullaccess(data_update)
self._test_colors({1: "red"})

Expand All @@ -800,6 +822,7 @@ def test_wfstransaction_delete_restricted(self):
# # WMS # # WMS # # WMS # #
def test_wms_getmap_subsetstring(self):
query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WMS",
"VERSION": "1.1.1",
"REQUEST": "GetMap",
Expand All @@ -816,6 +839,7 @@ def test_wms_getmap_subsetstring(self):
self._img_diff_error(response, headers, "WMS_GetMap")

query_string = "&".join(["%s=%s" % i for i in {
"MAP": self.projectPath,
"SERVICE": "WMS",
"VERSION": "1.1.1",
"REQUEST": "GetMap",
Expand Down Expand Up @@ -949,7 +973,7 @@ def test_wfs_getfeature_subsetstring2(self):
"Unexpected result in GetFeature\n%s" % response)

def _handle_request(self, restricted, *args):
accesscontrol._acctive = restricted
accesscontrol._active = restricted
result = self._result(server.handleRequest(*args))
return result

Expand Down Expand Up @@ -1027,8 +1051,10 @@ def _geo_img_diff(self, image_1, image_2):
with open(path.join(tempfile.gettempdir(), image_2), "w") as f:
f.write(image_1)
image_1 = gdal.Open(path.join(tempfile.gettempdir(), image_2), GA_ReadOnly)
assert image_1, "No output image written: " + image_2

image_2 = gdal.Open(self.testdata_path + "/results/" + image_2, GA_ReadOnly)
image_2 = gdal.Open(path.join(self.testdata_path, "results", image_2), GA_ReadOnly)
assert image_1, "No expected image found:" + image_2

if image_1.RasterXSize != image_2.RasterXSize:
return 1000 # wrong size
Expand Down

0 comments on commit b10e708

Please sign in to comment.