test_authmanager_password_postgres.py

Alessandro Pasotti, 2017-11-03 10:51 AM

Download (8.47 KB)

 
1
# -*- coding: utf-8 -*-
2
"""
3
Tests for auth manager Password access to postgres.
4

5
This is an integration test for QGIS Desktop Auth Manager postgres provider that
6
checks if QGIS can use a stored auth manager auth configuration to access
7
a Password protected postgres.
8

9
Configuration from the environment:
10

11
    * QGIS_POSTGRES_SERVER_PORT (default: 55432)
12
    * QGIS_POSTGRES_EXECUTABLE_PATH (default: /usr/lib/postgresql/9.4/bin)
13

14

15
From build dir, run: ctest -R PyQgsAuthManagerPasswordPostgresTest -V
16

17
or, if your PostgreSQL path differs from the default:
18

19
QGIS_POSTGRES_EXECUTABLE_PATH=/usr/lib/postgresql/<your_version_goes_here>/bin \
20
    ctest -R PyQgsAuthManagerPasswordPostgresTest -V
21

22
.. note:: This program is free software; you can redistribute it and/or modify
23
it under the terms of the GNU General Public License as published by
24
the Free Software Foundation; either version 2 of the License, or
25
(at your option) any later version.
26
"""
27
import os
28
import time
29
import signal
30
import stat
31
import subprocess
32
import tempfile
33

    
34
from shutil import rmtree
35

    
36
from utilities import unitTestDataPath
37
from qgis.core import (
38
    QgsApplication,
39
    QgsAuthManager,
40
    QgsAuthMethodConfig,
41
    QgsVectorLayer,
42
    QgsDataSourceUri,
43
    QgsWkbTypes,
44
)
45

    
46
from qgis.PyQt.QtNetwork import QSslCertificate
47

    
48
from qgis.testing import (
49
    start_app,
50
    unittest,
51
)
52

    
53

    
54
__author__ = 'Alessandro Pasotti'
55
__date__ = '25/10/2016'
56
__copyright__ = 'Copyright 2016, The QGIS Project'
57
# This will get replaced with a git SHA1 when you do a git archive
58
__revision__ = '$Format:%H$'
59

    
60
QGIS_POSTGRES_SERVER_PORT = os.environ.get('QGIS_POSTGRES_SERVER_PORT', '55432')
61
QGIS_POSTGRES_EXECUTABLE_PATH = os.environ.get('QGIS_POSTGRES_EXECUTABLE_PATH', '/usr/lib/postgresql/9.4/bin')
62

    
63
assert os.path.exists(QGIS_POSTGRES_EXECUTABLE_PATH)
64

    
65
QGIS_AUTH_DB_DIR_PATH = tempfile.mkdtemp()
66

    
67
# Postgres test path
68
QGIS_PG_TEST_PATH = tempfile.mkdtemp()
69

    
70
os.environ['QGIS_AUTH_DB_DIR_PATH'] = QGIS_AUTH_DB_DIR_PATH
71

    
72
qgis_app = start_app()
73

    
74
QGIS_POSTGRES_CONF_TEMPLATE = """
75
hba_file = '%(tempfolder)s/pg_hba.conf'
76
listen_addresses = '*'
77
port = %(port)s
78
max_connections = 100
79
unix_socket_directories = '%(tempfolder)s'
80
ssl = true
81
ssl_ciphers = 'DEFAULT:!LOW:!EXP:!MD5:@STRENGTH'        # allowed SSL ciphers
82
ssl_cert_file = '%(server_cert)s'
83
ssl_key_file = '%(server_key)s'
84
ssl_ca_file = '%(sslrootcert_path)s'
85
password_encryption = on
86
"""
87

    
88
QGIS_POSTGRES_HBA_TEMPLATE = """
89
hostssl    all           all             0.0.0.0/0              md5
90
hostssl    all           all             ::1/0                  md5
91
host       all           all             127.0.0.1/32           trust
92
host       all           all             ::1/32                 trust
93
"""
94

    
95

    
96
class TestAuthManager(unittest.TestCase):
97

    
98
    @classmethod
99
    def setUpAuth(cls):
100
        """Run before all tests and set up authentication"""
101
        authm = QgsApplication.authManager()
102
        assert (authm.setMasterPassword('masterpassword', True))
103
        cls.pg_conf = os.path.join(cls.tempfolder, 'postgresql.conf')
104
        cls.pg_hba = os.path.join(cls.tempfolder, 'pg_hba.conf')
105
        # Client side
106
        cls.sslrootcert_path = os.path.join(cls.certsdata_path, 'chains_subissuer-issuer-root_issuer2-root2.pem')
107
        assert os.path.isfile(cls.sslrootcert_path)
108
        os.chmod(cls.sslrootcert_path, stat.S_IRUSR)
109
        cls.auth_config = QgsAuthMethodConfig("Basic")
110
        cls.username = 'username'
111
        cls.password = 'password'
112
        cls.auth_config.setConfig('username', cls.username)
113
        cls.auth_config.setConfig('password', cls.password)
114
        cls.auth_config.setName('test_password_auth_config')
115
        cls.sslrootcert = QSslCertificate.fromPath(cls.sslrootcert_path)
116
        assert cls.sslrootcert is not None
117
        authm.storeCertAuthorities(cls.sslrootcert)
118
        authm.rebuildCaCertsCache()
119
        authm.rebuildTrustedCaCertsCache()
120
        authm.rebuildCertTrustCache()
121
        assert (authm.storeAuthenticationConfig(cls.auth_config)[0])
122
        assert cls.auth_config.isValid()
123

    
124
        # Server side
125
        cls.server_cert = os.path.join(cls.certsdata_path, 'localhost_ssl_cert.pem')
126
        cls.server_key = os.path.join(cls.certsdata_path, 'localhost_ssl_key.pem')
127
        cls.server_rootcert = cls.sslrootcert_path
128
        os.chmod(cls.server_cert, stat.S_IRUSR)
129
        os.chmod(cls.server_key, stat.S_IRUSR)
130
        os.chmod(cls.server_rootcert, stat.S_IRUSR)
131

    
132
        # Place conf in the data folder
133
        with open(cls.pg_conf, 'w+') as f:
134
            f.write(QGIS_POSTGRES_CONF_TEMPLATE % {
135
                'port': cls.port,
136
                'tempfolder': cls.tempfolder,
137
                'server_cert': cls.server_cert,
138
                'server_key': cls.server_key,
139
                'sslrootcert_path': cls.sslrootcert_path,
140
            })
141

    
142
        with open(cls.pg_hba, 'w+') as f:
143
            f.write(QGIS_POSTGRES_HBA_TEMPLATE)
144

    
145
    @classmethod
146
    def setUpClass(cls):
147
        """Run before all tests:
148
        Creates an auth configuration"""
149
        cls.port = QGIS_POSTGRES_SERVER_PORT
150
        cls.dbname = 'test_password'
151
        cls.tempfolder = QGIS_PG_TEST_PATH
152
        cls.certsdata_path = os.path.join(unitTestDataPath('auth_system'), 'certs_keys')
153
        cls.hostname = 'localhost'
154
        cls.data_path = os.path.join(cls.tempfolder, 'data')
155
        os.mkdir(cls.data_path)
156

    
157
        # Disable SSL verification for setup operations
158
        env = dict(os.environ)
159
        env['PGSSLMODE'] = 'disable'
160

    
161
        cls.setUpAuth()
162
        subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'initdb'), '-D', cls.data_path])
163

    
164
        cls.server = subprocess.Popen([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'postgres'), '-D',
165
                                       cls.data_path, '-c',
166
                                       "config_file=%s" % cls.pg_conf],
167
                                      env=env,
168
                                      stdout=subprocess.PIPE,
169
                                      stderr=subprocess.PIPE)
170
        # Wait max 10 secs for the server to start
171
        end = time.time() + 10
172
        while True:
173
            line = cls.server.stderr.readline()
174
            print(line)
175
            if line.find(b"database system is ready to accept") != -1:
176
                break
177
            if time.time() > end:
178
                raise Exception("Timeout connecting to PostgreSQL")
179
        # Create a DB
180
        subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'createdb'), '-h', 'localhost', '-p', cls.port, 'test_password'], env=env)
181
        # Inject test SQL from test path
182
        test_sql = os.path.join(unitTestDataPath('provider'), 'testdata_pg.sql')
183
        subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'psql'), '-h', 'localhost', '-p', cls.port, '-f', test_sql, cls.dbname], env=env)
184
        # Create a role
185
        subprocess.check_call([os.path.join(QGIS_POSTGRES_EXECUTABLE_PATH, 'psql'), '-h', 'localhost', '-p', cls.port, '-c', 'CREATE ROLE "%s" WITH SUPERUSER LOGIN' % cls.username, cls.dbname], env=env)
186

    
187
    @classmethod
188
    def tearDownClass(cls):
189
        """Run after all tests"""
190
        cls.server.terminate()
191
        os.kill(cls.server.pid, signal.SIGABRT)
192
        del cls.server
193
        time.sleep(2)
194
        rmtree(QGIS_AUTH_DB_DIR_PATH)
195
        rmtree(cls.tempfolder)
196

    
197
    def setUp(self):
198
        """Run before each test."""
199
        pass
200

    
201
    def tearDown(self):
202
        """Run after each test."""
203
        pass
204

    
205
    @classmethod
206
    def _getPostGISLayer(cls, type_name, layer_name=None, authcfg=None):
207
        """
208
        PG layer factory
209
        """
210
        if layer_name is None:
211
            layer_name = 'pg_' + type_name
212
        uri = QgsDataSourceUri()
213
        uri.setWkbType(QgsWkbTypes.Point)
214
        uri.setConnection("localhost", cls.port, cls.dbname, "", "", QgsDataSourceUri.SslVerifyFull, authcfg)
215
        uri.setKeyColumn('pk')
216
        uri.setSrid('EPSG:4326')
217
        uri.setDataSource('qgis_test', 'someData', "geom", "", "pk")
218
        # Note: do not expand here!
219
        layer = QgsVectorLayer(uri.uri(False), layer_name, 'postgres')
220
        return layer
221

    
222
    def testValidAuthAccess(self):
223
        """
224
        Access the protected layer with valid credentials
225
        """
226
        pg_layer = self._getPostGISLayer('testlayer_èé', authcfg=self.auth_config.id())
227
        self.assertTrue(pg_layer.isValid())
228

    
229
    def testInvalidAuthAccess(self):
230
        """
231
        Access the protected layer with not valid credentials
232
        """
233
        pg_layer = self._getPostGISLayer('testlayer_èé')
234
        self.assertFalse(pg_layer.isValid())
235

    
236

    
237
if __name__ == '__main__':
238
    unittest.main()