Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
Merge pull request #4124 from pblottiere/security
add unit tests for sql injection
- Loading branch information
Showing
4 changed files
with
873 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,382 @@ | ||
# -*- coding: utf-8 -*- | ||
"""QGIS Unit tests for server security. | ||
.. note:: This program is free software; you can redistribute it and/or modify | ||
it under the terms of the GNU General Public License as published by | ||
the Free Software Foundation; either version 2 of the License, or | ||
(at your option) any later version. | ||
""" | ||
__author__ = 'Paul Blottiere' | ||
__date__ = '31/01/2017' | ||
__copyright__ = 'Copyright 2017, The QGIS Project' | ||
# This will get replaced with a git SHA1 when you do a git archive | ||
__revision__ = '$Format:%H$' | ||
|
||
from qgis.utils import spatialite_connect | ||
import os | ||
import time | ||
import urllib.parse | ||
from shutil import copyfile | ||
from qgis.core import QgsApplication | ||
from qgis.server import QgsServer | ||
from qgis.testing import unittest | ||
from utilities import unitTestDataPath | ||
|
||
|
||
class TestQgsServerSecurity(unittest.TestCase): | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
cls.testdatapath = unitTestDataPath('qgis_server_security') + '/' | ||
cls.db = os.path.join(cls.testdatapath, 'db.sqlite') | ||
cls.db_clone = os.path.join(cls.testdatapath, 'db_clone.sqlite') | ||
cls.project = os.path.join(cls.testdatapath, 'project.qgs') | ||
cls.app = QgsApplication([], False) | ||
|
||
@classmethod | ||
def tearDownClass(cls): | ||
cls.app.exitQgis() | ||
try: | ||
os.remove(cls.db_clone) | ||
except OSError: | ||
pass | ||
|
||
def setUp(self): | ||
self.server = QgsServer() | ||
copyfile(self.db, self.db_clone) | ||
|
||
def test_wms_getfeatureinfo_filter_and_based_blind(self): | ||
""" | ||
And-based blind attack to check the kind of database currently used (if | ||
the result is valid for the point nammed 'b', then sqlite_version() | ||
function exist). | ||
But does not work because of the whitelist. | ||
If you remove the safety check, this is a valid injection. | ||
""" | ||
|
||
filter_sql = "point:\"name\" = 'b'" | ||
injection_sql = ") and (select sqlite_version()" | ||
|
||
query = "{0} {1}".format(filter_sql, injection_sql) | ||
d, h = self.handle_request_wms_getfeatureinfo(query) | ||
|
||
self.assertFalse(b"name = 'b'" in d) | ||
|
||
def test_wms_getfeatureinfo_filter_time_based_blind(self): | ||
""" | ||
Time-based blind to check the current version of database. If the | ||
server is too long to respond, then we have the answer! | ||
But it does not work because of the whitelist. | ||
If you remove the safety check, this is a valid injection. | ||
""" | ||
|
||
# first step, retrieve the version of sqlite by a regular way | ||
conn = spatialite_connect(self.db_clone) | ||
cur = conn.cursor() | ||
sql = "select sqlite_version()" | ||
sqlite_version = '' | ||
for row in cur.execute(sql): | ||
sqlite_version = row[0] | ||
conn.close() | ||
|
||
# second step, check the time of response for an invalid version | ||
filter_sql = "point:\"name\" = 'b'" | ||
injection_sql = ") and (select case sqlite_version() when '0.0.0' then substr(upper(hex(randomblob(99999999))),0,1) end)--" | ||
|
||
query = "{0} {1}".format(filter_sql, injection_sql) | ||
start = time.time() | ||
d, h = self.handle_request_wms_getfeatureinfo(query) | ||
duration_invalid_version = time.time() - start | ||
|
||
# third step, check the time of response for a valid version | ||
# maximum: several seconds | ||
injection_sql = ") and (select case sqlite_version() when '{0}' then substr(upper(hex(randomblob(99999999))),0,1) end)--".format(sqlite_version) | ||
|
||
query = "{0} {1}".format(filter_sql, injection_sql) | ||
start = time.time() | ||
d, h = self.handle_request_wms_getfeatureinfo(query) | ||
duration_valid_version = time.time() - start | ||
|
||
# compare duration. On my computer when safety check is deactivated: | ||
# duration_invalid_version: 0.012360334396362305 | ||
# duration_valid_version: 2.8810460567474365 | ||
self.assertAlmostEqual(duration_valid_version, duration_invalid_version, delta=0.5) | ||
|
||
def test_wms_getfeatureinfo_filter_stacked(self): | ||
""" | ||
The aim is to execute some staked queries. Here the 'drop' function is | ||
used but it could be done with create, insert, ... | ||
But the filter string is split thanks to the semicolon so it seems | ||
totally ignored whatever the query is (even without the safety check). | ||
""" | ||
|
||
filter_sql = "point:\"name\" = 'fake'" | ||
injection_sql = "); drop table point" | ||
|
||
query = "{0} {1}".format(filter_sql, injection_sql) | ||
d, h = self.handle_request_wms_getfeatureinfo(query) | ||
|
||
self.assertTrue(self.is_point_table_still_exist()) | ||
|
||
def test_wms_getfeatureinfo_filter_union_0(self): | ||
""" | ||
The aim is to retrieve name of tables within the database (like | ||
'SpatialIndex'). | ||
But the whitelist blocks this request because of invalid tokens. | ||
If you remove the safety check, this is a valid injection. | ||
""" | ||
|
||
filter_sql = "point:\"name\" = 'fake'" | ||
injection_sql = ") union select 1,1,name,1,1 from sqlite_master where type = \"table\" order by name--" | ||
|
||
query = "{0} {1}".format(filter_sql, injection_sql) | ||
d, h = self.handle_request_wms_getfeatureinfo(query) | ||
|
||
self.assertFalse(b'SpatialIndex' in d) | ||
|
||
def test_wms_getfeatureinfo_filter_union_1(self): | ||
""" | ||
The aim is to retrieve data from an excluded layer. | ||
But the whitelist blocks this request because of invalid tokens. | ||
If you remove the safety check, this is a valid injection. | ||
""" | ||
|
||
filter_sql = "point:\"name\" = 'fake'" | ||
injection_sql = ") union select 1,1,* from aoi--" | ||
|
||
query = "{0} {1}".format(filter_sql, injection_sql) | ||
d, h = self.handle_request_wms_getfeatureinfo(query) | ||
|
||
self.assertFalse(b'private_value' in d) | ||
|
||
def test_wms_getfeatureinfo_filter_unicode(self): | ||
""" | ||
The aim is to send some invalid token in unicode to bypass the | ||
whitelist. | ||
But unicode is interpreted and checked by the safety function. | ||
""" | ||
|
||
# %3B -> semicolon | ||
filter_sql = "point:\"name\" = 'fake %3B'" | ||
d, h = self.handle_request_wms_getfeatureinfo(filter_sql) | ||
self.assertTrue(self.check_service_exception_report(d)) | ||
|
||
def test_wms_getfeatureinfo_filter_patternmatching(self): | ||
""" | ||
The aim is to retrieve the table's name thanks to pattern matching. | ||
If you remove the safety check, this is a valid injection. | ||
""" | ||
|
||
filter_sql = "point:\"name\" = 'b'" | ||
injection_sql = "or ( select name from sqlite_master where type='table' and name like '{0}') != ''" | ||
query = "{0} {1}".format(filter_sql, injection_sql) | ||
|
||
# there's no table named as 'az%' | ||
name = "az%" | ||
sql = query.format(name) | ||
d, h = self.handle_request_wms_getfeatureinfo(sql) | ||
# self.assertTrue(b"name = 'b'" in d) #true if sanity check deactivated | ||
self.assertTrue(self.check_service_exception_report(d)) | ||
|
||
# a table named as 'ao%' exist | ||
name = "ao%" | ||
sql = query.format(name) | ||
d, h = self.handle_request_wms_getfeatureinfo(sql) | ||
# self.assertTrue(b"name = 'a'" in d) #true if sanity check deactivated | ||
self.assertTrue(self.check_service_exception_report(d)) | ||
|
||
# a table named as 'aoi' exist | ||
name = "aoi" | ||
sql = query.format(name) | ||
d, h = self.handle_request_wms_getfeatureinfo(sql) | ||
# self.assertTrue(b"name = 'a'" in d) #true if sanity check deactivated | ||
self.assertTrue(self.check_service_exception_report(d)) | ||
|
||
def test_wms_getfeatureinfo_filter_whitelist(self): | ||
""" | ||
The aim is to check that some tokens cannot pass the safety check | ||
whatever their positions in the filter string. | ||
""" | ||
|
||
# create | ||
filter_sql = "point:\"name\" = 'a'create" | ||
d, h = self.handle_request_wms_getfeatureinfo(filter_sql) | ||
self.assertTrue(self.check_service_exception_report(d)) | ||
|
||
filter_sql = "point:\"name\" = 'a' create" | ||
d, h = self.handle_request_wms_getfeatureinfo(filter_sql) | ||
self.assertTrue(self.check_service_exception_report(d)) | ||
|
||
# invalid token and escape single quote | ||
filter_sql = "point:\"name\" = 'a\\'create" | ||
d, h = self.handle_request_wms_getfeatureinfo(filter_sql) | ||
self.assertTrue(self.check_service_exception_report(d)) | ||
|
||
# drop | ||
filter_sql = "point:\"name\" = 'a' drop" | ||
d, h = self.handle_request_wms_getfeatureinfo(filter_sql) | ||
self.assertTrue(self.check_service_exception_report(d)) | ||
|
||
# select | ||
filter_sql = "point:\"name\" = 'a' select" | ||
d, h = self.handle_request_wms_getfeatureinfo(filter_sql) | ||
self.assertTrue(self.check_service_exception_report(d)) | ||
|
||
# comments | ||
filter_sql = "point:\"name\" = 'a' #" | ||
d, h = self.handle_request_wms_getfeatureinfo(filter_sql) | ||
self.assertTrue(self.check_service_exception_report(d)) | ||
|
||
filter_sql = "point:\"name\" = 'a' -" | ||
d, h = self.handle_request_wms_getfeatureinfo(filter_sql) | ||
self.assertTrue(self.check_service_exception_report(d)) | ||
|
||
def test_wfs_getfeature_filter_stacked(self): | ||
""" | ||
The aim is to execute some staked queries within the 'Literal' | ||
and 'PropertyName' field. Here the 'drop' function is used but it | ||
could be done with create, insert, ... | ||
But due to the implementation, these filters are not resolved on | ||
database side but in server side with QgsExpression. So, there's no | ||
'WHERE' clause and the query never reach the database. By the way, | ||
it's exactly the same thing whatever the kind of attacks and for | ||
the EXP_FILTER parameter too (filter described with QgsExpression). | ||
It's typically the kind of SQL injection which has been fixed in | ||
mapserver several years ago: | ||
https://trac.osgeo.org/mapserver/ticket/3874 | ||
""" | ||
|
||
# ogc:Literal / ogc:PropertyIsEqualTo | ||
literal = "4')); drop table point --" | ||
filter_xml = "<ogc:Filter%20xmlns:ogc=\"http://www.opengis.net/ogc\"><ogc:PropertyIsEqualTo><ogc:PropertyName>pkuid</ogc:PropertyName><ogc:Literal>{0}</ogc:Literal></ogc:PropertyIsEqualTo></ogc:Filter>".format(literal) | ||
self.handle_request_wfs_getfeature_filter(filter_xml) | ||
self.assertTrue(self.is_point_table_still_exist()) | ||
|
||
# ogc:Literal / ogc:PropertyIsLike | ||
literal = "4')); drop table point --" | ||
filter_xml = "<ogc:Filter%20xmlns:ogc=\"http://www.opengis.net/ogc\"><ogc:PropertyIsLike><ogc:PropertyName>pkuid</ogc:PropertyName><ogc:Literal>{0}</ogc:Literal></ogc:PropertyIsLike></ogc:Filter>".format(literal) | ||
self.handle_request_wfs_getfeature_filter(filter_xml) | ||
self.assertTrue(self.is_point_table_still_exist()) | ||
|
||
# ogc:PropertyName / ogc:PropertyIsLike | ||
propname = "name = 'a')); drop table point --" | ||
filter_xml = "<ogc:Filter%20xmlns:ogc=\"http://www.opengis.net/ogc\"><ogc:PropertyIsLike><ogc:PropertyName>{0}</ogc:PropertyName><ogc:Literal>4</ogc:Literal></ogc:PropertyIsLike></ogc:Filter>".format(propname) | ||
self.handle_request_wfs_getfeature_filter(filter_xml) | ||
self.assertTrue(self.is_point_table_still_exist()) | ||
|
||
def test_wms_getmap_sld_stacked(self): | ||
""" | ||
The aim is to execute some staked queries within the 'Literal' | ||
and 'PropertyName' field. Here the 'drop' function is used but it | ||
could be done with create, insert, ... | ||
However it's not working because special characters are duplicated. For | ||
example, with 'Literal' as "4')); drop table point --", the underlying | ||
query is: | ||
SELECT .... AND (("pkuid" = '4'')); drop table point --')) | ||
""" | ||
|
||
literal = "4')); drop table point --" | ||
sld = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><StyledLayerDescriptor xmlns=\"http://www.opengis.net/sld\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:ogc=\"http://www.opengis.net/ogc\" xsi:schemaLocation=\"http://www.opengis.net/sld http://schemas.opengis.net/sld/1.1.0/StyledLayerDescriptor.xsd\" version=\"1.1.0\" xmlns:se=\"http://www.opengis.net/se\" xmlns:xlink=\"http://www.w3.org/1999/xlink\"> <NamedLayer> <se:Name>point</se:Name> <UserStyle> <se:Name>point</se:Name> <se:FeatureTypeStyle> <se:Rule> <se:Name>Single symbol</se:Name> <ogc:Filter xmlns:ogc=\"http://www.opengis.net/ogc\"> <ogc:PropertyIsEqualTo> <ogc:PropertyName>pkuid</ogc:PropertyName> <ogc:Literal>{0}</ogc:Literal> </ogc:PropertyIsEqualTo> </ogc:Filter> <se:PointSymbolizer> <se:Graphic> <se:Mark> <se:WellKnownName>circle</se:WellKnownName> <se:Fill><se:SvgParameter name=\"fill\">5e86a1</se:SvgParameter></se:Fill><se:Stroke><se:SvgParameter name=\"stroke\">000000</se:SvgParameter></se:Stroke></se:Mark><se:Size>7</se:Size></se:Graphic></se:PointSymbolizer></se:Rule></se:FeatureTypeStyle></UserStyle></NamedLayer></StyledLayerDescriptor>".format(literal) | ||
self.handle_request_wms_getmap(sld) | ||
self.assertTrue(self.is_point_table_still_exist()) | ||
|
||
def check_service_exception_report(self, d): | ||
""" | ||
Return True if a ServiceExceptionReport is raised, False otherwise | ||
""" | ||
|
||
if b'<ServiceExceptionReport' in d: | ||
return True | ||
else: | ||
return False | ||
|
||
def handle_request_wfs_getfeature_filter(self, filter_xml): | ||
qs = "&".join(["%s=%s" % i for i in list({ | ||
"MAP": urllib.parse.quote(self.project), | ||
"SERVICE": "WFS", | ||
"VERSION": "1.1.1", | ||
"REQUEST": "GetFeature", | ||
"TYPENAME": "point", | ||
"STYLES": "", | ||
"CRS": "EPSG:32613", | ||
"FILTER": filter_xml}.items())]) | ||
|
||
return self.server.handleRequest(qs) | ||
|
||
def handle_request_wms_getfeatureinfo(self, filter_sql): | ||
qs = "&".join(["%s=%s" % i for i in list({ | ||
"MAP": urllib.parse.quote(self.project), | ||
"SERVICE": "WMS", | ||
"VERSION": "1.1.1", | ||
"REQUEST": "GetFeatureInfo", | ||
"QUERY_LAYERS": "point", | ||
"LAYERS": "point", | ||
"STYLES": "", | ||
"FORMAT": "image/png", | ||
"HEIGHT": "500", | ||
"WIDTH": "500", | ||
"BBOX": "606171,4822867,612834,4827375", | ||
"CRS": "EPSG:32613", | ||
"FILTER": filter_sql}.items())]) | ||
|
||
return self._result(self.server.handleRequest(qs)) | ||
|
||
def handle_request_wms_getmap(self, sld): | ||
qs = "&".join(["%s=%s" % i for i in list({ | ||
"MAP": urllib.parse.quote(self.project), | ||
"SERVICE": "WMS", | ||
"VERSION": "1.0.0", | ||
"REQUEST": "GetMap", | ||
"QUERY_LAYERS": "point", | ||
"LAYERS": "point", | ||
"STYLES": "", | ||
"FORMAT": "image/png", | ||
"HEIGHT": "500", | ||
"WIDTH": "500", | ||
"BBOX": "606171,4822867,612834,4827375", | ||
"CRS": "EPSG:32613", | ||
"SLD": sld}.items())]) | ||
|
||
return self._result(self.server.handleRequest(qs)) | ||
|
||
def is_point_table_still_exist(self): | ||
conn = spatialite_connect(self.db_clone) | ||
cur = conn.cursor() | ||
sql = "select * from point" | ||
point_table_exist = True | ||
try: | ||
cur.execute(sql) | ||
except: | ||
point_table_exist = False | ||
conn.close() | ||
|
||
return point_table_exist | ||
|
||
def _result(self, data): | ||
headers = {} | ||
for line in data[0].decode('UTF-8').split("\n"): | ||
if line != "": | ||
header = line.split(":") | ||
self.assertEqual(len(header), 2, line) | ||
headers[str(header[0])] = str(header[1]).strip() | ||
|
||
return data[1], headers | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |
Binary file not shown.
Oops, something went wrong.