Skip to content

Commit

Permalink
Query result model threaded fetcher
Browse files Browse the repository at this point in the history
plus: independent iterators and rewind
  • Loading branch information
elpaso committed Jan 13, 2021
1 parent f5ffc56 commit 3b7cc24
Show file tree
Hide file tree
Showing 16 changed files with 229 additions and 109 deletions.
Expand Up @@ -84,6 +84,11 @@ Returns ``True`` if there are more rows to fetch
QList<QVariant> nextRow() const;
%Docstring
Returns the next result row or an empty row if there are no rows left
%End

void rewind();
%Docstring
Resets the iterator counter used by :py:func:`~QgsAbstractDatabaseProviderConnection.hasNextRow` and :py:func:`~QgsAbstractDatabaseProviderConnection.nextRow`
%End

qlonglong fetchedRowCount( ) const;
Expand Down
14 changes: 11 additions & 3 deletions python/core/auto_generated/qgsqueryresultmodel.sip.in
Expand Up @@ -9,6 +9,12 @@









class QgsQueryResultModel : QAbstractListModel
{
%Docstring
Expand All @@ -22,11 +28,13 @@ The QgsQueryResultModel class is a model for QgsAbstractDatabaseProviderConnecti
%End
public:

QgsQueryResultModel( const QgsAbstractDatabaseProviderConnection::QueryResult &queryResult, QObject *parent = 0 );
QgsQueryResultModel(const QgsAbstractDatabaseProviderConnection::QueryResult& queryResult, QObject *parent = 0 );
%Docstring
Constructs a QgsQueryResultModel from a ``queryResult`` with optional ``parent``
%End

~QgsQueryResultModel();

public:

virtual int rowCount( const QModelIndex &parent ) const;
Expand All @@ -35,10 +43,10 @@ Constructs a QgsQueryResultModel from a ``queryResult`` with optional ``parent``

virtual QVariant data( const QModelIndex &index, int role ) const;

virtual void fetchMore( const QModelIndex &parent );

virtual bool canFetchMore( const QModelIndex &parent ) const;
public slots:

void newRowsReady(int newRowsCount );

};

Expand Down
19 changes: 14 additions & 5 deletions src/core/providers/ogr/qgsgeopackageproviderconnection.cpp
Expand Up @@ -425,14 +425,14 @@ QgsAbstractDatabaseProviderConnection::QueryResult QgsGeoPackageProviderConnecti
return QgsAbstractDatabaseProviderConnection::QueryResult();
}

QVariantList QgsGeoPackageProviderResultIterator::nextRow()
QVariantList QgsGeoPackageProviderResultIterator::nextRowPrivate()
{
const QVariantList currentRow { mNextRow };
mNextRow = nextRowPrivate();
mNextRow = nextRowInternal();
return currentRow;
}

QVariantList QgsGeoPackageProviderResultIterator::nextRowPrivate()
QVariantList QgsGeoPackageProviderResultIterator::nextRowInternal()
{
QVariantList row;
if ( mHDS && mOgrLayer )
Expand All @@ -457,11 +457,17 @@ QVariantList QgsGeoPackageProviderResultIterator::nextRowPrivate()
}
}
}
else
{
// Release the resources
GDALDatasetReleaseResultSet( mHDS.get(), mOgrLayer );
mHDS.release();
}
}
return row;
}

bool QgsGeoPackageProviderResultIterator::hasNextRow() const
bool QgsGeoPackageProviderResultIterator::hasNextRowPrivate() const
{
return ! mNextRow.isEmpty();
}
Expand All @@ -488,5 +494,8 @@ QList<QgsVectorDataProvider::NativeType> QgsGeoPackageProviderConnection::native

QgsGeoPackageProviderResultIterator::~QgsGeoPackageProviderResultIterator()
{
GDALDatasetReleaseResultSet( mHDS.get(), mOgrLayer );
if ( mHDS )
{
GDALDatasetReleaseResultSet( mHDS.get(), mOgrLayer );
}
}
7 changes: 3 additions & 4 deletions src/core/providers/ogr/qgsgeopackageproviderconnection.h
Expand Up @@ -34,9 +34,6 @@ struct QgsGeoPackageProviderResultIterator: public QgsAbstractDatabaseProviderCo

~QgsGeoPackageProviderResultIterator();

QVariantList nextRow() override;
bool hasNextRow() const override;

void setFields( const QgsFields &fields );

private:
Expand All @@ -46,7 +43,9 @@ struct QgsGeoPackageProviderResultIterator: public QgsAbstractDatabaseProviderCo
QgsFields mFields;
QVariantList mNextRow;

QVariantList nextRowPrivate();
QVariantList nextRowPrivate() override;
QVariantList nextRowInternal();
bool hasNextRowPrivate() const override;

};

Expand Down
91 changes: 57 additions & 34 deletions src/core/qgsabstractdatabaseproviderconnection.cpp
Expand Up @@ -446,70 +446,69 @@ QStringList QgsAbstractDatabaseProviderConnection::QueryResult::columns() const

QList<QList<QVariant> > QgsAbstractDatabaseProviderConnection::QueryResult::rows( QgsFeedback* feedback) const
{
if ( ! mResultIterator )
{
return QList<QList<QVariant> >();
}

// mRowCount might be -1 (unknown)
while ( mResultIterator &&
( mRowCount < 0 || mRows.count() < mRowCount ) &&
hasNextRow() && ( ! feedback || ! feedback->isCanceled() ) )
mResultIterator->hasNextRow() &&
( ! feedback || ! feedback->isCanceled() ) )
{
const QVariantList row( mResultIterator->nextRow() );
if ( row.isEmpty() )
{
break;
}
mRows.push_back( row );
}
return mRows;
return mResultIterator->rows();
}

QList<QVariant> QgsAbstractDatabaseProviderConnection::QueryResult::nextRow() const
{
if ( ! mResultIterator || ! hasNextRow() )
if ( ! mResultIterator )
{
return QList<QVariant>();
}
return at( mIteratorCounter++ );
}

const QList<QVariant> row( mResultIterator->nextRow() );

if ( ! row.isEmpty() )
{
mRows.push_back( row );
}

return row;
void QgsAbstractDatabaseProviderConnection::QueryResult::rewind()
{
mIteratorCounter = 0;
}

qlonglong QgsAbstractDatabaseProviderConnection::QueryResult::fetchedRowCount() const
{
return mRows.count();
if ( ! mResultIterator )
{
return 0;
}
return mResultIterator->fetchedRowCount();
}

QList<QVariant> QgsAbstractDatabaseProviderConnection::QueryResult::at( qlonglong index ) const
{
if ( index < 0 )
if ( index < 0 || ! mResultIterator )
{
return QList<QVariant>();
}

// Fetch rows until the index
if ( index >= mRows.count( ) )
while ( index >= mResultIterator->fetchedRowCount() &&
( mRowCount < 0 || mResultIterator->fetchedRowCount() < mRowCount ) &&
mResultIterator->hasNextRow() )
{
while ( mResultIterator && ( mRowCount < 0 || mRows.count() < mRowCount ) && index >= mRows.count() && hasNextRow() )
{
const QVariantList row { mResultIterator->nextRow() };
if ( row.isEmpty() )
{
break;
}
mRows.push_back( row );
}
mResultIterator->nextRow();
}

if ( index >= mRows.count( ) )
if ( index >= mResultIterator->fetchedRowCount() )
{
return QList<QVariant>();
}

return mRows.at( index );
return mResultIterator->rows().at( index );
}


Expand All @@ -524,15 +523,9 @@ bool QgsAbstractDatabaseProviderConnection::QueryResult::hasNextRow() const
{
return false;
}
const bool results { mResultIterator->hasNextRow() };
if ( ! results )
{
mResultIterator.reset();
}
return results;
return ! at( mIteratorCounter ).isEmpty();
}


void QgsAbstractDatabaseProviderConnection::QueryResult::appendColumn( const QString &columnName )
{
mColumns.push_back( columnName );
Expand All @@ -548,4 +541,34 @@ QgsAbstractDatabaseProviderConnection::QueryResult::QueryResult( std::shared_ptr
{}


QVariantList QgsAbstractDatabaseProviderConnection::QueryResult::QueryResultIterator::nextRow()
{
QMutexLocker lock(&mMutex);
const QVariantList row { nextRowPrivate() };
if ( ! row.isEmpty() )
{
mRows.push_back( row );
}
return row;
}

bool QgsAbstractDatabaseProviderConnection::QueryResult::QueryResultIterator::hasNextRow() const
{
QMutexLocker lock(&mMutex);
return hasNextRowPrivate();
}

qlonglong QgsAbstractDatabaseProviderConnection::QueryResult::QueryResultIterator::fetchedRowCount()
{
QMutexLocker lock(&mMutex);
return mRows.count();
}

QList<QList<QVariant> > QgsAbstractDatabaseProviderConnection::QueryResult::QueryResultIterator::rows() const
{
QMutexLocker lock(&mMutex);
return mRows;
}


///@endcond private
25 changes: 21 additions & 4 deletions src/core/qgsabstractdatabaseproviderconnection.h
Expand Up @@ -122,6 +122,11 @@ class CORE_EXPORT QgsAbstractDatabaseProviderConnection : public QgsAbstractProv
*/
QList<QVariant> nextRow() const;

/**
* Resets the iterator counter used by hasNextRow() and nextRow()
*/
void rewind();

/**
* Returns the number of fetched rows
* \see rowCount()
Expand Down Expand Up @@ -162,14 +167,26 @@ class CORE_EXPORT QgsAbstractDatabaseProviderConnection : public QgsAbstractProv
///@cond private

/**
* The QueryResultIterator struct is an abstract interface for provider query results iterators.
* The QueryResultIterator struct is an abstract interface for provider query result iterators.
* Providers must implement their own concrete iterator over query results.
*
* \note This struct is thread safe.
*/
struct QueryResultIterator SIP_SKIP
{
virtual QVariantList nextRow() = 0;
virtual bool hasNextRow() const = 0;
QVariantList nextRow();
bool hasNextRow() const;
qlonglong fetchedRowCount();
virtual ~QueryResultIterator() = default;
QList<QList<QVariant> > rows() const;

private:

virtual QVariantList nextRowPrivate() = 0;
virtual bool hasNextRowPrivate() const = 0;
mutable QList<QList<QVariant>> mRows;
mutable QMutex mMutex;

};

/**
Expand Down Expand Up @@ -202,8 +219,8 @@ class CORE_EXPORT QgsAbstractDatabaseProviderConnection : public QgsAbstractProv

mutable std::shared_ptr<QueryResultIterator> mResultIterator;
QStringList mColumns;
mutable QList<QList<QVariant>> mRows;
qlonglong mRowCount = 0;
mutable qlonglong mIteratorCounter = 0;

};

Expand Down
45 changes: 25 additions & 20 deletions src/core/qgsqueryresultmodel.cpp
Expand Up @@ -25,12 +25,12 @@ QgsQueryResultModel::QgsQueryResultModel(const QgsAbstractDatabaseProviderConnec
{
if ( mQueryResult.hasNextRow() )
{
ResultWorker *worker = new ResultWorker( &mQueryResult );
worker->moveToThread(&mWorkerThread);
//connect(&mWorkerThread, &QThread::finished, worker, &ResultWorker::stopFetching );
connect(&mWorkerThread, &QThread::finished, worker, &QObject::deleteLater);
connect(&mWorkerThread, &QThread::started, worker, &ResultWorker::fetchRows);
connect(worker, &ResultWorker::rowsReady, this, &QgsQueryResultModel::newRowsReady );
mWorker = new ResultWorker( &mQueryResult );
mWorker->moveToThread(&mWorkerThread);
connect(&mWorkerThread, &QThread::finished, mWorker, &ResultWorker::stopFetching );
connect(&mWorkerThread, &QThread::finished, mWorker, &QObject::deleteLater);
connect(&mWorkerThread, &QThread::started, mWorker, &ResultWorker::fetchRows);
connect(mWorker, &ResultWorker::rowsReady, this, &QgsQueryResultModel::newRowsReady );
mWorkerThread.start();
}
}
Expand All @@ -44,8 +44,12 @@ void QgsQueryResultModel::newRowsReady( int newRowCount )

QgsQueryResultModel::~QgsQueryResultModel()
{
mWorkerThread.quit();
mWorkerThread.wait();
if ( mWorker )
{
mWorker->stopFetching();
mWorkerThread.quit();
mWorkerThread.wait();
}
}

int QgsQueryResultModel::rowCount( const QModelIndex &parent ) const
Expand Down Expand Up @@ -81,26 +85,27 @@ QVariant QgsQueryResultModel::data( const QModelIndex &index, int role ) const

void ResultWorker::fetchRows()
{
int rowCount = 0;
while ( mQueryResult->hasNextRow() && mStopFetching == 0 )
qlonglong rowCount { 0 };
while ( mStopFetching == 0 )
{
mQueryResult->nextRow();
++rowCount;
if ( rowCount == ROWS_TO_FETCH && mStopFetching == 0 )
{
emit rowsReady( rowCount );
rowCount = 0;
}
if ( mQueryResult->at( rowCount ).isEmpty() )
{
break;
}
++rowCount;
if ( rowCount % ROWS_TO_FETCH == 0 && mStopFetching == 0 )
{
emit rowsReady( ROWS_TO_FETCH );
}
}

if ( rowCount > 0 && mStopFetching == 0 )
if ( rowCount % ROWS_TO_FETCH && mStopFetching == 0 )
{
emit rowsReady( rowCount );
emit rowsReady( rowCount % ROWS_TO_FETCH );
}
}

void ResultWorker::stopFetching()
{
qDebug() << "Stop fetching" << mQueryResult->fetchedRowCount();
mStopFetching = 1;
}

0 comments on commit 3b7cc24

Please sign in to comment.