龙空技术网

Python 线程threading简介

洪较瘦不着调退役it人 109

前言:

今天看官们对“python threading 退出”大致比较讲究,各位老铁们都需要学习一些“python threading 退出”的相关内容。那么小编同时在网上网罗了一些对于“python threading 退出””的相关内容,希望大家能喜欢,大家一起来了解一下吧!

Python 线程简介

20分钟

这里面所谓的知识点都可以深入分析。

什么是线程如何创建线程并等待它们完成如何使用ThreadPoolExecutor如何避免竞争如何使用 Python 提供的工具threading什么是线程?

线程是一个单独的执行流。这意味着程序将同时执行两件事。但对于大多数 Python 3 实现来说,不同的线程实际上并不会同时执行:只是看起来而已。

很容易将线程视为在程序上运行两个(或多个)不同的处理器,每个处理器同时执行一个独立的任务。线程可能在不同的处理器core上运行,但一次只能运行一个。

创建thread线程对象并启动

要启动单独的线程,创建一个实例,然后告诉它:Thread.start()

 import loggingimport threadingimport timedef thread_function(name):    logging.info("Thread %s: starting", name)    time.sleep(2)    logging.info("Thread %s: finishing", name)if __name__ == "__main__":    format = "%(asctime)s: %(message)s"    logging.basicConfig(format=format, level=logging.INFO,                        datefmt="%H:%M:%S")    logging.info("Main    : before creating thread")    x = threading.Thread(target=thread_function, args=(1,))    logging.info("Main    : before running thread")    x.start()    logging.info("Main    : wait for the thread to finish")    # x.join()    logging.info("Main    : all done")

x = threading.Thread(target=thread_function, args=(1,))x.start()

创建线程对象要告诉它 执行什么任务,实际上就是一个函数thread_function(),同时传递参数给这个函数(这个是非必要的)

$ ./single_thread.pyMain    : before creating threadMain    : before running threadThread 1: startingMain    : wait for the thread to finishMain    : all doneThread 1: finishing

守护进程线程

在计算机科学中,守护进程是在后台运行的进程。

Python 有更具体的含义。

当程序退出时,守护线程将立即关闭对于非守护线程,则程序将等待这些线程完成,然后再终止。

但是,当程序退出时,所有的线程都会被kill

daemon=True表示守护线程
x = threading.Thread(target=thread_function, args=(1,), daemon=True)

现在运行程序时,应看到以下输出:

$ ./daemon_thread.pyMain    : before creating threadMain    : before running threadThread 1: startingMain    : wait for the thread to finishMain    : all done

join() 线程

守护进程线程很方便,但是当您想等待线程停止时呢?

# x.join()

一个线程等待另一个线程 x 完成 x..join()

使用多个线程

启动多个线程的方法:

import loggingimport threadingimport timedef thread_function(name):    logging.info("Thread %s: starting", name)    time.sleep(2)    logging.info("Thread %s: finishing", name)if __name__ == "__main__":    format = "%(asctime)s: %(message)s"    logging.basicConfig(format=format, level=logging.INFO,                        datefmt="%H:%M:%S")    threads = list()    for index in range(3):        logging.info("Main    : create and start thread %d.", index)        x = threading.Thread(target=thread_function, args=(index,))        threads.append(x)        x.start()    for index, thread in enumerate(threads):        logging.info("Main    : before joining thread %d.", index)        thread.join()        logging.info("Main    : thread %d done", index)
$ ./multiple_threads.pyMain    : create and start thread 0.Thread 0: startingMain    : create and start thread 1.Thread 1: startingMain    : create and start thread 2.Thread 2: startingMain    : before joining thread 0.Thread 2: finishingThread 1: finishingThread 0: finishingMain    : thread 0 doneMain    : before joining thread 1.Main    : thread 1 doneMain    : before joining thread 2.Main    : thread 2 done
循环生成多个线程然后启动它x = threading.Thread(target=thread_function, args=(index,))

使用ThreadPoolExecutor从 Python 3.2 开始提供 ThreadPoolExecutor 线程池执行器创建池的最简单方法是作为上下文管理器,使用 with 语句来管理池的创建和销毁。如果写过Java程序一定会觉得这个这类相当 的熟悉。

import concurrent.futures# [rest of code]if __name__ == "__main__":    format = "%(asctime)s: %(message)s"    logging.basicConfig(format=format, level=logging.INFO,                        datefmt="%H:%M:%S")    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:        executor.map(thread_function, range(3))
  max_workers代表的最大 的线程数是 3

输出:

$ ./executor.pyThread 0: startingThread 1: startingThread 2: startingThread 1: finishingThread 0: finishingThread 2: finishing

共享资源竞争条件Race Conditions

当两个或多个线程访问共享数据或资源时,可能会出现争用条件。

下面的例子模拟从数据库取一个值然后 +1,再写回数据库

class FakeDatabase:    def __init__(self):        self.value = 0    def update(self, name):        logging.info("Thread %s: starting update", name)        local_copy = self.value        local_copy += 1        time.sleep(0.1)        self.value = local_copy        logging.info("Thread %s: finishing update", name)
 local_copy = self.valuelocal_copy += 1
if __name__ == "__main__":    format = "%(asctime)s: %(message)s"    logging.basicConfig(format=format, level=logging.INFO,                        datefmt="%H:%M:%S")    database = FakeDatabase()    logging.info("Testing update. Starting value is %d.", database.value)    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:        for index in range(2):            executor.submit(database.update, index)    logging.info("Testing update. Ending value is %d.", database.value)

程序创建一个带有两个线程的线程,然后调用每个线程

.submit(function, *args, **kwargs)
$ ./racecond.pyTesting unlocked update. Starting value is 0.Thread 0: starting updateThread 1: starting updateThread 0: finishing updateThread 1: finishing updateTesting unlocked update. Ending value is 1.

多线程访问共享资源的问题

当线程开始运行时每个线程都有自己的本地变量复本,线程执行在 .update() 的情况下,更新的是那个复本local_copy 而不是main程序中最新值,这就导致 对这个数据的修改是 线程不安全的。

两个线程

两个线程将同时运行,但不会同时运行。 多线程交替运行并且,每个线程都有自己的复本,.正是这个共享对象导致线程不安全的问题

两个线程可以交错访问单个共享对象,从而覆盖彼此的结果。当一个线程在另一个线程完成访问之前释放内存,可能会出现类似的争用条件。

local_copy = self.value核心问题是这个 local_copy不是最新的数据,因为有可能你拿 之后,其他人就修改了。

解决线程安全问题同步使用Lock

class FakeDatabase:    def __init__(self):        self.value = 0        self._lock = threading.Lock()    def locked_update(self, name):        logging.info("Thread %s: starting update", name)        logging.debug("Thread %s about to lock", name)        with self._lock:            logging.debug("Thread %s has lock", name)            local_copy = self.value            local_copy += 1            time.sleep(0.1)            self.value = local_copy            logging.debug("Thread %s about to release lock", name)        logging.debug("Thread %s after release", name)        logging.info("Thread %s: finishing update", name)
$ ./fixrace.pyTesting locked update. Starting value is 0.Thread 0: starting updateThread 1: starting updateThread 0: finishing updateThread 1: finishing updateTesting locked update. Ending value is 2.
logging.getLogger().setLevel(logging.DEBUG)

运行此程序如下

$ ./fixrace.pyTesting locked update. Starting value is 0.Thread 0: starting updateThread 0 about to lockThread 0 has lockThread 1: starting updateThread 1 about to lockThread 0 about to release lockThread 0 after releaseThread 0: finishing updateThread 1 has lockThread 1 about to release lockThread 1 after releaseThread 1: finishing updateTesting locked update. Ending value is 2.

本质 是在执行共享数据的操作时需要先拿到锁,然后操作结束后。其他线程才能修改这个数据

Deadlock死锁

互相等待。

import threadingl = threading.Lock()print("before first acquire")l.acquire()print("before second acquire")l.acquire()print("acquired lock twice")
生产者-消费者线程

生产者-消费者问题是一个标准的计算机科学问题,

生产者-消费者使用Lock工厂生产产品放到仓库,入库后库存增加,消费者购买商品,出库后库存减少

这就是一个简单的生产消费者的生活场景。

定义一个生产者。

import random SENTINEL = object()def producer(pipeline):    """Pretend we're getting a message from the network."""    for index in range(10):        message = random.randint(1, 101)        logging.info("Producer got message: %s", message)        pipeline.set_message(message, "Producer")    # Send a sentinel message to tell consumer we're done    pipeline.set_message(SENTINEL, "Producer")

定义一个消费者:pipeline

def consumer(pipeline):    """Pretend we're saving a number in the database."""    message = 0    while message is not SENTINEL:        message = pipeline.get_message("Consumer")        if message is not SENTINEL:            logging.info("Consumer storing message: %s", message)
if __name__ == "__main__":    format = "%(asctime)s: %(message)s"    logging.basicConfig(format=format, level=logging.INFO,                        datefmt="%H:%M:%S")    # logging.getLogger().setLevel(logging.DEBUG)    pipeline = Pipeline()    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:        executor.submit(producer, pipeline)        executor.submit(consumer, pipeline)
# logging.getLogger().setLevel(logging.DEBUG)

仓库:

class Pipeline:     def __init__(self):        self.message = 0        self.producer_lock = threading.Lock()        self.consumer_lock = threading.Lock()        self.consumer_lock.acquire()    def get_message(self, name):        logging.debug("%s:about to acquire getlock", name)        self.consumer_lock.acquire()        logging.debug("%s:have getlock", name)        message = self.message        logging.debug("%s:about to release setlock", name)        self.producer_lock.release()        logging.debug("%s:setlock released", name)        return message    def set_message(self, message, name):        logging.debug("%s:about to acquire setlock", name)        self.producer_lock.acquire()        logging.debug("%s:have setlock", name)        self.message = message        logging.debug("%s:about to release getlock", name)        self.consumer_lock.release()        logging.debug("%s:getlock released", name)

class Pipeline:    """    Class to allow a single element pipeline between producer and consumer.    """    def __init__(self):        self.message = 0        self.producer_lock = threading.Lock()        self.consumer_lock = threading.Lock()        self.consumer_lock.acquire()    def get_message(self, name):        self.consumer_lock.acquire()        message = self.message        self.producer_lock.release()        return message    def set_message(self, message, name):        self.producer_lock.acquire()        self.message = message        self.consumer_lock.release()

Pipeline

.message 存储要传递的消息。.producer_lock 限制线程对消息的访问。

.consumer_lock 是限制线程对消息的访问

$ ./prodcom_lock.pyProducer got data 43Producer got data 45Consumer storing data: 43Producer got data 86Consumer storing data: 45Producer got data 40Consumer storing data: 86Producer got data 62Consumer storing data: 40Producer got data 15Consumer storing data: 62Producer got data 16Consumer storing data: 15Producer got data 61Consumer storing data: 16Producer got data 73Consumer storing data: 61Producer got data 22Consumer storing data: 73Consumer storing data: 22

但是这个其实有个问题,大家如果仔细观察,每次只处理一个消息。

生产者-消费者使用Queue

如果希望能够一次处理管道中的多个消息怎么办呢?

 if __name__ == "__main__":    format = "%(asctime)s: %(message)s"    logging.basicConfig(format=format, level=logging.INFO,                        datefmt="%H:%M:%S")    # logging.getLogger().setLevel(logging.DEBUG)    pipeline = Pipeline()    event = threading.Event()    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:        executor.submit(producer, pipeline, event)        executor.submit(consumer, pipeline, event)        time.sleep(0.1)        logging.info("Main: about to set event")        event.set()

这里唯一的更改是在第 8 行创建对象,在第 10 行和第 11 行传递 as 参数,在第 13 行到第 15 行传递最后一部分,休眠一秒钟,记录一条消息,然后调用事件。eventevent.set()

也不必改变太多:producer

 def producer(pipeline, event):    """Pretend we're getting a number from the network."""    while not event.is_set():        message = random.randint(1, 101)        logging.info("Producer got message: %s", message)        pipeline.set_message(message, "Producer")    logging.info("Producer received EXIT event. Exiting")
def consumer(pipeline, event):    """Pretend we're saving a number in the database."""    while not event.is_set() or not pipeline.empty():        message = pipeline.get_message("Consumer")        logging.info(            "Consumer storing message: %s  (queue size=%s)",            message,            pipeline.qsize(),        )    logging.info("Consumer received EXIT event. Exiting")

 class Pipeline(queue.Queue):    def __init__(self):        super().__init__(maxsize=10)    def get_message(self, name):        logging.debug("%s:about to get from queue", name)        value = self.get()        logging.debug("%s:got %d from queue", name, value)        return value    def set_message(self, value, name):        logging.debug("%s:about to add %d to queue", name, value)        self.put(value)        logging.debug("%s:added %d to queue", name, value)
class Pipeline(queue.Queue):是一个队列的子类,先进先出。 新数据放在尾部,取数据从头部取。
$ ./prodcom_queue.pyProducer got message: 32Producer got message: 51Producer got message: 25Producer got message: 94Producer got message: 29Consumer storing message: 32 (queue size=3)Producer got message: 96Consumer storing message: 51 (queue size=3)Producer got message: 6Consumer storing message: 25 (queue size=3)Producer got message: 31[many lines deleted]Producer got message: 80Consumer storing message: 94 (queue size=6)Producer got message: 33Consumer storing message: 20 (queue size=6)Producer got message: 48Consumer storing message: 31 (queue size=6)Producer got message: 52Consumer storing message: 98 (queue size=6)Main: about to set eventProducer got message: 13Consumer storing message: 59 (queue size=6)Producer received EXIT event. ExitingConsumer storing message: 75 (queue size=6)Consumer storing message: 97 (queue size=5)Consumer storing message: 80 (queue size=4)Consumer storing message: 33 (queue size=3)Consumer storing message: 48 (queue size=2)Consumer storing message: 52 (queue size=1)Consumer storing message: 13 (queue size=0)Consumer received EXIT event. Exiting

Consumer storing message: 32 (queue size=3)
import concurrent.futuresimport loggingimport queueimport randomimport threadingimport timedef producer(queue, event):    """Pretend we're getting a number from the network."""    while not event.is_set():        message = random.randint(1, 101)        logging.info("Producer got message: %s", message)        queue.put(message)    logging.info("Producer received event. Exiting")def consumer(queue, event):    """Pretend we're saving a number in the database."""    while not event.is_set() or not queue.empty():        message = queue.get()        logging.info(            "Consumer storing message: %s (size=%d)", message, queue.qsize()        )    logging.info("Consumer received event. Exiting")if __name__ == "__main__":    format = "%(asctime)s: %(message)s"    logging.basicConfig(format=format, level=logging.INFO,                        datefmt="%H:%M:%S")    pipeline = queue.Queue(maxsize=10)    event = threading.Event()    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:        executor.submit(producer, pipeline, event)        executor.submit(consumer, pipeline, event)        time.sleep(0.1)        logging.info("Main: about to set event")        event.set()
线程对象Semaphore信号

内部有个计数器

执行逻辑前, 先 acquire(). 内部计数器 - 1执行完逻辑后再 释放 .release() 内部计数器 +1定时器

A 是一种安排在经过一定时间后调用函数的方法 threading.TimerTimer

t = threading.Timer(30.0, my_function)

#妙笔生花创作挑战#

标签: #python threading 退出 #pythonthreading