Skip to content

Commit

Permalink
Expose and use a simpler scopedTableBackup() method
Browse files Browse the repository at this point in the history
  • Loading branch information
strk committed Oct 13, 2021
1 parent 2b8a3fd commit 368cb40
Showing 1 changed file with 30 additions and 48 deletions.
78 changes: 30 additions & 48 deletions tests/src/python/test_provider_postgres.py
Expand Up @@ -99,27 +99,28 @@ def execSQLCommand(self, sql):
cur.close()
self.con.commit()

def backupTable(self, schema, table):
self.execSQLCommand('DROP TABLE IF EXISTS {s}.{t}_edit CASCADE'.format(s=schema, t=table))
self.execSQLCommand('CREATE TABLE {s}.{t}_edit AS SELECT * FROM {s}.{t}'.format(s=schema, t=table))

def restoreTable(self, schema, table):
self.execSQLCommand('TRUNCATE TABLE {s}.{t}'.format(s=schema, t=table))
self.execSQLCommand('INSERT INTO {s}.{t} SELECT * FROM {s}.{t}_edit'.format(s=schema, t=table))
self.execSQLCommand('DROP TABLE {s}.{t}_edit'.format(s=schema, t=table))

class TableBackup():
def __init__(self, tester, schema, table):
self.schema = schema
self.table = table
self.tester = tester
tester.execSQLCommand('DROP TABLE IF EXISTS {s}.{t}_edit CASCADE'.format(s=schema, t=table))
tester.execSQLCommand('CREATE TABLE {s}.{t}_edit AS SELECT * FROM {s}.{t}'.format(s=schema, t=table))

def __del__(self):
self.tester.execSQLCommand('TRUNCATE TABLE {s}.{t}'.format(s=self.schema, t=self.table))
self.tester.execSQLCommand('INSERT INTO {s}.{t} SELECT * FROM {s}.{t}_edit'.format(s=self.schema, t=self.table))
self.tester.execSQLCommand('DROP TABLE {s}.{t}_edit'.format(s=self.schema, t=self.table))
# Create instances of this class for scoped backups,
# example:
#
# backup1 = self.scopedTableBackup('qgis_test', 'datatable1')
# backup2 = self.scopedTableBackup('qgis_test', 'datatable2')
#
def scopedTableBackup(self, schema, table):

class ScopedBackup():
def __init__(self, tester, schema, table):
self.schema = schema
self.table = table
self.tester = tester
tester.execSQLCommand('DROP TABLE IF EXISTS {s}.{t}_edit CASCADE'.format(s=schema, t=table))
tester.execSQLCommand('CREATE TABLE {s}.{t}_edit AS SELECT * FROM {s}.{t}'.format(s=schema, t=table))

def __del__(self):
self.tester.execSQLCommand('TRUNCATE TABLE {s}.{t}'.format(s=self.schema, t=self.table))
self.tester.execSQLCommand('INSERT INTO {s}.{t} SELECT * FROM {s}.{t}_edit'.format(s=self.schema, t=self.table))
self.tester.execSQLCommand('DROP TABLE {s}.{t}_edit'.format(s=self.schema, t=self.table))

return ScopedBackup(self, schema, table)

def getSource(self):
# create temporary table for edit tests
Expand Down Expand Up @@ -669,7 +670,7 @@ def testNonPkBigintField(self):
flds = vl.fields()

# Backup test table (will be edited)
self.backupTable('qgis_test', 'bigint_pk')
scopedBackup = self.scopedTableBackup('qgis_test', 'bigint_pk')

# check if default values are correctly read back
f = next(vl.getFeatures(QgsFeatureRequest()))
Expand Down Expand Up @@ -722,9 +723,6 @@ def testNonPkBigintField(self):
self.assertEqual(f['bigint_attribute'], 84)
self.assertEqual(f['bigint_attribute_def'], 42)

# Restore test table
self.restoreTable('qgis_test', 'bigint_pk')

def testPktUpdateBigintPk(self):
"""Test if we can update objects with positive, zero and negative bigint PKs."""
vl = QgsVectorLayer(
Expand All @@ -734,7 +732,7 @@ def testPktUpdateBigintPk(self):
flds = vl.fields()

# Backup test table (will be edited)
self.backupTable('qgis_test', 'bigint_pk')
scopedBackup = self.scopedTableBackup('qgis_test', 'bigint_pk')

self.assertTrue(vl.isValid())

Expand Down Expand Up @@ -779,9 +777,6 @@ def testPktUpdateBigintPk(self):
statuses[3] = 1
self.assertTrue(all(x == 1 for x in statuses))

# Restore test table
self.restoreTable('qgis_test', 'bigint_pk')

def testPktUpdateBigintPkNonFirst(self):
"""Test if we can update objects with positive, zero and negative bigint PKs in tables whose PK is not the first field"""
vl = QgsVectorLayer('{} sslmode=disable srid=4326 key="pk" table="qgis_test".{} (geom)'.format(self.dbconn,
Expand All @@ -794,7 +789,7 @@ def testPktUpdateBigintPkNonFirst(self):
vl.startEditing()

# Backup test table (will be edited)
self.backupTable('qgis_test', 'bigint_non_first_pk')
scopedBackup = self.scopedTableBackup('qgis_test', 'bigint_non_first_pk')

statuses = [-1, -1, -1, -1]
values = ['first value', 'second value', 'zero value', 'negative value']
Expand Down Expand Up @@ -839,9 +834,6 @@ def testPktUpdateBigintPkNonFirst(self):
for i in range(len(statuses)):
self.assertEqual(statuses[i], 1, 'changed value "{}" not found'.format(newvalues[i]))

# Restore test table
self.restoreTable('qgis_test', 'bigint_non_first_pk')

def testPktComposite(self):
"""
Check that tables with PKs composed of many fields of different types are correctly read and written to
Expand All @@ -859,7 +851,7 @@ def testPktComposite(self):
self.assertEqual(f['value'], 'test 2')

# Backup test table (will be edited)
self.backupTable('qgis_test', 'tb_test_compound_pk')
scopedBackup = self.scopedTableBackup('qgis_test', 'tb_test_compound_pk')

# can we edit a field?
vl.startEditing()
Expand Down Expand Up @@ -910,9 +902,6 @@ def testPktComposite(self):

self.assertFalse(got_feature)

# Restore test table
self.restoreTable('qgis_test', 'tb_test_compound_pk')

def testPktCompositeFloat(self):
"""
Check that tables with PKs composed of many fields of different types are correctly read and written to
Expand All @@ -932,7 +921,7 @@ def testPktCompositeFloat(self):
self.assertEqual(f['value'], 'test 2')

# Backup test table (will be edited)
self.backupTable('qgis_test', 'tb_test_composite_float_pk')
scopedBackup = self.scopedTableBackup('qgis_test', 'tb_test_composite_float_pk')

# can we edit a field?
vl.startEditing()
Expand Down Expand Up @@ -991,17 +980,14 @@ def testPktCompositeFloat(self):

self.assertFalse(got_feature)

# Restore test table
self.restoreTable('qgis_test', 'tb_test_composite_float_pk')

def testPktFloatingPoint(self):
"""
Check if we can handle floating point/numeric primary keys correctly
"""

# 0. Backup test table (will be edited)
self.backupTable('qgis_test', 'tb_test_float_pk')
self.backupTable('qgis_test', 'tb_test_double_pk')
scopedBackup1 = self.scopedTableBackup('qgis_test', 'tb_test_float_pk')
scopedBackup2 = self.scopedTableBackup('qgis_test', 'tb_test_double_pk')

# 1. 32 bit float (PostgreSQL "REAL" type)
vl = QgsVectorLayer(self.dbconn + ' sslmode=disable srid=4326 key="pk" table="qgis_test"."tb_test_float_pk" (geom)', "test_float_pk", "postgres")
Expand Down Expand Up @@ -1118,10 +1104,6 @@ def testPktFloatingPoint(self):
# no NUMERIC/DECIMAL checks here. NUMERIC primary keys are unsupported.
# TODO: implement NUMERIC primary keys/arbitrary precision arithmethics/fixed point math in QGIS.

# Restore test tables
self.restoreTable('qgis_test', 'tb_test_float_pk')
self.restoreTable('qgis_test', 'tb_test_double_pk')

def testPktMapInsert(self):
vl = QgsVectorLayer('{} table="qgis_test"."{}" key="obj_id" sql='.format(self.dbconn, 'oid_serial_table'),
"oid_serial", "postgres")
Expand Down Expand Up @@ -1462,7 +1444,7 @@ def testJson(self):
self.assertTrue(vl.isValid())

# Backup test table (will be edited)
tableBackup = self.TableBackup(self, 'qgis_test', 'json')
tableBackup = self.scopedTableBackup('qgis_test', 'json')

attrs = (
123,
Expand Down

0 comments on commit 368cb40

Please sign in to comment.