Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[tests] Authmanager tests for username/pwd and PKI
  • Loading branch information
elpaso committed Nov 5, 2016
1 parent ecef7d7 commit 6fd3f74
Show file tree
Hide file tree
Showing 11 changed files with 973 additions and 111 deletions.
3 changes: 3 additions & 0 deletions tests/src/python/CMakeLists.txt
Expand Up @@ -114,4 +114,7 @@ ENDIF (WITH_APIDOC)
IF (WITH_SERVER)
ADD_PYTHON_TEST(PyQgsServer test_qgsserver.py)
ADD_PYTHON_TEST(PyQgsServerAccessControl test_qgsserver_accesscontrol.py)
ADD_PYTHON_TEST(PyQgsAuthManagerPasswordOWSTest test_authmanager_password_ows.py)
#ADD_PYTHON_TEST(PyQgsAuthManagerPKIOWSTest test_authmanager_pki_ows.py)
ADD_PYTHON_TEST(PyQgsAuthManagerPKIPostgresTest test_authmanager_pki_postgres.py)
ENDIF (WITH_SERVER)
156 changes: 156 additions & 0 deletions tests/src/python/qgis_wrapped_server.py
@@ -0,0 +1,156 @@
# -*- coding: utf-8 -*-
"""
QGIS Server HTTP wrapper
This script launches a QGIS Server listening on port 8081 or on the port
specified on the environment variable QGIS_SERVER_PORT.
QGIS_SERVER_HOST (defaults to 127.0.0.1)
For testing purposes, HTTP Basic can be enabled by setting the following
environment variables:
* QGIS_SERVER_HTTP_BASIC_AUTH (default not set, set to anything to enable)
* QGIS_SERVER_USERNAME (default ="username")
* QGIS_SERVER_PASSWORD (default ="password")
PKI authentication with HTTPS can be enabled with:
* QGIS_SERVER_PKI_CERTIFICATE (server certificate)
* QGIS_SERVER_PKI_KEY (server private key)
* QGIS_SERVER_PKI_AUTHORITY (root CA)
* QGIS_SERVER_PKI_USERNAME (valid username)
Sample run:
QGIS_SERVER_PKI_USERNAME=Gerardus QGIS_SERVER_PORT=47547 QGIS_SERVER_HOST=localhost \
QGIS_SERVER_PKI_KEY=/home/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_key.pem \
QGIS_SERVER_PKI_CERTIFICATE=/home/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_cert.pem \
QGIS_SERVER_PKI_AUTHORITY=/home/dev/QGIS/tests/testdata/auth_system/certs_keys/chains_subissuer-issuer-root_issuer2-root2.pem \
python /home/dev/QGIS/tests/src/python/qgis_wrapped_server.py
.. note:: This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
"""
from __future__ import print_function
from future import standard_library
standard_library.install_aliases()

__author__ = 'Alessandro Pasotti'
__date__ = '05/15/2016'
__copyright__ = 'Copyright 2016, The QGIS Project'
# This will get replaced with a git SHA1 when you do a git archive
__revision__ = '$Format:%H$'


import os
import sys
import ssl
import urllib.parse
from http.server import BaseHTTPRequestHandler, HTTPServer
from qgis.server import QgsServer, QgsServerFilter

QGIS_SERVER_PORT = int(os.environ.get('QGIS_SERVER_PORT', '8081'))
QGIS_SERVER_HOST = os.environ.get('QGIS_SERVER_HOST', '127.0.0.1')
# PKI authentication
QGIS_SERVER_PKI_CERTIFICATE = os.environ.get('QGIS_SERVER_PKI_CERTIFICATE')
QGIS_SERVER_PKI_KEY = os.environ.get('QGIS_SERVER_PKI_KEY')
QGIS_SERVER_PKI_AUTHORITY = os.environ.get('QGIS_SERVER_PKI_AUTHORITY')
QGIS_SERVER_PKI_USERNAME = os.environ.get('QGIS_SERVER_PKI_USERNAME')

# Check if PKI - https is enabled
https = (QGIS_SERVER_PKI_CERTIFICATE is not None and
os.path.isfile(QGIS_SERVER_PKI_CERTIFICATE) and
QGIS_SERVER_PKI_KEY is not None and
os.path.isfile(QGIS_SERVER_PKI_KEY) and
QGIS_SERVER_PKI_AUTHORITY is not None and
os.path.isfile(QGIS_SERVER_PKI_AUTHORITY) and
QGIS_SERVER_PKI_USERNAME)

qgs_server = QgsServer()

if os.environ.get('QGIS_SERVER_HTTP_BASIC_AUTH') is not None:
import base64

class HTTPBasicFilter(QgsServerFilter):

def responseComplete(self):
request = self.serverInterface().requestHandler()
if self.serverInterface().getEnv('HTTP_AUTHORIZATION'):
username, password = base64.b64decode(self.serverInterface().getEnv('HTTP_AUTHORIZATION')[6:]).split(':')
if (username == os.environ.get('QGIS_SERVER_USERNAME', 'username')
and password == os.environ.get('QGIS_SERVER_PASSWORD', 'password')):
return
# No auth ...
request.clearHeaders()
request.setHeader('Status', '401 Authorization required')
request.setHeader('WWW-Authenticate', 'Basic realm="QGIS Server"')
request.clearBody()
request.appendBody('<h1>Authorization required</h1>')

filter = HTTPBasicFilter(qgs_server.serverInterface())
qgs_server.serverInterface().registerFilter(filter)


class Handler(BaseHTTPRequestHandler):

def do_GET(self):
# For PKI: check the username from client certificate
if https:
try:
ssl.match_hostname(self.connection.getpeercert(), QGIS_SERVER_PKI_USERNAME)
except Exception as ex:
print("SSL Exception %s" % ex)
self.send_response(401)
self.end_headers()
self.wfile.write('UNAUTHORIZED')
return
# CGI vars:
for k, v in self.headers.items():
# Uncomment to print debug info about env vars passed into QGIS Server env
#print('Setting ENV var %s to %s' % ('HTTP_%s' % k.replace(' ', '-').replace('-', '_').replace(' ', '-').upper(), v))
qgs_server.putenv('HTTP_%s' % k.replace(' ', '-').replace('-', '_').replace(' ', '-').upper(), v)
qgs_server.putenv('SERVER_PORT', str(self.server.server_port))
qgs_server.putenv('SERVER_NAME', self.server.server_name)
qgs_server.putenv('REQUEST_URI', self.path)
parsed_path = urllib.parse.urlparse(self.path)
headers, body = qgs_server.handleRequest(parsed_path.query)
headers_dict = dict(h.split(': ', 1) for h in headers.decode().split('\n') if h)
try:
self.send_response(int(headers_dict['Status'].split(' ')[0]))
except:
self.send_response(200)
for k, v in headers_dict.items():
self.send_header(k, v)
self.end_headers()
self.wfile.write(body)
return

def do_POST(self):
content_len = int(self.headers.get('content-length', 0))
post_body = self.rfile.read(content_len).decode()
request = post_body[1:post_body.find(' ')]
self.path = self.path + '&REQUEST_BODY=' + \
post_body.replace('&amp;', '') + '&REQUEST=' + request
return self.do_GET()


if __name__ == '__main__':
server = HTTPServer((QGIS_SERVER_HOST, QGIS_SERVER_PORT), Handler)
if https:
server.socket = ssl.wrap_socket(server.socket,
certfile=QGIS_SERVER_PKI_CERTIFICATE,
keyfile=QGIS_SERVER_PKI_KEY,
ca_certs=QGIS_SERVER_PKI_AUTHORITY,
cert_reqs=ssl.CERT_REQUIRED,
server_side=True,
ssl_version=ssl.PROTOCOL_TLSv1)
message = 'Starting server on %s://%s:%s, use <Ctrl-C> to stop' % \
('https' if https else 'http', QGIS_SERVER_HOST, server.server_port)
try:
print(message, flush=True)
except:
print(message)
sys.stdout.flush()
server.serve_forever()
186 changes: 186 additions & 0 deletions tests/src/python/test_authmanager_password_ows.py
@@ -0,0 +1,186 @@
# -*- coding: utf-8 -*-
"""
Tests for auth manager WMS/WFS using QGIS Server through HTTP Basic
enabled qgis_wrapped_server.py.
This is an integration test for QGIS Desktop Auth Manager WFS and WMS provider
and QGIS Server WFS/WMS that check if QGIS can use a stored auth manager auth
configuration to access an HTTP Basic protected endpoint.
From build dir, run: ctest -R PyQgsAuthManagerPasswordOWSTest -V
.. note:: This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
"""
import os
import sys
import re
import subprocess
import tempfile
import random
import string
try:
from urllib.parse import quote
except:
from urllib import quote

__author__ = 'Alessandro Pasotti'
__date__ = '18/09/2016'
__copyright__ = 'Copyright 2016, The QGIS Project'
# This will get replaced with a git SHA1 when you do a git archive
__revision__ = '$Format:%H$'

from shutil import rmtree

from utilities import unitTestDataPath, waitServer
from qgis.core import (
QgsAuthManager,
QgsAuthMethodConfig,
QgsVectorLayer,
QgsRasterLayer,
)
from qgis.testing import (
start_app,
unittest,
)

try:
QGIS_SERVER_ENDPOINT_PORT = os.environ['QGIS_SERVER_ENDPOINT_PORT']
except:
QGIS_SERVER_ENDPOINT_PORT = '0' # Auto


QGIS_AUTH_DB_DIR_PATH = tempfile.mkdtemp()

os.environ['QGIS_AUTH_DB_DIR_PATH'] = QGIS_AUTH_DB_DIR_PATH

qgis_app = start_app()


class TestAuthManager(unittest.TestCase):

@classmethod
def setUpClass(cls):
"""Run before all tests:
Creates an auth configuration"""
cls.port = QGIS_SERVER_ENDPOINT_PORT
# Clean env just to be sure
env_vars = ['QUERY_STRING', 'QGIS_PROJECT_FILE']
for ev in env_vars:
try:
del os.environ[ev]
except KeyError:
pass
cls.testdata_path = unitTestDataPath('qgis_server') + '/'
cls.project_path = cls.testdata_path + "test_project.qgs"
# Enable auth
#os.environ['QGIS_AUTH_PASSWORD_FILE'] = QGIS_AUTH_PASSWORD_FILE
authm = QgsAuthManager.instance()
assert (authm.setMasterPassword('masterpassword', True))
cls.auth_config = QgsAuthMethodConfig('Basic')
cls.auth_config.setName('test_auth_config')
cls.username = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6))
cls.password = cls.username[::-1] # reversed
cls.auth_config.setConfig('username', cls.username)
cls.auth_config.setConfig('password', cls.password)
assert (authm.storeAuthenticationConfig(cls.auth_config)[0])
cls.hostname = '127.0.0.1'
cls.protocol = 'http'

os.environ['QGIS_SERVER_HTTP_BASIC_AUTH'] = '1'
os.environ['QGIS_SERVER_USERNAME'] = cls.username
os.environ['QGIS_SERVER_PASSWORD'] = cls.password
os.environ['QGIS_SERVER_PORT'] = str(cls.port)
os.environ['QGIS_SERVER_HOST'] = cls.hostname
server_path = os.path.dirname(os.path.realpath(__file__)) + \
'/qgis_wrapped_server.py'
cls.server = subprocess.Popen([sys.executable, server_path],
env=os.environ, stdout=subprocess.PIPE)

line = cls.server.stdout.readline()
cls.port = int(re.findall(b':(\d+)', line)[0])
assert cls.port != 0
# Wait for the server process to start
assert waitServer('%s://%s:%s' % (cls.protocol, cls.hostname, cls.port)), "Server is not responding! '%s://%s:%s" % (cls.protocol, cls.hostname, cls.port)

@classmethod
def tearDownClass(cls):
"""Run after all tests"""
cls.server.terminate()
rmtree(QGIS_AUTH_DB_DIR_PATH)
del cls.server

def setUp(self):
"""Run before each test."""
pass

def tearDown(self):
"""Run after each test."""
pass

@classmethod
def _getWFSLayer(cls, type_name, layer_name=None, authcfg=None):
"""
WFS layer factory
"""
if layer_name is None:
layer_name = 'wfs_' + type_name
parms = {
'srsname': 'EPSG:4326',
'typename': type_name.decode('utf-8').replace(' ', '_'),
'url': '%s://%s:%s/?map=%s' % (cls.protocol, cls.hostname, cls.port, cls.project_path),
'version': '1.0.0',
'table': '',
}
uri = u'%(url)s&SERVICE=WFS&VERSION=%(version)s&REQUEST=GetFeature&TYPENAME=%(typename)s&SRSNAME=%(srsname)s' % parms
if authcfg is not None:
uri = uri + "&authcfg=%s" % authcfg
wfs_layer = QgsVectorLayer(uri, layer_name, 'WFS')
return wfs_layer

@classmethod
def _getWMSLayer(cls, layers, layer_name=None, authcfg=None):
"""
WMS layer factory
"""
if layer_name is None:
layer_name = 'wms_' + layers.replace(',', '')
parms = {
'crs': 'EPSG:4326',
'url': '%s://%s:%s/?map=%s' % (cls.protocol, cls.hostname, cls.port, cls.project_path),
'format': 'image/png',
'layers': quote(layers),
'styles': '',
'version': 'auto',
#'sql': '',
}
if authcfg is not None:
parms.update({'authcfg': authcfg})
uri = '&'.join([("%s=%s" % (k, v.replace('=', '%3D'))) for k, v in list(parms.items())])
wms_layer = QgsRasterLayer(uri, layer_name, 'wms')
return wms_layer

def testValidAuthAccess(self):
"""
Access the HTTP Basic protected layer with valid credentials
"""
wfs_layer = self._getWFSLayer('testlayer èé', authcfg=self.auth_config.id())
self.assertTrue(wfs_layer.isValid())
wms_layer = self._getWMSLayer('testlayer èé', authcfg=self.auth_config.id())
self.assertTrue(wms_layer.isValid())

def testInvalidAuthAccess(self):
"""
Access the HTTP Basic protected layer with no credentials
"""
wfs_layer = self._getWFSLayer('testlayer èé')
self.assertFalse(wfs_layer.isValid())
wms_layer = self._getWMSLayer('testlayer èé')
self.assertFalse(wms_layer.isValid())


if __name__ == '__main__':
unittest.main()

0 comments on commit 6fd3f74

Please sign in to comment.