龙空技术网

使用QThreadPool多线程PyQt6应用程序

亚洲程序员盟主 835

前言:

当前我们对“qtimerpython”可能比较关怀,我们都需要学习一些“qtimerpython”的相关内容。那么小编在网上汇集了一些有关“qtimerpython””的相关知识,希望小伙伴们能喜欢,你们快快来了解一下吧!

在不影响UI的情况下并发运行后台任务;

在构建Python GUI应用程序时,一个常见的问题是在试图执行长时间运行的后台任务时“锁定”接口。在本教程中,我将介绍在PyQt6中实现并发执行的最简单的方法之一。

Background

基于Qt的应用程序(像大多数GUI应用程序一样)是基于事件的。这意味着执行是根据用户交互、信号和计时器来驱动的。在事件驱动的应用程序中,单击按钮会创建一个事件,应用程序随后会处理该事件以产生一些预期的输出。事件被推入和从事件队列中取出,并按顺序处理。

app = QApplication([])window = MainWindow()app.exec_()

事件循环通过在QApplication对象上调用.exec_()开始,并在与Python代码相同的线程中运行。运行此事件循环的线程(通常称为GUI线程)还处理与主机操作系统的所有窗口通信。

默认情况下,由事件循环触发的任何执行也将在此线程中同步运行。在实践中,这意味着任何时候你的PyQt应用程序在你的代码中做一些事情,窗口通信和GUI交互都是冻结的。

如果您正在做的事情很简单,并且快速地将控制返回到GUI循环,那么用户将察觉不到这种冻结。但是,如果您需要执行较长时间运行的任务,例如打开/写入一个大文件,下载一些数据,或呈现一些复杂的图像,就会出现问题。对于您的用户,应用程序将显示为无响应(因为它是)。因为你的应用程序不再与操作系统通信,如果你点击你的应用程序,你会看到旋转的死亡之轮。没有人希望这样。

解决方案很简单:将工作从GUI线程移出(放到另一个线程中)。PyQt(通过Qt)提供了一个简单的接口来实现这一点。

准备

为了演示多线程执行,我们需要一个应用程序。下面是一个简单的PyQt应用程序,它将允许我们演示多线程,并看到实际的结果。只需复制并粘贴到一个新文件中,并使用适当的文件名(如多线程.py)保存它。将其余代码将被添加到这个文件中。

from PyQt6.QtGui import *from PyQt6.QtWidgets import *from PyQt6.QtCore import *import timeclass MainWindow(QMainWindow):    def __init__(self, *args, **kwargs):        super(MainWindow, self).__init__(*args, **kwargs)        self.counter = 0        layout = QVBoxLayout()        self.l = QLabel("Start")        b = QPushButton("DANGER!")        b.pressed.connect(self.oh_no)        layout.addWidget(self.l)        layout.addWidget(b)        w = QWidget()        w.setLayout(layout)        self.setCentralWidget(w)        self.show()        self.timer = QTimer()        self.timer.setInterval(1000)        self.timer.timeout.connect(self.recurring_timer)        self.timer.start()    def oh_no(self):        time.sleep(5)    def recurring_timer(self):        self.counter +=1        self.l.setText("Counter: %d" % self.counter)app = QApplication([])window = MainWindow()app.exec()

运行:您应该看到一个演示窗口,其中有一个数字在向上计数。这是由一个简单的循环时间产生的,每秒发射一次。这可以看作是我们的事件循环指示器,它是一种简单的方法,可以让我们知道我们的应用程序正在正常运行。还有一个按钮上写着“DANGER!”按下它。

您将注意到,每当您按下按钮时,计数器停止跳动,应用程序完全冻结5S。在Windows上,你可能会看到窗口变白,表明它没有响应,而在Mac上,你会看到旋转的死亡之轮。

显示冻结的原因是Qt事件循环被阻止处理(和响应)窗口事件。你在窗口上的点击仍然被主机操作系统注册并发送给你的应用程序,但因为它位于你的time.sleep中,它不能接受或对它们做出反应。它们必须等到您的代码将控制权传递回Qt。

解决这个问题的最简单、也许也是最合乎逻辑的方法是在代码中接受事件。这允许Qt继续响应主机操作系统,并且您的应用程序将保持响应。你可以通过在QApplication类上使用静态的.processEvents()函数轻松做到这一点。只需在长时间运行的代码块中添加如下一行:

QApplication.processEvents()

例如,长时间运行代码。我们可以把它分解成5x个1秒的睡眠,并在中间插入.processEvents。

def oh_no(self):    for n in range(5):        QApplication.processEvents()        time.sleep(1)

现在,当您按下按钮时,进入休眠。但是,现在QApplication.processEvents()会间歇性地将控制传递回Qt,并允许它像正常一样响应事件。Qt将接受事件并在返回运行其余代码之前处理它们。

这是可行的,但出于几个原因,它仍然是垃圾代码。

首先,当您将控制权传递回Qt时,您的代码将不再运行。这意味着无论你试图做什么长时间运行的事情都会花费更长的时间。这绝对不是你想要的。

其次,在主事件循环(app.exec())之外处理事件,会导致你的应用程序在循环内分支到处理代码(例如,用于触发槽或事件)。如果您的代码依赖于/响应外部状态,这可能会导致未定义的行为。下面的代码演示了这一点:

from PyQt6.QtGui import *from PyQt6.QtWidgets import *from PyQt6.QtCore import *import timeclass MainWindow(QMainWindow):    def __init__(self, *args, **kwargs):        super(MainWindow, self).__init__(*args, **kwargs)        self.counter = 0        layout = QVBoxLayout()        self.l = QLabel("Start")        b = QPushButton("DANGER!")        b.pressed.connect(self.oh_no)        c = QPushButton("?")        c.pressed.connect(self.change_message)        layout.addWidget(self.l)        layout.addWidget(b)        layout.addWidget(c)        w = QWidget()        w.setLayout(layout)        self.setCentralWidget(w)        self.show()    def change_message(self):        self.message = "OH NO"    def oh_no(self):        self.message = "Pressed"        for n in range(100):            time.sleep(0.1)            self.l.setText(self.message)            QApplication.processEvents()app = QApplication([])window = MainWindow()app.exec_()

如果运行这段代码,您将看到与以前一样的计数器。按下“DANGER!”将显示的文本更改为“Pressed”,如oh_no函数的入口点所定义的那样。但是,如果在oh_no仍在运行时按下“?”按钮,您将看到消息发生了变化。状态从循环外部被改变。

这是一个简单的例子。但是,如果您的应用程序中有多个长时间运行的进程,并且每个进程都调用QApplication.processEvents()来保持运行,那么您的应用程序行为可能是不可预测的。

线程和进程

如果你退一步思考你想在你的应用程序中发生什么,它可能可以总结为“一些事情与其他事情同时发生”。

在PyQt应用程序中运行独立任务有两种主要方法:线程和进程。

线程共享相同的内存空间,因此可以快速启动并消耗最少的资源。共享内存使得在线程之间传递数据变得很简单,但是从不同线程读取/写入内存可能导致竞争条件或段错误。在Python GUI中,还有一个额外的问题,即多个线程被同一个全局解释器锁(GIL)绑定——这意味着非GIL释放的Python代码一次只能在一个线程中执行。然而,这并不是PyQt的主要问题,因为大部分时间都是在Python之外度过的。

进程使用独立的内存空间(以及完全独立的Python解释器)。这避免了GIL的任何潜在问题,但代价是启动时间较慢,内存开销更大,发送/接收数据更复杂。

为了简单起见,通常使用线程是有意义的,除非您有很好的理由使用进程。Qt中的子进程更适合于运行和与外部程序通信。

QRunnable和QThreadPool

Qt为在其他线程中运行作业提供了一个非常简单的接口,这在PyQt中很好地公开了。这是围绕两个类构建的:QRunnable和QThreadPool。前者是您想要执行的工作的容器,而后者是将工作传递给线程的方法。

使用QThreadPool的好处是它可以为您处理工作线程的排队和执行。除了排队作业和检索结果之外,根本没有太多要做的事情。

要定义一个自定义QRunnable,您可以子类化基本QRunnable类,然后将您希望执行的代码放置在run()方法中。下面是我们长时间运行的实现。sleep job作为QRunnable。将以下代码添加到multithread.py中,位于MainWindow类定义的上方。

class Worker(QRunnable):    '''    Worker thread    '''    @pyqtSlot()    def run(self):        '''        Your code goes in this function        '''        print("Thread start")        time.sleep(5)        print("Thread complete")

在另一个线程中执行我们的函数只是简单地创建一个Worker实例,然后将它传递给我们的QThreadPool实例,它将自动执行。

接下来在__init__块中添加以下内容,以设置线程池。

self.threadpool = QThreadPool()print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount())

最后,将以下代码行添加到oh_no函数中。

def oh_no(self):    worker = Worker()    self.threadpool.start(worker)

现在,单击按钮将创建一个工作线程来处理(长时间运行的)任务,并通过线程池将其转到另一个线程。如果没有足够的线程来处理传入的worker,它们将被排队并在稍后按顺序执行。

尝试一下,您将看到您的应用程序现在可以毫无问题地处理点击按钮。

检查一下如果你多次按下按钮会发生什么。您应该看到您的线程立即执行,直到.maxThreadCount报告的数量。如果在已经有这个数量的活动worker之后再次按下按钮,那么后续的worker将排队,直到有一个线程可用为止。

改善QRunnables

如果你想将自定义数据传递给执行函数,你可以通过init,然后从run槽函数中通过self访问数据。

class Worker(QRunnable):    '''    Worker thread    :param args: Arguments to make available to the run code    :param kwargs: Keywords arguments to make available to the run code    '''    def __init__(self, *args, **kwargs):        super(Worker, self).__init__()        self.args = args        self.kwargs = kwargs    @pyqtSlot()    def run(self):        '''        Initialise the runner function with passed self.args, self.kwargs.        '''        print(args, kwargs)

事实上,我们可以利用Python中函数是对象这一事实,并传递函数来执行,而不是每次都子类化。在接下来的构造中,我们只需要一个Worker类来处理所有的执行作业。

class Worker(QRunnable):    '''    Worker thread    Inherits from QRunnable to handler worker thread setup, signals and wrap-up.    :param callback: The function callback to run on this worker thread. Supplied args and                     kwargs will be passed through to the runner.    :type callback: function    :param args: Arguments to pass to the callback function    :param kwargs: Keywords to pass to the callback function    '''    def __init__(self, fn, *args, **kwargs):        super(Worker, self).__init__()        # Store constructor arguments (re-used for processing)        self.fn = fn        self.args = args        self.kwargs = kwargs    @pyqtSlot()    def run(self):        '''        Initialise the runner function with passed args, kwargs.        '''        self.fn(*self.args, **self.kwargs)

您现在可以传入任何Python函数,并让它在单独的线程中执行。

def execute_this_fn(self):    print("Hello!")def oh_no(self):    args = (2,3)    kwargs = {'test':1, "test2":2}    worker = Worker(self.execute_this_fn, args, kwargs)    worker = Worker(self.execute_this_fn) # Any other args, kwargs are passed to the run function    # Execute    self.threadpool.start(worker)
Thread IO

有时,能够从正在运行的worker传回状态和数据是很有帮助的。这可能包括计算的结果、引发的异常或正在进行的进展(想想进度条)。Qt提供了信号和插槽框架,它允许你这样做,并且是线程安全的,允许从运行线程直接到GUI前端的安全通信。信号允许您使用.emit值,然后在代码的其他地方由与.connect链接的slot函数拾取这些值。

下面是一个简单的WorkerSignals类,定义为包含许多示例信号。

自定义信号只能在从QObject派生的对象上定义。由于QRunnable不是从QObject派生的,我们不能直接定义那里的信号。使用自定义QObject保存信号是最简单的解决方案。

import traceback, sysclass WorkerSignals(QObject):    '''    Defines the signals available from a running worker thread.    Supported signals are:    finished        No data    error        tuple (exctype, value, traceback.format_exc() )    result        object data returned from processing, anything    '''    finished = pyqtSignal()    error = pyqtSignal(tuple)    result = pyqtSignal(object)

在这个例子中,我们定义了3个自定义信号:

finished信号,没有数据表明任务何时完成。error信号,接收异常类型、异常值和格式化回溯的元组。result信号,从执行函数接收任意对象类型的结果。

您可能不需要所有这些信号,但它们被包括在内,以指示可能发生的事情。在下面的代码中,我们将实现一个长时间运行的任务,该任务利用这些信号向用户提供有用的信息。

class Worker(QRunnable):    '''    Worker thread    Inherits from QRunnable to handler worker thread setup, signals and wrap-up.    :param callback: The function callback to run on this worker thread. Supplied args and                     kwargs will be passed through to the runner.    :type callback: function    :param args: Arguments to pass to the callback function    :param kwargs: Keywords to pass to the callback function    '''    def __init__(self, fn, *args, **kwargs):        super(Worker, self).__init__()        # Store constructor arguments (re-used for processing)        self.fn = fn        self.args = args        self.kwargs = kwargs        self.signals = WorkerSignals()    @pyqtSlot()    def run(self):        '''        Initialise the runner function with passed args, kwargs.        '''        # Retrieve args/kwargs here; and fire processing using them        try:            result = self.fn(                *self.args, **self.kwargs            )        except:            traceback.print_exc()            exctype, value = sys.exc_info()[:2]            self.signals.error.emit((exctype, value, traceback.format_exc()))        else:            self.signals.result.emit(result)  # Return the result of the processing        finally:            self.signals.finished.emit()  # Done

您可以将自己的处理函数连接到这些信号,以接收线程完成(或结果)的通知。

def execute_this_fn(self):    for n in range(0, 5):        time.sleep(1)    return "Done."def print_output(self, s):    print(s)def thread_complete(self):    print("THREAD COMPLETE!")def oh_no(self):    # Pass the function to execute    worker = Worker(self.execute_this_fn) # Any other args, kwargs are passed to the run function    worker.signals.result.connect(self.print_output)    worker.signals.finished.connect(self.thread_complete)    # Execute    self.threadpool.start(worker)

您还经常希望从长时间运行的线程接收状态信息。这可以通过传递回调来实现,您运行的代码可以向回调发送信息。这里有两个选择:定义新的信号(允许使用事件循环执行处理)或使用标准的Python函数。

在这两种情况下,您都需要将这些回调传递给目标函数才能使用它们。在下面的完整代码中使用了基于信号的方法,其中我们传递了一个int作为线程进度百分比的指示器

完整代码

下面给出了一个完整的工作示例,展示了自定义QRunnable工作者以及工作者和进度信号。您应该能够轻松地将此代码适应您所开发的任何多线程应用程序。

from PyQt6.QtGui import *from PyQt6.QtWidgets import *from PyQt6.QtCore import *import timeimport traceback, sysclass WorkerSignals(QObject):    '''    Defines the signals available from a running worker thread.    Supported signals are:    finished        No data    error        tuple (exctype, value, traceback.format_exc() )    result        object data returned from processing, anything    progress        int indicating % progress    '''    finished = pyqtSignal()    error = pyqtSignal(tuple)    result = pyqtSignal(object)    progress = pyqtSignal(int)class Worker(QRunnable):    '''    Worker thread    Inherits from QRunnable to handler worker thread setup, signals and wrap-up.    :param callback: The function callback to run on this worker thread. Supplied args and                     kwargs will be passed through to the runner.    :type callback: function    :param args: Arguments to pass to the callback function    :param kwargs: Keywords to pass to the callback function    '''    def __init__(self, fn, *args, **kwargs):        super(Worker, self).__init__()        # Store constructor arguments (re-used for processing)        self.fn = fn        self.args = args        self.kwargs = kwargs        self.signals = WorkerSignals()        # Add the callback to our kwargs        self.kwargs['progress_callback'] = self.signals.progress    @pyqtSlot()    def run(self):        '''        Initialise the runner function with passed args, kwargs.        '''        # Retrieve args/kwargs here; and fire processing using them        try:            result = self.fn(*self.args, **self.kwargs)        except:            traceback.print_exc()            exctype, value = sys.exc_info()[:2]            self.signals.error.emit((exctype, value, traceback.format_exc()))        else:            self.signals.result.emit(result)  # Return the result of the processing        finally:            self.signals.finished.emit()  # Doneclass MainWindow(QMainWindow):    def __init__(self, *args, **kwargs):        super(MainWindow, self).__init__(*args, **kwargs)        self.counter = 0        layout = QVBoxLayout()        self.l = QLabel("Start")        b = QPushButton("DANGER!")        b.pressed.connect(self.oh_no)        layout.addWidget(self.l)        layout.addWidget(b)        w = QWidget()        w.setLayout(layout)        self.setCentralWidget(w)        self.show()        self.threadpool = QThreadPool()        print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount())        self.timer = QTimer()        self.timer.setInterval(1000)        self.timer.timeout.connect(self.recurring_timer)        self.timer.start()    def progress_fn(self, n):        print("%d%% done" % n)    def execute_this_fn(self, progress_callback):        for n in range(0, 5):            time.sleep(1)            progress_callback.emit(n*100/4)        return "Done."    def print_output(self, s):        print(s)    def thread_complete(self):        print("THREAD COMPLETE!")    def oh_no(self):        # Pass the function to execute        worker = Worker(self.execute_this_fn) # Any other args, kwargs are passed to the run function        worker.signals.result.connect(self.print_output)        worker.signals.finished.connect(self.thread_complete)        worker.signals.progress.connect(self.progress_fn)        # Execute        self.threadpool.start(worker)    def recurring_timer(self):        self.counter +=1        self.l.setText("Counter: %d" % self.counter)app = QApplication([])window = MainWindow()app.exec_()
备注

您可能已经发现了这个总体计划中的轻微缺陷——我们仍然在使用事件循环(和GUI线程)来处理我们的worker的输出。

当我们只是跟踪进度、完成或返回元数据时,这不是问题。然而,如果你有返回大量数据的worker——例如加载大文件,执行复杂的分析和需要(大)结果,或者查询数据库——通过GUI线程传递这些数据可能会导致性能问题,最好避免。

类似地,如果应用程序使用大量线程和Python结果处理程序,则可能会遇到GIL的限制。如前所述,在使用线程时,Python的执行一次仅限于单个线程。处理线程信号的Python代码可能会被worker阻塞,反之亦然。因为阻塞插槽函数会阻塞事件循环,这会直接影响GUI的响应能力。

在这些情况下,最好使用纯python线程池(例如并发futures)来将处理和线程事件处理与GUI进一步隔离。但是,请注意,任何Python GUI代码都可以阻塞其他Python代码,除非它位于单独的进程中。

标签: #qtimerpython