Skip to content

Commit

Permalink
Fix more racy conditions
Browse files Browse the repository at this point in the history
Switch from QtConcurrent::run to QThreadPool::start

QtConcurrent doesn't give us anyway to cancel queued but not
started tasks. QThreadPool does (and also allows us to specify
task priority)
  • Loading branch information
nyalldawson committed Dec 5, 2016
1 parent 01d6afa commit 5216220
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 92 deletions.
2 changes: 1 addition & 1 deletion python/core/__init__.py
Expand Up @@ -208,7 +208,7 @@ def __init__(self, description, function, on_finished, *args, **kwargs):
self.returned_values = None
self.exception = None

def run(self):
def _run(self):
try:
self.returned_values = self.function(self, *self.args, **self.kwargs)
except Exception as ex:
Expand Down
8 changes: 4 additions & 4 deletions python/core/qgstaskmanager.sip
Expand Up @@ -14,7 +14,7 @@
*
* \note Added in version 3.0
*/
class QgsTask : QObject
class QgsTask : QObject, QRunnable
{

%TypeHeaderCode
Expand Down Expand Up @@ -93,7 +93,7 @@ class QgsTask : QObject
* then this method should not be called directly, instead it is left to the
* task manager to start the task when appropriate.
*/
void start();
void run();

/**
* Notifies the task that it should terminate. Calling this is not guaranteed
Expand Down Expand Up @@ -212,7 +212,7 @@ class QgsTask : QObject
* @see completed()
* @see terminated()
*/
virtual TaskResult run() = 0;
virtual TaskResult _run() = 0;

/**
* If the task is managed by a QgsTaskManager, this will be called after the
Expand Down Expand Up @@ -295,7 +295,7 @@ class QgsTaskManager : QObject
/**
* Constructor for TaskDefinition. Ownership of the task is transferred to the definition.
*/
TaskDefinition( QgsTask* task, QgsTaskList dependencies = QgsTaskList() );
explicit TaskDefinition( QgsTask* task, QgsTaskList dependencies = QgsTaskList() );

//! Task
QgsTask* task;
Expand Down
63 changes: 38 additions & 25 deletions src/core/qgstaskmanager.cpp
Expand Up @@ -34,25 +34,35 @@ QgsTask::QgsTask( const QString &name, const Flags& flags )
, mTotalProgress( 0.0 )
, mShouldTerminate( false )
, mStartCount( 0 )
{}
{
setAutoDelete( false );
}

QgsTask::~QgsTask()
{
Q_ASSERT_X( mStatus == Queued || mStatus == Terminated || mStatus == Complete, "delete", QString( "status was %1" ).arg( mStatus ).toLatin1() );

QThreadPool::globalInstance()->cancel( this );

Q_FOREACH ( const SubTask& subTask, mSubTasks )
{
delete subTask.task;
}
}

void QgsTask::start()
void QgsTask::run()
{
mStartCount++;
Q_ASSERT( mStartCount == 1 );

if ( mStatus != Queued )
return;

mStatus = Running;
mOverallStatus = Running;
emit statusChanged( Running );
emit begun();
TaskResult result = run();
TaskResult result = _run();
switch ( result )
{
case ResultSuccess:
Expand All @@ -74,6 +84,8 @@ void QgsTask::start()
void QgsTask::cancel()
{
mShouldTerminate = true;

QThreadPool::globalInstance()->cancel( this );
if ( mStatus == Queued || mStatus == OnHold )
{
// immediately terminate unstarted jobs
Expand Down Expand Up @@ -297,13 +309,14 @@ QgsTaskManager::~QgsTaskManager()
cancelAll();

//then clean them up, including waiting for them to terminate
mParentTaskMutex->lockForRead();
QSet< QgsTask* > parents = mParentTasks;
parents.detach();
mParentTaskMutex->unlock();
Q_FOREACH ( QgsTask* task, parents )
mTaskMutex->lockForRead();
QMap< long, TaskInfo > tasks = mTasks;
mTasks.detach();
mTaskMutex->unlock();
QMap< long, TaskInfo >::const_iterator it = tasks.constBegin();
for ( ; it != tasks.constEnd(); ++it )
{
cleanupAndDeleteTask( task );
cleanupAndDeleteTask( it.value().task );
}

delete mTaskMutex;
Expand Down Expand Up @@ -437,10 +450,7 @@ void QgsTaskManager::cancelAll()

Q_FOREACH ( QgsTask* task, parents )
{
if ( task->isActive() )
{
task->cancel();
}
task->cancel();
}
}

Expand Down Expand Up @@ -640,6 +650,10 @@ bool QgsTaskManager::cleanupAndDeleteTask( QgsTask *task )
return false;

long id = taskId( task );
if ( id < 0 )
return false;

task->disconnect( this );

mDependenciesMutex->lockForWrite();
if ( mTaskDependencies.contains( id ) )
Expand Down Expand Up @@ -668,20 +682,24 @@ bool QgsTaskManager::cleanupAndDeleteTask( QgsTask *task )
mLayerDependencies.remove( id );
mLayerDependenciesMutex->unlock();

if ( task->isActive() )
if ( task->status() != QgsTask::Complete || task->status() != QgsTask::Terminated )
{
task->cancel();
if ( isParent )
{
// delete task when it's terminated
connect( task, &QgsTask::taskCompleted, task, &QgsTask::deleteLater );
connect( task, &QgsTask::taskTerminated, task, &QgsTask::deleteLater );
}
task->cancel();
}
else if ( isParent )
else
{
//task already finished, kill it
task->deleteLater();
QThreadPool::globalInstance()->cancel( task );
if ( isParent )
{
//task already finished, kill it
task->deleteLater();
}
}

// at this stage (hopefully) dependent tasks have been cancelled or queued
Expand All @@ -700,20 +718,16 @@ bool QgsTaskManager::cleanupAndDeleteTask( QgsTask *task )

void QgsTaskManager::processQueue()
{
static QMutex processMutex;

processMutex.lock();
int prevActiveCount = countActiveTasks();
mActiveTaskMutex->lockForWrite();
mActiveTasks.clear();
mTaskMutex->lockForWrite();
for ( QMap< long, TaskInfo >::iterator it = mTasks.begin(); it != mTasks.end(); ++it )
{
QgsTask* task = it.value().task;
if ( !it.value().added && task && task->mStatus == QgsTask::Queued && dependenciesSatisified( it.key() ) )
if ( task && task->mStatus == QgsTask::Queued && dependenciesSatisified( it.key() ) && it.value().added.testAndSetRelaxed( 0, 1 ) )
{
it.value().added = true;
it.value().future = QtConcurrent::run( task, &QgsTask::start );
QThreadPool::globalInstance()->start( task );
}

if ( task && ( task->mStatus != QgsTask::Complete && task->mStatus != QgsTask::Terminated ) )
Expand All @@ -723,7 +737,6 @@ void QgsTaskManager::processQueue()
}
mTaskMutex->unlock();
mActiveTaskMutex->unlock();
processMutex.unlock();

mParentTaskMutex->lockForRead();
bool allFinished = mActiveTasks.isEmpty();
Expand Down
17 changes: 8 additions & 9 deletions src/core/qgstaskmanager.h
Expand Up @@ -35,16 +35,16 @@ typedef QList< QgsTask* > QgsTaskList;
* or added to a QgsTaskManager for automatic management.
*
* Derived classes should implement the process they want to execute in the background
* within the run() method. This method will be called when the
* task commences (ie via calling start() ).
* within the _run() method. This method will be called when the
* task commences (ie via calling run() ).
*
* Long running tasks should periodically check the isCancelled() flag to detect if the task
* has been cancelled via some external event. If this flag is true then the task should
* clean up and terminate at the earliest possible convenience.
*
* \note Added in version 3.0
*/
class CORE_EXPORT QgsTask : public QObject
class CORE_EXPORT QgsTask : public QObject, public QRunnable
{
Q_OBJECT

Expand Down Expand Up @@ -123,7 +123,7 @@ class CORE_EXPORT QgsTask : public QObject
* then this method should not be called directly, instead it is left to the
* task manager to start the task when appropriate.
*/
void start();
void run() override;

/**
* Notifies the task that it should terminate. Calling this is not guaranteed
Expand Down Expand Up @@ -224,6 +224,7 @@ class CORE_EXPORT QgsTask : public QObject

void subTaskComplete();


protected:

/**
Expand All @@ -245,7 +246,7 @@ class CORE_EXPORT QgsTask : public QObject
* @see completed()
* @see terminated()
*/
virtual TaskResult run() = 0;
virtual TaskResult _run() = 0;

/**
* If the task is managed by a QgsTaskManager, this will be called after the
Expand Down Expand Up @@ -305,7 +306,6 @@ class CORE_EXPORT QgsTask : public QObject
//! Status of this task and all subtasks
TaskStatus mOverallStatus;


//! Progress of this (parent) task alone
double mProgress;
//! Overall progress of this task and all subtasks
Expand Down Expand Up @@ -367,7 +367,7 @@ class CORE_EXPORT QgsTaskManager : public QObject
/**
* Constructor for TaskDefinition.
*/
TaskDefinition( QgsTask* task, QgsTaskList dependencies = QgsTaskList() )
explicit TaskDefinition( QgsTask* task, QgsTaskList dependencies = QgsTaskList() )
: task( task )
, dependencies( dependencies )
{}
Expand Down Expand Up @@ -503,8 +503,7 @@ class CORE_EXPORT QgsTaskManager : public QObject
, added( false )
{}
QgsTask* task;
bool added;
QFuture< void > future;
QAtomicInt added;
};

mutable QReadWriteLock* mTaskMutex;
Expand Down
1 change: 0 additions & 1 deletion src/gui/qgstaskmanagerwidget.cpp
Expand Up @@ -180,7 +180,6 @@ Qt::ItemFlags QgsTaskManagerModel::flags( const QModelIndex &index ) const
bool QgsTaskManagerModel::setData( const QModelIndex &index, const QVariant &value, int role )
{
Q_UNUSED( role );
return false;

if ( !index.isValid() )
return false;
Expand Down

0 comments on commit 5216220

Please sign in to comment.