Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit d259279

Browse files
elpasodakcarto
authored andcommittedOct 27, 2017
[oauth2] Added test for resource owner password grant flow
1 parent 244e886 commit d259279

File tree

5 files changed

+544
-23
lines changed

5 files changed

+544
-23
lines changed
 

‎.ci/travis/linux/before_install.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@
1515

1616

1717
#pip3 install termcolor
18+
pip install psycopg2 numpy nose2 pyyaml mock future termcolor oauthlib

‎.ci/travis/macos/before_install.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ pip3 install \
2727
pyyaml \
2828
mock \
2929
future \
30-
termcolor
30+
termcolor \
31+
oauthlib
3132

3233
brew install \
3334
qscintilla2 \

‎tests/src/python/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,9 @@ IF (WITH_SERVER)
229229
ADD_PYTHON_TEST(PyQgsAuthManagerPasswordOWSTest test_authmanager_password_ows.py)
230230
ADD_PYTHON_TEST(PyQgsAuthManagerPKIOWSTest test_authmanager_pki_ows.py)
231231
ADD_PYTHON_TEST(PyQgsAuthManagerPKIPostgresTest test_authmanager_pki_postgres.py)
232+
IF(WITH_OAUTH2_PLUGIN)
233+
ADD_PYTHON_TEST(PyQgsAuthManagerOAuth2OWSTest test_authmanager_oauth2_ows.py)
234+
ENDIF()
232235
ADD_PYTHON_TEST(PyQgsServerServices test_qgsserver_services.py)
233236
ADD_PYTHON_TEST(PyQgsServerModules test_qgsserver_modules.py)
234237
ADD_PYTHON_TEST(PyQgsServerRequest test_qgsserver_request.py)

‎tests/src/python/qgis_wrapped_server.py

Lines changed: 265 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,39 @@
1+
#!/usr/bin/env python3
12
# -*- coding: utf-8 -*-
23
"""
3-
QGIS Server HTTP wrapper
4+
QGIS Server HTTP wrapper for testing purposes
5+
================================================================================
46
57
This script launches a QGIS Server listening on port 8081 or on the port
68
specified on the environment variable QGIS_SERVER_PORT.
7-
QGIS_SERVER_HOST (defaults to 127.0.0.1)
9+
Hostname is set by environment variable QGIS_SERVER_HOST (defaults to 127.0.0.1)
10+
11+
The server can be configured to support any of the following auth systems
12+
(mutually exclusive):
13+
14+
* PKI
15+
* HTTP Basic
16+
* OAuth2 (requires python package oauthlib, installable with:
17+
with "pip install oauthlib")
18+
19+
20+
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
21+
SECURITY WARNING:
22+
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
23+
24+
This script was developed for testing purposes and was not meant to be secure,
25+
please do not use in a production server any of the authentication systems
26+
implemented here.
27+
28+
29+
HTTPS
30+
--------------------------------------------------------------------------------
31+
32+
HTTPS is automatically enabled for PKI and OAuth2
33+
34+
35+
HTTP Basic
36+
--------------------------------------------------------------------------------
837
938
For testing purposes, HTTP Basic can be enabled by setting the following
1039
environment variables:
@@ -13,21 +42,61 @@
1342
* QGIS_SERVER_USERNAME (default ="username")
1443
* QGIS_SERVER_PASSWORD (default ="password")
1544
45+
46+
PKI
47+
--------------------------------------------------------------------------------
48+
1649
PKI authentication with HTTPS can be enabled with:
1750
1851
* QGIS_SERVER_PKI_CERTIFICATE (server certificate)
1952
* QGIS_SERVER_PKI_KEY (server private key)
2053
* QGIS_SERVER_PKI_AUTHORITY (root CA)
2154
* QGIS_SERVER_PKI_USERNAME (valid username)
2255
23-
Sample run:
56+
57+
OAuth2 Resource Owner Grant Flow
58+
--------------------------------------------------------------------------------
59+
60+
OAuth2 Resource Owner Grant Flow with HTTPS can be enabled with:
61+
62+
* QGIS_SERVER_OAUTH2_AUTHORITY (no default)
63+
* QGIS_SERVER_OAUTH2_KEY (server private key)
64+
* QGIS_SERVER_OAUTH2_CERTIFICATE (server certificate)
65+
* QGIS_SERVER_OAUTH2_USERNAME (default ="username")
66+
* QGIS_SERVER_OAUTH2_PASSWORD (default ="password")
67+
* QGIS_SERVER_OAUTH2_TOKEN_EXPIRES_IN (default = 3600)
68+
69+
Available endpoints:
70+
71+
- /token (returns a new access_token),
72+
optionally specify an expiration time in seconds with ?ttl=<int>
73+
- /refresh (returns a new access_token from a refresh token),
74+
optionally specify an expiration time in seconds with ?ttl=<int>
75+
- /result (check the Bearer token and returns a short sentence if it validates)
76+
77+
78+
Sample runs
79+
--------------------------------------------------------------------------------
80+
81+
PKI:
2482
2583
QGIS_SERVER_PKI_USERNAME=Gerardus QGIS_SERVER_PORT=47547 QGIS_SERVER_HOST=localhost \
2684
QGIS_SERVER_PKI_KEY=/home/$USER/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_key.pem \
2785
QGIS_SERVER_PKI_CERTIFICATE=/home/$USER/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_cert.pem \
2886
QGIS_SERVER_PKI_AUTHORITY=/home/$USER/dev/QGIS/tests/testdata/auth_system/certs_keys/chains_subissuer-issuer-root_issuer2-root2.pem \
2987
python3 /home/$USER/dev/QGIS/tests/src/python/qgis_wrapped_server.py
3088
89+
90+
OAuth2:
91+
92+
QGIS_SERVER_PORT=8443 \
93+
QGIS_SERVER_HOST=localhost \
94+
QGIS_SERVER_OAUTH2_AUTHORITY=/home/$USER/dev/QGIS/tests/testdata/auth_system/certs_keys/chain_subissuer-issuer-root.pem \
95+
QGIS_SERVER_OAUTH2_CERTIFICATE=/home/$USER/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_cert.pem \
96+
QGIS_SERVER_OAUTH2_KEY=/home/$USER/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_key.pem \
97+
python3 /home/$USER/dev/QGIS/tests/src/python/qgis_wrapped_server.py
98+
99+
31100
.. note:: This program is free software; you can redistribute it and/or modify
32101
it under the terms of the GNU General Public License as published by
33102
the Free Software Foundation; either version 2 of the License, or
@@ -49,42 +118,70 @@
49118
import sys
50119
import signal
51120
import ssl
121+
import copy
52122
import urllib.parse
53123
from http.server import BaseHTTPRequestHandler, HTTPServer
54124
from qgis.core import QgsApplication
55125
from qgis.server import QgsServer, QgsServerRequest, QgsBufferServerRequest, QgsBufferServerResponse
56126

57127
QGIS_SERVER_PORT = int(os.environ.get('QGIS_SERVER_PORT', '8081'))
58128
QGIS_SERVER_HOST = os.environ.get('QGIS_SERVER_HOST', '127.0.0.1')
129+
130+
# HTTP Basic
131+
QGIS_SERVER_HTTP_BASIC_AUTH = os.environ.get('QGIS_SERVER_HTTP_BASIC_AUTH', False)
132+
QGIS_SERVER_USERNAME = os.environ.get('QGIS_SERVER_USERNAME', 'username')
133+
QGIS_SERVER_PASSWORD = os.environ.get('QGIS_SERVER_PASSWORD', 'password')
134+
59135
# PKI authentication
60136
QGIS_SERVER_PKI_CERTIFICATE = os.environ.get('QGIS_SERVER_PKI_CERTIFICATE')
61137
QGIS_SERVER_PKI_KEY = os.environ.get('QGIS_SERVER_PKI_KEY')
62138
QGIS_SERVER_PKI_AUTHORITY = os.environ.get('QGIS_SERVER_PKI_AUTHORITY')
63139
QGIS_SERVER_PKI_USERNAME = os.environ.get('QGIS_SERVER_PKI_USERNAME')
64140

65-
# Check if PKI - https is enabled
66-
https = (QGIS_SERVER_PKI_CERTIFICATE is not None and
67-
os.path.isfile(QGIS_SERVER_PKI_CERTIFICATE) and
68-
QGIS_SERVER_PKI_KEY is not None and
69-
os.path.isfile(QGIS_SERVER_PKI_KEY) and
70-
QGIS_SERVER_PKI_AUTHORITY is not None and
71-
os.path.isfile(QGIS_SERVER_PKI_AUTHORITY) and
72-
QGIS_SERVER_PKI_USERNAME)
141+
# OAuth2 authentication
142+
QGIS_SERVER_OAUTH2_CERTIFICATE = os.environ.get('QGIS_SERVER_OAUTH2_CERTIFICATE')
143+
QGIS_SERVER_OAUTH2_KEY = os.environ.get('QGIS_SERVER_OAUTH2_KEY')
144+
QGIS_SERVER_OAUTH2_AUTHORITY = os.environ.get('QGIS_SERVER_OAUTH2_AUTHORITY')
145+
QGIS_SERVER_OAUTH2_USERNAME = os.environ.get('QGIS_SERVER_OAUTH2_USERNAME', 'username')
146+
QGIS_SERVER_OAUTH2_PASSWORD = os.environ.get('QGIS_SERVER_OAUTH2_PASSWORD', 'password')
147+
QGIS_SERVER_OAUTH2_TOKEN_EXPIRES_IN = os.environ.get('QGIS_SERVER_OAUTH2_TOKEN_EXPIRES_IN', 3600)
148+
149+
# Check if PKI is enabled
150+
QGIS_SERVER_PKI_AUTH = (
151+
QGIS_SERVER_PKI_CERTIFICATE is not None and
152+
os.path.isfile(QGIS_SERVER_PKI_CERTIFICATE) and
153+
QGIS_SERVER_PKI_KEY is not None and
154+
os.path.isfile(QGIS_SERVER_PKI_KEY) and
155+
QGIS_SERVER_PKI_AUTHORITY is not None and
156+
os.path.isfile(QGIS_SERVER_PKI_AUTHORITY) and
157+
QGIS_SERVER_PKI_USERNAME)
158+
159+
# Check if OAuth2 is enabled
160+
QGIS_SERVER_OAUTH2_AUTH = (
161+
QGIS_SERVER_OAUTH2_CERTIFICATE is not None and
162+
os.path.isfile(QGIS_SERVER_OAUTH2_CERTIFICATE) and
163+
QGIS_SERVER_OAUTH2_KEY is not None and
164+
os.path.isfile(QGIS_SERVER_OAUTH2_KEY) and
165+
QGIS_SERVER_OAUTH2_AUTHORITY is not None and
166+
os.path.isfile(QGIS_SERVER_OAUTH2_AUTHORITY) and
167+
QGIS_SERVER_OAUTH2_USERNAME and QGIS_SERVER_OAUTH2_PASSWORD)
168+
169+
HTTPS_ENABLED = QGIS_SERVER_PKI_AUTH or QGIS_SERVER_OAUTH2_AUTH
73170

74171

75172
qgs_app = QgsApplication([], False)
76173
qgs_server = QgsServer()
77174

78175

79-
if os.environ.get('QGIS_SERVER_HTTP_BASIC_AUTH') is not None:
176+
if QGIS_SERVER_HTTP_BASIC_AUTH:
80177
from qgis.server import QgsServerFilter
81178
import base64
82179

83180
class HTTPBasicFilter(QgsServerFilter):
84181

85182
def responseComplete(self):
86183
handler = self.serverInterface().requestHandler()
87-
auth = self.serverInterface().requestHandler().requestHeader('HTTP_AUTHORIZATION')
184+
auth = handler.requestHeader('HTTP_AUTHORIZATION')
88185
if auth:
89186
username, password = base64.b64decode(auth[6:]).split(b':')
90187
if (username.decode('utf-8') == os.environ.get('QGIS_SERVER_USERNAME', 'username') and
@@ -100,6 +197,139 @@ def responseComplete(self):
100197
qgs_server.serverInterface().registerFilter(filter)
101198

102199

200+
if QGIS_SERVER_OAUTH2_AUTH:
201+
from qgis.server import QgsServerFilter
202+
from oauthlib.oauth2 import RequestValidator, LegacyApplicationServer
203+
import base64
204+
from datetime import datetime
205+
206+
# Naive token storage implementation
207+
_tokens = {}
208+
209+
class SimpleValidator(RequestValidator):
210+
"""Validate username and password
211+
Note: does not support scopes or client_id"""
212+
213+
def validate_client_id(self, client_id, request):
214+
return True
215+
216+
def authenticate_client(self, request, *args, **kwargs):
217+
"""Wide open"""
218+
request.client = type("Client", (), {'client_id': 'my_id'})
219+
return True
220+
221+
def validate_user(self, username, password, client, request, *args, **kwargs):
222+
if username == QGIS_SERVER_OAUTH2_USERNAME and password == QGIS_SERVER_OAUTH2_PASSWORD:
223+
return True
224+
return False
225+
226+
def validate_grant_type(self, client_id, grant_type, client, request, *args, **kwargs):
227+
# Clients should only be allowed to use one type of grant.
228+
return grant_type in ('password', 'refresh_token')
229+
230+
def get_default_scopes(self, client_id, request, *args, **kwargs):
231+
# Scopes a client will authorize for if none are supplied in the
232+
# authorization request.
233+
return ('my_scope', )
234+
235+
def validate_scopes(self, client_id, scopes, client, request, *args, **kwargs):
236+
"""Wide open"""
237+
return True
238+
239+
def save_bearer_token(self, token, request, *args, **kwargs):
240+
# Remember to associate it with request.scopes, request.user and
241+
# request.client. The two former will be set when you validate
242+
# the authorization code. Don't forget to save both the
243+
# access_token and the refresh_token and set expiration for the
244+
# access_token to now + expires_in seconds.
245+
_tokens[token['access_token']] = copy.copy(token)
246+
_tokens[token['access_token']]['expiration'] = datetime.now().timestamp() + int(token['expires_in'])
247+
248+
def validate_bearer_token(self, token, scopes, request):
249+
"""Check the token"""
250+
return token in _tokens and _tokens[token]['expiration'] > datetime.now().timestamp()
251+
252+
def validate_refresh_token(self, refresh_token, client, request, *args, **kwargs):
253+
"""Ensure the Bearer token is valid and authorized access to scopes."""
254+
for t in _tokens.values():
255+
if t['refresh_token'] == refresh_token:
256+
return True
257+
return False
258+
259+
def get_original_scopes(self, refresh_token, request, *args, **kwargs):
260+
"""Get the list of scopes associated with the refresh token."""
261+
return []
262+
263+
validator = SimpleValidator()
264+
oauth_server = LegacyApplicationServer(validator, token_expires_in=QGIS_SERVER_OAUTH2_TOKEN_EXPIRES_IN)
265+
266+
class OAuth2Filter(QgsServerFilter):
267+
"""This filter provides testing endpoint for OAuth2 Resource Owner Grant Flow
268+
269+
Available endpoints:
270+
- /token (returns a new access_token),
271+
optionally specify an expiration time in seconds with ?ttl=<int>
272+
- /refresh (returns a new access_token from a refresh token),
273+
optionally specify an expiration time in seconds with ?ttl=<int>
274+
- /result (check the Bearer token and returns a short sentence if it validates)
275+
"""
276+
277+
def responseComplete(self):
278+
279+
handler = self.serverInterface().requestHandler()
280+
281+
def _token(ttl):
282+
"""Common code for new and refresh token"""
283+
handler.clear()
284+
body = bytes(handler.data()).decode('utf8')
285+
old_expires_in = oauth_server.default_token_type.expires_in
286+
# Hacky way to dynamically set token expiration time
287+
oauth_server.default_token_type.expires_in = ttl
288+
headers, payload, code = oauth_server.create_token_response('/token', 'post', body, {})
289+
oauth_server.default_token_type.expires_in = old_expires_in
290+
for k, v in headers.items():
291+
handler.setResponseHeader(k, v)
292+
handler.setStatusCode(code)
293+
handler.appendBody(payload.encode('utf-8'))
294+
295+
# Token expiration
296+
ttl = handler.parameterMap().get('TTL', QGIS_SERVER_OAUTH2_TOKEN_EXPIRES_IN)
297+
# Issue a new token
298+
if handler.url().find('/token') != -1:
299+
_token(ttl)
300+
return
301+
302+
# Refresh token
303+
if handler.url().find('/refresh') != -1:
304+
_token(ttl)
305+
return
306+
307+
# Check for valid token
308+
auth = handler.requestHeader('HTTP_AUTHORIZATION')
309+
if auth:
310+
result, response = oauth_server.verify_request(handler.url(), 'post', '', {'Authorization': auth})
311+
if result:
312+
# This is a test endpoint for OAuth2, it requires a valid token
313+
if handler.url().find('/result') != -1:
314+
handler.clear()
315+
handler.appendBody(b'Valid Token: enjoy OAuth2')
316+
# Standard flow
317+
return
318+
else:
319+
# Wrong token, default response 401
320+
pass
321+
322+
# No auth ...
323+
handler.clear()
324+
handler.setStatusCode(401)
325+
handler.setResponseHeader('Status', '401 Unauthorized')
326+
handler.setResponseHeader('WWW-Authenticate', 'Bearer realm="QGIS Server"')
327+
handler.appendBody(b'Invalid Token: Authorization required.')
328+
329+
filter = OAuth2Filter(qgs_server.serverInterface())
330+
qgs_server.serverInterface().registerFilter(filter)
331+
332+
103333
class Handler(BaseHTTPRequestHandler):
104334

105335
def do_GET(self, post_body=None):
@@ -130,16 +360,29 @@ def do_POST(self):
130360

131361
if __name__ == '__main__':
132362
server = HTTPServer((QGIS_SERVER_HOST, QGIS_SERVER_PORT), Handler)
133-
if https:
134-
server.socket = ssl.wrap_socket(server.socket,
135-
certfile=QGIS_SERVER_PKI_CERTIFICATE,
136-
keyfile=QGIS_SERVER_PKI_KEY,
137-
ca_certs=QGIS_SERVER_PKI_AUTHORITY,
138-
cert_reqs=ssl.CERT_REQUIRED,
139-
server_side=True,
140-
ssl_version=ssl.PROTOCOL_TLSv1)
363+
# HTTPS is enabled if any of PKI or OAuth2 are enabled too
364+
if HTTPS_ENABLED:
365+
if QGIS_SERVER_OAUTH2_AUTH:
366+
server.socket = ssl.wrap_socket(
367+
server.socket,
368+
certfile=QGIS_SERVER_OAUTH2_CERTIFICATE,
369+
ca_certs=QGIS_SERVER_OAUTH2_AUTHORITY,
370+
keyfile=QGIS_SERVER_OAUTH2_KEY,
371+
server_side=True,
372+
#cert_reqs=ssl.CERT_REQUIRED, # No certs for OAuth2
373+
ssl_version=ssl.PROTOCOL_TLSv1)
374+
else:
375+
server.socket = ssl.wrap_socket(
376+
server.socket,
377+
certfile=QGIS_SERVER_PKI_CERTIFICATE,
378+
keyfile=QGIS_SERVER_PKI_KEY,
379+
ca_certs=QGIS_SERVER_PKI_AUTHORITY,
380+
cert_reqs=ssl.CERT_REQUIRED,
381+
server_side=True,
382+
ssl_version=ssl.PROTOCOL_TLSv1)
383+
141384
print('Starting server on %s://%s:%s, use <Ctrl-C> to stop' %
142-
('https' if https else 'http', QGIS_SERVER_HOST, server.server_port), flush=True)
385+
('https' if HTTPS_ENABLED else 'http', QGIS_SERVER_HOST, server.server_port), flush=True)
143386

144387
def signal_handler(signal, frame):
145388
global qgs_app
Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Tests for auth manager WMS/WFS using QGIS Server through OAuth2
4+
enabled qgis_wrapped_server.py.
5+
6+
This is an integration test for QGIS Desktop Auth Manager WFS and WMS provider
7+
and QGIS Server WFS/WMS that check if QGIS can use a stored auth manager auth
8+
configuration to access an OAuth2 Resource Owner Grant Flow protected endpoint.
9+
10+
11+
From build dir, run: ctest -R PyQgsAuthManagerOAuth2OWSTest -V
12+
13+
.. note:: This program is free software; you can redistribute it and/or modify
14+
it under the terms of the GNU General Public License as published by
15+
the Free Software Foundation; either version 2 of the License, or
16+
(at your option) any later version.
17+
"""
18+
import os
19+
import sys
20+
import re
21+
import subprocess
22+
import tempfile
23+
import urllib
24+
import stat
25+
import json
26+
import time
27+
import random
28+
29+
__author__ = 'Alessandro Pasotti'
30+
__date__ = '20/04/2017'
31+
__copyright__ = 'Copyright 2017, The QGIS Project'
32+
# This will get replaced with a git SHA1 when you do a git archive
33+
__revision__ = '$Format:%H$'
34+
35+
from shutil import rmtree
36+
37+
from utilities import unitTestDataPath, waitServer
38+
from qgis.core import (
39+
QgsAuthManager,
40+
QgsAuthMethodConfig,
41+
QgsVectorLayer,
42+
QgsRasterLayer,
43+
)
44+
45+
from qgis.PyQt.QtNetwork import QSslCertificate
46+
47+
from qgis.testing import (
48+
start_app,
49+
unittest,
50+
)
51+
52+
try:
53+
QGIS_SERVER_ENDPOINT_PORT = os.environ['QGIS_SERVER_ENDPOINT_PORT']
54+
except:
55+
QGIS_SERVER_ENDPOINT_PORT = '0' # Auto
56+
57+
58+
QGIS_AUTH_DB_DIR_PATH = tempfile.mkdtemp()
59+
60+
os.environ['QGIS_AUTH_DB_DIR_PATH'] = QGIS_AUTH_DB_DIR_PATH
61+
62+
qgis_app = start_app()
63+
64+
65+
def setup_oauth(username, password, token_uri, refresh_token_uri='', authcfg_id='oauth-2', authcfg_name='OAuth2 test configuration'):
66+
"""Setup oauth configuration to access OAuth API,
67+
return authcfg_id on success, None on failure
68+
"""
69+
cfgjson = {
70+
"accessMethod": 0,
71+
"apiKey": "",
72+
"clientId": "",
73+
"clientSecret": "",
74+
"configType": 1,
75+
"grantFlow": 2,
76+
"password": password,
77+
"persistToken": False,
78+
"redirectPort": '7070',
79+
"redirectUrl": "",
80+
"refreshTokenUrl": refresh_token_uri,
81+
"requestTimeout": '30',
82+
"requestUrl": "",
83+
"scope": "",
84+
"state": "",
85+
"tokenUrl": token_uri,
86+
"username": username,
87+
"version": 1
88+
}
89+
90+
if authcfg_id not in QgsAuthManager.instance().availableAuthMethodConfigs():
91+
authConfig = QgsAuthMethodConfig('OAuth2')
92+
authConfig.setId(authcfg_id)
93+
authConfig.setName(authcfg_name)
94+
authConfig.setConfig('oauth2config', json.dumps(cfgjson))
95+
if QgsAuthManager.instance().storeAuthenticationConfig(authConfig):
96+
return authcfg_id
97+
else:
98+
authConfig = QgsAuthMethodConfig()
99+
QgsAuthManager.instance().loadAuthenticationConfig(authcfg_id, authConfig, True)
100+
authConfig.setName(authcfg_name)
101+
authConfig.setConfig('oauth2config', json.dumps(cfgjson))
102+
if QgsAuthManager.instance().updateAuthenticationConfig(authConfig):
103+
return authcfg_id
104+
return None
105+
106+
107+
class TestAuthManager(unittest.TestCase):
108+
109+
@classmethod
110+
def setUpAuth(cls):
111+
"""Run before all tests and set up authentication"""
112+
authm = QgsAuthManager.instance()
113+
assert (authm.setMasterPassword('masterpassword', True))
114+
cls.sslrootcert_path = os.path.join(cls.certsdata_path, 'chains_subissuer-issuer-root_issuer2-root2.pem')
115+
assert os.path.isfile(cls.sslrootcert_path)
116+
os.chmod(cls.sslrootcert_path, stat.S_IRUSR)
117+
118+
cls.sslrootcert = QSslCertificate.fromPath(cls.sslrootcert_path)
119+
assert cls.sslrootcert is not None
120+
authm.storeCertAuthorities(cls.sslrootcert)
121+
authm.rebuildCaCertsCache()
122+
authm.rebuildTrustedCaCertsCache()
123+
124+
cls.server_cert = os.path.join(cls.certsdata_path, '127_0_0_1_ssl_cert.pem')
125+
cls.server_key = os.path.join(cls.certsdata_path, '127_0_0_1_ssl_key.pem')
126+
cls.server_rootcert = cls.sslrootcert_path
127+
os.chmod(cls.server_cert, stat.S_IRUSR)
128+
os.chmod(cls.server_key, stat.S_IRUSR)
129+
os.chmod(cls.server_rootcert, stat.S_IRUSR)
130+
131+
os.environ['QGIS_SERVER_HOST'] = cls.hostname
132+
os.environ['QGIS_SERVER_PORT'] = str(cls.port)
133+
os.environ['QGIS_SERVER_OAUTH2_KEY'] = cls.server_key
134+
os.environ['QGIS_SERVER_OAUTH2_CERTIFICATE'] = cls.server_cert
135+
os.environ['QGIS_SERVER_OAUTH2_USERNAME'] = cls.username
136+
os.environ['QGIS_SERVER_OAUTH2_PASSWORD'] = cls.password
137+
os.environ['QGIS_SERVER_OAUTH2_AUTHORITY'] = cls.server_rootcert
138+
# Set default token expiration to 2 seconds, note that this can be
139+
# also controlled when issuing token requests by adding ttl=<int>
140+
# to the query string
141+
os.environ['QGIS_SERVER_OAUTH2_TOKEN_EXPIRES_IN'] = '2'
142+
143+
@classmethod
144+
def setUpClass(cls):
145+
"""Run before all tests:
146+
Creates an auth configuration"""
147+
cls.port = QGIS_SERVER_ENDPOINT_PORT
148+
# Clean env just to be sure
149+
env_vars = ['QUERY_STRING', 'QGIS_PROJECT_FILE']
150+
for ev in env_vars:
151+
try:
152+
del os.environ[ev]
153+
except KeyError:
154+
pass
155+
cls.testdata_path = unitTestDataPath('qgis_server')
156+
cls.certsdata_path = os.path.join(unitTestDataPath('auth_system'), 'certs_keys')
157+
cls.project_path = os.path.join(cls.testdata_path, "test_project.qgs")
158+
# cls.hostname = 'localhost'
159+
cls.protocol = 'https'
160+
cls.hostname = '127.0.0.1'
161+
cls.username = 'username'
162+
cls.password = 'password'
163+
cls.setUpAuth()
164+
165+
server_path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
166+
'qgis_wrapped_server.py')
167+
cls.server = subprocess.Popen([sys.executable, server_path],
168+
env=os.environ, stdout=subprocess.PIPE)
169+
line = cls.server.stdout.readline()
170+
cls.port = int(re.findall(b':(\d+)', line)[0])
171+
assert cls.port != 0
172+
173+
# We need a valid port before we setup the oauth configuration
174+
cls.token_uri = '%s://%s:%s/token' % (cls.protocol, cls.hostname, cls.port)
175+
cls.refresh_token_uri = '%s://%s:%s/refresh' % (cls.protocol, cls.hostname, cls.port)
176+
# Need a random authcfg or the cache will bites us back!
177+
cls.authcfg_id = setup_oauth(cls.username, cls.password, cls.token_uri, cls.refresh_token_uri, str(random.randint(0, 10000000)))
178+
# This is to test wrong credentials
179+
cls.wrong_authcfg_id = setup_oauth('wrong', 'wrong', cls.token_uri, cls.refresh_token_uri, str(random.randint(0, 10000000)))
180+
# Get the authentication configuration instance:
181+
cls.auth_config = QgsAuthManager.instance().availableAuthMethodConfigs()[cls.authcfg_id]
182+
assert cls.auth_config.isValid()
183+
184+
# Wait for the server process to start
185+
assert waitServer('%s://%s:%s' % (cls.protocol, cls.hostname, cls.port)), "Server is not responding! %s://%s:%s" % (cls.protocol, cls.hostname, cls.port)
186+
187+
@classmethod
188+
def tearDownClass(cls):
189+
"""Run after all tests"""
190+
cls.server.kill()
191+
rmtree(QGIS_AUTH_DB_DIR_PATH)
192+
del cls.server
193+
194+
def setUp(self):
195+
"""Run before each test."""
196+
pass
197+
198+
def tearDown(self):
199+
"""Run after each test."""
200+
pass
201+
202+
@classmethod
203+
def _getWFSLayer(cls, type_name, layer_name=None, authcfg=None):
204+
"""
205+
WFS layer factory
206+
"""
207+
if layer_name is None:
208+
layer_name = 'wfs_' + type_name
209+
parms = {
210+
'srsname': 'EPSG:4326',
211+
'typename': type_name,
212+
'url': '%s://%s:%s/?map=%s' % (cls.protocol, cls.hostname, cls.port, cls.project_path),
213+
'version': 'auto',
214+
'table': '',
215+
}
216+
if authcfg is not None:
217+
parms.update({'authcfg': authcfg})
218+
uri = ' '.join([("%s='%s'" % (k, v)) for k, v in list(parms.items())])
219+
wfs_layer = QgsVectorLayer(uri, layer_name, 'WFS')
220+
return wfs_layer
221+
222+
@classmethod
223+
def _getWMSLayer(cls, layers, layer_name=None, authcfg=None):
224+
"""
225+
WMS layer factory
226+
"""
227+
if layer_name is None:
228+
layer_name = 'wms_' + layers.replace(',', '')
229+
parms = {
230+
'crs': 'EPSG:4326',
231+
'url': '%s://%s:%s/?map=%s' % (cls.protocol, cls.hostname, cls.port, cls.project_path),
232+
'format': 'image/png',
233+
# This is needed because of a really weird implementation in QGIS Server, that
234+
# replaces _ in the the real layer name with spaces
235+
'layers': urllib.parse.quote(layers.replace('_', ' ')),
236+
'styles': '',
237+
'version': 'auto',
238+
# 'sql': '',
239+
}
240+
if authcfg is not None:
241+
parms.update({'authcfg': authcfg})
242+
uri = '&'.join([("%s=%s" % (k, v.replace('=', '%3D'))) for k, v in list(parms.items())])
243+
wms_layer = QgsRasterLayer(uri, layer_name, 'wms')
244+
return wms_layer
245+
246+
def testNoAuthAccess(self):
247+
"""
248+
Access the protected layer with no credentials
249+
"""
250+
wms_layer = self._getWMSLayer('testlayer_èé')
251+
self.assertFalse(wms_layer.isValid())
252+
253+
def testInvalidAuthAccess(self):
254+
"""
255+
Access the protected layer with wrong credentials
256+
"""
257+
wms_layer = self._getWMSLayer('testlayer_èé', authcfg=self.wrong_authcfg_id)
258+
self.assertFalse(wms_layer.isValid())
259+
260+
def testValidAuthAccess(self):
261+
"""
262+
Access the protected layer with valid credentials
263+
Note: cannot test invalid access WFS in a separate test because
264+
it would fail the subsequent (valid) calls due to cached connections
265+
"""
266+
wfs_layer = self._getWFSLayer('testlayer_èé', authcfg=self.auth_config.id())
267+
self.assertTrue(wfs_layer.isValid())
268+
wms_layer = self._getWMSLayer('testlayer_èé', authcfg=self.auth_config.id())
269+
self.assertTrue(wms_layer.isValid())
270+
271+
272+
if __name__ == '__main__':
273+
unittest.main()

0 commit comments

Comments
 (0)
Please sign in to comment.