1
|
|
2
|
"""
|
3
|
Tests for auth manager Password access to postgres.
|
4
|
|
5
|
This is an integration test for QGIS Desktop Auth Manager postgres provider that
|
6
|
checks if QGIS can use a stored auth manager auth configuration to access
|
7
|
a Password protected postgres.
|
8
|
|
9
|
Configuration from the environment:
|
10
|
|
11
|
* QGIS_POSTGRES_SERVER_PORT (default: 55432)
|
12
|
* QGIS_POSTGRES_EXECUTABLE_PATH (default: /usr/lib/postgresql/9.4/bin)
|
13
|
|
14
|
|
15
|
From build dir, run: ctest -R PyQgsAuthManagerPasswordPostgresTest -V
|
16
|
|
17
|
or, if your PostgreSQL path differs from the default:
|
18
|
|
19
|
QGIS_POSTGRES_EXECUTABLE_PATH=/usr/lib/postgresql/<your_version_goes_here>/bin \
|
20
|
ctest -R PyQgsAuthManagerPasswordPostgresTest -V
|
21
|
|
22
|
.. note:: This program is free software; you can redistribute it and/or modify
|
23
|
it under the terms of the GNU General Public License as published by
|
24
|
the Free Software Foundation; either version 2 of the License, or
|
25
|
(at your option) any later version.
|
26
|
"""
|
27
|
import os
|
28
|
import time
|
29
|
import signal
|
30
|
import stat
|
31
|
import subprocess
|
32
|
import tempfile
|
33
|
|
34
|
from shutil import rmtree
|
35
|
|
36
|
from utilities import unitTestDataPath
|
37
|
from qgis.core import (
|
38
|
QgsApplication,
|
39
|
QgsAuthManager,
|
40
|
QgsAuthMethodConfig,
|
41
|
QgsVectorLayer,
|
42
|
QgsDataSourceUri,
|
43
|
QgsWkbTypes,
|
44
|
)
|
45
|
|
46
|
from qgis.PyQt.QtNetwork import QSslCertificate
|
47
|
|
48
|
from qgis.testing import (
|
49
|
start_app,
|
50
|
unittest,
|
51
|
)
|
52
|
|
53
|
|
54
|
__author__ = 'Alessandro Pasotti'
|
55
|
__date__ = '25/10/2016'
|
56
|
__copyright__ = 'Copyright 2016, The QGIS Project'
|
57
|
|
58
|
__revision__ = '$Format:%H$'
|
59
|
|
60
|
QGIS_POSTGRES_SERVER_PORT = os.environ.get('QGIS_POSTGRES_SERVER_PORT', '55432')
|
61
|
QGIS_POSTGRES_EXECUTABLE_PATH = os.environ.get('QGIS_POSTGRES_EXECUTABLE_PATH', '/usr/lib/postgresql/9.4/bin')
|
62
|
|
63
|
assert os.path.exists(QGIS_POSTGRES_EXECUTABLE_PATH)
|
64
|
|
65
|
QGIS_AUTH_DB_DIR_PATH = tempfile.mkdtemp()
|
66
|
|
67
|
|
68
|
QGIS_PG_TEST_PATH = tempfile.mkdtemp()
|
69
|
|
70
|
os.environ['QGIS_AUTH_DB_DIR_PATH'] = QGIS_AUTH_DB_DIR_PATH
|
71
|
|
72
|
qgis_app = start_app()
|
73
|
|
74
|
QGIS_POSTGRES_CONF_TEMPLATE = """
|
75
|
hba_file = '%(tempfolder)s/pg_hba.conf'
|
76
|
listen_addresses = '*'
|
77
|
port = %(port)s
|
78
|
max_connections = 100
|
79
|
unix_socket_directories = '%(tempfolder)s'
|
80
|
ssl = true
|
81
|
ssl_ciphers = 'DEFAULT:!LOW:!EXP:!MD5:@STRENGTH' # allowed SSL ciphers
|
82
|
ssl_cert_file = '%(server_cert)s'
|
83
|
ssl_key_file = '%(server_key)s'
|
84
|
ssl_ca_file = '%(sslrootcert_path)s'
|
85
|
password_encryption = on
|
86
|
"""
|
87
|
|
88
|
QGIS_POSTGRES_HBA_TEMPLATE = """
|
89
|
hostssl all all 0.0.0.0/0 md5
|
90
|
hostssl all all ::1/0 md5
|
91
|
host all all 127.0.0.1/32 trust
|
92
|
host all all ::1/32 trust
|
93
|
"""
|
94
|
|
95
|
|
96
|
class TestAuthManager(unittest.TestCase):
|
97
|
|
98
|
@classmethod
|
99
|
def setUpAuth(cls):
|
100
|
"""Run before all tests and set up authentication"""
|
101
|
authm = QgsApplication.authManager()
|
102
|
assert (authm.setMasterPassword('masterpassword', True))
|
103
|
cls.pg_conf = os.path.join(cls.tempfolder, 'postgresql.conf')
|
104
|
cls.pg_hba = os.path.join(cls.tempfolder, 'pg_hba.conf')
|
105
|
|
106
|
cls.sslrootcert_path = os.path.join(cls.certsdata_path, 'chains_subissuer-issuer-root_issuer2-root2.pem')
|
107
|
assert os.path.isfile(cls.sslrootcert_path)
|
108
|
os.chmod(cls.sslrootcert_path, stat.S_IRUSR)
|
109
|
cls.auth_config = QgsAuthMethodConfig("Basic")
|
110
|
cls.username = 'username'
|
111
|
cls.password = 'password'
|
112
|
cls.auth_config.setConfig('username', cls.username)
|
113
|
cls.auth_config.setConfig('password', cls.password)
|
114
|
cls.auth_config.setName('test_password_auth_config')
|
115
|
cls.sslrootcert = QSslCertificate.fromPath(cls.sslrootcert_path)
|
116
|
assert cls.sslrootcert is not None
|
117
|
authm.storeCertAuthorities(cls.sslrootcert)
|
118
|
authm.rebuildCaCertsCache()
|
119
|
authm.rebuildTrustedCaCertsCache()
|
120
|
authm.rebuildCertTrustCache()
|
121
|
assert (authm.storeAuthenticationConfig(cls.auth_config)[0])
|
122
|
assert cls.auth_config.isValid()
|
123
|
|
124
|
|
125
|
cls.server_cert = os.path.join(cls.certsdata_path, 'localhost_ssl_cert.pem')
|
126
|
cls.server_key = os.path.join(cls.certsdata_path, 'localhost_ssl_key.pem')
|
127
|
cls.server_rootcert = cls.sslrootcert_path
|
128
|
os.chmod(cls.server_cert, stat.S_IRUSR)
|
129
|
os.chmod(cls.server_key, stat.S_IRUSR)
|
130
|
os.chmod(cls.server_rootcert, stat.S_IRUSR)
|
131
|
|
132
|
|
133
|
with open(cls.pg_conf, 'w+') as f:
|
134
|
f.write(QGIS_POSTGRES_CONF_TEMPLATE % {
|
135
|
'port': cls.port,
|
136
|
'tempfolder': cls.tempfolder,
|
137
|
'server_cert': cls.server_cert,
|
138
|
'server_key': cls.server_key,
|
139
|
'sslrootcert_path': cls.sslrootcert_path,
|
140
|
})
|
141
|
|
142
|
with open(cls.pg_hba, 'w+') as f:
|
143
|
f.write(QGIS_POSTGRES_HBA_TEMPLATE)
|
144
|
|
145
|
@classmethod
|
146
|
def setUpClass(cls):
|
147
|
"""Run before all tests:
|
148
|
Creates an auth configuration"""
|
149
|
cls.port = QGIS_POSTGRES_SERVER_PORT
|
150
|
cls.dbname = 'test_password'
|
151
|
cls.tempfolder = QGIS_PG_TEST_PATH
|
152
|
cls.certsdata_path = os.path.join(unitTestDataPath('auth_system'), 'certs_keys')
|
153
|
cls.hostname = 'localhost'
|
154
|
cls.data_path = os.path.join(cls.tempfolder, 'data')
|
155
|
os.mkdir(cls.data_path)
|
156
|
|
157
|
|
158
|
env = dict(os.environ)
|
159
|
env['PGSSLMODE'] = 'disable'
|
160
|
|
161
|
cls.setUpAuth()
|
162
|
subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'initdb'), '-D', cls.data_path])
|
163
|
|
164
|
cls.server = subprocess.Popen([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'postgres'), '-D',
|
165
|
cls.data_path, '-c',
|
166
|
"config_file=%s" % cls.pg_conf],
|
167
|
env=env,
|
168
|
stdout=subprocess.PIPE,
|
169
|
stderr=subprocess.PIPE)
|
170
|
|
171
|
end = time.time() + 10
|
172
|
while True:
|
173
|
line = cls.server.stderr.readline()
|
174
|
print(line)
|
175
|
if line.find(b"database system is ready to accept") != -1:
|
176
|
break
|
177
|
if time.time() > end:
|
178
|
raise Exception("Timeout connecting to PostgreSQL")
|
179
|
|
180
|
subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'createdb'), '-h', 'localhost', '-p', cls.port, 'test_password'], env=env)
|
181
|
|
182
|
test_sql = os.path.join(unitTestDataPath('provider'), 'testdata_pg.sql')
|
183
|
subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'psql'), '-h', 'localhost', '-p', cls.port, '-f', test_sql, cls.dbname], env=env)
|
184
|
|
185
|
subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'psql'), '-h', 'localhost', '-p', cls.port, '-c', 'CREATE ROLE "%s" WITH SUPERUSER LOGIN' % cls.username, cls.dbname], env=env)
|
186
|
|
187
|
@classmethod
|
188
|
def tearDownClass(cls):
|
189
|
"""Run after all tests"""
|
190
|
cls.server.terminate()
|
191
|
os.kill(cls.server.pid, signal.SIGABRT)
|
192
|
del cls.server
|
193
|
time.sleep(2)
|
194
|
rmtree(QGIS_AUTH_DB_DIR_PATH)
|
195
|
rmtree(cls.tempfolder)
|
196
|
|
197
|
def setUp(self):
|
198
|
"""Run before each test."""
|
199
|
pass
|
200
|
|
201
|
def tearDown(self):
|
202
|
"""Run after each test."""
|
203
|
pass
|
204
|
|
205
|
@classmethod
|
206
|
def _getPostGISLayer(cls, type_name, layer_name=None, authcfg=None):
|
207
|
"""
|
208
|
PG layer factory
|
209
|
"""
|
210
|
if layer_name is None:
|
211
|
layer_name = 'pg_' + type_name
|
212
|
uri = QgsDataSourceUri()
|
213
|
uri.setWkbType(QgsWkbTypes.Point)
|
214
|
uri.setConnection("localhost", cls.port, cls.dbname, "", "", QgsDataSourceUri.SslVerifyFull, authcfg)
|
215
|
uri.setKeyColumn('pk')
|
216
|
uri.setSrid('EPSG:4326')
|
217
|
uri.setDataSource('qgis_test', 'someData', "geom", "", "pk")
|
218
|
|
219
|
layer = QgsVectorLayer(uri.uri(False), layer_name, 'postgres')
|
220
|
return layer
|
221
|
|
222
|
def testValidAuthAccess(self):
|
223
|
"""
|
224
|
Access the protected layer with valid credentials
|
225
|
"""
|
226
|
pg_layer = self._getPostGISLayer('testlayer_èé', authcfg=self.auth_config.id())
|
227
|
self.assertTrue(pg_layer.isValid())
|
228
|
|
229
|
def testInvalidAuthAccess(self):
|
230
|
"""
|
231
|
Access the protected layer with not valid credentials
|
232
|
"""
|
233
|
pg_layer = self._getPostGISLayer('testlayer_èé')
|
234
|
self.assertFalse(pg_layer.isValid())
|
235
|
|
236
|
|
237
|
if __name__ == '__main__':
|
238
|
unittest.main()
|