Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Added connection pool for postgres
  • Loading branch information
wonder-sk committed Jan 6, 2014
1 parent cae352b commit 9cc3576
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 23 deletions.
2 changes: 2 additions & 0 deletions src/providers/postgres/CMakeLists.txt
Expand Up @@ -5,6 +5,7 @@
SET(PG_SRCS
qgspostgresprovider.cpp
qgspostgresconn.cpp
qgspostgresconnpool.cpp
qgspostgresdataitems.cpp
qgspostgresfeatureiterator.cpp
qgspgsourceselect.cpp
Expand All @@ -15,6 +16,7 @@ SET(PG_SRCS
SET(PG_MOC_HDRS
qgspostgresprovider.h
qgspostgresconn.h
qgspostgresconnpool.h
qgspostgresdataitems.h
qgspgsourceselect.h
qgspgnewconnection.h
Expand Down
42 changes: 23 additions & 19 deletions src/providers/postgres/qgspostgresconn.cpp
Expand Up @@ -33,7 +33,6 @@
#include <netinet/in.h>
#endif

//#define POSTGRES_SHARED_CONNECTIONS

QgsPostgresResult::~QgsPostgresResult()
{
Expand Down Expand Up @@ -124,46 +123,50 @@ Oid QgsPostgresResult::PQoidValue()
return ::PQoidValue( mRes );
}


QMap<QString, QgsPostgresConn *> QgsPostgresConn::sConnectionsRO;
QMap<QString, QgsPostgresConn *> QgsPostgresConn::sConnectionsRW;
const int QgsPostgresConn::sGeomTypeSelectLimit = 100;

QgsPostgresConn *QgsPostgresConn::connectDb( QString conninfo, bool readonly )
QgsPostgresConn *QgsPostgresConn::connectDb( QString conninfo, bool readonly, bool shared )
{
#ifdef POSTGRES_SHARED_CONNECTIONS
QMap<QString, QgsPostgresConn *> &connections =
readonly ? QgsPostgresConn::sConnectionsRO : QgsPostgresConn::sConnectionsRW;

if ( connections.contains( conninfo ) )
if ( shared )
{
QgsDebugMsg( QString( "Using cached connection for %1" ).arg( conninfo ) );
connections[conninfo]->mRef++;
return connections[conninfo];
if ( connections.contains( conninfo ) )
{
QgsDebugMsg( QString( "Using cached connection for %1" ).arg( conninfo ) );
connections[conninfo]->mRef++;
return connections[conninfo];
}
}
#endif

QgsPostgresConn *conn = new QgsPostgresConn( conninfo, readonly );
QgsPostgresConn *conn = new QgsPostgresConn( conninfo, readonly, shared );

if ( conn->mRef == 0 )
{
delete conn;
return 0;
}

#ifdef POSTGRES_SHARED_CONNECTIONS
connections.insert( conninfo, conn );
#endif
if ( shared )
{
connections.insert( conninfo, conn );
}

return conn;
}

QgsPostgresConn::QgsPostgresConn( QString conninfo, bool readOnly )
QgsPostgresConn::QgsPostgresConn( QString conninfo, bool readOnly, bool shared )
: mRef( 1 )
, mOpenCursors( 0 )
, mConnInfo( conninfo )
, mGotPostgisVersion( false )
, mReadOnly( readOnly )
, mNextCursorId( 0 )
, mShared( shared )
{
QgsDebugMsg( QString( "New PostgreSQL connection for " ) + conninfo );

Expand Down Expand Up @@ -268,14 +271,15 @@ void QgsPostgresConn::disconnect()
if ( --mRef > 0 )
return;

#ifdef POSTGRES_SHARED_CONNECTIONS
QMap<QString, QgsPostgresConn *>& connections = mReadOnly ? sConnectionsRO : sConnectionsRW;
if ( mShared )
{
QMap<QString, QgsPostgresConn *>& connections = mReadOnly ? sConnectionsRO : sConnectionsRW;

QString key = connections.key( this, QString::null );
QString key = connections.key( this, QString::null );

Q_ASSERT( !key.isNull() );
connections.remove( key );
#endif
Q_ASSERT( !key.isNull() );
connections.remove( key );
}

delete this;
}
Expand Down
8 changes: 6 additions & 2 deletions src/providers/postgres/qgspostgresconn.h
Expand Up @@ -155,11 +155,12 @@ class QgsPostgresResult
PGresult *mRes;
};


class QgsPostgresConn : public QObject
{
Q_OBJECT;
public:
static QgsPostgresConn *connectDb( QString connInfo, bool readOnly );
static QgsPostgresConn *connectDb( QString connInfo, bool readOnly, bool shared = true );
void disconnect();

//! get postgis version string
Expand Down Expand Up @@ -269,7 +270,7 @@ class QgsPostgresConn : public QObject
static void deleteConnection( QString theConnName );

private:
QgsPostgresConn( QString conninfo, bool readOnly );
QgsPostgresConn( QString conninfo, bool readOnly, bool shared );
~QgsPostgresConn();

int mRef;
Expand Down Expand Up @@ -332,6 +333,9 @@ class QgsPostgresConn : public QObject
void deduceEndian();

int mNextCursorId;

bool mShared; //! < whether the connection is shared by more providers (must not be if going to be used in worker threads)
};


#endif
164 changes: 164 additions & 0 deletions src/providers/postgres/qgspostgresconnpool.cpp
@@ -0,0 +1,164 @@
/***************************************************************************
qgspostgresconnpool.cpp
---------------------
begin : January 2014
copyright : (C) 2014 by Martin Dobias
email : wonder dot sk at gmail dot com
***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************/

#include "qgspostgresconnpool.h"

#include "qgspostgresconn.h"

#include <QCoreApplication>

#define POSTGRES_MAX_CONCURRENT_CONNS 4
#define POSTGRES_CONN_EXPIRATION 10 // in seconds



QgsPostgresConnPoolGroup::QgsPostgresConnPoolGroup( const QString& ci )
: connInfo( ci )
, sem( POSTGRES_MAX_CONCURRENT_CONNS )
, expirationTimer( this )
{
// just to make sure the object belongs to main thread and thus will get events
moveToThread( qApp->thread() );

expirationTimer.setInterval( POSTGRES_CONN_EXPIRATION * 1000 );
connect( &expirationTimer, SIGNAL( timeout() ), this, SLOT( handleConnectionExpired() ) );
}


QgsPostgresConnPoolGroup::~QgsPostgresConnPoolGroup()
{
foreach ( Item item, conns )
item.c->disconnect();
}


QgsPostgresConn* QgsPostgresConnPoolGroup::acquire()
{
// we are going to acquire a resource - if no resource is available, we will block here
sem.acquire();

// quick (preferred) way - use cached connection
{
QMutexLocker locker( &connMutex );

if ( !conns.isEmpty() )
{
Item i = conns.pop();

// no need to run if nothing can expire
if ( conns.isEmpty() )
expirationTimer.stop();

return i.c;
}
}

QgsPostgresConn* c = QgsPostgresConn::connectDb( connInfo, true, false ); // TODO: read-only
if ( !c )
{
// we didn't get connection for some reason, so release the lock
sem.release();
return 0;
}

return c;
}


void QgsPostgresConnPoolGroup::release( QgsPostgresConn* conn )
{
connMutex.lock();
Item i;
i.c = conn;
i.lastUsedTime = QTime::currentTime();
conns.push( i );

if ( !expirationTimer.isActive() )
expirationTimer.start();

connMutex.unlock();

sem.release(); // this can unlock a thread waiting in acquire()
}


void QgsPostgresConnPoolGroup::handleConnectionExpired()
{
connMutex.lock();

QTime now = QTime::currentTime();

// what connections have expired?
QList<int> toDelete;
for ( int i = 0; i < conns.count(); ++i )
{
if ( conns.at( i ).lastUsedTime.secsTo( now ) >= POSTGRES_CONN_EXPIRATION )
toDelete.append( i );
}

// delete expired connections
for ( int j = toDelete.count() - 1; j >= 0; --j )
{
int index = toDelete[j];
conns[index].c->disconnect();
conns.remove( index );
}

if ( conns.isEmpty() )
expirationTimer.stop();

connMutex.unlock();
}


// ----


QgsPostgresConnPool* QgsPostgresConnPool::mInstance = 0;


QgsPostgresConnPool* QgsPostgresConnPool::instance()
{
if ( !mInstance )
mInstance = new QgsPostgresConnPool;
return mInstance;
}


QgsPostgresConn* QgsPostgresConnPool::acquireConnection( const QString& connInfo )
{
mMutex.lock();
QgsPostgresConnPoolGroups::iterator it = mGroups.find( connInfo );
if ( it == mGroups.end() )
{
it = mGroups.insert( connInfo, new QgsPostgresConnPoolGroup( connInfo ) );
}
QgsPostgresConnPoolGroup* group = *it;
mMutex.unlock();

return group->acquire();
}


void QgsPostgresConnPool::releaseConnection( QgsPostgresConn* conn )
{
mMutex.lock();
QgsPostgresConnPoolGroups::iterator it = mGroups.find( conn->connInfo() );
Q_ASSERT( it != mGroups.end() );
QgsPostgresConnPoolGroup* group = *it;
mMutex.unlock();

group->release( conn );
}

0 comments on commit 9cc3576

Please sign in to comment.