龙空技术网

深度学习——异步计算、数据并行(动手学深度学习)

AI小李 191

前言:

目前朋友们对“并行算法设计技术”大约比较珍视,姐妹们都需要分析一些“并行算法设计技术”的相关知识。那么小编同时在网摘上汇集了一些对于“并行算法设计技术””的相关资讯,希望各位老铁们能喜欢,小伙伴们快快来学习一下吧!

编译器和解释器

Python 是一种解释性语言,因此如果没有编译器给代码优化,代码会跑得很慢。

符号式编程

考虑另一种选择符号式编程(symbolic programming),即代码通常只在完全定义了过程之后才执行计算。这个策略被多个深度学习框架使用,包括 Theano 和 TensorFlow(后者已经获得了命令式编程的扩展)。一般包括以下步骤:

定义计算流程;将流程编译成可执行的程序;给定输入,调用编译好的程序执行。

这将允许进行大量的优化。首先,在大多数情况下,我们可以跳过 Python 解释器。从而消除因为多个更快的 GPU 与单个 CPU 上的单个 Python 线程搭配使用时产生的性能瓶颈。其次,编译器可以将代码优化和重写。因为编译器在将其转换为机器指令之前可以看到完整的代码,所以这种优化是可以实现的。例如,只要某个变量不再需要,编译器就可以释放内存(或者从不分配内存),或者将代码转换为一个完全等价的片段。下面,我们将通过模拟命令式编程来进一步了解符号式编程的概念。

def add_():    return '''def add(a, b):    return a + b'''def fancy_func_():    return '''def fancy_func(a, b, c, d):    e = add(a, b)    f = add(c, d)    g = add(e, f)    return g'''def evoke_():    return add_() + fancy_func_() + 'print(fancy_func(1, 2, 3, 4))'prog = evoke_()print(prog)y = compile(prog, '', 'exec')exec(y)
def add(a, b):    return a + bdef fancy_func(a, b, c, d):    e = add(a, b)    f = add(c, d)    g = add(e, f)    return gprint(fancy_func(1, 2, 3, 4))10

里面出现了神奇的两个函数 compile() 和 exec():

compile(source, filename, mode[, flags[, dont_inherit]])source:字符串或者 AST(Abstract Syntax Trees)对象filename:代码文件名称,如果不是从文件读取代码则传递一些可辨认的值mode:指定编译代码的种类。可以指定为 exec, eval, singleflags:变量作用域,局部命名空间,如果被提供,可以是任何映射对象flags 和 dont_inherit 是用来控制编译源码时的标志exec(obj)obj:要执行的表达式。

命令式(解释型)编程和符号式编程的区别如下:

命令式编程更容易使用。在 Python 中,命令式编程的大部分代码都是简单易懂的。命令式编程也更容易调试,这是因为无论是获取和打印所有的中间变量值,或者使用 Python 的内置调试工具都更加简单;符号式编程运行效率更高,更易于移植。符号式编程更容易在编译期间优化代码,同时还能够将程序移植到与 Python 无关的格式中,从而允许程序在非 Python 环境中运行,避免了任何潜在的与 Python 解释器相关的性能问题。混合式编程

PyTorch 是基于命令式编程并且使用动态计算图。为了能够利用符号式编程的可移植性和效率,开发人员思考能否将这两种编程模型的优点结合起来,于是就产生了 TorchScript。TorchScript 允许用户使用纯命令式编程进行开发和调试,同时能够将大多数程序转换为符号式程序,以便在需要产品级计算性能和部署时使用。

接下来假设已经定义好了一个网络比如 net=MLP(),那么可以使用 net = torch.jit.script(net) 代码使用 TorchScript:

torch.jit.script(obj, optimize=None, _frames_up=0, _rcb=None, example_inputs=None):编写一个函数或 nn.Module 脚本将检查源代码,使用 TorchScript 编译器将其编译为 TorchScript 代码,并返回 ScriptModule 或 ScriptFunction。 TorchScript 本身是 Python 语言的一个子集,因此并非 Python 中的所有功能都有效,但我们提供了足够的功能来计算张量并执行依赖于控制的操作。有关完整指南,请参阅 TorchScript 语言参考。编写字典或列表的脚本会将其中的数据复制到 TorchScript 实例中,随后可以通过引用在 Python 和 TorchScript 之间以零复制开销传递。torch.jit.script() 可以为模块、函数、字典和列表用作函数,而且还可以被用作装饰器。返回:如果 obj 是 nn.Module,script 会返回一个 ScriptModule。返回的 ScriptModule 将与原来的 nn.Module 有相同的子模块和参数集合。如果 obj 是独立的函数,一个 ScriptFunction 将会返回。如果 obj 是字典,将会返回 torch._C.ScriptDict。如果 obj 是列表,将会返回 torch._C.ScriptList。

在使用上面转化成 TorchScript 的代码后,一个三层的多层感知机大约增快了 20%。而且,还可以方便地使用 net.save('filepath.pt') 来保存网络结构。众所周知,普通的 torch.save()/torch.load() 是不能在没有原本的模块类定义下读取模型的。但是在 TorchScript 中,接下来即使我们删除了原本的多层感知机的类以及衍生的实例,也可以通过 torch.jit.load('filepath.pt') 重新载入模型。当然也不排除是我没删干净

异步计算

PyTorch 使用了 Python 自己的调度器来实现不同的性能权衡。对 PyTorch 来说 GPU操 作在默认情况下是异步的。当调用一个使用 GPU 的函数时,操作会排队到特定的设备上,但不一定要等到以后才执行。这允许并行执行更多的计算,包括在 CPU 或其他 GPU 上的操作。

因此,了解异步编程是如何工作的,通过主动地减少计算需求和相互依赖,有助于我们开发更高效的程序。这能够减少内存开销并提高处理器利用率。下面测试一下 numpy(CPU) 和 PyTorch(GPU) 的速度。

# GPU计算热身device = d2l.try_gpu()a = torch.randn(size=(1000, 1000), device=device)b = torch.mm(a, a)with d2l.Benchmark('numpy'):    for _ in range(10):        a = numpy.random.normal(size=(1000, 1000))        b = numpy.dot(a, a)with d2l.Benchmark('torch'):    for _ in range(10):        a = torch.randn(size=(1000, 1000), device=device)        b = torch.mm(a, a)
numpy: 1.0981 sectorch: 0.0011 sec

默认情况下,GPU 操作在 PyTorch 中是异步的。强制 PyTorch 在返回之前完成所有计算,这种强制说明了之前发生的情况:计算是由后端执行,而前端将控制权返回给了 Python。

例如下面调用 torch.cuda.synchronize(device),这个函数等待在一个 CUDA 设备上所有核的所有流都完成。

with d2l.Benchmark():    for _ in range(10):        a = torch.randn(size=(1000, 1000), device=device)        b = torch.mm(a, a)    torch.cuda.synchronize(device)
Done: 0.0089 sec

广义上说,PyTorch 有一个用于与用户直接交互的前端(例如通过 Python),还有一个由系统用来执行计算的后端。用户可以用各种前端语言编写 PyTorch 程序,如 Python 和 C++。不管使用的前端编程语言是什么,PyTorch 程序的执行主要发生在 C++ 实现的后端。由前端语言发出的操作被传递到后端执行。后端管理自己的线程,这些线程不断收集和执行排队的任务。请注意,要使其工作,后端必须能够跟踪计算图中各个步骤之间的依赖关系。因此,不可能并行化相互依赖的操作。

当语句的结果需要被打印出来时,Python 前端线程将等待 C++ 后端线程完成结果计算。这种设计的一个好处是 Python 前端线程不需要执行实际的计算。因此,不管 Python 的性能如何,对程序的整体性能几乎没有影响。

练习题

(1)在CPU上,对本节中相同的矩阵乘法操作进行基准测试,仍然可以通过后端观察异步吗?

torch 观察不到异步现象,反倒是 numpy 可以观察到异步的现象。虽然 torch.cuda.synchronize(torch.device('cpu')) 会弹出报错,但是仍然可以使用以下两个代码来测试速度:

with d2l.Benchmark('numpy'):    for _ in range(10):        a = numpy.random.normal(size=(1000, 1000))        b = numpy.dot(a, a)        # time.sleep(5)        with d2l.Benchmark('torch'):    for _ in range(10):        a = torch.randn(size=(1000, 1000), device=device)        b = torch.mm(a, a)
numpy: 0.9737 sectorch: 0.2859 sec
with d2l.Benchmark('numpy'):    for _ in range(10):        a = numpy.random.normal(size=(1000, 1000))        b = numpy.dot(a, a)        time.sleep(5)        with d2l.Benchmark('torch'):    for _ in range(10):        a = torch.randn(size=(1000, 1000), device=device)        b = torch.mm(a, a)
numpy: 0.9414 sectorch: 0.2103 sec

经过多次尝试,可以发现 torch 的执行时间有明显差异,这说明有 numpy 有部分仍然占用设备的时候,已经开始对 torch 的矩阵乘法计时了。

而如果把这两个矩阵乘法的顺序反过来,numpy 的时间变化不大,因此 torch 几乎没有异步而 numpy 异步了。

最后,我发现 torch.cuda.synchronize() 直接调用不加参数就不会报错了。如果它的 device 参数为 None,那么它将使用 current_device 函数找出当前设备。

自动并行

深度学习框架会在后端自动构建计算图。利用计算图,系统可以了解所有依赖关系,并且可以选择性地并行执行多个不相互依赖的任务以提高速度。

通常情况下单个操作符将使用所有CPU或单个GPU上的所有计算资源。并行化对单设备计算机来说并不是很有用,而并行化对于多个设备就很重要了。

请注意,接下来的实验至少需要两个GPU来运行。

基于 GPU 的并行计算

测试一下两个 GPU 串行各执行 10 次矩阵乘法和并行各执行 10 次矩阵乘法的速度。

devices = d2l.try_all_gpus()def run(x):    return [x.mm(x) for _ in range(50)]x_gpu1 = torch.rand(size=(4000, 4000), device=devices[0])x_gpu2 = torch.rand(size=(4000, 4000), device=devices[1])
run(x_gpu1)run(x_gpu2)  # 预热设备torch.cuda.synchronize(devices[0])torch.cuda.synchronize(devices[1])with d2l.Benchmark('GPU1 time'):    run(x_gpu1)    torch.cuda.synchronize(devices[0])with d2l.Benchmark('GPU2 time'):    run(x_gpu2)    torch.cuda.synchronize(devices[1])
GPU1 time: 1.5491 secGPU2 time: 1.4804 sec

删除两个任务之间的 torch.cuda.synchronize() 语句,系统就可以在两个设备上自动实现并行计算。

with d2l.Benchmark('GPU1 & GPU2'):    run(x_gpu1)    run(x_gpu2)    torch.cuda.synchronize()
GPU1 & GPU2: 1.5745 sec
并行计算与通信

在许多情况下,我们需要在不同的设备之间移动数据,比如在CPU和GPU之间,或者在不同的GPU之间。

通过在 GPU 上计算,然后将结果复制回 CPU 来模拟这个过程。

def copy_to_cpu(x, non_blocking=False):    return [y.to('cpu', non_blocking=non_blocking) for y in x]with d2l.Benchmark('在GPU1上运行'):    y = run(x_gpu1)    torch.cuda.synchronize()with d2l.Benchmark('复制到CPU'):    y_cpu = copy_to_cpu(y)    torch.cuda.synchronize()
在GPU1上运行: 1.6285 sec复制到CPU: 2.5801 sec

在 GPU 仍在运行时就开始使用 PCI-Express 总线带宽来移动数据是有利的。在 PyTorch 中,to() 和 copy_() 等函数都允许显式的 non_blocking 参数,这允许在不需要同步时调用方可以绕过同步。设置 non_blocking=True 以模拟这个场景。

with d2l.Benchmark('在GPU1上运行并复制到CPU'):    y = run(x_gpu1)    y_cpu = copy_to_cpu(y, True)    torch.cuda.synchronize()
在GPU1上运行并复制到CPU: 1.9456 sec
多 GPU 训练

在多个 GPU 上并行总共分为三种:

网络并行按层并行数据并行

实际上,数据并行是最常用的方法。原书中也重点讨论了数据并行。

数据并行性

假设一台机器有 \(k\) 个 GPU。 给定需要训练的模型,虽然每个 GPU 上的参数值都是相同且同步的,但是每个 GPU 都将独立地维护一组完整的模型参数。

一般来说,\(k\) 个 GPU 并行训练过程如下:

在任何一次训练迭代中,给定的随机的小批量样本都将被分成 \(k\) 个部分,并均匀地分配到 GPU 上;每个 GPU 根据分配给它的小批量子集,计算模型参数的损失和梯度;将 \(k\) 个 GPU 中的局部梯度聚合,以获得当前小批量的随机梯度;聚合梯度被重新分发到每个 GPU 中;每个 GPU 使用这个小批量随机梯度,来更新它所维护的完整的模型参数集。

在实践中请注意,当在 \(k\) 个 GPU 上训练时,需要扩大小批量的大小为 \(k\) 的倍数,这样每个 GPU 都有相同的工作量,就像只在单个 GPU 上训练一样。 因此,在 16-GPU 服务器上可以显著地增加小批量数据量的大小,同时可能还需要相应地提高学习率。

数据同步

对于高效的多 GPU 训练,我们需要两个基本操作。首先,我们需要向多个设备分发参数并附加梯度(get_params)。如果没有参数,就不可能在 GPU 上评估网络。第二,需要跨多个设备对参数求和,也就是说,需要一个 allreduce 函数。

get_params() 函数定义如下:

def get_params(params, device):    new_params = [p.to(device) for p in params]    for p in new_params:        p.requires_grad_()    return new_params

假设现在有一个向量分布在多个 GPU 上,下面的 allreduce 函数将所有向量相加,并将结果广播给所有GPU。请注意,需要将数据复制到累积结果的设备,才能使函数正常工作。

def allreduce(data):    for i in range(1, len(data)):        data[0][:] += data[i].to(data[0].device)    for i in range(1, len(data)):        data[i][:] = data[0].to(data[i].device)
数据分发

nn.parallel.scatter() 是一个简单的工具函数,将一个小批量数据均匀地分布在多个 GPU 上。用法如下所示:

data = torch.arange(20).reshape(4, 5)devices = [torch.device('cuda:0'), torch.device('cuda:1')]split = nn.parallel.scatter(data, devices)print('input :', data)print('load into', devices)print('output:', split)
input : tensor([[ 0,  1,  2,  3,  4],        [ 5,  6,  7,  8,  9],        [10, 11, 12, 13, 14],        [15, 16, 17, 18, 19]])load into [device(type='cuda', index=0), device(type='cuda', index=1)]output: (tensor([[0, 1, 2, 3, 4],        [5, 6, 7, 8, 9]], device='cuda:0'), tensor([[10, 11, 12, 13, 14],        [15, 16, 17, 18, 19]], device='cuda:1'))
训练
def train_batch(X, y, device_params, devices, lr):    X_shards, y_shards = split_batch(X, y, devices)    # 在每个GPU上分别计算损失    ls = [loss(lenet(X_shard, device_W), y_shard).sum()          for X_shard, y_shard, device_W in zip(              X_shards, y_shards, device_params)]    for l in ls:  # 反向传播在每个GPU上分别执行        l.backward()    # 将每个GPU的所有梯度相加,并将其广播到所有GPU    with torch.no_grad():        for i in range(len(device_params[0])):            allreduce(                [device_params[c][i].grad for c in range(len(devices))])    # 在每个GPU上分别更新模型参数    for param in device_params:        d2l.sgd(param, lr, X.shape[0]) # 在这里,我们使用全尺寸的小批量     

与前几章中略有不同:训练函数需要分配 GPU 并将所有模型参数复制到所有设备。显然,每个小批量都是使用 train_batch 函数来处理多个 GPU。我们只在一个 GPU 上计算模型的精确度,而让其他 GPU 保持空闲,尽管这是相对低效的,但是使用方便且代码简洁。

def train(num_gpus, batch_size, lr):    train_iter, test_iter = d2l.load_data_fashion_mnist(batch_size)    devices = [d2l.try_gpu(i) for i in range(num_gpus)]    # 将模型参数复制到num_gpus个GPU    device_params = [get_params(params, d) for d in devices]    num_epochs = 10    animator = d2l.Animator('epoch', 'test acc', xlim=[1, num_epochs])    timer = d2l.Timer()    for epoch in range(num_epochs):        timer.start()        for X, y in train_iter:            # 为单个小批量执行多GPU训练            train_batch(X, y, device_params, devices, lr)            torch.cuda.synchronize()        timer.stop()        # 在GPU0上评估模型        animator.add(epoch + 1, (d2l.evaluate_accuracy_gpu(            lambda x: lenet(x, device_params[0]), test_iter, devices[0]),))    print(f'测试精度:{animator.Y[0][-1]:.2f},{timer.avg():.1f}秒/轮,'          f'在{str(devices)}')
多 GPU 的简洁实现DataParallel()

原书出现了一个有趣的函数 net = nn.DataParallel(net, device_ids=devices),这个函数可以说是本节的重点。

torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0) 这个函数:

在模块的层级上实现了数据并行。

这个容器通过在批次维度中分块将输入拆分到指定设备,从而并行化给定模块的应用程序(其他对象将在每个设备上复制一次)。在前向传递中,模块在每个设备上被复制,每个副本处理一部分输入。在向后传递期间,来自每个副本的梯度被汇总到原始模块中。

批量大小应大于使用的 GPU 数量。

另外,PyTorch 推荐使用 nn.parallel.DistributedDataParallel() 来代替 nn.Parallel(),原因如下:

大多数涉及批量输入和多个 GPU 的用例应默认使用 DistributedDataParallel 来利用多个 GPU。

使用具有多处理功能的 CUDA 模型有一些重要的注意事项;除非注意准确地满足数据处理要求,否则您的程序很可能会出现不正确或未定义的行为。

建议使用 DistributedDataParallel,而不是 DataParallel 进行多 GPU 训练,即使只有一个设备。

DistributedDataParallel 和 DataParallel 之间的区别是:DistributedDataParallel 使用多进程,其中为每个 GPU 创建一个进程,而 DataParallel 使用多线程。通过使用 multiprocessing,每个 GPU 都有自己的专用进程,这避免了 Python 解释器的 GIL 带来的性能开销。

如果您使用 DistributedDataParallel,您可以使用 torch.distributed.launch 实用程序来启动您的程序,请参阅第三方后端。

允许将任意位置和关键字输入传递到 DataParallel 中,但某些类型需要特殊处理。张量将依托指定的维度被分开(默认为 0)。元组、列表和字典类型将被浅拷贝。其他类型将在不同的线程之间共享,如果写入模型的正向传播,则可能会被破坏。

在运行此 DataParallel 模块之前,并行化模块必须在 device_ids[0] 上具有其参数和缓冲区。原因在于:在每个 forward 中,模块在每个设备上被复制,因此对 forward 中正在运行的模块的任何更新都将丢失。例如,如果模块有一个计数器属性,在每次转发时递增,它将始终保持初始值,因为更新是在转发后销毁的副本上完成的。但是,DataParallel 保证 device[0] 上的副本的参数和缓冲区将与基本并行化模块共享存储。因此,将记录对设备 [0] 上的参数或缓冲区的就地更新。例如,BatchNorm2d 和 spectral_norm() 依赖于此行为来更新缓冲区。

当模块在 forward() 中返回一个标量时,此 wrapper 将返回一个长度等于数据并行中使用的设备数量的向量,其中包含每个设备的结果。

参数:

module (Module) – 要并行的模块device_ids (list of python:int or torch.device) – CUDA 设备(默认:全部设备)output_device (int or torch.device) – 输出的设备位置(默认:device_ids[0])

于是,原书中训练这段代码写成了这样:

def train(net, num_gpus, batch_size, lr):    train_iter, test_iter = d2l.load_data_fashion_mnist(batch_size)    devices = [d2l.try_gpu(i) for i in range(num_gpus)]    def init_weights(m):        if type(m) in [nn.Linear, nn.Conv2d]:            nn.init.normal_(m.weight, std=0.01)    net.apply(init_weights)    # 在多个GPU上设置模型    net = nn.DataParallel(net, device_ids=devices)    trainer = torch.optim.SGD(net.parameters(), lr)    loss = nn.CrossEntropyLoss()    timer, num_epochs = d2l.Timer(), 10    animator = d2l.Animator('epoch', 'test acc', xlim=[1, num_epochs])    for epoch in range(num_epochs):        net.train()        timer.start()        for X, y in train_iter:            trainer.zero_grad()            X, y = X.to(devices[0]), y.to(devices[0])            l = loss(net(X), y)            l.backward()            trainer.step()        timer.stop()        animator.add(epoch + 1, (d2l.evaluate_accuracy_gpu(net, test_iter),))    print(f'测试精度:{animator.Y[0][-1]:.2f},{timer.avg():.1f}秒/轮,'          f'在{str(devices)}')

注意第 \(19\) 行中是把数据传到了 \(0\) 号 GPU 上,然后它就会自动切成 GPU 个数据块然后传过去了。

运行代码测试一下!首先是只使用 \(1\) 块 GPU 的代码:

train(net, num_gpus=1, batch_size=256, lr=0.1)
测试精度:0.90,222.1秒/轮,在[device(type='cuda', index=0)]

然后是使用 \(2\) 块 GPU 的代码:

train(net, num_gpus=2, batch_size=512, lr=0.2)
测试精度:0.87,111.8秒/轮,在[device(type='cuda', index=0), device(type='cuda', index=1)]

接近一倍的速度提升。原书中跑一轮居然只需要 \(10\) 秒左右,不禁令人感慨。

结语

喜欢人工智能学习的小伙伴,记得点赞关注~

标签: #并行算法设计技术