前言:
今天看官们对“python threading 退出”大致比较讲究,各位老铁们都需要学习一些“python threading 退出”的相关内容。那么小编同时在网上网罗了一些对于“python threading 退出””的相关内容,希望大家能喜欢,大家一起来了解一下吧!Python 线程简介
20分钟
这里面所谓的知识点都可以深入分析。
什么是线程如何创建线程并等待它们完成如何使用ThreadPoolExecutor如何避免竞争如何使用 Python 提供的工具threading什么是线程?
线程是一个单独的执行流。这意味着程序将同时执行两件事。但对于大多数 Python 3 实现来说,不同的线程实际上并不会同时执行:只是看起来而已。
很容易将线程视为在程序上运行两个(或多个)不同的处理器,每个处理器同时执行一个独立的任务。线程可能在不同的处理器core上运行,但一次只能运行一个。
创建thread线程对象并启动
要启动单独的线程,创建一个实例,然后告诉它:Thread.start()
import loggingimport threadingimport timedef thread_function(name): logging.info("Thread %s: starting", name) time.sleep(2) logging.info("Thread %s: finishing", name)if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") logging.info("Main : before creating thread") x = threading.Thread(target=thread_function, args=(1,)) logging.info("Main : before running thread") x.start() logging.info("Main : wait for the thread to finish") # x.join() logging.info("Main : all done")
x = threading.Thread(target=thread_function, args=(1,))x.start()
创建线程对象要告诉它 执行什么任务,实际上就是一个函数thread_function(),同时传递参数给这个函数(这个是非必要的)
$ ./single_thread.pyMain : before creating threadMain : before running threadThread 1: startingMain : wait for the thread to finishMain : all doneThread 1: finishing
守护进程线程
在计算机科学中,守护进程是在后台运行的进程。
Python 有更具体的含义。
当程序退出时,守护线程将立即关闭对于非守护线程,则程序将等待这些线程完成,然后再终止。
但是,当程序退出时,所有的线程都会被kill
daemon=True表示守护线程
x = threading.Thread(target=thread_function, args=(1,), daemon=True)
现在运行程序时,应看到以下输出:
$ ./daemon_thread.pyMain : before creating threadMain : before running threadThread 1: startingMain : wait for the thread to finishMain : all done
join() 线程
守护进程线程很方便,但是当您想等待线程停止时呢?
# x.join()
一个线程等待另一个线程 x 完成 x..join()
使用多个线程
启动多个线程的方法:
import loggingimport threadingimport timedef thread_function(name): logging.info("Thread %s: starting", name) time.sleep(2) logging.info("Thread %s: finishing", name)if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") threads = list() for index in range(3): logging.info("Main : create and start thread %d.", index) x = threading.Thread(target=thread_function, args=(index,)) threads.append(x) x.start() for index, thread in enumerate(threads): logging.info("Main : before joining thread %d.", index) thread.join() logging.info("Main : thread %d done", index)
$ ./multiple_threads.pyMain : create and start thread 0.Thread 0: startingMain : create and start thread 1.Thread 1: startingMain : create and start thread 2.Thread 2: startingMain : before joining thread 0.Thread 2: finishingThread 1: finishingThread 0: finishingMain : thread 0 doneMain : before joining thread 1.Main : thread 1 doneMain : before joining thread 2.Main : thread 2 done
循环生成多个线程然后启动它x = threading.Thread(target=thread_function, args=(index,))
使用ThreadPoolExecutor从 Python 3.2 开始提供 ThreadPoolExecutor 线程池执行器创建池的最简单方法是作为上下文管理器,使用 with 语句来管理池的创建和销毁。如果写过Java程序一定会觉得这个这类相当 的熟悉。
import concurrent.futures# [rest of code]if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: executor.map(thread_function, range(3))
max_workers代表的最大 的线程数是 3
输出:
$ ./executor.pyThread 0: startingThread 1: startingThread 2: startingThread 1: finishingThread 0: finishingThread 2: finishing
共享资源竞争条件Race Conditions
当两个或多个线程访问共享数据或资源时,可能会出现争用条件。
下面的例子模拟从数据库取一个值然后 +1,再写回数据库
class FakeDatabase: def __init__(self): self.value = 0 def update(self, name): logging.info("Thread %s: starting update", name) local_copy = self.value local_copy += 1 time.sleep(0.1) self.value = local_copy logging.info("Thread %s: finishing update", name)
local_copy = self.valuelocal_copy += 1
if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") database = FakeDatabase() logging.info("Testing update. Starting value is %d.", database.value) with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: for index in range(2): executor.submit(database.update, index) logging.info("Testing update. Ending value is %d.", database.value)
程序创建一个带有两个线程的线程,然后调用每个线程
.submit(function, *args, **kwargs)
$ ./racecond.pyTesting unlocked update. Starting value is 0.Thread 0: starting updateThread 1: starting updateThread 0: finishing updateThread 1: finishing updateTesting unlocked update. Ending value is 1.
多线程访问共享资源的问题
当线程开始运行时每个线程都有自己的本地变量复本,线程执行在 .update() 的情况下,更新的是那个复本local_copy 而不是main程序中最新值,这就导致 对这个数据的修改是 线程不安全的。
两个线程
两个线程将同时运行,但不会同时运行。 多线程交替运行并且,每个线程都有自己的复本,.正是这个共享对象导致线程不安全的问题
两个线程可以交错访问单个共享对象,从而覆盖彼此的结果。当一个线程在另一个线程完成访问之前释放内存,可能会出现类似的争用条件。
local_copy = self.value核心问题是这个 local_copy不是最新的数据,因为有可能你拿 之后,其他人就修改了。
解决线程安全问题同步使用Lock
class FakeDatabase: def __init__(self): self.value = 0 self._lock = threading.Lock() def locked_update(self, name): logging.info("Thread %s: starting update", name) logging.debug("Thread %s about to lock", name) with self._lock: logging.debug("Thread %s has lock", name) local_copy = self.value local_copy += 1 time.sleep(0.1) self.value = local_copy logging.debug("Thread %s about to release lock", name) logging.debug("Thread %s after release", name) logging.info("Thread %s: finishing update", name)
$ ./fixrace.pyTesting locked update. Starting value is 0.Thread 0: starting updateThread 1: starting updateThread 0: finishing updateThread 1: finishing updateTesting locked update. Ending value is 2.
logging.getLogger().setLevel(logging.DEBUG)
运行此程序如下
$ ./fixrace.pyTesting locked update. Starting value is 0.Thread 0: starting updateThread 0 about to lockThread 0 has lockThread 1: starting updateThread 1 about to lockThread 0 about to release lockThread 0 after releaseThread 0: finishing updateThread 1 has lockThread 1 about to release lockThread 1 after releaseThread 1: finishing updateTesting locked update. Ending value is 2.
本质 是在执行共享数据的操作时需要先拿到锁,然后操作结束后。其他线程才能修改这个数据
Deadlock死锁
互相等待。
import threadingl = threading.Lock()print("before first acquire")l.acquire()print("before second acquire")l.acquire()print("acquired lock twice")生产者-消费者线程
生产者-消费者问题是一个标准的计算机科学问题,
生产者-消费者使用Lock工厂生产产品放到仓库,入库后库存增加,消费者购买商品,出库后库存减少
这就是一个简单的生产消费者的生活场景。
定义一个生产者。
import random SENTINEL = object()def producer(pipeline): """Pretend we're getting a message from the network.""" for index in range(10): message = random.randint(1, 101) logging.info("Producer got message: %s", message) pipeline.set_message(message, "Producer") # Send a sentinel message to tell consumer we're done pipeline.set_message(SENTINEL, "Producer")
定义一个消费者:pipeline
def consumer(pipeline): """Pretend we're saving a number in the database.""" message = 0 while message is not SENTINEL: message = pipeline.get_message("Consumer") if message is not SENTINEL: logging.info("Consumer storing message: %s", message)
if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") # logging.getLogger().setLevel(logging.DEBUG) pipeline = Pipeline() with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: executor.submit(producer, pipeline) executor.submit(consumer, pipeline)
# logging.getLogger().setLevel(logging.DEBUG)
仓库:
class Pipeline: def __init__(self): self.message = 0 self.producer_lock = threading.Lock() self.consumer_lock = threading.Lock() self.consumer_lock.acquire() def get_message(self, name): logging.debug("%s:about to acquire getlock", name) self.consumer_lock.acquire() logging.debug("%s:have getlock", name) message = self.message logging.debug("%s:about to release setlock", name) self.producer_lock.release() logging.debug("%s:setlock released", name) return message def set_message(self, message, name): logging.debug("%s:about to acquire setlock", name) self.producer_lock.acquire() logging.debug("%s:have setlock", name) self.message = message logging.debug("%s:about to release getlock", name) self.consumer_lock.release() logging.debug("%s:getlock released", name)
class Pipeline: """ Class to allow a single element pipeline between producer and consumer. """ def __init__(self): self.message = 0 self.producer_lock = threading.Lock() self.consumer_lock = threading.Lock() self.consumer_lock.acquire() def get_message(self, name): self.consumer_lock.acquire() message = self.message self.producer_lock.release() return message def set_message(self, message, name): self.producer_lock.acquire() self.message = message self.consumer_lock.release()
Pipeline
.message 存储要传递的消息。.producer_lock 限制线程对消息的访问。
.consumer_lock 是限制线程对消息的访问
$ ./prodcom_lock.pyProducer got data 43Producer got data 45Consumer storing data: 43Producer got data 86Consumer storing data: 45Producer got data 40Consumer storing data: 86Producer got data 62Consumer storing data: 40Producer got data 15Consumer storing data: 62Producer got data 16Consumer storing data: 15Producer got data 61Consumer storing data: 16Producer got data 73Consumer storing data: 61Producer got data 22Consumer storing data: 73Consumer storing data: 22
但是这个其实有个问题,大家如果仔细观察,每次只处理一个消息。
生产者-消费者使用Queue
如果希望能够一次处理管道中的多个消息怎么办呢?
if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") # logging.getLogger().setLevel(logging.DEBUG) pipeline = Pipeline() event = threading.Event() with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: executor.submit(producer, pipeline, event) executor.submit(consumer, pipeline, event) time.sleep(0.1) logging.info("Main: about to set event") event.set()
这里唯一的更改是在第 8 行创建对象,在第 10 行和第 11 行传递 as 参数,在第 13 行到第 15 行传递最后一部分,休眠一秒钟,记录一条消息,然后调用事件。eventevent.set()
也不必改变太多:producer
def producer(pipeline, event): """Pretend we're getting a number from the network.""" while not event.is_set(): message = random.randint(1, 101) logging.info("Producer got message: %s", message) pipeline.set_message(message, "Producer") logging.info("Producer received EXIT event. Exiting")
def consumer(pipeline, event): """Pretend we're saving a number in the database.""" while not event.is_set() or not pipeline.empty(): message = pipeline.get_message("Consumer") logging.info( "Consumer storing message: %s (queue size=%s)", message, pipeline.qsize(), ) logging.info("Consumer received EXIT event. Exiting")
class Pipeline(queue.Queue): def __init__(self): super().__init__(maxsize=10) def get_message(self, name): logging.debug("%s:about to get from queue", name) value = self.get() logging.debug("%s:got %d from queue", name, value) return value def set_message(self, value, name): logging.debug("%s:about to add %d to queue", name, value) self.put(value) logging.debug("%s:added %d to queue", name, value)
class Pipeline(queue.Queue):是一个队列的子类,先进先出。 新数据放在尾部,取数据从头部取。
$ ./prodcom_queue.pyProducer got message: 32Producer got message: 51Producer got message: 25Producer got message: 94Producer got message: 29Consumer storing message: 32 (queue size=3)Producer got message: 96Consumer storing message: 51 (queue size=3)Producer got message: 6Consumer storing message: 25 (queue size=3)Producer got message: 31[many lines deleted]Producer got message: 80Consumer storing message: 94 (queue size=6)Producer got message: 33Consumer storing message: 20 (queue size=6)Producer got message: 48Consumer storing message: 31 (queue size=6)Producer got message: 52Consumer storing message: 98 (queue size=6)Main: about to set eventProducer got message: 13Consumer storing message: 59 (queue size=6)Producer received EXIT event. ExitingConsumer storing message: 75 (queue size=6)Consumer storing message: 97 (queue size=5)Consumer storing message: 80 (queue size=4)Consumer storing message: 33 (queue size=3)Consumer storing message: 48 (queue size=2)Consumer storing message: 52 (queue size=1)Consumer storing message: 13 (queue size=0)Consumer received EXIT event. Exiting
Consumer storing message: 32 (queue size=3)
import concurrent.futuresimport loggingimport queueimport randomimport threadingimport timedef producer(queue, event): """Pretend we're getting a number from the network.""" while not event.is_set(): message = random.randint(1, 101) logging.info("Producer got message: %s", message) queue.put(message) logging.info("Producer received event. Exiting")def consumer(queue, event): """Pretend we're saving a number in the database.""" while not event.is_set() or not queue.empty(): message = queue.get() logging.info( "Consumer storing message: %s (size=%d)", message, queue.qsize() ) logging.info("Consumer received event. Exiting")if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") pipeline = queue.Queue(maxsize=10) event = threading.Event() with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: executor.submit(producer, pipeline, event) executor.submit(consumer, pipeline, event) time.sleep(0.1) logging.info("Main: about to set event") event.set()线程对象Semaphore信号
内部有个计数器
执行逻辑前, 先 acquire(). 内部计数器 - 1执行完逻辑后再 释放 .release() 内部计数器 +1定时器
A 是一种安排在经过一定时间后调用函数的方法 threading.TimerTimer
t = threading.Timer(30.0, my_function)
#妙笔生花创作挑战#