前言:
当前我们对“qtimerpython”可能比较关怀,我们都需要学习一些“qtimerpython”的相关内容。那么小编在网上汇集了一些有关“qtimerpython””的相关知识,希望小伙伴们能喜欢,你们快快来了解一下吧!在不影响UI的情况下并发运行后台任务;
在构建Python GUI应用程序时,一个常见的问题是在试图执行长时间运行的后台任务时“锁定”接口。在本教程中,我将介绍在PyQt6中实现并发执行的最简单的方法之一。
Background
基于Qt的应用程序(像大多数GUI应用程序一样)是基于事件的。这意味着执行是根据用户交互、信号和计时器来驱动的。在事件驱动的应用程序中,单击按钮会创建一个事件,应用程序随后会处理该事件以产生一些预期的输出。事件被推入和从事件队列中取出,并按顺序处理。
app = QApplication([])window = MainWindow()app.exec_()
事件循环通过在QApplication对象上调用.exec_()开始,并在与Python代码相同的线程中运行。运行此事件循环的线程(通常称为GUI线程)还处理与主机操作系统的所有窗口通信。
默认情况下,由事件循环触发的任何执行也将在此线程中同步运行。在实践中,这意味着任何时候你的PyQt应用程序在你的代码中做一些事情,窗口通信和GUI交互都是冻结的。
如果您正在做的事情很简单,并且快速地将控制返回到GUI循环,那么用户将察觉不到这种冻结。但是,如果您需要执行较长时间运行的任务,例如打开/写入一个大文件,下载一些数据,或呈现一些复杂的图像,就会出现问题。对于您的用户,应用程序将显示为无响应(因为它是)。因为你的应用程序不再与操作系统通信,如果你点击你的应用程序,你会看到旋转的死亡之轮。没有人希望这样。
解决方案很简单:将工作从GUI线程移出(放到另一个线程中)。PyQt(通过Qt)提供了一个简单的接口来实现这一点。
准备
为了演示多线程执行,我们需要一个应用程序。下面是一个简单的PyQt应用程序,它将允许我们演示多线程,并看到实际的结果。只需复制并粘贴到一个新文件中,并使用适当的文件名(如多线程.py)保存它。将其余代码将被添加到这个文件中。
from PyQt6.QtGui import *from PyQt6.QtWidgets import *from PyQt6.QtCore import *import timeclass MainWindow(QMainWindow): def __init__(self, *args, **kwargs): super(MainWindow, self).__init__(*args, **kwargs) self.counter = 0 layout = QVBoxLayout() self.l = QLabel("Start") b = QPushButton("DANGER!") b.pressed.connect(self.oh_no) layout.addWidget(self.l) layout.addWidget(b) w = QWidget() w.setLayout(layout) self.setCentralWidget(w) self.show() self.timer = QTimer() self.timer.setInterval(1000) self.timer.timeout.connect(self.recurring_timer) self.timer.start() def oh_no(self): time.sleep(5) def recurring_timer(self): self.counter +=1 self.l.setText("Counter: %d" % self.counter)app = QApplication([])window = MainWindow()app.exec()
运行:您应该看到一个演示窗口,其中有一个数字在向上计数。这是由一个简单的循环时间产生的,每秒发射一次。这可以看作是我们的事件循环指示器,它是一种简单的方法,可以让我们知道我们的应用程序正在正常运行。还有一个按钮上写着“DANGER!”按下它。
您将注意到,每当您按下按钮时,计数器停止跳动,应用程序完全冻结5S。在Windows上,你可能会看到窗口变白,表明它没有响应,而在Mac上,你会看到旋转的死亡之轮。
显示冻结的原因是Qt事件循环被阻止处理(和响应)窗口事件。你在窗口上的点击仍然被主机操作系统注册并发送给你的应用程序,但因为它位于你的time.sleep中,它不能接受或对它们做出反应。它们必须等到您的代码将控制权传递回Qt。
解决这个问题的最简单、也许也是最合乎逻辑的方法是在代码中接受事件。这允许Qt继续响应主机操作系统,并且您的应用程序将保持响应。你可以通过在QApplication类上使用静态的.processEvents()函数轻松做到这一点。只需在长时间运行的代码块中添加如下一行:
QApplication.processEvents()
例如,长时间运行代码。我们可以把它分解成5x个1秒的睡眠,并在中间插入.processEvents。
def oh_no(self): for n in range(5): QApplication.processEvents() time.sleep(1)
现在,当您按下按钮时,进入休眠。但是,现在QApplication.processEvents()会间歇性地将控制传递回Qt,并允许它像正常一样响应事件。Qt将接受事件并在返回运行其余代码之前处理它们。
这是可行的,但出于几个原因,它仍然是垃圾代码。
首先,当您将控制权传递回Qt时,您的代码将不再运行。这意味着无论你试图做什么长时间运行的事情都会花费更长的时间。这绝对不是你想要的。
其次,在主事件循环(app.exec())之外处理事件,会导致你的应用程序在循环内分支到处理代码(例如,用于触发槽或事件)。如果您的代码依赖于/响应外部状态,这可能会导致未定义的行为。下面的代码演示了这一点:
from PyQt6.QtGui import *from PyQt6.QtWidgets import *from PyQt6.QtCore import *import timeclass MainWindow(QMainWindow): def __init__(self, *args, **kwargs): super(MainWindow, self).__init__(*args, **kwargs) self.counter = 0 layout = QVBoxLayout() self.l = QLabel("Start") b = QPushButton("DANGER!") b.pressed.connect(self.oh_no) c = QPushButton("?") c.pressed.connect(self.change_message) layout.addWidget(self.l) layout.addWidget(b) layout.addWidget(c) w = QWidget() w.setLayout(layout) self.setCentralWidget(w) self.show() def change_message(self): self.message = "OH NO" def oh_no(self): self.message = "Pressed" for n in range(100): time.sleep(0.1) self.l.setText(self.message) QApplication.processEvents()app = QApplication([])window = MainWindow()app.exec_()
如果运行这段代码,您将看到与以前一样的计数器。按下“DANGER!”将显示的文本更改为“Pressed”,如oh_no函数的入口点所定义的那样。但是,如果在oh_no仍在运行时按下“?”按钮,您将看到消息发生了变化。状态从循环外部被改变。
这是一个简单的例子。但是,如果您的应用程序中有多个长时间运行的进程,并且每个进程都调用QApplication.processEvents()来保持运行,那么您的应用程序行为可能是不可预测的。
线程和进程
如果你退一步思考你想在你的应用程序中发生什么,它可能可以总结为“一些事情与其他事情同时发生”。
在PyQt应用程序中运行独立任务有两种主要方法:线程和进程。
线程共享相同的内存空间,因此可以快速启动并消耗最少的资源。共享内存使得在线程之间传递数据变得很简单,但是从不同线程读取/写入内存可能导致竞争条件或段错误。在Python GUI中,还有一个额外的问题,即多个线程被同一个全局解释器锁(GIL)绑定——这意味着非GIL释放的Python代码一次只能在一个线程中执行。然而,这并不是PyQt的主要问题,因为大部分时间都是在Python之外度过的。
进程使用独立的内存空间(以及完全独立的Python解释器)。这避免了GIL的任何潜在问题,但代价是启动时间较慢,内存开销更大,发送/接收数据更复杂。
为了简单起见,通常使用线程是有意义的,除非您有很好的理由使用进程。Qt中的子进程更适合于运行和与外部程序通信。
QRunnable和QThreadPool
Qt为在其他线程中运行作业提供了一个非常简单的接口,这在PyQt中很好地公开了。这是围绕两个类构建的:QRunnable和QThreadPool。前者是您想要执行的工作的容器,而后者是将工作传递给线程的方法。
使用QThreadPool的好处是它可以为您处理工作线程的排队和执行。除了排队作业和检索结果之外,根本没有太多要做的事情。
要定义一个自定义QRunnable,您可以子类化基本QRunnable类,然后将您希望执行的代码放置在run()方法中。下面是我们长时间运行的实现。sleep job作为QRunnable。将以下代码添加到multithread.py中,位于MainWindow类定义的上方。
class Worker(QRunnable): ''' Worker thread ''' @pyqtSlot() def run(self): ''' Your code goes in this function ''' print("Thread start") time.sleep(5) print("Thread complete")
在另一个线程中执行我们的函数只是简单地创建一个Worker实例,然后将它传递给我们的QThreadPool实例,它将自动执行。
接下来在__init__块中添加以下内容,以设置线程池。
self.threadpool = QThreadPool()print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount())
最后,将以下代码行添加到oh_no函数中。
def oh_no(self): worker = Worker() self.threadpool.start(worker)
现在,单击按钮将创建一个工作线程来处理(长时间运行的)任务,并通过线程池将其转到另一个线程。如果没有足够的线程来处理传入的worker,它们将被排队并在稍后按顺序执行。
尝试一下,您将看到您的应用程序现在可以毫无问题地处理点击按钮。
检查一下如果你多次按下按钮会发生什么。您应该看到您的线程立即执行,直到.maxThreadCount报告的数量。如果在已经有这个数量的活动worker之后再次按下按钮,那么后续的worker将排队,直到有一个线程可用为止。
改善QRunnables
如果你想将自定义数据传递给执行函数,你可以通过init,然后从run槽函数中通过self访问数据。
class Worker(QRunnable): ''' Worker thread :param args: Arguments to make available to the run code :param kwargs: Keywords arguments to make available to the run code ''' def __init__(self, *args, **kwargs): super(Worker, self).__init__() self.args = args self.kwargs = kwargs @pyqtSlot() def run(self): ''' Initialise the runner function with passed self.args, self.kwargs. ''' print(args, kwargs)
事实上,我们可以利用Python中函数是对象这一事实,并传递函数来执行,而不是每次都子类化。在接下来的构造中,我们只需要一个Worker类来处理所有的执行作业。
class Worker(QRunnable): ''' Worker thread Inherits from QRunnable to handler worker thread setup, signals and wrap-up. :param callback: The function callback to run on this worker thread. Supplied args and kwargs will be passed through to the runner. :type callback: function :param args: Arguments to pass to the callback function :param kwargs: Keywords to pass to the callback function ''' def __init__(self, fn, *args, **kwargs): super(Worker, self).__init__() # Store constructor arguments (re-used for processing) self.fn = fn self.args = args self.kwargs = kwargs @pyqtSlot() def run(self): ''' Initialise the runner function with passed args, kwargs. ''' self.fn(*self.args, **self.kwargs)
您现在可以传入任何Python函数,并让它在单独的线程中执行。
def execute_this_fn(self): print("Hello!")def oh_no(self): args = (2,3) kwargs = {'test':1, "test2":2} worker = Worker(self.execute_this_fn, args, kwargs) worker = Worker(self.execute_this_fn) # Any other args, kwargs are passed to the run function # Execute self.threadpool.start(worker)Thread IO
有时,能够从正在运行的worker传回状态和数据是很有帮助的。这可能包括计算的结果、引发的异常或正在进行的进展(想想进度条)。Qt提供了信号和插槽框架,它允许你这样做,并且是线程安全的,允许从运行线程直接到GUI前端的安全通信。信号允许您使用.emit值,然后在代码的其他地方由与.connect链接的slot函数拾取这些值。
下面是一个简单的WorkerSignals类,定义为包含许多示例信号。
自定义信号只能在从QObject派生的对象上定义。由于QRunnable不是从QObject派生的,我们不能直接定义那里的信号。使用自定义QObject保存信号是最简单的解决方案。
import traceback, sysclass WorkerSignals(QObject): ''' Defines the signals available from a running worker thread. Supported signals are: finished No data error tuple (exctype, value, traceback.format_exc() ) result object data returned from processing, anything ''' finished = pyqtSignal() error = pyqtSignal(tuple) result = pyqtSignal(object)
在这个例子中,我们定义了3个自定义信号:
finished信号,没有数据表明任务何时完成。error信号,接收异常类型、异常值和格式化回溯的元组。result信号,从执行函数接收任意对象类型的结果。
您可能不需要所有这些信号,但它们被包括在内,以指示可能发生的事情。在下面的代码中,我们将实现一个长时间运行的任务,该任务利用这些信号向用户提供有用的信息。
class Worker(QRunnable): ''' Worker thread Inherits from QRunnable to handler worker thread setup, signals and wrap-up. :param callback: The function callback to run on this worker thread. Supplied args and kwargs will be passed through to the runner. :type callback: function :param args: Arguments to pass to the callback function :param kwargs: Keywords to pass to the callback function ''' def __init__(self, fn, *args, **kwargs): super(Worker, self).__init__() # Store constructor arguments (re-used for processing) self.fn = fn self.args = args self.kwargs = kwargs self.signals = WorkerSignals() @pyqtSlot() def run(self): ''' Initialise the runner function with passed args, kwargs. ''' # Retrieve args/kwargs here; and fire processing using them try: result = self.fn( *self.args, **self.kwargs ) except: traceback.print_exc() exctype, value = sys.exc_info()[:2] self.signals.error.emit((exctype, value, traceback.format_exc())) else: self.signals.result.emit(result) # Return the result of the processing finally: self.signals.finished.emit() # Done
您可以将自己的处理函数连接到这些信号,以接收线程完成(或结果)的通知。
def execute_this_fn(self): for n in range(0, 5): time.sleep(1) return "Done."def print_output(self, s): print(s)def thread_complete(self): print("THREAD COMPLETE!")def oh_no(self): # Pass the function to execute worker = Worker(self.execute_this_fn) # Any other args, kwargs are passed to the run function worker.signals.result.connect(self.print_output) worker.signals.finished.connect(self.thread_complete) # Execute self.threadpool.start(worker)
您还经常希望从长时间运行的线程接收状态信息。这可以通过传递回调来实现,您运行的代码可以向回调发送信息。这里有两个选择:定义新的信号(允许使用事件循环执行处理)或使用标准的Python函数。
在这两种情况下,您都需要将这些回调传递给目标函数才能使用它们。在下面的完整代码中使用了基于信号的方法,其中我们传递了一个int作为线程进度百分比的指示器
完整代码
下面给出了一个完整的工作示例,展示了自定义QRunnable工作者以及工作者和进度信号。您应该能够轻松地将此代码适应您所开发的任何多线程应用程序。
from PyQt6.QtGui import *from PyQt6.QtWidgets import *from PyQt6.QtCore import *import timeimport traceback, sysclass WorkerSignals(QObject): ''' Defines the signals available from a running worker thread. Supported signals are: finished No data error tuple (exctype, value, traceback.format_exc() ) result object data returned from processing, anything progress int indicating % progress ''' finished = pyqtSignal() error = pyqtSignal(tuple) result = pyqtSignal(object) progress = pyqtSignal(int)class Worker(QRunnable): ''' Worker thread Inherits from QRunnable to handler worker thread setup, signals and wrap-up. :param callback: The function callback to run on this worker thread. Supplied args and kwargs will be passed through to the runner. :type callback: function :param args: Arguments to pass to the callback function :param kwargs: Keywords to pass to the callback function ''' def __init__(self, fn, *args, **kwargs): super(Worker, self).__init__() # Store constructor arguments (re-used for processing) self.fn = fn self.args = args self.kwargs = kwargs self.signals = WorkerSignals() # Add the callback to our kwargs self.kwargs['progress_callback'] = self.signals.progress @pyqtSlot() def run(self): ''' Initialise the runner function with passed args, kwargs. ''' # Retrieve args/kwargs here; and fire processing using them try: result = self.fn(*self.args, **self.kwargs) except: traceback.print_exc() exctype, value = sys.exc_info()[:2] self.signals.error.emit((exctype, value, traceback.format_exc())) else: self.signals.result.emit(result) # Return the result of the processing finally: self.signals.finished.emit() # Doneclass MainWindow(QMainWindow): def __init__(self, *args, **kwargs): super(MainWindow, self).__init__(*args, **kwargs) self.counter = 0 layout = QVBoxLayout() self.l = QLabel("Start") b = QPushButton("DANGER!") b.pressed.connect(self.oh_no) layout.addWidget(self.l) layout.addWidget(b) w = QWidget() w.setLayout(layout) self.setCentralWidget(w) self.show() self.threadpool = QThreadPool() print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount()) self.timer = QTimer() self.timer.setInterval(1000) self.timer.timeout.connect(self.recurring_timer) self.timer.start() def progress_fn(self, n): print("%d%% done" % n) def execute_this_fn(self, progress_callback): for n in range(0, 5): time.sleep(1) progress_callback.emit(n*100/4) return "Done." def print_output(self, s): print(s) def thread_complete(self): print("THREAD COMPLETE!") def oh_no(self): # Pass the function to execute worker = Worker(self.execute_this_fn) # Any other args, kwargs are passed to the run function worker.signals.result.connect(self.print_output) worker.signals.finished.connect(self.thread_complete) worker.signals.progress.connect(self.progress_fn) # Execute self.threadpool.start(worker) def recurring_timer(self): self.counter +=1 self.l.setText("Counter: %d" % self.counter)app = QApplication([])window = MainWindow()app.exec_()备注
您可能已经发现了这个总体计划中的轻微缺陷——我们仍然在使用事件循环(和GUI线程)来处理我们的worker的输出。
当我们只是跟踪进度、完成或返回元数据时,这不是问题。然而,如果你有返回大量数据的worker——例如加载大文件,执行复杂的分析和需要(大)结果,或者查询数据库——通过GUI线程传递这些数据可能会导致性能问题,最好避免。
类似地,如果应用程序使用大量线程和Python结果处理程序,则可能会遇到GIL的限制。如前所述,在使用线程时,Python的执行一次仅限于单个线程。处理线程信号的Python代码可能会被worker阻塞,反之亦然。因为阻塞插槽函数会阻塞事件循环,这会直接影响GUI的响应能力。
在这些情况下,最好使用纯python线程池(例如并发futures)来将处理和线程事件处理与GUI进一步隔离。但是,请注意,任何Python GUI代码都可以阻塞其他Python代码,除非它位于单独的进程中。
标签: #qtimerpython