邮箱中的Qt线程设计


风晓
风晓 2024-01-04 11:00:21 53478
分类专栏: 资讯

邮箱(deepin-mail)主要使用Qt框架开发,是一个有大量并行任务且需要监控进度和结果的项目,任务的优先级调整和支持取消回滚也是必不可少。Qt已经为我们提供了多种线程设计的方法,可以满足绝大部分使用场景,但是每一种方案单拎出来都不能很恰到好处的在项目中使用,本文主要对项目中的线程设计进行介绍,通过将多个Qt线程方案相结合,衍生出一种适合邮箱这类型项目的应用方法。

 

浅谈Qt中的线程方法

QThread

基于QThread的方案,需要子类化QThread实现run()方法,并在合适的时候调用start(),在这之前需要做好数据交换,并在线程执行过程中通过跨线程的connect()方法将数据传出,这里需要注意不能将连接参数设置为Qt::DirectConnection,因为线程的数据需要通过事件循环传递来保证安全,在跨线程的connect()方法中参数即使使用了引用Qt也依然会在emit时帮你额外触发一次参数拷贝,影响性能。

如果不是开销非常大的任务像这样直接继承其实不受推荐,现在更倾向于使用组合的方式,子类化QObject来创建一个Worker类,添加一个QThread成员变量,将Worker对象移动到启动后的QThread对象线程中,这样Worker信号触发的槽函数都会在QThread对象启动的线程中运行。由于QThread也是QObject派生类, 偷懒的人可以让Worker直接继承QThread然后moveToThread(this),当然这样做需要对QThread的理解更 透彻一些,所以官方也不建议这种写法因为QThread是被设计成一个管理线程的类不应参与过多业务逻辑。

基于QThread的方案尤其要注意定时器和套接字的析构问题,确保他们在创建的线程中使用和析构,QThread的使用者会不注意的将他们在线程中创建和使用,却在QThread自己的析构函数中析构(QThread对象的析构是在他所在的线程而不是自己开启的线程)。

QRunnable

基于QRunnable的方案,将任务分解成QRunnable对象直接放入QThreadPool中执行,使用上和QThread一样我们完成他的run()方法,但是能且只能通过QThreadPool来执行,自身不包含其他的功能特性,开销很小。但就是因为这样,他和外界交换数据或者线程同步就变得相当麻烦,由于QRunnable不是QObject派生类,无法使用Qt的信号槽,不做一些处理很难优雅地将进度和结果抛出,如果用同时继承QRunnable和QObject的方式进行来添加信号槽机制不如直接使用QThread了。还有一种方式,在QRunnable对象中保存一些传入的引用或指针来做消息传递,这样数据可以通过原子 变量或互斥锁来实现更新,指针可以使用元对象系统中的方法QMetaObject::invokeMethod()通过事件循环传出消息,但实际运用起来依旧麻烦,每个数据包括指针你都要考虑他的跨线程竞争问题,不得不控制参数的数量,将每个任务尽可能的切割成更小的任务。

QtConcurrent

还有一种基于QtConcurrent框架的方案,是Qt的高级别线程特性,支持多种线程管理的方法, 只要把方法或者lambda表达式传入run()并指定一个线程池(默认是全局线程池QThreadPool::globalInstance())就完成了开线程执行直到返回结果的一系列流程,它会自动将任务分配到可用的处理器核心上执行,最大化利用现在核心越来越多的cpu架构。它的run()方法重载数量非常多,包括 异步和同步等待线程执行完成的方法。选择其中的异步方法,就可以通过监控返回的QFuture对象来得到线程状态和结果,主要方法的官方描述如下:

Concurrent Map and Map-Reduce
QtConcurrent::map() applies a function to every item in a container, modifying the items in-place.
QtConcurrent::mapped() is like map(), except that it returns a new container with the modifications.
QtConcurrent::mappedReduced() is like mapped(), except that the modified results are reduced or folded into a single result.
Concurrent Filter and Filter-Reduce
QtConcurrent::filter() removes all items from a container based on the result of a filter function.
QtConcurrent::filtered() is like filter(), except that it returns a new container with the filtered results.
QtConcurrent::filteredReduced() is like filtered(), except that the filtered results are reduced or folded into a single result.
Concurrent Run
QtConcurrent::run() runs a function in another thread.

横向比较以上方案

 

以上方案并没有一种是完全优于另一种的,每种都有它适应的场景,灵活运用满足项目的需要是最重要的。

邮箱中的线程方案

心路历程
Qt Concurrent方案中 基于future的机制非常适合邮箱项目做前后端分离,后端只要提供 QFuture 对 象出去被前端监控,其他的工作包括任务调度都在后端内部完成,实现解耦。但总体上它的设计初 衷更多是为了使用简单而不是功能强大,对并行处理的线程增加了映射、 过滤 和规约算法却削弱了 对线程内部的控制和线程之间的消息传递、同步能力,而 QFuture 对象提供了 pause ()和 cancel ()等方法只能影响线程池中还未执行的任务。
希望使用QtConcurrent特性的同时还要解决一些实际问题,要能够在线程池中对不同优先级的线程进行排序,要能掌握耗时任务的实时进度并能够对其暂停和取消,操作的影响具体到任务中操作数据库、读写文件或者和服务器交互中的某一步,这样在部分取消后可以做一些回滚来保证任务的原子性。
既然Qt使用了future来命名,其实Qt已经实现了 future-promise机制 ,还在自己的源码中大量的使用 。如果观察 QFuture 和 QThreadPool 的源码,时不时就会看到一个叫 QFutureInterface 的类,Qt的帮助文档中不包含相关资料,但是别看他叫做"interface",其实他是可以直接使用的,并且拥有着满足项目需要的方法。有兴趣的同学可以阅读相关源码来了解,如果在源码中看到以下的描述不要紧张,一直追溯到Qt5的最后一个版本,这些接口也是存在并且稳定的。

方案改造
为了更好的利用这个特性,需要对它进行了改造以接地气一些,我们通过使用 QThreadPool +
QRunnable 方案中的设计思路来控制线程池任务 。
首先继承 QRunnable 创建一个类模板用于后面衍生出各式各样的任务:
template < typename T >
class AbstractTask : public QRunnable
{
public :
QFuture < T > future ();
protected :
inline void reportStarted ();
inline void setProgressRange ( int minimum, int maximum);
inline void setProgressValueAndText ( int progressValue, const QString &progressText =
"" );
inline void reportResult ( const T &result, const int &index = - 1 );
inline void reportFinished ( const T *result = 0 );
virtual void run () = 0 ;
private :
QFutureInterface < T > d ;
};
模板参数是每个任务想要对外提供的返回结果,可以只返回错误码和错误描述用于表示执行结果,也可以添加更多的参数比如同时将下载的文件通过future返回。不用担心额外的拷贝开销,因为另外一个让人省心的地方是, QFutureInterface 已经通过引用计数为自己实现了隐式共享。
template < typename T >
class QFutureInterface : public QFutureInterfaceBase
{
public :
QFutureInterface ( State initialState = NoState )
: QFutureInterfaceBase ( initialState )
{
refT ();
}
QFutureInterface ( const QFutureInterface & other )
: QFutureInterfaceBase ( other )
{
refT ();
}
~ QFutureInterface ()
{
if (! derefT ())
resultStoreBase (). template clear< T >();
}
...
}
QFutureInterface 通过原子变量来实现引用计数,提供一个平台无关的原子操作,但并不是所有的处理器都支持 QAtomicInt ,如今国产芯片百家争鸣,如果你是特殊的架构,使用前检测一下当前处理器是否支持某个API是很重要的。使用原子变量会比使用互斥锁的方式更加简单和高效:
class RefCount
{
public :
inline RefCount ( int r = 0 , int rt = 0 ): m_refCount ( r ), m_refCountT ( rt ) {}
inline bool ref () { return m_refCount.ref(); }
inline bool deref () { return m_refCount.deref(); }
inline int load () const { return m_refCount.load(); } inline bool refT () { return m_refCountT.ref(); }
inline bool derefT () { return m_refCountT.deref(); }
inline int loadT () const { return m_refCountT.load(); }
private :
QAtomicInt m_refCount ;
QAtomicInt m_refCountT ;
};
QFutureInterface 通过 future ()方法创建出的 QFuture 都会存有一份自己的引用实例,参与了引用计数 的计算。只有当所有的 QFutureInterface 对象都被析构(包括 QFuture 中的),他们所指向的 result ()结果空间才也会释放。出于灵活同一个任务是可以返回多个 QFuture 对象分发到不同的模块以被监控的, 但在任务完成后记得重置 QFuture 以释放内存,赋值为一个 QFuture ()即可。
template < typename T >
class QFuture
{
public :
explicit QFuture ( QFutureInterface < T > * p )
: d (* p )
{ }
mutable QFutureInterface < T > d ;
}
template < typename T >
class QFutureInterface : public QFutureInterfaceBase
{
inline QFuture < T > future ()
{
return QFuture < T >( this );
}
}
使用案例

上图是任务流转的一个简要流程,结合这个流程下面将给出项目中的一个实现,一个为自己的账号创建邮件目录的任务:

class CreateMailFolderTask : public AbstractTask < ResultResponse >
{
public :
CreateMailFolderTask ( QSharedPointer < ProtocolEngine > engine , QSharedPointer < ProtocolCache > cache );
void run ();
void initParam ( const QString & folderName );
private :
QSharedPointer < ProtocolEngine > m_engine ;
QSharedPointer < ProtocolCache > m_cache ;
QString m_folderName ;
};
...
void CreateMailFolderTask :: run ()
{
setProgressRange ( 0 , 100 );
reportProgressValueAndText ( 0," Started " );
//do something
const ResultResponse & result = m_engine ->createMailFolder ( m_folderName );
reportProgressValueAndText ( 50," Ongoing");
//different processing according to d . isPaused () , d . isCanceled ()
if ( RESULT_SUCCESS == result . type )
m_cache ->createFolderId ( m_folderName ); //do something
reportProgressValueAndText ( 100," Finished");
reportResult ( result );
reportFinished ();
}
首先按我们的模板实现一个创建目录的任务,在任务的run()方法中实现相关功能,这时候就可以根据需要自由的通过多种 report ()方法将进度、状态描述和结果抛出,以便外部可以在每个节点获取当前任务的状态,根据是否被暂停或者被取消等通过 QFuture 设置的状态来做出不同的处理,如果有必要比如在邮箱项目中,我们传递了一个 QAtomic 原子变量到任务甚至子任务中,进行更加精的控制。类中有两个成员变量 m_engine 和 m_cache ,这个是项目中用于执行邮件协议和本地缓存代码的控制类,线程安全,不做过多扩展说明。接下来是使用:
QFuture < ResultResponse > createFolder ( const QString & folderId ){
CreateMailFolderTask * task = new CreateMailFolderTask ( m_protocolEngine , m_protocolCache );
task -> initParam ( folderId );
QFuture < PrepareResponse > future = task -> future ();
emit sigNewTask ( task );
return future ;
}
我们创建了一个任务,但是任务并不需要立刻开始,而是通过信号将task抛出等待处理,可以在合适的时候通过线程池 pool -> tryStart ( task )来执行,可以丢在数据结构中保存下来进行优先级排序后等待唤起,还可以格式化存储到文件中保存退出等待下次应用启动后继续执行。拿到 QFuture 对象的模块立刻就能够进行监控,是否开始、是否结束和进度都可以通过 QFuture 的方法获取或使用 QFutursSynchronizer 组合监控,也可以通过 QFutureWatcher 监控 QFuture 实现被动处理,这个具体看看官方说明即可:
QFuture represents the result of an asynchronous computation.
QFutureIterator allows iterating through results available via QFuture .
QFutureWatcher allows monitoring a QFuture using signals-and-slots.
QFutureSynchronizer is a convenience class that automatically synchronizes several
QFutures.
QFutureWatcher < ResultResponse > watcher ;
watcher . setFuture ( future );
QObject :: connect (& watcher , & QFutureWatcher < ResultResponse >:: progressValueChanged ,
[ = ]( int progressValue ) {
progressValue ; //do something });
任务的返回可以通过 QFuture 的 result ()方法获取,如果是逐步抛出结果的批处理任务,可以通过 results ()或者 resultAt (int index)方法获取已取得的结果列表。 result ()的提前调用不会产生错误,它会阻塞当前的线程,等待任务完成后得到结果才会继续向下执行,也可以主动调用 waitForFinished () 方法阻塞等待任务完成达到一样的效果。阻塞等待可以不用为了耗时极短的任务去写监控代码,也为写单元测试代码带来了非常大的便利性:
# include < gtest / gtest . h >
TEST_F ( ut_session , createFolder ){
QFuture < ResultResponse > future = session -> createFolder ("MyBox");
future . waitForFinished ();
EXPECT_TRUE ( ResultCode :: Success == future . result (). code );
}
小结
总结一下,Qt future-promise结合 QRunnable 的方案十分灵活,其实还有很多特性没有在此演
示,我们已经将它落地在邮箱项目中,接口稳定运行,取得了不错的效果。

网站声明:如果转载,请联系本站管理员。否则一切后果自行承担。

本文链接:https://www.xckfsq.com/news/show.html?id=34720
赞同 0
评论 0 条
风晓L1
粉丝 1 发表 522 + 关注 私信
上周热门
WPS City Talk · 校招西安站来了!  3746
服贸会|范渊荣获年度创新领军人物!王欣分享安恒信息“AI+安全”探索  3671
有在找工作的IT人吗?  3638
字节跳动“安全范儿”高校挑战赛来袭!三大赛道,赢 80 万专项基金!  3594
阿B秋招线下宣讲行程来啦,速速报名!  3588
字节跳动校招 | 电商业务 2025 校园招聘进行中!五大职类热招,等你来投!  3578
麒麟天御安全域管平台升级!为企业管理保驾护航  3564
烽火通信2025届校园招聘宣讲行程发布!!  3399
2024海洋能源产业融合发展论坛暨博览会同期活动-海洋能源与数字化智能化论坛成功举办  3362
华为全联接大会2024丨软通动力分论坛精彩议程抢先看!  3330
本周热议
我的信创开放社区兼职赚钱历程 40
今天你签到了吗? 27
如何玩转信创开放社区—从小白进阶到专家 15
信创开放社区邀请他人注册的具体步骤如下 15
方德桌面操作系统 14
我有15积分有什么用? 13
用抖音玩法闯信创开放社区——用平台宣传企业产品服务 13
如何让你先人一步获得悬赏问题信息?(创作者必看) 12
2024中国信创产业发展大会暨中国信息科技创新与应用博览会 9
中央国家机关政府采购中心:应当将CPU、操作系统符合安全可靠测评要求纳入采购需求 8

加入交流群

请使用微信扫一扫!