Skip to content

Commit 9cc3576

Browse files
committedJan 6, 2014
Added connection pool for postgres
1 parent cae352b commit 9cc3576

File tree

6 files changed

+300
-23
lines changed

6 files changed

+300
-23
lines changed
 

‎src/providers/postgres/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
SET(PG_SRCS
66
qgspostgresprovider.cpp
77
qgspostgresconn.cpp
8+
qgspostgresconnpool.cpp
89
qgspostgresdataitems.cpp
910
qgspostgresfeatureiterator.cpp
1011
qgspgsourceselect.cpp
@@ -15,6 +16,7 @@ SET(PG_SRCS
1516
SET(PG_MOC_HDRS
1617
qgspostgresprovider.h
1718
qgspostgresconn.h
19+
qgspostgresconnpool.h
1820
qgspostgresdataitems.h
1921
qgspgsourceselect.h
2022
qgspgnewconnection.h

‎src/providers/postgres/qgspostgresconn.cpp

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
#include <netinet/in.h>
3434
#endif
3535

36-
//#define POSTGRES_SHARED_CONNECTIONS
3736

3837
QgsPostgresResult::~QgsPostgresResult()
3938
{
@@ -124,46 +123,50 @@ Oid QgsPostgresResult::PQoidValue()
124123
return ::PQoidValue( mRes );
125124
}
126125

126+
127127
QMap<QString, QgsPostgresConn *> QgsPostgresConn::sConnectionsRO;
128128
QMap<QString, QgsPostgresConn *> QgsPostgresConn::sConnectionsRW;
129129
const int QgsPostgresConn::sGeomTypeSelectLimit = 100;
130130

131-
QgsPostgresConn *QgsPostgresConn::connectDb( QString conninfo, bool readonly )
131+
QgsPostgresConn *QgsPostgresConn::connectDb( QString conninfo, bool readonly, bool shared )
132132
{
133-
#ifdef POSTGRES_SHARED_CONNECTIONS
134133
QMap<QString, QgsPostgresConn *> &connections =
135134
readonly ? QgsPostgresConn::sConnectionsRO : QgsPostgresConn::sConnectionsRW;
136135

137-
if ( connections.contains( conninfo ) )
136+
if ( shared )
138137
{
139-
QgsDebugMsg( QString( "Using cached connection for %1" ).arg( conninfo ) );
140-
connections[conninfo]->mRef++;
141-
return connections[conninfo];
138+
if ( connections.contains( conninfo ) )
139+
{
140+
QgsDebugMsg( QString( "Using cached connection for %1" ).arg( conninfo ) );
141+
connections[conninfo]->mRef++;
142+
return connections[conninfo];
143+
}
142144
}
143-
#endif
144145

145-
QgsPostgresConn *conn = new QgsPostgresConn( conninfo, readonly );
146+
QgsPostgresConn *conn = new QgsPostgresConn( conninfo, readonly, shared );
146147

147148
if ( conn->mRef == 0 )
148149
{
149150
delete conn;
150151
return 0;
151152
}
152153

153-
#ifdef POSTGRES_SHARED_CONNECTIONS
154-
connections.insert( conninfo, conn );
155-
#endif
154+
if ( shared )
155+
{
156+
connections.insert( conninfo, conn );
157+
}
156158

157159
return conn;
158160
}
159161

160-
QgsPostgresConn::QgsPostgresConn( QString conninfo, bool readOnly )
162+
QgsPostgresConn::QgsPostgresConn( QString conninfo, bool readOnly, bool shared )
161163
: mRef( 1 )
162164
, mOpenCursors( 0 )
163165
, mConnInfo( conninfo )
164166
, mGotPostgisVersion( false )
165167
, mReadOnly( readOnly )
166168
, mNextCursorId( 0 )
169+
, mShared( shared )
167170
{
168171
QgsDebugMsg( QString( "New PostgreSQL connection for " ) + conninfo );
169172

@@ -268,14 +271,15 @@ void QgsPostgresConn::disconnect()
268271
if ( --mRef > 0 )
269272
return;
270273

271-
#ifdef POSTGRES_SHARED_CONNECTIONS
272-
QMap<QString, QgsPostgresConn *>& connections = mReadOnly ? sConnectionsRO : sConnectionsRW;
274+
if ( mShared )
275+
{
276+
QMap<QString, QgsPostgresConn *>& connections = mReadOnly ? sConnectionsRO : sConnectionsRW;
273277

274-
QString key = connections.key( this, QString::null );
278+
QString key = connections.key( this, QString::null );
275279

276-
Q_ASSERT( !key.isNull() );
277-
connections.remove( key );
278-
#endif
280+
Q_ASSERT( !key.isNull() );
281+
connections.remove( key );
282+
}
279283

280284
delete this;
281285
}

‎src/providers/postgres/qgspostgresconn.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,12 @@ class QgsPostgresResult
155155
PGresult *mRes;
156156
};
157157

158+
158159
class QgsPostgresConn : public QObject
159160
{
160161
Q_OBJECT;
161162
public:
162-
static QgsPostgresConn *connectDb( QString connInfo, bool readOnly );
163+
static QgsPostgresConn *connectDb( QString connInfo, bool readOnly, bool shared = true );
163164
void disconnect();
164165

165166
//! get postgis version string
@@ -269,7 +270,7 @@ class QgsPostgresConn : public QObject
269270
static void deleteConnection( QString theConnName );
270271

271272
private:
272-
QgsPostgresConn( QString conninfo, bool readOnly );
273+
QgsPostgresConn( QString conninfo, bool readOnly, bool shared );
273274
~QgsPostgresConn();
274275

275276
int mRef;
@@ -332,6 +333,9 @@ class QgsPostgresConn : public QObject
332333
void deduceEndian();
333334

334335
int mNextCursorId;
336+
337+
bool mShared; //! < whether the connection is shared by more providers (must not be if going to be used in worker threads)
335338
};
336339

340+
337341
#endif
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/***************************************************************************
2+
qgspostgresconnpool.cpp
3+
---------------------
4+
begin : January 2014
5+
copyright : (C) 2014 by Martin Dobias
6+
email : wonder dot sk at gmail dot com
7+
***************************************************************************
8+
* *
9+
* This program is free software; you can redistribute it and/or modify *
10+
* it under the terms of the GNU General Public License as published by *
11+
* the Free Software Foundation; either version 2 of the License, or *
12+
* (at your option) any later version. *
13+
* *
14+
***************************************************************************/
15+
16+
#include "qgspostgresconnpool.h"
17+
18+
#include "qgspostgresconn.h"
19+
20+
#include <QCoreApplication>
21+
22+
#define POSTGRES_MAX_CONCURRENT_CONNS 4
23+
#define POSTGRES_CONN_EXPIRATION 10 // in seconds
24+
25+
26+
27+
QgsPostgresConnPoolGroup::QgsPostgresConnPoolGroup( const QString& ci )
28+
: connInfo( ci )
29+
, sem( POSTGRES_MAX_CONCURRENT_CONNS )
30+
, expirationTimer( this )
31+
{
32+
// just to make sure the object belongs to main thread and thus will get events
33+
moveToThread( qApp->thread() );
34+
35+
expirationTimer.setInterval( POSTGRES_CONN_EXPIRATION * 1000 );
36+
connect( &expirationTimer, SIGNAL( timeout() ), this, SLOT( handleConnectionExpired() ) );
37+
}
38+
39+
40+
QgsPostgresConnPoolGroup::~QgsPostgresConnPoolGroup()
41+
{
42+
foreach ( Item item, conns )
43+
item.c->disconnect();
44+
}
45+
46+
47+
QgsPostgresConn* QgsPostgresConnPoolGroup::acquire()
48+
{
49+
// we are going to acquire a resource - if no resource is available, we will block here
50+
sem.acquire();
51+
52+
// quick (preferred) way - use cached connection
53+
{
54+
QMutexLocker locker( &connMutex );
55+
56+
if ( !conns.isEmpty() )
57+
{
58+
Item i = conns.pop();
59+
60+
// no need to run if nothing can expire
61+
if ( conns.isEmpty() )
62+
expirationTimer.stop();
63+
64+
return i.c;
65+
}
66+
}
67+
68+
QgsPostgresConn* c = QgsPostgresConn::connectDb( connInfo, true, false ); // TODO: read-only
69+
if ( !c )
70+
{
71+
// we didn't get connection for some reason, so release the lock
72+
sem.release();
73+
return 0;
74+
}
75+
76+
return c;
77+
}
78+
79+
80+
void QgsPostgresConnPoolGroup::release( QgsPostgresConn* conn )
81+
{
82+
connMutex.lock();
83+
Item i;
84+
i.c = conn;
85+
i.lastUsedTime = QTime::currentTime();
86+
conns.push( i );
87+
88+
if ( !expirationTimer.isActive() )
89+
expirationTimer.start();
90+
91+
connMutex.unlock();
92+
93+
sem.release(); // this can unlock a thread waiting in acquire()
94+
}
95+
96+
97+
void QgsPostgresConnPoolGroup::handleConnectionExpired()
98+
{
99+
connMutex.lock();
100+
101+
QTime now = QTime::currentTime();
102+
103+
// what connections have expired?
104+
QList<int> toDelete;
105+
for ( int i = 0; i < conns.count(); ++i )
106+
{
107+
if ( conns.at( i ).lastUsedTime.secsTo( now ) >= POSTGRES_CONN_EXPIRATION )
108+
toDelete.append( i );
109+
}
110+
111+
// delete expired connections
112+
for ( int j = toDelete.count() - 1; j >= 0; --j )
113+
{
114+
int index = toDelete[j];
115+
conns[index].c->disconnect();
116+
conns.remove( index );
117+
}
118+
119+
if ( conns.isEmpty() )
120+
expirationTimer.stop();
121+
122+
connMutex.unlock();
123+
}
124+
125+
126+
// ----
127+
128+
129+
QgsPostgresConnPool* QgsPostgresConnPool::mInstance = 0;
130+
131+
132+
QgsPostgresConnPool* QgsPostgresConnPool::instance()
133+
{
134+
if ( !mInstance )
135+
mInstance = new QgsPostgresConnPool;
136+
return mInstance;
137+
}
138+
139+
140+
QgsPostgresConn* QgsPostgresConnPool::acquireConnection( const QString& connInfo )
141+
{
142+
mMutex.lock();
143+
QgsPostgresConnPoolGroups::iterator it = mGroups.find( connInfo );
144+
if ( it == mGroups.end() )
145+
{
146+
it = mGroups.insert( connInfo, new QgsPostgresConnPoolGroup( connInfo ) );
147+
}
148+
QgsPostgresConnPoolGroup* group = *it;
149+
mMutex.unlock();
150+
151+
return group->acquire();
152+
}
153+
154+
155+
void QgsPostgresConnPool::releaseConnection( QgsPostgresConn* conn )
156+
{
157+
mMutex.lock();
158+
QgsPostgresConnPoolGroups::iterator it = mGroups.find( conn->connInfo() );
159+
Q_ASSERT( it != mGroups.end() );
160+
QgsPostgresConnPoolGroup* group = *it;
161+
mMutex.unlock();
162+
163+
group->release( conn );
164+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/***************************************************************************
2+
qgspostgresconnpool.h
3+
---------------------
4+
begin : January 2014
5+
copyright : (C) 2014 by Martin Dobias
6+
email : wonder dot sk at gmail dot com
7+
***************************************************************************
8+
* *
9+
* This program is free software; you can redistribute it and/or modify *
10+
* it under the terms of the GNU General Public License as published by *
11+
* the Free Software Foundation; either version 2 of the License, or *
12+
* (at your option) any later version. *
13+
* *
14+
***************************************************************************/
15+
16+
#ifndef QGSPOSTGRESCONNPOOL_H
17+
#define QGSPOSTGRESCONNPOOL_H
18+
19+
#include <QMap>
20+
#include <QMutex>
21+
#include <QSemaphore>
22+
#include <QStack>
23+
#include <QTime>
24+
#include <QTimer>
25+
26+
class QgsPostgresConn;
27+
28+
29+
//! stores data related to one server
30+
class QgsPostgresConnPoolGroup : public QObject
31+
{
32+
Q_OBJECT
33+
public:
34+
35+
static const int maxConcurrentConnections;
36+
37+
struct Item
38+
{
39+
QgsPostgresConn* c;
40+
QTime lastUsedTime;
41+
};
42+
43+
QgsPostgresConnPoolGroup( const QString& ci );
44+
~QgsPostgresConnPoolGroup();
45+
46+
QgsPostgresConn* acquire();
47+
48+
void release( QgsPostgresConn* conn );
49+
50+
protected slots:
51+
void handleConnectionExpired();
52+
53+
protected:
54+
Q_DISABLE_COPY(QgsPostgresConnPoolGroup)
55+
56+
QString connInfo;
57+
QStack<Item> conns;
58+
QMutex connMutex;
59+
QSemaphore sem;
60+
QTimer expirationTimer;
61+
};
62+
63+
typedef QMap<QString, QgsPostgresConnPoolGroup*> QgsPostgresConnPoolGroups;
64+
65+
/**
66+
* Class responsible for keeping a pool of open connections to PostgreSQL servers.
67+
* This is desired to avoid the overhead of creation of new connection everytime.
68+
*
69+
* The class is a singleton. The methods are thread safe.
70+
*
71+
* The connection pool has a limit on maximum number of concurrent connections
72+
* (per server), once the limit is reached, the acquireConnection() function
73+
* will block. All connections that have been acquired must be then released
74+
* with releaseConnection() function.
75+
*
76+
* When the connections are not used for some time, they will get closed automatically
77+
* to save resources.
78+
*
79+
* \todo Make the connection pool available also for read-write connections.
80+
*
81+
*/
82+
class QgsPostgresConnPool
83+
{
84+
public:
85+
86+
static QgsPostgresConnPool* instance();
87+
88+
//! Try to acquire a connection: if no connections are available, the thread will get blocked.
89+
//! @return initialized connection or null on error
90+
QgsPostgresConn* acquireConnection( const QString& connInfo );
91+
92+
//! Release an existing connection so it will get back into the pool and can be reused
93+
void releaseConnection( QgsPostgresConn* conn );
94+
95+
protected:
96+
QgsPostgresConnPoolGroups mGroups;
97+
QMutex mMutex;
98+
99+
static QgsPostgresConnPool* mInstance;
100+
};
101+
102+
#endif // QGSPOSTGRESCONNPOOL_H

‎src/providers/postgres/qgspostgresfeatureiterator.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
***************************************************************************/
1515
#include "qgspostgresfeatureiterator.h"
1616
#include "qgspostgresprovider.h"
17+
#include "qgspostgresconnpool.h"
1718

1819
#include "qgslogger.h"
1920
#include "qgsmessagelog.h"
@@ -28,7 +29,7 @@ QgsPostgresFeatureIterator::QgsPostgresFeatureIterator( QgsPostgresFeatureSource
2829
: QgsAbstractFeatureIteratorFromSource( source, ownSource, request )
2930
, mFeatureQueueSize( sFeatureQueueSize )
3031
{
31-
mConn = QgsPostgresConn::connectDb( mSource->mConnInfo, true );
32+
mConn = QgsPostgresConnPool::instance()->acquireConnection( mSource->mConnInfo );
3233

3334
if ( !mConn )
3435
{
@@ -173,7 +174,7 @@ bool QgsPostgresFeatureIterator::close()
173174

174175
mConn->closeCursor( mCursorName );
175176

176-
mConn->disconnect();
177+
QgsPostgresConnPool::instance()->releaseConnection( mConn );
177178
mConn = 0;
178179

179180
while ( !mFeatureQueue.empty() )

0 commit comments

Comments
 (0)
Please sign in to comment.