Skip to content

Commit b23e29f

Browse files
authoredNov 3, 2016
Merge pull request #3716 from elpaso/auth_tests_more_2_18
More Authentication Tests
2 parents 0796ecb + 9c535b5 commit b23e29f

File tree

8 files changed

+531
-21
lines changed

8 files changed

+531
-21
lines changed
 

‎ci/travis/linux/qt5/blacklist.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ PyQgsMapUnitScale
66
PyQgsPalLabelingServer
77
PyQgsRelationEditWidget
88
PyQgsServer
9-
PyQgsAuthManagerEndpointTest
9+
PyQgsAuthManagerPasswordOWSTest
1010
PyQgsServerAccessControl
1111
PyQgsSipCoverage
1212
PyQgsSpatialiteProvider

‎tests/src/python/CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,5 +150,7 @@ IF (WITH_SERVER)
150150
ADD_PYTHON_TEST(PyQgsServerAccessControl test_qgsserver_accesscontrol.py)
151151
ADD_PYTHON_TEST(PyQgsServerWFST test_qgsserver_wfst.py)
152152
ADD_PYTHON_TEST(PyQgsOfflineEditingWFS test_offline_editing_wfs.py)
153-
ADD_PYTHON_TEST(PyQgsAuthManagerEndpointTest test_authmanager_endpoint.py)
153+
ADD_PYTHON_TEST(PyQgsAuthManagerPasswordOWSTest test_authmanager_password_ows.py)
154+
#ADD_PYTHON_TEST(PyQgsAuthManagerPKIOWSTest test_authmanager_pki_ows.py)
155+
ADD_PYTHON_TEST(PyQgsAuthManagerPKIPostgresTest test_authmanager_pki_postgres.py)
154156
ENDIF (WITH_SERVER)

‎tests/src/python/qgis_wrapped_server.py

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,21 @@
1313
* QGIS_SERVER_USERNAME (default ="username")
1414
* QGIS_SERVER_PASSWORD (default ="password")
1515
16+
PKI authentication with HTTPS can be enabled with:
17+
18+
* QGIS_SERVER_PKI_CERTIFICATE (server certificate)
19+
* QGIS_SERVER_PKI_KEY (server private key)
20+
* QGIS_SERVER_PKI_AUTHORITY (root CA)
21+
* QGIS_SERVER_PKI_USERNAME (valid username)
22+
23+
Sample run:
24+
25+
QGIS_SERVER_PKI_USERNAME=Gerardus QGIS_SERVER_PORT=47547 QGIS_SERVER_HOST=localhost \
26+
QGIS_SERVER_PKI_KEY=/home/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_key.pem \
27+
QGIS_SERVER_PKI_CERTIFICATE=/home/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_cert.pem \
28+
QGIS_SERVER_PKI_AUTHORITY=/home/dev/QGIS/tests/testdata/auth_system/certs_keys/chains_subissuer-issuer-root_issuer2-root2.pem \
29+
python /home/dev/QGIS/tests/src/python/qgis_wrapped_server.py
30+
1631
.. note:: This program is free software; you can redistribute it and/or modify
1732
it under the terms of the GNU General Public License as published by
1833
the Free Software Foundation; either version 2 of the License, or
@@ -31,12 +46,27 @@
3146

3247
import os
3348
import sys
49+
import ssl
3450
import urllib.parse
3551
from http.server import BaseHTTPRequestHandler, HTTPServer
3652
from qgis.server import QgsServer, QgsServerFilter
3753

3854
QGIS_SERVER_PORT = int(os.environ.get('QGIS_SERVER_PORT', '8081'))
3955
QGIS_SERVER_HOST = os.environ.get('QGIS_SERVER_HOST', '127.0.0.1')
56+
# PKI authentication
57+
QGIS_SERVER_PKI_CERTIFICATE = os.environ.get('QGIS_SERVER_PKI_CERTIFICATE')
58+
QGIS_SERVER_PKI_KEY = os.environ.get('QGIS_SERVER_PKI_KEY')
59+
QGIS_SERVER_PKI_AUTHORITY = os.environ.get('QGIS_SERVER_PKI_AUTHORITY')
60+
QGIS_SERVER_PKI_USERNAME = os.environ.get('QGIS_SERVER_PKI_USERNAME')
61+
62+
# Check if PKI - https is enabled
63+
https = (QGIS_SERVER_PKI_CERTIFICATE is not None and
64+
os.path.isfile(QGIS_SERVER_PKI_CERTIFICATE) and
65+
QGIS_SERVER_PKI_KEY is not None and
66+
os.path.isfile(QGIS_SERVER_PKI_KEY) and
67+
QGIS_SERVER_PKI_AUTHORITY is not None and
68+
os.path.isfile(QGIS_SERVER_PKI_AUTHORITY) and
69+
QGIS_SERVER_PKI_USERNAME)
4070

4171
qgs_server = QgsServer()
4272

@@ -66,8 +96,20 @@ def responseComplete(self):
6696
class Handler(BaseHTTPRequestHandler):
6797

6898
def do_GET(self):
99+
# For PKI: check the username from client certificate
100+
if https:
101+
try:
102+
ssl.match_hostname(self.connection.getpeercert(), QGIS_SERVER_PKI_USERNAME)
103+
except Exception as ex:
104+
print("SSL Exception %s" % ex)
105+
self.send_response(401)
106+
self.end_headers()
107+
self.wfile.write('UNAUTHORIZED')
108+
return
69109
# CGI vars:
70110
for k, v in self.headers.items():
111+
# Uncomment to print debug info about env vars passed into QGIS Server env
112+
#print('Setting ENV var %s to %s' % ('HTTP_%s' % k.replace(' ', '-').replace('-', '_').replace(' ', '-').upper(), v))
71113
qgs_server.putenv('HTTP_%s' % k.replace(' ', '-').replace('-', '_').replace(' ', '-').upper(), v)
72114
qgs_server.putenv('SERVER_PORT', str(self.server.server_port))
73115
qgs_server.putenv('SERVER_NAME', self.server.server_name)
@@ -96,7 +138,19 @@ def do_POST(self):
96138

97139
if __name__ == '__main__':
98140
server = HTTPServer((QGIS_SERVER_HOST, QGIS_SERVER_PORT), Handler)
99-
print('Starting server on %s:%s, use <Ctrl-C> to stop' %
100-
(QGIS_SERVER_HOST, server.server_port))
101-
sys.stdout.flush()
141+
if https:
142+
server.socket = ssl.wrap_socket(server.socket,
143+
certfile=QGIS_SERVER_PKI_CERTIFICATE,
144+
keyfile=QGIS_SERVER_PKI_KEY,
145+
ca_certs=QGIS_SERVER_PKI_AUTHORITY,
146+
cert_reqs=ssl.CERT_REQUIRED,
147+
server_side=True,
148+
ssl_version=ssl.PROTOCOL_TLSv1)
149+
message = 'Starting server on %s://%s:%s, use <Ctrl-C> to stop' % \
150+
('https' if https else 'http', QGIS_SERVER_HOST, server.server_port)
151+
try:
152+
print(message, flush=True)
153+
except:
154+
print(message)
155+
sys.stdout.flush()
102156
server.serve_forever()

‎tests/src/python/test_authmanager_endpoint.py renamed to ‎tests/src/python/test_authmanager_password_ows.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
configuration to access an HTTP Basic protected endpoint.
99
1010
11-
From build dir, run: ctest -R PyQgsAuthManagerEnpointTest -V
11+
From build dir, run: ctest -R PyQgsAuthManagerPasswordOWSTest -V
1212
1313
.. note:: This program is free software; you can redistribute it and/or modify
1414
it under the terms of the GNU General Public License as published by
@@ -22,7 +22,10 @@
2222
import tempfile
2323
import random
2424
import string
25-
import urllib
25+
try:
26+
from urllib.parse import quote
27+
except:
28+
from urllib import quote
2629

2730
__author__ = 'Alessandro Pasotti'
2831
__date__ = '18/09/2016'
@@ -47,7 +50,7 @@
4750
try:
4851
QGIS_SERVER_ENDPOINT_PORT = os.environ['QGIS_SERVER_ENDPOINT_PORT']
4952
except:
50-
QGIS_SERVER_ENDPOINT_PORT = '0' # Auto
53+
QGIS_SERVER_ENDPOINT_PORT = '0' # Auto
5154

5255

5356
QGIS_AUTH_DB_DIR_PATH = tempfile.mkdtemp()
@@ -84,11 +87,14 @@ def setUpClass(cls):
8487
cls.auth_config.setConfig('username', cls.username)
8588
cls.auth_config.setConfig('password', cls.password)
8689
assert (authm.storeAuthenticationConfig(cls.auth_config)[0])
90+
cls.hostname = '127.0.0.1'
91+
cls.protocol = 'http'
8792

8893
os.environ['QGIS_SERVER_HTTP_BASIC_AUTH'] = '1'
8994
os.environ['QGIS_SERVER_USERNAME'] = cls.username
9095
os.environ['QGIS_SERVER_PASSWORD'] = cls.password
9196
os.environ['QGIS_SERVER_PORT'] = str(cls.port)
97+
os.environ['QGIS_SERVER_HOST'] = cls.hostname
9298
server_path = os.path.dirname(os.path.realpath(__file__)) + \
9399
'/qgis_wrapped_server.py'
94100
cls.server = subprocess.Popen([sys.executable, server_path],
@@ -98,7 +104,7 @@ def setUpClass(cls):
98104
cls.port = int(re.findall(b':(\d+)', line)[0])
99105
assert cls.port != 0
100106
# Wait for the server process to start
101-
assert waitServer('http://127.0.0.1:%s' % cls.port), "Server is not responding! http://127.0.0.1:%s" % cls.port
107+
assert waitServer('%s://%s:%s' % (cls.protocol, cls.hostname, cls.port)), "Server is not responding! '%s://%s:%s" % (cls.protocol, cls.hostname, cls.port)
102108

103109
@classmethod
104110
def tearDownClass(cls):
@@ -125,13 +131,16 @@ def _getWFSLayer(cls, type_name, layer_name=None, authcfg=None):
125131
parms = {
126132
'srsname': 'EPSG:4326',
127133
'typename': type_name,
128-
'url': 'http://127.0.0.1:%s/?map=%s' % (cls.port, cls.project_path),
134+
'url': '%s://%s:%s/?map=%s' % (cls.protocol, cls.hostname, cls.port, cls.project_path),
129135
'version': 'auto',
130136
'table': '',
131137
}
132138
if authcfg is not None:
133139
parms.update({'authcfg': authcfg})
134-
uri = ' '.join([("%s='%s'" % (k, v.decode('utf-8'))) for k, v in list(parms.items())])
140+
try: # Py2
141+
uri = ' '.join([("%s='%s'" % (k, v.decode('utf-8'))) for k, v in list(parms.items())])
142+
except AttributeError: # Py3
143+
uri = ' '.join([("%s='%s'" % (k, v)) for k, v in list(parms.items())])
135144
wfs_layer = QgsVectorLayer(uri, layer_name, 'WFS')
136145
return wfs_layer
137146

@@ -144,11 +153,11 @@ def _getWMSLayer(cls, layers, layer_name=None, authcfg=None):
144153
layer_name = 'wms_' + layers.replace(',', '')
145154
parms = {
146155
'crs': 'EPSG:4326',
147-
'url': 'http://127.0.0.1:%s/?map=%s' % (cls.port, cls.project_path),
156+
'url': '%s://%s:%s/?map=%s' % (cls.protocol, cls.hostname, cls.port, cls.project_path),
148157
'format': 'image/png',
149158
# This is needed because of a really weird implementation in QGIS Server, that
150159
# replaces _ in the the real layer name with spaces
151-
'layers': urllib.quote(layers.replace('_', ' ')),
160+
'layers': quote(layers.replace('_', ' ')),
152161
'styles': '',
153162
'version': 'auto',
154163
#'sql': '',
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Tests for auth manager WMS/WFS using QGIS Server through PKI
4+
enabled qgis_wrapped_server.py.
5+
6+
This is an integration test for QGIS Desktop Auth Manager WFS and WMS provider
7+
and QGIS Server WFS/WMS that check if QGIS can use a stored auth manager auth
8+
configuration to access an HTTP Basic protected endpoint.
9+
10+
11+
From build dir, run: ctest -R PyQgsAuthManagerPKIOWSTest -V
12+
13+
.. note:: This program is free software; you can redistribute it and/or modify
14+
it under the terms of the GNU General Public License as published by
15+
the Free Software Foundation; either version 2 of the License, or
16+
(at your option) any later version.
17+
"""
18+
import os
19+
import sys
20+
import re
21+
import subprocess
22+
import tempfile
23+
import urllib
24+
import stat
25+
26+
__author__ = 'Alessandro Pasotti'
27+
__date__ = '25/10/2016'
28+
__copyright__ = 'Copyright 2016, The QGIS Project'
29+
# This will get replaced with a git SHA1 when you do a git archive
30+
__revision__ = '$Format:%H$'
31+
32+
from shutil import rmtree
33+
34+
from utilities import unitTestDataPath, waitServer
35+
from qgis.core import (
36+
QgsAuthManager,
37+
QgsAuthMethodConfig,
38+
QgsVectorLayer,
39+
QgsRasterLayer,
40+
)
41+
42+
from qgis.PyQt.QtNetwork import QSslCertificate
43+
44+
from qgis.testing import (
45+
start_app,
46+
unittest,
47+
)
48+
49+
try:
50+
QGIS_SERVER_ENDPOINT_PORT = os.environ['QGIS_SERVER_ENDPOINT_PORT']
51+
except:
52+
QGIS_SERVER_ENDPOINT_PORT = '0' # Auto
53+
54+
55+
QGIS_AUTH_DB_DIR_PATH = tempfile.mkdtemp()
56+
57+
os.environ['QGIS_AUTH_DB_DIR_PATH'] = QGIS_AUTH_DB_DIR_PATH
58+
59+
qgis_app = start_app()
60+
61+
62+
class TestAuthManager(unittest.TestCase):
63+
64+
@classmethod
65+
def setUpAuth(cls):
66+
"""Run before all tests and set up authentication"""
67+
authm = QgsAuthManager.instance()
68+
assert (authm.setMasterPassword('masterpassword', True))
69+
cls.sslrootcert_path = os.path.join(cls.certsdata_path, 'chains_subissuer-issuer-root_issuer2-root2.pem')
70+
cls.sslcert = os.path.join(cls.certsdata_path, 'gerardus_cert.pem')
71+
cls.sslkey = os.path.join(cls.certsdata_path, 'gerardus_key.pem')
72+
assert os.path.isfile(cls.sslcert)
73+
assert os.path.isfile(cls.sslkey)
74+
assert os.path.isfile(cls.sslrootcert_path)
75+
os.chmod(cls.sslcert, stat.S_IRUSR)
76+
os.chmod(cls.sslkey, stat.S_IRUSR)
77+
os.chmod(cls.sslrootcert_path, stat.S_IRUSR)
78+
cls.auth_config = QgsAuthMethodConfig("PKI-Paths")
79+
cls.auth_config.setConfig('certpath', cls.sslcert)
80+
cls.auth_config.setConfig('keypath', cls.sslkey)
81+
cls.auth_config.setName('test_pki_auth_config')
82+
cls.username = 'Gerardus'
83+
cls.sslrootcert = QSslCertificate.fromPath(cls.sslrootcert_path)
84+
assert cls.sslrootcert is not None
85+
authm.storeCertAuthorities(cls.sslrootcert)
86+
authm.rebuildCaCertsCache()
87+
authm.rebuildTrustedCaCertsCache()
88+
assert (authm.storeAuthenticationConfig(cls.auth_config)[0])
89+
assert cls.auth_config.isValid()
90+
91+
cls.server_cert = os.path.join(cls.certsdata_path, 'localhost_ssl_cert.pem')
92+
cls.server_key = os.path.join(cls.certsdata_path, 'localhost_ssl_key.pem')
93+
cls.server_rootcert = cls.sslrootcert_path
94+
os.chmod(cls.server_cert, stat.S_IRUSR)
95+
os.chmod(cls.server_key, stat.S_IRUSR)
96+
os.chmod(cls.server_rootcert, stat.S_IRUSR)
97+
98+
os.environ['QGIS_SERVER_HOST'] = cls.hostname
99+
os.environ['QGIS_SERVER_PORT'] = str(cls.port)
100+
os.environ['QGIS_SERVER_PKI_KEY'] = cls.server_key
101+
os.environ['QGIS_SERVER_PKI_CERTIFICATE'] = cls.server_cert
102+
os.environ['QGIS_SERVER_PKI_USERNAME'] = cls.username
103+
os.environ['QGIS_SERVER_PKI_AUTHORITY'] = cls.server_rootcert
104+
105+
@classmethod
106+
def setUpClass(cls):
107+
"""Run before all tests:
108+
Creates an auth configuration"""
109+
cls.port = QGIS_SERVER_ENDPOINT_PORT
110+
# Clean env just to be sure
111+
env_vars = ['QUERY_STRING', 'QGIS_PROJECT_FILE']
112+
for ev in env_vars:
113+
try:
114+
del os.environ[ev]
115+
except KeyError:
116+
pass
117+
cls.testdata_path = unitTestDataPath('qgis_server')
118+
cls.certsdata_path = os.path.join(unitTestDataPath('auth_system'), 'certs_keys')
119+
cls.project_path = os.path.join(cls.testdata_path, "test_project.qgs")
120+
cls.hostname = 'localhost'
121+
cls.protocol = 'https'
122+
123+
cls.setUpAuth()
124+
125+
server_path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
126+
'qgis_wrapped_server.py')
127+
cls.server = subprocess.Popen([sys.executable, server_path],
128+
env=os.environ, stdout=subprocess.PIPE)
129+
line = cls.server.stdout.readline()
130+
cls.port = int(re.findall(b':(\d+)', line)[0])
131+
assert cls.port != 0
132+
# Wait for the server process to start
133+
assert waitServer('%s://%s:%s' % (cls.protocol, cls.hostname, cls.port)), "Server is not responding! %s://%s:%s" % (cls.protocol, cls.hostname, cls.port)
134+
135+
@classmethod
136+
def tearDownClass(cls):
137+
"""Run after all tests"""
138+
cls.server.terminate()
139+
rmtree(QGIS_AUTH_DB_DIR_PATH)
140+
del cls.server
141+
142+
def setUp(self):
143+
"""Run before each test."""
144+
pass
145+
146+
def tearDown(self):
147+
"""Run after each test."""
148+
pass
149+
150+
@classmethod
151+
def _getWFSLayer(cls, type_name, layer_name=None, authcfg=None):
152+
"""
153+
WFS layer factory
154+
"""
155+
if layer_name is None:
156+
layer_name = 'wfs_' + type_name
157+
parms = {
158+
'srsname': 'EPSG:4326',
159+
'typename': type_name,
160+
'url': '%s://%s:%s/?map=%s' % (cls.protocol, cls.hostname, cls.port, cls.project_path),
161+
'version': 'auto',
162+
'table': '',
163+
}
164+
if authcfg is not None:
165+
parms.update({'authcfg': authcfg})
166+
try: # Py2
167+
uri = ' '.join([("%s='%s'" % (k, v.decode('utf-8'))) for k, v in list(parms.items())])
168+
except AttributeError: # Py3
169+
uri = ' '.join([("%s='%s'" % (k, v)) for k, v in list(parms.items())])
170+
wfs_layer = QgsVectorLayer(uri, layer_name, 'WFS')
171+
return wfs_layer
172+
173+
@classmethod
174+
def _getWMSLayer(cls, layers, layer_name=None, authcfg=None):
175+
"""
176+
WMS layer factory
177+
"""
178+
if layer_name is None:
179+
layer_name = 'wms_' + layers.replace(',', '')
180+
parms = {
181+
'crs': 'EPSG:4326',
182+
'url': '%s://%s:%s/?map=%s' % (cls.protocol, cls.hostname, cls.port, cls.project_path),
183+
'format': 'image/png',
184+
# This is needed because of a really weird implementation in QGIS Server, that
185+
# replaces _ in the the real layer name with spaces
186+
'layers': urllib.quote(layers.replace('_', ' ')),
187+
'styles': '',
188+
'version': 'auto',
189+
#'sql': '',
190+
}
191+
if authcfg is not None:
192+
parms.update({'authcfg': authcfg})
193+
uri = '&'.join([("%s=%s" % (k, v.replace('=', '%3D'))) for k, v in list(parms.items())])
194+
wms_layer = QgsRasterLayer(uri, layer_name, 'wms')
195+
return wms_layer
196+
197+
def testValidAuthAccess(self):
198+
"""
199+
Access the protected layer with valid credentials
200+
Note: cannot test invalid access in a separate test because
201+
it would fail the subsequent (valid) calls due to cached connections
202+
"""
203+
wfs_layer = self._getWFSLayer('testlayer_èé', authcfg=self.auth_config.id())
204+
self.assertTrue(wfs_layer.isValid())
205+
wms_layer = self._getWMSLayer('testlayer_èé', authcfg=self.auth_config.id())
206+
self.assertTrue(wms_layer.isValid())
207+
208+
209+
if __name__ == '__main__':
210+
unittest.main()
Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Tests for auth manager PKI access to postgres.
4+
5+
This is an integration test for QGIS Desktop Auth Manager postgres provider that
6+
checks if QGIS can use a stored auth manager auth configuration to access
7+
a PKI protected postgres.
8+
9+
Configuration form the environment:
10+
11+
* QGIS_POSTGRES_SERVER_PORT (default: 55432)
12+
* QGIS_POSTGRES_EXECUTABLE_PATH (default: /usr/lib/postgresql/9.4/bin)
13+
14+
15+
From build dir, run: ctest -R PyQgsAuthManagerPKIPostgresTest -V
16+
17+
.. note:: This program is free software; you can redistribute it and/or modify
18+
it under the terms of the GNU General Public License as published by
19+
the Free Software Foundation; either version 2 of the License, or
20+
(at your option) any later version.
21+
"""
22+
import os
23+
import time
24+
import signal
25+
import stat
26+
import subprocess
27+
import tempfile
28+
29+
from shutil import rmtree
30+
31+
from utilities import unitTestDataPath
32+
from qgis.core import (
33+
QgsAuthManager,
34+
QgsAuthMethodConfig,
35+
QgsVectorLayer,
36+
QgsDataSourceURI,
37+
QgsWKBTypes,
38+
)
39+
40+
41+
from qgis.PyQt.QtNetwork import QSslCertificate
42+
43+
from qgis.testing import (
44+
start_app,
45+
unittest,
46+
)
47+
48+
49+
__author__ = 'Alessandro Pasotti'
50+
__date__ = '25/10/2016'
51+
__copyright__ = 'Copyright 2016, The QGIS Project'
52+
# This will get replaced with a git SHA1 when you do a git archive
53+
__revision__ = '$Format:%H$'
54+
55+
QGIS_POSTGRES_SERVER_PORT = os.environ.get('QGIS_POSTGRES_SERVER_PORT', '55432')
56+
QGIS_POSTGRES_EXECUTABLE_PATH = os.environ.get('QGIS_POSTGRES_EXECUTABLE_PATH', '/usr/lib/postgresql/9.4/bin')
57+
58+
assert os.path.exists(QGIS_POSTGRES_EXECUTABLE_PATH)
59+
60+
QGIS_AUTH_DB_DIR_PATH = tempfile.mkdtemp()
61+
62+
# Postgres test path
63+
QGIS_PG_TEST_PATH = tempfile.mkdtemp()
64+
65+
os.environ['QGIS_AUTH_DB_DIR_PATH'] = QGIS_AUTH_DB_DIR_PATH
66+
67+
qgis_app = start_app()
68+
69+
QGIS_POSTGRES_CONF_TEMPLATE = """
70+
hba_file = '%(tempfolder)s/pg_hba.conf'
71+
listen_addresses = '*'
72+
port = %(port)s
73+
max_connections = 100
74+
unix_socket_directories = '%(tempfolder)s'
75+
ssl = true
76+
ssl_ciphers = 'DEFAULT:!LOW:!EXP:!MD5:@STRENGTH' # allowed SSL ciphers
77+
ssl_cert_file = '%(server_cert)s'
78+
ssl_key_file = '%(server_key)s'
79+
ssl_ca_file = '%(sslrootcert_path)s'
80+
password_encryption = on
81+
"""
82+
83+
QGIS_POSTGRES_HBA_TEMPLATE = """
84+
hostssl all all 0.0.0.0/0 cert clientcert=1
85+
hostssl all all ::1/0 cert clientcert=1
86+
host all all 127.0.0.1/32 trust
87+
host all all ::1/32 trust
88+
"""
89+
90+
91+
class TestAuthManager(unittest.TestCase):
92+
93+
@classmethod
94+
def setUpAuth(cls):
95+
"""Run before all tests and set up authentication"""
96+
authm = QgsAuthManager.instance()
97+
assert (authm.setMasterPassword('masterpassword', True))
98+
cls.pg_conf = os.path.join(cls.tempfolder, 'postgresql.conf')
99+
cls.pg_hba = os.path.join(cls.tempfolder, 'pg_hba.conf')
100+
# Client side
101+
cls.sslrootcert_path = os.path.join(cls.certsdata_path, 'chains_subissuer-issuer-root_issuer2-root2.pem')
102+
cls.sslcert = os.path.join(cls.certsdata_path, 'gerardus_cert.pem')
103+
cls.sslkey = os.path.join(cls.certsdata_path, 'gerardus_key.pem')
104+
assert os.path.isfile(cls.sslcert)
105+
assert os.path.isfile(cls.sslkey)
106+
assert os.path.isfile(cls.sslrootcert_path)
107+
os.chmod(cls.sslcert, stat.S_IRUSR)
108+
os.chmod(cls.sslkey, stat.S_IRUSR)
109+
os.chmod(cls.sslrootcert_path, stat.S_IRUSR)
110+
cls.auth_config = QgsAuthMethodConfig("PKI-Paths")
111+
cls.auth_config.setConfig('certpath', cls.sslcert)
112+
cls.auth_config.setConfig('keypath', cls.sslkey)
113+
cls.auth_config.setName('test_pki_auth_config')
114+
cls.username = 'Gerardus'
115+
cls.sslrootcert = QSslCertificate.fromPath(cls.sslrootcert_path)
116+
assert cls.sslrootcert is not None
117+
authm.storeCertAuthorities(cls.sslrootcert)
118+
authm.rebuildCaCertsCache()
119+
authm.rebuildTrustedCaCertsCache()
120+
authm.rebuildCertTrustCache()
121+
assert (authm.storeAuthenticationConfig(cls.auth_config)[0])
122+
assert cls.auth_config.isValid()
123+
124+
# Server side
125+
cls.server_cert = os.path.join(cls.certsdata_path, 'localhost_ssl_cert.pem')
126+
cls.server_key = os.path.join(cls.certsdata_path, 'localhost_ssl_key.pem')
127+
cls.server_rootcert = cls.sslrootcert_path
128+
os.chmod(cls.server_cert, stat.S_IRUSR)
129+
os.chmod(cls.server_key, stat.S_IRUSR)
130+
os.chmod(cls.server_rootcert, stat.S_IRUSR)
131+
132+
# Place conf in the data folder
133+
with open(cls.pg_conf, 'w+') as f:
134+
f.write(QGIS_POSTGRES_CONF_TEMPLATE % {
135+
'port': cls.port,
136+
'tempfolder': cls.tempfolder,
137+
'server_cert': cls.server_cert,
138+
'server_key': cls.server_key,
139+
'sslrootcert_path': cls.sslrootcert_path,
140+
})
141+
142+
with open(cls.pg_hba, 'w+') as f:
143+
f.write(QGIS_POSTGRES_HBA_TEMPLATE)
144+
145+
@classmethod
146+
def setUpClass(cls):
147+
"""Run before all tests:
148+
Creates an auth configuration"""
149+
cls.port = QGIS_POSTGRES_SERVER_PORT
150+
cls.dbname = 'test_pki'
151+
cls.tempfolder = QGIS_PG_TEST_PATH
152+
cls.certsdata_path = os.path.join(unitTestDataPath('auth_system'), 'certs_keys')
153+
cls.hostname = 'localhost'
154+
cls.data_path = os.path.join(cls.tempfolder, 'data')
155+
os.mkdir(cls.data_path)
156+
157+
cls.setUpAuth()
158+
subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'initdb'), '-D', cls.data_path])
159+
160+
cls.server = subprocess.Popen([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'postgres'), '-D',
161+
cls.data_path, '-c',
162+
"config_file=%s" % cls.pg_conf],
163+
env=os.environ,
164+
stdout=subprocess.PIPE,
165+
stderr=subprocess.PIPE)
166+
# Wait max 10 secs for the server to start
167+
end = time.time() + 10
168+
while True:
169+
line = cls.server.stderr.readline()
170+
print(line)
171+
if line.find(b"database system is ready to accept") != -1:
172+
break
173+
if time.time() > end:
174+
raise Exception("Timeout connecting to postgresql")
175+
# Create a DB
176+
subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'createdb'), '-h', 'localhost', '-p', cls.port, 'test_pki'])
177+
# Inject test SQL from test path
178+
test_sql = os.path.join(unitTestDataPath('provider'), 'testdata_pg.sql')
179+
subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'psql'), '-h', 'localhost', '-p', cls.port, '-f', test_sql, cls.dbname])
180+
# Create a role
181+
subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'psql'), '-h', 'localhost', '-p', cls.port, '-c', 'CREATE ROLE "%s" WITH SUPERUSER LOGIN' % cls.username, cls.dbname])
182+
183+
@classmethod
184+
def tearDownClass(cls):
185+
"""Run after all tests"""
186+
cls.server.terminate()
187+
os.kill(cls.server.pid, signal.SIGABRT)
188+
del cls.server
189+
time.sleep(2)
190+
rmtree(QGIS_AUTH_DB_DIR_PATH)
191+
rmtree(cls.tempfolder)
192+
193+
def setUp(self):
194+
"""Run before each test."""
195+
pass
196+
197+
def tearDown(self):
198+
"""Run after each test."""
199+
pass
200+
201+
@classmethod
202+
def _getPostGISLayer(cls, type_name, layer_name=None, authcfg=None):
203+
"""
204+
PG layer factory
205+
"""
206+
if layer_name is None:
207+
layer_name = 'pg_' + type_name
208+
uri = QgsDataSourceURI()
209+
uri.setWkbType(QgsWKBTypes.Point)
210+
uri.setConnection("localhost", cls.port, cls.dbname, "", "", QgsDataSourceURI.SSLverifyFull, authcfg)
211+
uri.setKeyColumn('pk')
212+
uri.setSrid('EPSG:4326')
213+
uri.setDataSource('qgis_test', 'someData', "geom", "", "pk")
214+
# Note: do not expand here!
215+
layer = QgsVectorLayer(uri.uri(False), layer_name, 'postgres')
216+
return layer
217+
218+
def testValidAuthAccess(self):
219+
"""
220+
Access the protected layer with valid credentials
221+
"""
222+
pg_layer = self._getPostGISLayer('testlayer_èé', authcfg=self.auth_config.id())
223+
self.assertTrue(pg_layer.isValid())
224+
225+
def testInvalidAuthAccess(self):
226+
"""
227+
Access the protected layer with not valid credentials
228+
"""
229+
pg_layer = self._getPostGISLayer('testlayer_èé')
230+
self.assertFalse(pg_layer.isValid())
231+
232+
if __name__ == '__main__':
233+
unittest.main()

‎tests/src/python/test_qgsserver.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -459,11 +459,13 @@ def _img_diff_error(self, response, headers, image, max_diff=10, max_size_diff=Q
459459
report, encoded_rendered_file.strip(), tempfile.gettempdir(), image
460460
)
461461

462-
with open(os.path.join(tempfile.gettempdir(), image + "_result_diff.png"), "rb") as diff_file:
463-
encoded_diff_file = base64.b64encode(diff_file.read())
464-
message += "\nDiff:\necho '%s' | base64 -d > %s/%s_result_diff.png" % (
465-
encoded_diff_file.strip(), tempfile.gettempdir(), image
466-
)
462+
# If the failure is in image sizes the diff file will not exists.
463+
if os.path.exists(os.path.join(tempfile.gettempdir(), image + "_result_diff.png")):
464+
with open(os.path.join(tempfile.gettempdir(), image + "_result_diff.png"), "rb") as diff_file:
465+
encoded_diff_file = base64.b64encode(diff_file.read())
466+
message += "\nDiff:\necho '%s' | base64 -d > %s/%s_result_diff.png" % (
467+
encoded_diff_file.strip(), tempfile.gettempdir(), image
468+
)
467469

468470
self.assertTrue(test, message)
469471

‎tests/src/python/utilities.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
import platform
2020
import tempfile
2121
try:
22-
from urllib2 import urlopen, HTTPError
22+
from urllib2 import urlopen, HTTPError, URLError
2323
except ImportError:
24-
from urllib.request import urlopen, HTTPError
24+
from urllib.request import urlopen, HTTPError, URLError
2525

2626
from qgis.PyQt.QtCore import QDir
2727

@@ -833,7 +833,7 @@ def waitServer(url, timeout=10):
833833
try:
834834
urlopen(url, timeout=1)
835835
return True
836-
except HTTPError:
836+
except (HTTPError, URLError):
837837
return True
838838
except Exception as e:
839839
if now() > end:

0 commit comments

Comments
 (0)
Please sign in to comment.