龙空技术网

网络工程师的Python之路——多线程(Multithreading)

弈心 1142

前言:

如今看官们对“python 线程异步”都比较看重,看官们都想要了解一些“python 线程异步”的相关资讯。那么小编在网摘上搜集了一些对于“python 线程异步””的相关资讯,希望朋友们能喜欢,咱们快快来了解一下吧!

弈心:从事计算机网络工作11年(新加坡7年,沙特4年),2013年考取CCIE,在新加坡先后任职于AT&T,新加坡交通部,苹果,Equinix,苏格兰皇家银行等大型企业、银行和政府部门。 目前供职于“世界第一土豪大学“沙特阿卜杜拉国王科技大学(KAUST),担任Senior Network Engineer,为KAUST校史上第一位也是唯一一位华人IT部门高级职员。2019年6月在知乎发布了华语圈第一本专门为编程零基础的网络工程师量身打造的Python教程《网络工程师的Python之路》。

在《网络工程师的Python之路 -- netdev(异步并行)》笔者介绍了如何在Python中使用单线程异步来提高Python脚本的工作效率,通过给5台交换机做配置的实验,证明了异步(Asynchronous)是如何将同步(Synchronous)下一个需要45秒才能执行完成的脚本的运行时间缩短至仅仅5秒的。今天我们来讲下另外一种提高脚本执行效率的技术:多线程(Multithreading)。

1. 单线程 VS 多线程

在《网络工程师的Python之路 -- netdev(异步并行)》中我们引用了数学家华罗庚的《统筹方法》中所举的例子来说明了同步和异步在单线程中的区别。其实我们也可以引用同样的例子来说明单线程和多线程的区别。在华罗庚《统筹方法》讲到的沏茶的这个例子中,如果只有一个人来完成烧水、洗茶杯、倒茶叶三项任务的话,因为此时只有一个劳动力,我们就可以把它看成是单线程。假设我们能找来三个人分别负责烧水、洗茶杯、倒茶业,并且保证三个人同时开工干活的话,那我们就可以把它看成是多线程,每一个劳动力代表一个线程。

在计算机的世界中也是一样的,一个程序可以启用多个线程同时完成多个任务,如果一个线程阻塞,其他线程并不受影响。现在的CPU都是多核的,单线程只能用到其中的一核,这其实是对硬件资源的一种浪费(当然不可否认的是随着时代的进步,现在的CPU已经足够强大,即使只用单核也能同时应付多个任务,这也是后来Python开始支持异步的原因之一)。如果我们使用多线程来运行Python脚本的话不仅能极大的提高脚本的运行速度,增加工作效率,并且还能充分利用我们主机的硬件资源。接下来我们就看下如何在Python中使用多线程。

2. 多线程在Python中的应用

在Python 3中已经内置了_thread和threading两个模块来实现多线程。相较于_thread,threading提供的方法更多而且更常用,因此接下来我们将举例讲解threading模块的用法,首先来看下面这段代码:

 import threading import time   def say_after(what, delay):  print (what)  time.sleep(delay)  print (what)   t = threading.Thread(target = say_after, args = ('hello',3)) t.start()
这里我们导入了threading这个Python内置模块来实现多线程。之后定义了一个say_after(what, delay)函数,该函数包含what和delay两个参数,分别用来表示打印的内容,以及time.sleep()休眠的时间。随后我们使用threading的Thread()函数为say_after(what, delay)函数创建了一个线程并将它赋值给变量t,注意Thread()里的target参数对应的是函数名称(即这里的say_after),args对应的是该say_after函数里的参数,这里等同于“what = ‘hello’”,“delay = 3”。最后我们调用threading中的start()来启动我们刚刚创建的线程。

运行代码看效果:

在打印出第一个hello后,程序因为time.sleep(3)休眠了三秒,三秒之后随即打印出了第二个hello。因为这时我们只运行了say_after(what, delay)这一个函数,并且只运行了一次,因此即使我们现在启用了多线程,我们也感受不了它和单线程有什么区别。接下来我们将该代码修改如下:

 #coding=utf-8 import threading import time   def say_after(what, delay):  print (what)  time.sleep(delay)  print (what)   t = threading.Thread(target = say_after, args = ('hello',3))   print (f"程序于 {time.strftime('%X')} 开始执行") t.start() print (f"程序于 {time.strftime('%X')} 执行结束")
这一次我们调用time.strftime()来尝试记录程序执行前和执行后的时间,看看有什么“意想不到”的结果。

运行代码看效果:

这里你肯定会问为什么程序在02:23:44开始执行,又在同一时间结束?难道不是该休眠三秒吗?为什么明明“print (f"程序于 {time.strftime('%X')} 开始执行")”和“print (f"程序于 {time.strftime('%X')} 执行结束")”分别写在“t.start()”的上面和下面,但是不等第二个hello被打印出来,“print (f"程序于 {time.strftime('%X')} 执行结束")”就被执行了?

这是因为除了threading.Thread()为say_after()函数创建的用户线程外,“print (f"程序于 {time.strftime('%X')} 开始执行")”和“print (f"程序于 {time.strftime('%X')} 执行结束")”两个print()函数也共同占用了公用的内核线程。也就是说该脚本现在实际上是调用了两个线程:一个用户线程,一个内核线程,也就构成了一个多线程的环境。因为分属不同的线程,say_after()函数和函数之外的两个print语句是同时运行的,互不干涉,因此“print (f"程序于 {time.strftime('%X')} 执行结束")”是不会像在单线程中那样等到t.start()执行完了才被执行,而是在“print (f"程序于 {time.strftime('%X')} 开始执行")”被执行后就马上跟着被执行。这也就解释了为什么你会看到原本需要休眠三秒时间的脚本会在“程序在02:23:44开始执行”和“程序在02:23:44执行结束”同时开始和结束。

如果想要正确捕捉say_after(what, delay)函数开始和结束时的时间,我们需要额外使用threading模块的join()方法,来看下面的代码:

 #coding=utf-8 import threading import time   def say_after(what, delay):  print (what)  time.sleep(delay)  print (what)   t = threading.Thread(target = say_after, args = ('hello',3))   print (f"程序于 {time.strftime('%X')} 开始执行") t.start() t.join() print (f"程序于 {time.strftime('%X')} 执行结束")
这里我们只修改了代码的一处地方,即在t.start()下面添加了一个t.join(),join()方法的作用是强制阻塞调用它的线程,直到该线程运行完毕或者终止为止(类似于单线程同步)。因为这里调用join()方法的变量t正是我们用threading.Thread()为say_after(what, delay)创建的用户线程,因此使用内核线程的“print (f"程序于 {time.strftime('%X')} 执行结束")”必须等待该用户线程执行完毕过后才能继续执行,因此脚本在执行时的效果会让你觉得整体还是以“单线程同步”的方式运行的。

运行代码看效果:

这里可以看到因为调用了.join()方法,在内核线程上运行的“print (f"程序于 {time.strftime('%X')} 执行结束")”必须等待在用户线程上运行的say_after(what, delay)执行完毕后,才能继续被执行,因此程序前后执行总共花费了3秒,类似于“单线程同步”的效果。

最后我们再举一例,来看看如何创建多个用户线程并运行,代码如下:

 #coding=utf-8 import threading import time   def say_after(what, delay):  print (what)  time.sleep(delay)  print (what)   print (f"程序于 {time.strftime('%X')} 开始执行\n") threads = []   for i in range(1,6):  t = threading.Thread(target=say_after, name="线程" + str(i), args=('hello',3))  print(t.name + '开始执行。')  t.start()   threads.append(t)   for i in threads:  i.join() print (f"\n程序于 {time.strftime('%X')} 执行结束")
这里我们使用for循环配合range(1,6)创建了5个线程,并且将它们以多线程的形式执行,也就是把say_after(what, delay)函数以多线程的形式执行了5次。每个线程作为元素加入进了threads这个空列表,然后我们再次使用for语句来遍历现在已经有了5个线程的threads列表,对其中的每个线程都调用的join()方法,确保直到它们都执行结束后,我们才执行内核线程下的“print (f"程序于 {time.strftime('%X')} 执行结束")”。

运行代码看效果:

可以看到这里我们成功的使用了多线程将程序执行,如果以单线程来执行5次say_after(what,delay)函数的话,那么需要花费3x5=15秒才能跑完整个脚本,而在多线程的形式下,整个程序只花费了3秒就运行完毕。

3. 多线程在Netmiko中的应用

在掌握了threading模块的基本用法后,接下来我们看看如何将它和netmiko结合,实现通过netmiko对网络做多线程登录和操作。

在《网络工程师的Python之路 -- netdev(异步并行)》中,我们使用传统的“单线程同步”方式,通过netmiko对5台交换机(192.168.2.11--192.168.2.15)下的“line vty 5 15”配置了“login local”,整个脚本从开始执行到结束,前后总共耗时了45.02秒。这里我们用netmiko以多线程的方式,对这5台交换机做同样的配置并计时,来看看使用多线程能为我们节省多少时间。

脚本代码如下:

 #coding=utf-8 import threading from queue import Queue import time from netmiko import ConnectHandler   f = open('ip_list.txt') threads = []   def ssh_session(ip, output_q):  commands = ["line vty 5 15", "login local","exit"]  SW = {'device_type': 'cisco_ios', 'ip': ip, 'username': 'python', 'password': '123'}  ssh_session = ConnectHandler(**SW)  output = ssh_session.send_config_set(commands)  print (output)   print (f"程序于 {time.strftime('%X')} 开始执行\n") for ips in f.readlines():  t = threading.Thread(target=ssh_session, args=(ips.strip(), Queue()))  t.start()  threads.append(t)   for i in threads:  i.join() print (f"\n程序于 {time.strftime('%X')} 执行结束")
在使用netmiko实现多线程时,我们需要导入queue这个内置队列模块。在Python中,队列(queue)是线程间最常用的交换数据的形式,这里我们引用了queue模块中的Queue类,也就是先进先出(FIFO)队列。并将它作为出队列参数配置给了ssh_session(ip, output_q)这个函数,有关queue模块的具体介绍不在本书讨论范围内,这里只需要知道这是使用netmiko实现多线程时必备的步骤即可。其余代码讲解从略。

运行脚本看效果:

可以看到使用netmiko多线程总共仅耗时10秒钟(08:11:41 – 08:11:51)便完成了对5台交换机的配置。比默认使用单线程同步的传统方式快了35秒,不过这个速度依然慢于《网络工程师的Python之路 -- netdev(异步并行)》中提到的单线程异步方式,在该节实验中,我们使用netdev单线程总共仅耗时5秒钟便对同样的5台交换机完成了同样的配置。

标签: #python 线程异步