Skip to content

Commit f1791ec

Browse files
committedNov 3, 2016
[tests] PKI auth tests on a PKI-enabled QGIS Server
1 parent d20b2a8 commit f1791ec

File tree

5 files changed

+260
-8
lines changed

5 files changed

+260
-8
lines changed
 

‎tests/src/python/CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,5 +150,7 @@ IF (WITH_SERVER)
150150
ADD_PYTHON_TEST(PyQgsServerAccessControl test_qgsserver_accesscontrol.py)
151151
ADD_PYTHON_TEST(PyQgsServerWFST test_qgsserver_wfst.py)
152152
ADD_PYTHON_TEST(PyQgsOfflineEditingWFS test_offline_editing_wfs.py)
153-
ADD_PYTHON_TEST(PyQgsAuthManagerEndpointTest test_authmanager_endpoint.py)
153+
ADD_PYTHON_TEST(PyQgsAuthManagerPasswordEndpointTest test_authmanager_password_endpoint.py)
154+
ADD_PYTHON_TEST(PyQgsAuthManagerPKIEndpointTest test_authmanager_pki_endpoint.py)
155+
ADD_PYTHON_TEST(PyQgsAuthManagerPKIPostgresTest test_authmanager_pki_postgres.py)
154156
ENDIF (WITH_SERVER)

‎tests/src/python/qgis_wrapped_server.py

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,21 @@
1313
* QGIS_SERVER_USERNAME (default ="username")
1414
* QGIS_SERVER_PASSWORD (default ="password")
1515
16+
PKI authentication with HTTPS can be enabled with:
17+
18+
* QGIS_SERVER_PKI_CERTIFICATE (server certificate)
19+
* QGIS_SERVER_PKI_KEY (server private key)
20+
* QGIS_SERVER_PKI_AUTHORITY (root CA)
21+
* QGIS_SERVER_PKI_USERNAME (valid username)
22+
23+
Sample run:
24+
25+
QGIS_SERVER_PKI_USERNAME=Gerardus QGIS_SERVER_PORT=47547 QGIS_SERVER_HOST=localhost \
26+
QGIS_SERVER_PKI_KEY=/home/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_key.pem \
27+
QGIS_SERVER_PKI_CERTIFICATE=/home/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_cert.pem \
28+
QGIS_SERVER_PKI_AUTHORITY=/home/dev/QGIS/tests/testdata/auth_system/certs_keys/chains_subissuer-issuer-root_issuer2-root2.pem \
29+
python /home/dev/QGIS/tests/src/python/qgis_wrapped_server.py
30+
1631
.. note:: This program is free software; you can redistribute it and/or modify
1732
it under the terms of the GNU General Public License as published by
1833
the Free Software Foundation; either version 2 of the License, or
@@ -31,12 +46,27 @@
3146

3247
import os
3348
import sys
49+
import ssl
3450
import urllib.parse
3551
from http.server import BaseHTTPRequestHandler, HTTPServer
3652
from qgis.server import QgsServer, QgsServerFilter
3753

3854
QGIS_SERVER_PORT = int(os.environ.get('QGIS_SERVER_PORT', '8081'))
3955
QGIS_SERVER_HOST = os.environ.get('QGIS_SERVER_HOST', '127.0.0.1')
56+
# PKI authentication
57+
QGIS_SERVER_PKI_CERTIFICATE = os.environ.get('QGIS_SERVER_PKI_CERTIFICATE')
58+
QGIS_SERVER_PKI_KEY = os.environ.get('QGIS_SERVER_PKI_KEY')
59+
QGIS_SERVER_PKI_AUTHORITY = os.environ.get('QGIS_SERVER_PKI_AUTHORITY')
60+
QGIS_SERVER_PKI_USERNAME = os.environ.get('QGIS_SERVER_PKI_USERNAME')
61+
62+
# Check if PKI - https is enabled
63+
https = (QGIS_SERVER_PKI_CERTIFICATE is not None and
64+
os.path.isfile(QGIS_SERVER_PKI_CERTIFICATE) and
65+
QGIS_SERVER_PKI_KEY is not None and
66+
os.path.isfile(QGIS_SERVER_PKI_KEY) and
67+
QGIS_SERVER_PKI_AUTHORITY is not None and
68+
os.path.isfile(QGIS_SERVER_PKI_AUTHORITY) and
69+
QGIS_SERVER_PKI_USERNAME)
4070

4171
qgs_server = QgsServer()
4272

@@ -66,8 +96,20 @@ def responseComplete(self):
6696
class Handler(BaseHTTPRequestHandler):
6797

6898
def do_GET(self):
99+
# For PKI: check the username from client certificate
100+
if https:
101+
try:
102+
ssl.match_hostname(self.connection.getpeercert(), QGIS_SERVER_PKI_USERNAME)
103+
except ssl.CertificateError as ex:
104+
print("SSL Exception %s" % ex)
105+
self.send_response(401)
106+
self.end_headers()
107+
self.wfile.write('UNAUTHORIZED')
108+
return
69109
# CGI vars:
70110
for k, v in self.headers.items():
111+
# Uncomment to print debug info about env vars passed into QGIS Server env
112+
#print('Setting ENV var %s to %s' % ('HTTP_%s' % k.replace(' ', '-').replace('-', '_').replace(' ', '-').upper(), v))
71113
qgs_server.putenv('HTTP_%s' % k.replace(' ', '-').replace('-', '_').replace(' ', '-').upper(), v)
72114
qgs_server.putenv('SERVER_PORT', str(self.server.server_port))
73115
qgs_server.putenv('SERVER_NAME', self.server.server_name)
@@ -96,7 +138,15 @@ def do_POST(self):
96138

97139
if __name__ == '__main__':
98140
server = HTTPServer((QGIS_SERVER_HOST, QGIS_SERVER_PORT), Handler)
99-
print('Starting server on %s:%s, use <Ctrl-C> to stop' %
100-
(QGIS_SERVER_HOST, server.server_port))
141+
if https:
142+
server.socket = ssl.wrap_socket(server.socket,
143+
certfile=QGIS_SERVER_PKI_CERTIFICATE,
144+
keyfile=QGIS_SERVER_PKI_KEY,
145+
ca_certs=QGIS_SERVER_PKI_AUTHORITY,
146+
cert_reqs=ssl.CERT_REQUIRED,
147+
server_side=True,
148+
ssl_version=ssl.PROTOCOL_TLSv1)
149+
print('Starting server on %s://%s:%s, use <Ctrl-C> to stop' %
150+
('https' if https else 'http', QGIS_SERVER_HOST, server.server_port))
101151
sys.stdout.flush()
102152
server.serve_forever()

‎tests/src/python/test_authmanager_endpoint.py renamed to ‎tests/src/python/test_authmanager_password_endpoint.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
configuration to access an HTTP Basic protected endpoint.
99
1010
11-
From build dir, run: ctest -R PyQgsAuthManagerEnpointTest -V
11+
From build dir, run: ctest -R PyQgsAuthManagerUsernamePasswordEnpointTest -V
1212
1313
.. note:: This program is free software; you can redistribute it and/or modify
1414
it under the terms of the GNU General Public License as published by
@@ -47,7 +47,7 @@
4747
try:
4848
QGIS_SERVER_ENDPOINT_PORT = os.environ['QGIS_SERVER_ENDPOINT_PORT']
4949
except:
50-
QGIS_SERVER_ENDPOINT_PORT = '0' # Auto
50+
QGIS_SERVER_ENDPOINT_PORT = '0' # Auto
5151

5252

5353
QGIS_AUTH_DB_DIR_PATH = tempfile.mkdtemp()
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Tests for auth manager WMS/WFS using QGIS Server through PKI
4+
enabled qgis_wrapped_server.py.
5+
6+
This is an integration test for QGIS Desktop Auth Manager WFS and WMS provider
7+
and QGIS Server WFS/WMS that check if QGIS can use a stored auth manager auth
8+
configuration to access an HTTP Basic protected endpoint.
9+
10+
11+
From build dir, run: ctest -R PyQgsAuthManagerPKIEndpointTest -V
12+
13+
.. note:: This program is free software; you can redistribute it and/or modify
14+
it under the terms of the GNU General Public License as published by
15+
the Free Software Foundation; either version 2 of the License, or
16+
(at your option) any later version.
17+
"""
18+
import os
19+
import sys
20+
import re
21+
import subprocess
22+
import tempfile
23+
import urllib
24+
25+
__author__ = 'Alessandro Pasotti'
26+
__date__ = '25/10/2016'
27+
__copyright__ = 'Copyright 2016, The QGIS Project'
28+
# This will get replaced with a git SHA1 when you do a git archive
29+
__revision__ = '$Format:%H$'
30+
31+
from shutil import rmtree
32+
33+
from utilities import unitTestDataPath, waitServer
34+
from qgis.core import (
35+
QgsAuthManager,
36+
QgsAuthMethodConfig,
37+
QgsVectorLayer,
38+
QgsRasterLayer,
39+
)
40+
41+
from PyQt4.QtNetwork import QSslCertificate
42+
43+
from qgis.testing import (
44+
start_app,
45+
unittest,
46+
)
47+
48+
try:
49+
QGIS_SERVER_ENDPOINT_PORT = os.environ['QGIS_SERVER_ENDPOINT_PORT']
50+
except:
51+
QGIS_SERVER_ENDPOINT_PORT = '0' # Auto
52+
53+
54+
QGIS_AUTH_DB_DIR_PATH = tempfile.mkdtemp()
55+
56+
os.environ['QGIS_AUTH_DB_DIR_PATH'] = QGIS_AUTH_DB_DIR_PATH
57+
58+
qgis_app = start_app()
59+
60+
61+
class TestAuthManager(unittest.TestCase):
62+
63+
@classmethod
64+
def setUpAuth(cls):
65+
"""Run before all tests and set up authentication"""
66+
authm = QgsAuthManager.instance()
67+
assert (authm.setMasterPassword('masterpassword', True))
68+
cls.sslrootcert_path = os.path.join(cls.certsdata_path, 'chains_subissuer-issuer-root_issuer2-root2.pem')
69+
cls.sslcert = os.path.join(cls.certsdata_path, 'gerardus_cert.pem')
70+
cls.sslkey = os.path.join(cls.certsdata_path, 'gerardus_key.pem')
71+
cls.auth_config = QgsAuthMethodConfig("PKI-Paths")
72+
cls.auth_config.setConfig('certpath', cls.sslcert)
73+
cls.auth_config.setConfig('keypath', cls.sslkey)
74+
cls.auth_config.setName('test_pki_auth_config')
75+
assert os.path.isfile(cls.sslcert)
76+
assert os.path.isfile(cls.sslkey)
77+
assert os.path.isfile(cls.sslrootcert_path)
78+
cls.username = 'Gerardus'
79+
cls.sslrootcert = QSslCertificate.fromPath(cls.sslrootcert_path)
80+
assert cls.sslrootcert is not None
81+
authm.storeCertAuthorities(cls.sslrootcert)
82+
authm.rebuildCaCertsCache()
83+
authm.rebuildTrustedCaCertsCache()
84+
assert (authm.storeAuthenticationConfig(cls.auth_config)[0])
85+
assert cls.auth_config.isValid()
86+
87+
cls.server_cert = os.path.join(cls.certsdata_path, 'localhost_ssl_cert.pem')
88+
cls.server_key = os.path.join(cls.certsdata_path, 'localhost_ssl_key.pem')
89+
cls.server_rootcert = cls.sslrootcert_path
90+
91+
os.environ['QGIS_SERVER_HOST'] = cls.hostname
92+
os.environ['QGIS_SERVER_PORT'] = str(cls.port)
93+
os.environ['QGIS_SERVER_PKI_KEY'] = cls.server_key
94+
os.environ['QGIS_SERVER_PKI_CERTIFICATE'] = cls.server_cert
95+
os.environ['QGIS_SERVER_PKI_USERNAME'] = cls.username
96+
os.environ['QGIS_SERVER_PKI_AUTHORITY'] = cls.server_rootcert
97+
98+
@classmethod
99+
def setUpClass(cls):
100+
"""Run before all tests:
101+
Creates an auth configuration"""
102+
cls.port = QGIS_SERVER_ENDPOINT_PORT
103+
# Clean env just to be sure
104+
env_vars = ['QUERY_STRING', 'QGIS_PROJECT_FILE']
105+
for ev in env_vars:
106+
try:
107+
del os.environ[ev]
108+
except KeyError:
109+
pass
110+
cls.testdata_path = unitTestDataPath('qgis_server')
111+
cls.certsdata_path = os.path.join(unitTestDataPath('auth_system'), 'certs_keys')
112+
cls.project_path = os.path.join(cls.testdata_path, "test_project.qgs")
113+
cls.hostname = 'localhost'
114+
cls.protocol = 'https'
115+
116+
cls.setUpAuth()
117+
118+
server_path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
119+
'qgis_wrapped_server.py')
120+
cls.server = subprocess.Popen([sys.executable, server_path],
121+
env=os.environ, stdout=subprocess.PIPE)
122+
line = cls.server.stdout.readline()
123+
cls.port = int(re.findall(b':(\d+)', line)[0])
124+
assert cls.port != 0
125+
# Wait for the server process to start
126+
assert waitServer('%s://%s:%s' % (cls.protocol, cls.hostname, cls.port)), "Server is not responding! %s://%s:%s" % (cls.protocol, cls.hostname, cls.port)
127+
128+
@classmethod
129+
def tearDownClass(cls):
130+
"""Run after all tests"""
131+
cls.server.terminate()
132+
rmtree(QGIS_AUTH_DB_DIR_PATH)
133+
del cls.server
134+
135+
def setUp(self):
136+
"""Run before each test."""
137+
pass
138+
139+
def tearDown(self):
140+
"""Run after each test."""
141+
pass
142+
143+
@classmethod
144+
def _getWFSLayer(cls, type_name, layer_name=None, authcfg=None):
145+
"""
146+
WFS layer factory
147+
"""
148+
if layer_name is None:
149+
layer_name = 'wfs_' + type_name
150+
parms = {
151+
'srsname': 'EPSG:4326',
152+
'typename': type_name,
153+
'url': '%s://%s:%s/?map=%s' % (cls.protocol, cls.hostname, cls.port, cls.project_path),
154+
'version': 'auto',
155+
'table': '',
156+
}
157+
if authcfg is not None:
158+
parms.update({'authcfg': authcfg})
159+
uri = ' '.join([("%s='%s'" % (k, v.decode('utf-8'))) for k, v in list(parms.items())])
160+
wfs_layer = QgsVectorLayer(uri, layer_name, 'WFS')
161+
return wfs_layer
162+
163+
@classmethod
164+
def _getWMSLayer(cls, layers, layer_name=None, authcfg=None):
165+
"""
166+
WMS layer factory
167+
"""
168+
if layer_name is None:
169+
layer_name = 'wms_' + layers.replace(',', '')
170+
parms = {
171+
'crs': 'EPSG:4326',
172+
'url': '%s://%s:%s/?map=%s' % (cls.protocol, cls.hostname, cls.port, cls.project_path),
173+
'format': 'image/png',
174+
# This is needed because of a really weird implementation in QGIS Server, that
175+
# replaces _ in the the real layer name with spaces
176+
'layers': urllib.quote(layers.replace('_', ' ')),
177+
'styles': '',
178+
'version': 'auto',
179+
#'sql': '',
180+
}
181+
if authcfg is not None:
182+
parms.update({'authcfg': authcfg})
183+
uri = '&'.join([("%s=%s" % (k, v.replace('=', '%3D'))) for k, v in list(parms.items())])
184+
wms_layer = QgsRasterLayer(uri, layer_name, 'wms')
185+
return wms_layer
186+
187+
def testValidAuthAccess(self):
188+
"""
189+
Access the protected layer with valid credentials
190+
Note: cannot test invalid access in a separate test because
191+
it would fail the subsequent (valid) calls due to cached connections
192+
"""
193+
wfs_layer = self._getWFSLayer('testlayer_èé', authcfg=self.auth_config.id())
194+
self.assertTrue(wfs_layer.isValid())
195+
wms_layer = self._getWMSLayer('testlayer_èé', authcfg=self.auth_config.id())
196+
self.assertTrue(wms_layer.isValid())
197+
198+
199+
if __name__ == '__main__':
200+
unittest.main()

‎tests/src/python/utilities.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
import platform
2020
import tempfile
2121
try:
22-
from urllib2 import urlopen, HTTPError
22+
from urllib2 import urlopen, HTTPError, URLError
2323
except ImportError:
24-
from urllib.request import urlopen, HTTPError
24+
from urllib.request import urlopen, HTTPError, URLError
2525

2626
from qgis.PyQt.QtCore import QDir
2727

@@ -833,7 +833,7 @@ def waitServer(url, timeout=10):
833833
try:
834834
urlopen(url, timeout=1)
835835
return True
836-
except HTTPError:
836+
except (HTTPError, URLError):
837837
return True
838838
except Exception as e:
839839
if now() > end:

0 commit comments

Comments
 (0)
Please sign in to comment.