|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +""" |
| 3 | +Tests for auth manager Basic Auth access to postgres. |
| 4 | +
|
| 5 | +This is an integration test for QGIS Desktop Auth Manager postgres provider that |
| 6 | +checks if QGIS can use a stored auth manager auth configuration to access |
| 7 | +a username/password protected postgres. |
| 8 | +
|
| 9 | +Configuration from the environment: |
| 10 | +
|
| 11 | + * QGIS_POSTGRES_SERVER_PORT (default: 55432) |
| 12 | + * QGIS_POSTGRES_EXECUTABLE_PATH (default: /usr/lib/postgresql/9.4/bin) |
| 13 | +
|
| 14 | +
|
| 15 | +From build dir, run: ctest -R PyQgsAuthManagerOgrPostgresTest -V |
| 16 | +
|
| 17 | +or, if your PostgreSQL path differs from the default: |
| 18 | +
|
| 19 | +QGIS_POSTGRES_EXECUTABLE_PATH=/usr/lib/postgresql/<your_version_goes_here>/bin \ |
| 20 | + ctest -R PyQgsAuthManagerOgrPostgresTest -V |
| 21 | +
|
| 22 | +.. note:: This program is free software; you can redistribute it and/or modify |
| 23 | +it under the terms of the GNU General Public License as published by |
| 24 | +the Free Software Foundation; either version 2 of the License, or |
| 25 | +(at your option) any later version. |
| 26 | +""" |
| 27 | +import os |
| 28 | +import time |
| 29 | +import signal |
| 30 | +import stat |
| 31 | +import subprocess |
| 32 | +import tempfile |
| 33 | + |
| 34 | +from shutil import rmtree |
| 35 | + |
| 36 | +from utilities import unitTestDataPath |
| 37 | +from qgis.core import ( |
| 38 | + QgsApplication, |
| 39 | + QgsAuthManager, |
| 40 | + QgsAuthMethodConfig, |
| 41 | + QgsVectorLayer, |
| 42 | + QgsDataSourceUri, |
| 43 | + QgsWkbTypes, |
| 44 | +) |
| 45 | + |
| 46 | +from qgis.PyQt.QtNetwork import QSslCertificate |
| 47 | + |
| 48 | +from qgis.testing import ( |
| 49 | + start_app, |
| 50 | + unittest, |
| 51 | +) |
| 52 | + |
| 53 | + |
| 54 | +__author__ = 'Alessandro Pasotti' |
| 55 | +__date__ = '03/11/2017' |
| 56 | +__copyright__ = 'Copyright 2017, The QGIS Project' |
| 57 | +# This will get replaced with a git SHA1 when you do a git archive |
| 58 | +__revision__ = '$Format:%H$' |
| 59 | + |
| 60 | +QGIS_POSTGRES_SERVER_PORT = os.environ.get('QGIS_POSTGRES_SERVER_PORT', '55432') |
| 61 | +QGIS_POSTGRES_EXECUTABLE_PATH = os.environ.get('QGIS_POSTGRES_EXECUTABLE_PATH', '/usr/lib/postgresql/9.4/bin') |
| 62 | + |
| 63 | +assert os.path.exists(QGIS_POSTGRES_EXECUTABLE_PATH) |
| 64 | + |
| 65 | +QGIS_AUTH_DB_DIR_PATH = tempfile.mkdtemp() |
| 66 | + |
| 67 | +# Postgres test path |
| 68 | +QGIS_PG_TEST_PATH = tempfile.mkdtemp() |
| 69 | + |
| 70 | +os.environ['QGIS_AUTH_DB_DIR_PATH'] = QGIS_AUTH_DB_DIR_PATH |
| 71 | + |
| 72 | +qgis_app = start_app() |
| 73 | + |
| 74 | +QGIS_POSTGRES_CONF_TEMPLATE = """ |
| 75 | +hba_file = '%(tempfolder)s/pg_hba.conf' |
| 76 | +listen_addresses = '*' |
| 77 | +port = %(port)s |
| 78 | +max_connections = 100 |
| 79 | +unix_socket_directories = '%(tempfolder)s' |
| 80 | +ssl = true |
| 81 | +ssl_ciphers = 'DEFAULT:!LOW:!EXP:!MD5:@STRENGTH' # allowed SSL ciphers |
| 82 | +ssl_cert_file = '%(server_cert)s' |
| 83 | +ssl_key_file = '%(server_key)s' |
| 84 | +ssl_ca_file = '%(sslrootcert_path)s' |
| 85 | +password_encryption = on |
| 86 | +""" |
| 87 | + |
| 88 | +QGIS_POSTGRES_HBA_TEMPLATE = """ |
| 89 | +hostssl all all 0.0.0.0/0 md5 |
| 90 | +hostssl all all ::1/0 md5 |
| 91 | +host all all 127.0.0.1/32 trust |
| 92 | +host all all ::1/32 trust |
| 93 | +
|
| 94 | +host all all 0.0.0.0/0 trust |
| 95 | +
|
| 96 | +""" |
| 97 | + |
| 98 | + |
| 99 | +class TestAuthManager(unittest.TestCase): |
| 100 | + |
| 101 | + @classmethod |
| 102 | + def setUpAuth(cls): |
| 103 | + """Run before all tests and set up authentication""" |
| 104 | + authm = QgsApplication.authManager() |
| 105 | + assert (authm.setMasterPassword('masterpassword', True)) |
| 106 | + cls.pg_conf = os.path.join(cls.tempfolder, 'postgresql.conf') |
| 107 | + cls.pg_hba = os.path.join(cls.tempfolder, 'pg_hba.conf') |
| 108 | + # Client side |
| 109 | + cls.sslrootcert_path = os.path.join(cls.certsdata_path, 'chains_subissuer-issuer-root_issuer2-root2.pem') |
| 110 | + assert os.path.isfile(cls.sslrootcert_path) |
| 111 | + os.chmod(cls.sslrootcert_path, stat.S_IRUSR) |
| 112 | + cls.auth_config = QgsAuthMethodConfig("Basic") |
| 113 | + cls.auth_config.setConfig('username', cls.username) |
| 114 | + cls.auth_config.setConfig('password', cls.password) |
| 115 | + cls.auth_config.setName('test_basic_auth_config') |
| 116 | + cls.sslrootcert = QSslCertificate.fromPath(cls.sslrootcert_path) |
| 117 | + assert cls.sslrootcert is not None |
| 118 | + authm.storeCertAuthorities(cls.sslrootcert) |
| 119 | + authm.rebuildCaCertsCache() |
| 120 | + authm.rebuildTrustedCaCertsCache() |
| 121 | + authm.rebuildCertTrustCache() |
| 122 | + assert (authm.storeAuthenticationConfig(cls.auth_config)[0]) |
| 123 | + assert cls.auth_config.isValid() |
| 124 | + cls.authcfg = cls.auth_config.id() |
| 125 | + |
| 126 | + # Server side |
| 127 | + cls.server_cert = os.path.join(cls.certsdata_path, 'localhost_ssl_cert.pem') |
| 128 | + cls.server_key = os.path.join(cls.certsdata_path, 'localhost_ssl_key.pem') |
| 129 | + cls.server_rootcert = cls.sslrootcert_path |
| 130 | + os.chmod(cls.server_cert, stat.S_IRUSR) |
| 131 | + os.chmod(cls.server_key, stat.S_IRUSR) |
| 132 | + os.chmod(cls.server_rootcert, stat.S_IRUSR) |
| 133 | + |
| 134 | + # Place conf in the data folder |
| 135 | + with open(cls.pg_conf, 'w+') as f: |
| 136 | + f.write(QGIS_POSTGRES_CONF_TEMPLATE % { |
| 137 | + 'port': cls.port, |
| 138 | + 'tempfolder': cls.tempfolder, |
| 139 | + 'server_cert': cls.server_cert, |
| 140 | + 'server_key': cls.server_key, |
| 141 | + 'sslrootcert_path': cls.sslrootcert_path, |
| 142 | + }) |
| 143 | + |
| 144 | + with open(cls.pg_hba, 'w+') as f: |
| 145 | + f.write(QGIS_POSTGRES_HBA_TEMPLATE) |
| 146 | + |
| 147 | + @classmethod |
| 148 | + def setUpClass(cls): |
| 149 | + """Run before all tests: |
| 150 | + Creates an auth configuration""" |
| 151 | + cls.port = QGIS_POSTGRES_SERVER_PORT |
| 152 | + cls.username = 'username' |
| 153 | + cls.password = 'password' |
| 154 | + cls.dbname = 'test_basic' |
| 155 | + cls.tempfolder = QGIS_PG_TEST_PATH |
| 156 | + cls.certsdata_path = os.path.join(unitTestDataPath('auth_system'), 'certs_keys') |
| 157 | + cls.hostname = 'localhost' |
| 158 | + cls.data_path = os.path.join(cls.tempfolder, 'data') |
| 159 | + os.mkdir(cls.data_path) |
| 160 | + |
| 161 | + cls.setUpAuth() |
| 162 | + subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'initdb'), '-D', cls.data_path]) |
| 163 | + |
| 164 | + # Disable SSL verification for setup operations |
| 165 | + env = dict(os.environ) |
| 166 | + env['PGSSLMODE'] = 'disable' |
| 167 | + |
| 168 | + cls.server = subprocess.Popen([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'postgres'), '-D', |
| 169 | + cls.data_path, '-c', |
| 170 | + "config_file=%s" % cls.pg_conf], |
| 171 | + env=env, |
| 172 | + stdout=subprocess.PIPE, |
| 173 | + stderr=subprocess.PIPE) |
| 174 | + # Wait max 10 secs for the server to start |
| 175 | + end = time.time() + 10 |
| 176 | + while True: |
| 177 | + line = cls.server.stderr.readline() |
| 178 | + print(line) |
| 179 | + if line.find(b"database system is ready to accept") != -1: |
| 180 | + break |
| 181 | + if time.time() > end: |
| 182 | + raise Exception("Timeout connecting to PostgreSQL") |
| 183 | + # Create a DB |
| 184 | + subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'createdb'), '-h', 'localhost', '-p', cls.port, 'test_basic'], env=env) |
| 185 | + # Inject test SQL from test path |
| 186 | + test_sql = os.path.join(unitTestDataPath('provider'), 'testdata_pg.sql') |
| 187 | + subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'psql'), '-h', 'localhost', '-p', cls.port, '-f', test_sql, cls.dbname], env=env) |
| 188 | + # Create a role |
| 189 | + subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'psql'), '-h', 'localhost', '-p', cls.port, '-c', 'CREATE ROLE "%s" WITH SUPERUSER LOGIN PASSWORD \'%s\'' % (cls.username, cls.password), cls.dbname], env=env) |
| 190 | + |
| 191 | + @classmethod |
| 192 | + def tearDownClass(cls): |
| 193 | + """Run after all tests""" |
| 194 | + cls.server.terminate() |
| 195 | + os.kill(cls.server.pid, signal.SIGABRT) |
| 196 | + del cls.server |
| 197 | + time.sleep(2) |
| 198 | + rmtree(QGIS_AUTH_DB_DIR_PATH) |
| 199 | + rmtree(cls.tempfolder) |
| 200 | + |
| 201 | + def setUp(self): |
| 202 | + """Run before each test.""" |
| 203 | + pass |
| 204 | + |
| 205 | + def tearDown(self): |
| 206 | + """Run after each test.""" |
| 207 | + pass |
| 208 | + |
| 209 | + @classmethod |
| 210 | + def _getPostGISLayer(cls, type_name, layer_name=None, authcfg=''): |
| 211 | + """ |
| 212 | + PG layer factory |
| 213 | + """ |
| 214 | + if layer_name is None: |
| 215 | + layer_name = 'pg_' + type_name |
| 216 | + |
| 217 | + # Warning: OGR needs the schema if it's not the default, so qgis_test.someData |
| 218 | + connstring = "PG:dbname='%(dbname)s' host='%(hostname)s' port='%(port)s' sslmode='verify-full' sslrootcert='%(sslrootcert)s'%(authcfg)s|layername=qgis_test.someData" % ( |
| 219 | + { |
| 220 | + 'dbname': cls.dbname, |
| 221 | + 'hostname': cls.hostname, |
| 222 | + 'port': cls.port, |
| 223 | + 'authcfg': ' authcfg=\'%s\'' % authcfg if authcfg else '', |
| 224 | + 'sslrootcert': cls.sslrootcert_path, |
| 225 | + } |
| 226 | + ) |
| 227 | + layer = QgsVectorLayer(connstring, layer_name, 'ogr') |
| 228 | + return layer |
| 229 | + |
| 230 | + def testValidAuthAccess(self): |
| 231 | + """ |
| 232 | + Access the protected layer with valid credentials |
| 233 | + """ |
| 234 | + pg_layer = self._getPostGISLayer('testlayer_èé', authcfg=self.auth_config.id()) |
| 235 | + self.assertTrue(pg_layer.isValid()) |
| 236 | + |
| 237 | + def testInvalidAuthAccess(self): |
| 238 | + """ |
| 239 | + Access the protected layer with not valid credentials |
| 240 | + """ |
| 241 | + pg_layer = self._getPostGISLayer('testlayer_èé') |
| 242 | + self.assertFalse(pg_layer.isValid()) |
| 243 | + |
| 244 | + |
| 245 | +if __name__ == '__main__': |
| 246 | + unittest.main() |
0 commit comments