龙空技术网

c# 10 教程:22 并行编程

启辰8 30

前言:

当前大家对“c语言结构化程序设计方法步骤”可能比较讲究,你们都需要了解一些“c语言结构化程序设计方法步骤”的相关资讯。那么小编在网摘上收集了一些关于“c语言结构化程序设计方法步骤””的相关文章,希望兄弟们能喜欢,兄弟们一起来学习一下吧!

在本章中,我们将介绍旨在利用多核处理器的多线程 API 和构造:

并行 LINQ 或并行类构造

这些构造统称为(松散地)并行 (PFX)。并行类与任务并行性构造一起称为 (TPL)。

在阅读本章之前,您需要熟悉第 中的基础知识,尤其是锁定、线程安全和 Task 类。

注意

.NET 提供了许多其他专用 API 来帮助进行并行和异步编程:

System.Threading.Channels.Channel 是一个高性能异步生产者/使用者队列,在 .NET Core 3 中引入。(在System.Threading.Tasks.Dataflow命名空间中)是一个复杂的API,用于创建缓冲块网络,这些并行执行操作或数据转换,类似于actor/agent编程。实现了 LINQ over IObservable(IAsyncEnumerable 的替代抽象),并且擅长组合异步流。反应式扩展包含在 NuGet 包中。为什么选择PFX?

在过去的 15 年里,CPU 制造商已经从单核处理器转向多核处理器。这对我们程序员来说是有问题的,因为单线程代码不会因为这些额外的内核而自动运行得更快。

对于大多数服务器应用程序来说,使用多个内核很容易,其中每个线程都可以独立处理单独的客户端请求,但在桌面上则更加困难,因为它通常需要您获取计算密集型代码并执行以下操作:

将其小块。通过多线程并行执行这些块。在结果可用时,以线程安全和高性能的方式结果。

尽管您可以使用经典的多线程构造完成所有这些操作,但它很尴尬,尤其是分区和整理的步骤。另一个问题是,当许多线程同时处理相同的数据时,通常的线程安全锁定策略会导致大量争用。

PFX 库专为帮助这些方案而设计。

注意

利用多核或多处理器的编程称为。这是更广泛的多线程概念的子集。

PFX 概念

在线程之间划分工作有两种策略:。

当必须对许多数据值执行一组任务时,我们可以通过让每个线程对值子集执行(相同的)任务集来并行化。这称为数据,因为我们在线程之间对进行分区。相反,使用任务,我们对进行分区;换句话说,我们让每个线程执行不同的任务。

通常,数据并行性更容易,并且可以更好地扩展到高度并行的硬件,因为它可以减少或消除共享数据(从而减少争用和线程安全问题)。此外,数据并行性利用了数据值通常比离散任务更多的事实,从而增加了并行性的潜力。

数据并行性也有利于结构化并行,这意味着并行工作单元在程序中的同一位置开始和结束。相比之下,任务并行性往往是非结构化的,这意味着并行工作单元可能在分散在程序中的位置开始和结束。结构化并行性更简单,更不容易出错,允许您将分区和线程协调(甚至结果排序)的困难工作转移到库中。

聚苯乙烯组件

PFX 包含两层功能,如图 所示。较高层由两个 API 组成:PLINQ 和 Parallel 类。下层包含任务并行类,以及一组有助于并行编程活动的附加构造。

PLINQ 提供了最丰富的功能:它自动执行并行化的所有步骤,包括将工作划分为任务、在线程上执行这些任务以及将结果整理到单个输出序列中。它称为声明式,因为您只需要并行化您的工作(您将其构造为 LINQ 查询),并让运行时处理实现细节。相比之下,其他方法是的,因为您需要显式编写代码来分区或整理。如以下概要所示,对于并行类,您必须自己整理结果;使用任务并行构造,您还必须自己对工作进行分区:

分区工作

整理结果

普林克

是的

是的

并行类

是的

PFX 的任务并行性

并发集合和旋转基元可帮助您进行较低级别的并行编程活动。这些很重要,因为 PFX 不仅适用于当今的硬件,还适用于具有更多内核的未来几代处理器。如果你想移动一堆切碎的木头,并且你有32名工人来完成这项工作,那么最大的挑战是在工人互不妨碍的情况下移动木材。这与在 32 个内核之间划分算法相同:如果使用普通锁来保护公共资源,则由此产生的阻塞可能意味着这些内核中只有一小部分实际上同时处于繁忙状态。并发集合专门针对高并发访问进行了优化,重点是最小化或消除阻塞。PLINQ 和 Parallel 类本身依赖于并发集合和旋转基元来有效地管理工作。

何时使用 PFX

PFX 的主要用例是:利用多核处理器来加速计算密集型代码。

并行编程中的一个挑战是阿姆达尔定律,该定律指出并行化的最大性能改进由必须按顺序执行的代码部分决定。例如,如果算法的执行时间只有三分之二是可并行化的,则即使内核数量无限,性能提升也永远不会超过三倍。

PFX的其他用途

并行编程结构不仅可用于利用多核,还可用于其他方案:

当您需要线程安全的队列、堆栈或字典时,并发集合有时是合适的。BlockingCollection 提供了一种实现生产者/使用者结构的简单方法,并且是并发性的好方法。任务是异步编程的基础,正如我们在第中看到的那样。

因此,在继续之前,有必要验证瓶颈是否在可并行化代码中。同样值得考虑的是,计算密集型 - 优化通常是最简单、最有效的方法。但是,需要权衡的是,某些优化技术可能会使并行化代码变得更加困难。

最简单的收益来自所谓的问题 - 这是当一个作业可以很容易地划分为可以有效地自行执行的任务时(结构化并行性非常适合此类问题)。示例包括数学或密码学中的许多图像处理任务、光线追踪和暴力破解方法。非尴尬并行问题的一个例子是实现快速排序算法的优化版本 - 一个好的结果需要一些思考,并且可能需要非结构化并行性。

普林克

PLINQ 会自动并行处理本地 LINQ 查询。PLINQ 的优点是易于使用,因为它将工作分区和结果排序规则的负担卸载到 .NET。

若要使用 PLINQ,只需在输入序列上调用 AsParallel(),然后像往常一样继续 LINQ 查询。以下查询计算 3 到 100,000 之间的质数,充分利用目标计算机上的所有内核:

// Calculate prime numbers using a simple (unoptimized) algorithm.IEnumerable<int> numbers = Enumerable.Range (3, 100000-3);var parallelQuery =   from n in numbers.AsParallel()  where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)  select n;int[] primes = parallelQuery.ToArray();

AsParallel 是 System.Linq.ParallelEnumerable 中的扩展方法。它将输入包装在基于 ParallelQuery<TSource> 的序列中,这会导致随后调用的 LINQ 查询运算符绑定到在 ParallelEnumerable 中定义的一组备用扩展方法。它们提供了每个标准查询运算符的并行实现。本质上,它们的工作原理是将输入序列划分为在不同线程上执行的块,将结果整理回单个输出序列以供使用,如图 所示。

PLINQ 执行模型

调用 AsSequential() 将解开 ParallelQuery 序列,以便后续查询运算符绑定到标准查询运算符并按顺序执行。在调用具有副作用或不是线程安全的方法之前,这是必需的。

对于接受两个输入序列(联接、组联、康卡特、并集、相交、except 和 Zip)的查询运算符,必须将 AsParallel() 应用于两个输入序列(否则将引发异常)。但是,您不需要在查询进行时继续将 AsParallel 应用于查询,因为 PLINQ 的查询运算符会输出另一个 ParallelQuery 序列。事实上,再次调用 AsParallel 会导致效率低下,因为它会强制合并和重新分区查询:

mySequence.AsParallel()           // Wraps sequence in ParallelQuery<int>          .Where (n => n > 100)   // Outputs another ParallelQuery<int>          .AsParallel()           // Unnecessary - and inefficient!          .Select (n => n * n)

并非所有查询运算符都可以有效地并行化。对于那些不能实现运算符(请参阅 将按顺序实现运算符。如果 PLINQ 怀疑并行化的开销实际上会减慢特定查询的速度,则 PLINQ 也可能按顺序运行。

PLINQ 仅适用于本地集合:例如,它不适用于实体框架,因为在这些情况下,LINQ 会转换为 SQL,然后在数据库服务器上执行。但是, PLINQ 对从数据库查询获取的结果集执行其他本地查询。

注意

如果 PLINQ 查询引发异常,则会将其重新引发为 AggregateException,其 InnerExceptions 属性包含真正的异常(或多个异常)。有关更多详细信息,请参阅

并行执行弹道学

与普通的 LINQ 查询一样,PLINQ 查询是延迟计算的。这意味着只有在您开始使用结果时才会触发执行 — 通常通过 foreach 循环(尽管它也可以通过转换运算符(如 ToArray 或返回单个元素或值的运算符)触发)。

但是,在枚举结果时,执行过程与普通顺序查询的执行方式略有不同。顺序查询完全由使用者以“拉取”方式提供支持:输入序列中的每个元素在使用者需要时准确获取。并行查询通常使用独立的线程从输入序列中获取元素,略于使用者需要它们的时间(更像新闻阅读器的提词器)。然后,它通过查询链并行处理元素,将结果保存在一个小缓冲区中,以便它们为按需使用者做好准备。如果使用者提前暂停或中断枚举,查询处理器也会暂停或停止,以免浪费 CPU 时间或内存。

注意

您可以通过在 AsParallel 之后调用 WithMergeOptions 来调整 PLINQ 的缓冲行为。自动缓冲的默认值通常可提供最佳的总体结果。NotBuffered 禁用缓冲区,如果您希望尽快看到结果,它很有用;完全缓冲在将整个结果集呈现给使用者之前对其进行缓存(OrderBy 和反向运算符自然以这种方式工作,元素、聚合和转换运算符也是如此)。

为什么 ASPARALLEL 不是默认值?

鉴于 AsParallel 透明地并行化 LINQ 查询,问题就来了:为什么Microsoft不简单地并行化标准查询运算符并将 PLINQ 设为默认值?

方法的原因有很多。首先,要使 PLINQ 有用,必须有合理数量的计算密集型工作才能将其分配给工作线程。大多数 LINQ 到对象查询的执行速度都非常快;因此,不仅不需要并行化,而且分区、整理和协调额外线程的开销实际上可能会减慢速度。

此外:

在元素排序方面,PLINQ 查询的输出(默认情况下)可能与 LINQ 查询不同(请参阅)。PLINQ 将异常包装在 AggregateException 中(以处理引发多个异常的可能性)。如果查询调用线程不安全的方法,PLINQ 将给出不可靠的结果。

最后,PLINQ 提供了相当多的钩子用于调整和调整。给标准的 LINQ to-Objects API 增加这些细微差别的负担会增加分心。

普林克和订购

并行化查询运算符的副作用是,在整理结果时,结果的提交顺序不一定相同(参见)。换句话说,LINQ 对序列的正常顺序保留保证不再有效。

如果需要顺序保留,可以通过在 之后调用 AsOrdered() 来强制保留它:

myCollection.AsParallel().AsOrdered()...

调用 AsOrdered 会导致大量元素的性能下降,因为 PLINQ 必须跟踪每个元素的原始位置。

您可以通过调用 AsUnordered 来抵消查询中稍后的 AsOrdered 的影响:这将引入一个“随机随机洗牌点”,它允许点开始更有效地执行。因此,如果只想保留前两个查询运算符的输入序列顺序,则可以执行以下操作:

inputSequence.AsParallel().AsOrdered()  .QueryOperator1()  .QueryOperator2()  .AsUnordered()       // From here on, ordering doesn’t matter  .QueryOperator3()  ...

AsOrdered 不是默认值,因为对于大多数查询,原始输入顺序无关紧要。换句话说,如果 AsOrdered 是默认值,则需要将 AsUnordered 应用于大多数并行查询以获得最佳性能,这将很麻烦。

普林克限制

PLINQ 可以并行化的内容存在实际限制。默认情况下,以下查询运算符会阻止并行化,除非源元素位于其原始索引位置:

Select 、SelectMany 和 ElementAt 的索引版本

大多数查询运算符会更改元素的索引位置(包括删除元素的运算符,例如 Where )。这意味着,如果要使用前面的运算符,它们通常需要位于查询的开头。

以下查询运算符是可并行化的,但使用昂贵的分区策略,有时可能比顺序处理慢:

连接 、分组依据 、组连接 、非重复 、 并集 、 相交 和 除外

聚合运算符在其标准化身中的重载不可并行化 — PLINQ 提供了特殊的重载来处理此问题(请参阅)。

所有其他运算符都是可并行化的,尽管使用这些运算符并不能保证查询将被并行化。如果 PLINQ 怀疑并行化的开销会减慢该特定查询的速度,则 PLINQ 可能会按顺序运行查询。您可以通过在 AsParallel() 之后调用以下内容来覆盖此行为并强制并行:

.WithExecutionMode (ParallelExecutionMode.ForceParallelism)
示例:并行拼写检查器

假设我们要编写一个拼写检查器,该拼写检查器利用所有可用内核快速运行非常大的文档。通过将我们的算法表述为 LINQ 查询,我们可以很容易地将其并行化。

第一步是将英语单词词典下载到HashSet中,以便高效查找:

if (!File.Exists ("WordLookup.txt")    // Contains about 150,000 words  File.WriteAllText ("WordLookup.txt",    await new HttpClient().GetStringAsync (      ";));var wordLookup = new HashSet<string> (  File.ReadAllLines ("WordLookup.txt"),  StringComparer.InvariantCultureIgnoreCase);

然后,我们使用单词查找创建一个测试“文档”,其中包含一百万个随机单词的数组。构建数组后,让我们引入几个拼写错误:

var random = new Random();string[] wordList = wordLookup.ToArray();string[] wordsToTest = Enumerable.Range (0, 1000000)  .Select (i => wordList [random.Next (0, wordList.Length)])  .ToArray();wordsToTest [12345] = "woozsh";     // Introduce a couplewordsToTest [23456] = "wubsie";     // of spelling mistakes.

现在我们可以通过测试单词到测试来执行并行拼写检查 wordLookup .PLINQ 使这变得非常简单:

var query = wordsToTest  .AsParallel()  .Select  ((word, index) => new IndexedWord { Word=word, Index=index })  .Where   (iword => !wordLookup.Contains (iword.Word))  .OrderBy (iword => iword.Index);foreach (var mistake in query)  Console.WriteLine (mistake.Word + " - index = " + mistake.Index);// OUTPUT:// woozsh - index = 12345// wubsie - index = 23456

索引字是一个自定义结构,我们定义如下:

struct IndexedWord { public string Word; public int Index; }

谓词中的 wordLookup.Contains 方法为查询提供了一些“肉”,使其值得并行化。

注意

我们可以通过使用匿名类型而不是 IndexedWord 结构来稍微简化查询。但是,这会降低性能,因为匿名类型(类,因此是引用类型)会产生基于堆的分配和后续垃圾回收的成本。

对于顺序查询,这种差异可能不足以影响,但对于并行查询,支持基于堆栈的分配可能非常有利。这是因为基于堆栈的分配是高度可并行化的(因为每个线程都有自己的堆栈),而所有线程必须竞争同一个堆 - 由单个内存管理器和垃圾回收器管理。

使用 ThreadLocal<T>

让我们通过并行创建随机测试词列表本身来扩展我们的示例。我们将其构造为 LINQ 查询,因此应该很容易。这是顺序版本:

string[] wordsToTest = Enumerable.Range (0, 1000000)  .Select (i => wordList [random.Next (0, wordList.Length)])  .ToArray();

不幸的是,调用随机。Next 不是线程安全的,因此它不像将 AsParallel() 插入查询那么简单。一个可能的解决方案是编写一个锁定随机的函数。下一个;但是,这将限制并发性。更好的选择是使用 ThreadLocal<Random>(参见中的创建一个单独的 Random 对象。然后,我们可以并行化查询,如下所示:

var localRandom = new ThreadLocal<Random> ( () => new Random (Guid.NewGuid().GetHashCode()) );string[] wordsToTest = Enumerable.Range (0, 1000000).AsParallel()  .Select (i => wordList [localRandom.Value.Next (0, wordList.Length)])  .ToArray();

在用于实例化随机对象的工厂函数中,我们传入 Guid 的哈希码,以确保如果在短时间内创建两个随机对象,它们将产生不同的随机数序列。

功能纯度

由于 PLINQ 在并行线程上运行查询,因此必须注意不要执行线程不安全的操作。特别是,写入,因此线程不安全:

// The following query multiplies each element by its position.// Given an input of Enumerable.Range(0,999), it should output squares.int i = 0;var query = from n in Enumerable.Range(0,999).AsParallel() select n * i++;

我们可以通过使用锁使增量 i 线程安全,但问题仍然存在,i 不一定对应于输入元素的位置。将 AsOrdered 添加到查询中并不能解决后一个问题,因为 AsOrdered 只能确保元素的输出顺序与按顺序处理的顺序一致,而实际上它并不按顺序它们。

正确的解决方案是重写我们的查询以使用 Select 的索引版本:

何时使用 PLINQ

在现有应用程序中搜索 LINQ 查询并尝试并行化它们很诱人。这通常是无效的,因为 LINQ 显然是最佳解决方案的大多数问题往往执行得非常快,因此无法从并行化中受益。更好的方法是查找 CPU 密集型瓶颈,然后考虑是否可以将其表示为 LINQ 查询。(这种重组的一个受欢迎的副作用是 LINQ 通常使代码更小且更具可读性。

PLINQ 非常适合令人尴尬的并行问题。然而,对于成像来说,这可能是一个糟糕的选择,因为将数百万像素整理成一个输出序列会产生瓶颈。相反,最好将像素直接写入数组或非托管内存块,并使用并行类或任务并行性来管理多线程。(但是,使用 ForAll 可以击败结果排序规则 — 我们在中对此进行了讨论。如果图像处理算法自然适合 LINQ,则这样做是有意义的。

var query = Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i);

为了获得最佳性能,从查询运算符调用的任何方法都应该是线程安全的,因为它不写入字段或属性(非副作用或)。如果它们通过锁定是线程安全的,则查询的并行性潜力将受到持续时间除以在该函数中花费的总时间的限制。

设置并行度

默认情况下,PLINQ 为正在使用的处理器选择最佳并行度。您可以通过在 AsParallel 之后调用 WithDegreeOfParallelism 来覆盖它:

...AsParallel().WithDegreeOfPallelism(4)...

当并行度增加到超出核心计数时,一个例子是 I/O 密集型工作(例如,一次下载多个网页)。然而,任务组合器和异步函数提供了一种同样简单和的解决方案(参见中的)。与任务 s 不同,PLINQ 无法在不阻塞线程(和线程,使情况更糟)的情况下执行 I/O 绑定工作。

更改并行度

在 PLINQ 查询中只能调用一次 WithDegreeOfParallelism。如果需要再次调用它,则必须通过在查询中再次调用 AsParallel() 来强制合并和重新分区查询:

"The Quick Brown Fox"  .AsParallel().WithDegreeOfParallelism (2)  .Where (c => !char.IsWhiteSpace (c))  .AsParallel().WithDegreeOfParallelism (3)   // Forces Merge + Partition  .Select (c => char.ToUpper (c))
取消

取消 PLINQ 查询(在 foreach 循环中使用其结果)很容易:只需脱离 foreach ,查询将自动取消,因为枚举器是隐式释放的。

对于以转换、元素或聚合运算符终止的查询,可以通过取消令牌从另一个线程它(请参阅中的)。若要插入令牌,请在调用 AsParallel 后调用 WithCancel,传入 CancelTokenSource 对象的 Token 属性。然后,另一个线程可以在令牌源上调用 Cancel,这会在查询的使用者上引发 OperationCanceledException:

IEnumerable<int> million = Enumerable.Range (3, 1000000);var cancelSource = new CancellationTokenSource();var primeNumberQuery =   from n in million.AsParallel().WithCancellation (cancelSource.Token)  where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)  select n;new Thread (() => {                    Thread.Sleep (100);      // Cancel query after                    cancelSource.Cancel();   // 100 milliseconds.                  }           ).Start();try {  // Start query running:  int[] primes = primeNumberQuery.ToArray();  // We'll never get here because the other thread will cancel us.}catch (OperationCanceledException){  Console.WriteLine ("Query canceled");}

取消后,PLINQ 会等待每个工作线程完成其当前元素,然后再结束查询。这意味着查询调用的任何外部方法都将运行完成。

优化普林克输出侧优化

PLINQ 的优势之一是它可以方便地将并行化工作的结果整理到单个输出序列中。但是,有时,您最终对该序列所做的只是在每个元素上运行一次某个函数:

foreach (int n in parallelQuery)  DoSomething (n);

如果是这种情况,并且您不关心元素的处理顺序,则可以使用 PLINQ 的 ForAll 方法提高效率。

ForAll 方法对 ParallelQuery 的每个输出元素运行一个委托。它直接挂接到 PLINQ 的内部,绕过整理和枚举结果的步骤。这里有一个简单的例子:

"abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write);

显示了该过程。

注意

整理和枚举结果并不是一项成本高昂的操作,因此当有大量快速执行的输入时,ForAll 优化会产生最大的收益。

输入端优化

PLINQ 有三种分区策略,用于将输入元素分配给线程:

策略

元素分配

相对性能

区块分区

动态

平均

范围分区

静态的

从差到优

哈希分区

静态的

对于需要比较元素( GroupBy 、 Join 、 GroupJoin 、 相交 、 except 、 并集 和 非重复 )的查询运算符,您别无选择:PLINQ 始终使用。哈希分区的效率相对较低,因为它必须预先计算每个元素的哈希代码(以便可以在同一线程上处理具有相同哈希代码的元素)。如果发现这太慢,唯一的选择是调用 AsSequential 以禁用并行化。

对于所有其他查询运算符,您可以选择是使用范围分区还是块分区。默认情况下:

如果输入序列是(如果它是一个数组或实现 IList<T> ),PLINQ 选择。否则,PLINQ 将选择。

简而言之,对于长序列,范围分区更快,每个元素都需要相似的 CPU 时间来处理。否则,块分区通常更快。

要强制范围分区:

如果查询以 Enumerable.Range 开头,则将该方法替换为 ParallelEnumerable.Range 。否则,只需在输入序列上调用 ToList 或 ToArray(显然,这本身会产生性能成本,您应该考虑到这一点)。注意

ParallelEnumerable.Range 不仅仅是调用 Enumerable.Range( ... ) 的快捷方式。作为并行() .它通过激活范围分区来更改查询的性能。

要强制块分区,请将输入序列包装在对 Partitioner.Create(在 System.Collection.Concurrent 中)的调用中,如下所示:

int[] numbers = { 3, 4, 5, 6, 7, 8, 9 };var parallelQuery =  Partitioner.Create (numbers, true).AsParallel()  .Where (...)

Partitioner.Create 的第二个参数表示您希望对查询,这是您希望块分区的另一种说法。

块分区的工作原理是让每个工作线程定期从输入序列中抓取小的元素“块”进行处理(参见)。PLINQ 首先分配非常小的块(一次一个或两个元素)。然后,它会随着查询的进行增加块大小:这可确保小序列有效地并行化,并且大序列不会导致过多的往返。如果一个工人碰巧得到了“简单”的元素(这个过程很快),它最终会得到更多的块。该系统使每个线程保持同样繁忙(并且内核“平衡”);唯一的缺点是从共享输入序列中获取元素需要同步(通常是独占锁),这可能会导致一些开销和争用。

块分区与范围分区

范围分区绕过正常的输入端枚举,并为每个工作线程预分配相同数量的元素,从而避免对输入序列进行争用。但是,如果某些线程碰巧获得简单的元素并提前完成,则它们将闲置,而其余线程将继续工作。我们早期的素数计算器在范围分区方面可能表现不佳。当范围分区可以很好地工作时,计算前 10 万个整数的平方根之和的一个例子:

ParallelEnumerable.Range (1, 10000000).Sum (i => Math.Sqrt (i))

ParallelEnumerable.Range 返回一个 ParallelQuery<T> ,因此您随后不需要调用 AsParallel 。

注意

范围分区不一定在块中分配元素范围,而是选择“条带化”策略。例如,如果有两个工作器,则一个工作器可能处理奇数元素,而另一个工作器处理偶数元素。TakeWhile 运算符几乎肯定会触发条带化策略,以避免在序列的后期不必要地处理元素。

优化自定义聚合

PLINQ 可以有效地并行化 Sum、平均值、最小值和最大值运算符,无需额外干预。但是,聚合运算符给 PLINQ 带来了特殊的挑战。如所述,聚合执行自定义聚合。例如,下面对数字序列求和,模仿 Sum 运算符:

int[] numbers = { 1, 2, 3 };int sum = numbers.Aggregate (0, (total, n) => total + n);   // 6

我们还在第 中看到,对于聚合,提供的委托必须是关联和交换的。如果违反此规则,PLINQ 将给出不正确的结果,因为它从输入序列中提取多个种子,以便同时聚合序列的分区。

显式种子聚合似乎是 PLINQ 的安全选项,但不幸的是,由于依赖于单个种子,这些聚合通常按顺序执行。为了缓解此问题,PLINQ 提供了另一个聚合重载,允许您指定多个种子,或者更确切地说,指定。对于每个线程,它执行此函数以生成一个单独的种子,该种子成为累加器,它在本地聚合元素。

您还必须提供一个函数来指示如何组合本地和主累加器。最后,此聚合重载(有点无偿)期望委托对结果执行任何最终转换(您可以通过之后自己对结果运行某些函数来轻松实现此目的)。因此,以下是四个代表,按通过顺序排列:

种子工厂

返回新的本地累加器

更新累加器功能

将元素聚合到本地累加器

组合蓄能器功能

将本地蓄能器与主蓄能器相结合

结果选择器

对最终结果应用任何最终转换

注意

在简单方案中,可以指定种子工厂。当种子是要更改的引用类型时,此策略将失败,因为每个线程将共享相同的实例。

举一个非常简单的例子,下面对数字数组中的值求和:

numbers.AsParallel().Aggregate ( () => 0,                                      // seedFactory  (localTotal, n) => localTotal + n,           // updateAccumulatorFunc  (mainTot, localTot) => mainTot + localTot,   // combineAccumulatorFunc  finalResult => finalResult)                  // resultSelector

这个例子是人为的,因为我们可以使用更简单的方法(例如未播种聚合,或者更好的 Sum 运算符)同样有效地获得相同的答案。举一个更现实的例子,假设我们要计算给定字符串中英文字母表中每个字母的频率。一个简单的顺序解决方案可能如下所示:

string text = "Let’s suppose this is a really long string";var letterFrequencies = new int[26];foreach (char c in text){  int index = char.ToUpper (c) - 'A';  if (index >= 0 && index < 26) letterFrequencies [index]++;};
注意

输入文本可能很长的一个例子是基因测序。然后,“字母表”将由字母,,和组成。

为了并行化这一点,我们可以将 foreach 语句替换为对 Parallel.ForEach 的调用(我们将在下一节中介绍),但这将让我们处理共享阵列上的并发问题。锁定访问该阵列几乎会扼杀并行化的潜力。

聚合提供了一个整洁的解决方案。在这种情况下,累加器是一个数组,就像我们前面示例中的 letterFrequency 数组一样。下面是使用聚合的顺序版本:

int[] result =  text.Aggregate (    new int[26],                // Create the "accumulator"    (letterFrequencies, c) =>   // Aggregate a letter into the accumulator    {      int index = char.ToUpper (c) - 'A';      if (index >= 0 && index < 26) letterFrequencies [index]++;      return letterFrequencies;    });

现在并行版本,使用 PLINQ 的特殊重载:

int[] result =  text.AsParallel().Aggregate (   () => new int[26],             // Create a new local accumulator    (localFrequencies, c) =>       // Aggregate into the local accumulator    {      int index = char.ToUpper (c) - 'A';      if (index >= 0 && index < 26) localFrequencies [index]++;      return localFrequencies;    },                                   // Aggregate local->main accumulator    (mainFreq, localFreq) =>      mainFreq.Zip (localFreq, (f1, f2) => f1 + f2).ToArray(),    finalResult => finalResult     // Perform any final transformation  );                               // on the end result.

请注意,局部累积函数 localFrequency 数组。这种执行此优化的能力非常重要,并且是合法的,因为 localFrequency 是每个线程的本地。

平行类

PFX 通过并行类中的三种静态方法提供结构化并行的基本形式:

Parallel.Invoke

并行执行委托数组

Parallel.For

执行 C# for 循环的并行等效项

Parallel.ForEach

执行 C# foreach 循环的并行等效项

所有三种方法都会阻止,直到所有工作完成。与 PLINQ 一样,在发生未经处理的异常后,剩余的工作线程将在当前迭代后停止,并将异常(或多个异常)抛回调用方 — 包装在 AggregateException 中(请参阅)。

Parallel.Invoke

Parallel.Invoke 并行执行一组 Action 委托,然后等待它们完成。该方法的最简单版本定义如下:

public static void Invoke (params Action[] actions);

与 PLINQ 一样,并行 .* 方法针对计算密集型工作而非 I/O 密集型工作进行了优化。但是,一次下载两个网页提供了一种演示 Parallel.Invoke 的简单方法:

Parallel.Invoke ( () => new WebClient().DownloadFile (";, "lp.html"), () => new WebClient().DownloadFile (";, "ms.html"));

从表面上看,这似乎是创建和等待两个线程绑定 Task 对象的便捷快捷方式。但有一个重要的区别:如果你传入一个包含一百万个委托的数组,Parallel.Invoke 仍然有效地工作。这是因为它将大量元素为分配给少数基础任务的批次,而不是为每个委托创建单独的任务。

与 Parallel 的所有方法一样,在整理结果时,您只能靠自己。这意味着您需要牢记线程安全性。例如,以下内容是线程不安全的:

var data = new List<string>();Parallel.Invoke ( () => data.Add (new WebClient().DownloadString (";)), () => data.Add (new WebClient().DownloadString (";)));

锁定添加到列表可以解决此问题,但如果具有更大的快速执行委托数组,则锁定会产生瓶颈。更好的解决方案是使用线程安全集合,我们将在后面的部分中介绍 - 将是理想的选择。

Parallel.Invoke 也被重载以接受 ParallelOptions 对象:

public static void Invoke (ParallelOptions options,                           params Action[] actions);

使用 ParallelOptions ,您可以插入取消令牌、限制最大并发性以及指定自定义任务计划程序。当您执行的任务(大约)多于核心时,取消令牌是相关的:取消后,任何未启动的委托都将被放弃。但是,任何已经执行任务的代表将继续完成。有关如何使用取消令牌的示例,请参阅

Parallel.For 和 Parallel.ForEach

Parallel.For 和 Parallel.ForEach 执行等效的 C# for 和 foreach 循环,但每次迭代并行执行,而不是按顺序执行。以下是他们的(最简单的)签名:

public static ParallelLoopResult For (  int fromInclusive, int toExclusive, Action<int> body)public static ParallelLoopResult ForEach<TSource> (  IEnumerable<TSource> source, Action<TSource> body)

此顺序 for 循环

for (int i = 0; i < 100; i++)  Foo (i);

像这样并行化

Parallel.For (0, 100, i => Foo (i));

或者更简单地说:

Parallel.For (0, 100, Foo);

而这个顺序的 foreach

foreach (char c in "Hello, world")  Foo (c);

像这样并行化:

Parallel.ForEach ("Hello, world", Foo);

举一个实际的例子,如果我们导入 System.Security.Cryptography 命名空间,我们可以并行生成六个公钥/私钥对字符串,如下所示:

var keyPairs = new string[6];Parallel.For (0, keyPairs.Length,              i => keyPairs[i] = RSA.Create().ToXmlString (true));

与 Parallel.Invoke 一样,我们可以为 Parallel.For 和 Parallel.ForEach 提供大量的工作项,它们将被有效地划分为几个任务。

注意

后一个查询也可以使用 PLINQ 完成:

string[] keyPairs =  ParallelEnumerable.Range (0, 6)  .Select (i => RSA.Create().ToXmlString (true))  .ToArray();
外环与内环

Parallel.For 和 Parallel.ForEach 通常在外部循环而不是内部循环上效果最好。这是因为对于前者,您将提供更大的工作块来并行化,从而稀释了管理开销。通常不需要并行化内部和外部循环。在以下示例中,我们通常需要 100 多个内核才能从内部并行化中受益:

Parallel.For (0, 100, i =>{  Parallel.For (0, 50, j => Foo (i, j));   // Sequential would be better});                                        // for the inner loop.
Indexed Parallel.ForEach

有时,了解循环迭代索引很有用。使用顺序 foreach ,很容易:

int i = 0;foreach (char c in "Hello, world")  Console.WriteLine (c.ToString() + i++);

但是,在并行上下文中,递增共享变量不是线程安全的。您必须改用以下版本的 ForEach :

public static ParallelLoopResult ForEach<TSource> (  IEnumerable<TSource> source, Action<TSource,ParallelLoopState,long> body)

我们将忽略 ParallelLoopState(我们将在下一节中介绍)。现在,我们对 Action 的第三个类型参数 long 感兴趣,它表示循环索引:

Parallel.ForEach ("Hello, world", (c, state, i) =>{   Console.WriteLine (c.ToString() + i);});

为了将其置于实际上下文中,让我们重新审视一下我们使用 PLINQ 编写的拼写检查器。以下代码加载一个字典以及一个包含一百万个单词的数组进行测试:

if (!File.Exists ("WordLookup.txt"))    // Contains about 150,000 words  new WebClient().DownloadFile (    ";, "WordLookup.txt");var wordLookup = new HashSet<string> (  File.ReadAllLines ("WordLookup.txt"),  StringComparer.InvariantCultureIgnoreCase);var random = new Random();string[] wordList = wordLookup.ToArray();string[] wordsToTest = Enumerable.Range (0, 1000000)  .Select (i => wordList [random.Next (0, wordList.Length)])  .ToArray();wordsToTest [12345] = "woozsh";     // Introduce a couplewordsToTest [23456] = "wubsie";     // of spelling mistakes.

我们可以使用 Parallel.ForEach 的索引版本对我们的 wordsToTest 数组执行拼写检查,如下所示:

var misspellings = new ConcurrentBag<Tuple<int,string>>();Parallel.ForEach (wordsToTest, (word, state, i) =>{  if (!wordLookup.Contains (word))    misspellings.Add (Tuple.Create ((int) i, word));});

请注意,我们必须将结果整理到线程安全集合中:与使用 PLINQ 相比,必须这样做是缺点。与 PLINQ 相比,它的优势在于我们避免了应用索引选择查询运算符的成本,而索引 Select 查询运算符的效率低于索引的 ForEach。

ParallelLoopState:尽早脱离循环

由于并行 For 或 ForEach 中的循环主体是委托,因此不能使用 break 语句提前退出循环。相反,您必须在 ParallelLoopState 对象上调用 Break 或 Stop:

public class ParallelLoopState{  public void Break();  public void Stop();  public bool IsExceptional { get; }  public bool IsStopped { get; }  public long? LowestBreakIteration { get; }  public bool ShouldExitCurrentIteration { get; }}

获取 ParallelLoopState 很容易:所有版本的 For 和 ForEach 都被重载以接受 Action<TSource,ParallelLoopState> 类型的循环体。所以,为了并行化这个

foreach (char c in "Hello, world")  if (c == ',')    break;  else    Console.Write (c);

这样做:

Parallel.ForEach ("Hello, world", (c, loopState) =>{  if (c == ',')    loopState.Break();  else    Console.Write (c);});// OUTPUT: Hlloe

从输出中可以看出,循环主体可以按随机顺序完成。除了这种差异之外,调用 Break 至少会产生与按顺序执行循环相同的元素:此示例将始终至少按某种 、、l、 和 。 相反,调用 Stop 而不是 Break 会强制所有线程在其当前迭代后立即完成。在我们的示例中,如果另一个线程滞后,调用 Stop 可以为我们提供字母 、、l、 和 的子集。 当您找到要查找的内容时,或者当出现问题并且您不会查看结果时,呼叫 Stop 非常有用。

注意

Parallel.For 和 Parallel.ForEach 方法返回一个 ParallelLoopResult 对象,该对象公开名为 IsDone 和 LowestBreakIteration 的属性。这些告诉您循环是否运行完成;如果没有,则指示循环在哪个周期中断。

如果 LowestBreakIteration 返回 null,则表示您在循环中调用了 Stop(而不是 Break)。

如果循环体很长,则可能希望其他线程在方法主体中途中断,以防早期中断或停止。可以通过在代码中的不同位置轮询 ShouldExitCurrentIteration 属性来执行此操作;此属性在停止后立即变为 true,或者在中断后不久变为 true。

注意

ShouldExitCurrentIteration 在取消请求后也会变为 true,或者在循环中抛出异常。

IsExceptional 可让您知道另一个线程上是否发生了异常。任何未处理的异常都将导致循环在每个线程的当前迭代后停止:若要避免这种情况,必须在代码中显式处理异常。

使用局部值进行优化

Parallel.For 和 Parallel.ForEach 都提供了一组重载,这些重载具有一个名为 TLocal 的泛型类型参数。这些重载旨在帮助您使用迭代密集型循环优化数据排序规则。最简单的是这样的:

public static ParallelLoopResult For <TLocal> (  int fromInclusive,  int toExclusive,  Func <TLocal> localInit,  Func <int, ParallelLoopState, TLocal, TLocal> body,  Action <TLocal> localFinally);

在实践中很少需要这些方法,因为它们的目标场景主要由 PLINQ 覆盖(这是幸运的,因为这些重载有点吓人!

从本质上讲,问题是这样的:假设我们要对数字 1 到 10,000,000 的平方根求和。计算 10 万个平方根很容易并行化,但对它们的值求和很麻烦,因为我们必须锁定更新总数:

object locker = new object();double total = 0;Parallel.For (1, 10000000,              i => { lock (locker) total += Math.Sqrt (i); });

并行化的收益被获得 10 万个锁的成本以及由此产生的阻塞所抵消。

然而,现实情况是,我们实际上不需要10万把锁。想象一下,一队志愿者捡起大量垃圾。如果所有工人共用一个垃圾桶,那么旅行和争用将使该过程效率极低。显而易见的解决方案是让每个工人都有一个私人或“本地”垃圾桶,偶尔会将其倒入主垃圾箱。

For 和 ForEach 的 TLocal 版本正是以这种方式工作的。志愿者是内部工作线程,本地垃圾桶。要使执行此作业,必须向其提供两个指示的附加委托:

如何初始化新的本地值如何将本地聚合与主值组合

此外,而不是返回 void 的主体委托,它应该返回本地值的新聚合。下面是重构的示例:

object locker = new object();double grandTotal = 0;Parallel.For (1, 10000000,  () => 0.0,                        // Initialize the local value.  (i, state, localTotal) =>         // Body delegate. Notice that it     localTotal + Math.Sqrt (i),    // returns the new local total.  localTotal =>                                    // Add the local value    { lock (locker) grandTotal += localTotal; }    // to the master value.);

我们仍然必须锁定,但只能将局部值聚合到总计。这使得该过程大大提高了效率。

注意

如前所述,PLINQ 通常非常适合这些场景。我们的示例可以像这样与 PLINQ 并行化:

ParallelEnumerable.Range (1, 10000000)                  .Sum (i => Math.Sqrt (i))

(请注意,我们使用 ParallelEnumerable 来强制:在这种情况下,这可以提高性能,因为所有数字的处理时间都相同。

在更复杂的方案中,可以使用 LINQ 的聚合运算符而不是 Sum 。如果您提供了本地种子工厂,则情况有点类似于使用 Parallel.For 提供本地值函数。

任务并行性

是使用 PFX 并行化的最低级别方法。用于在此级别工作的类在 System.Threading.Tasks 命名空间中定义,包括以下内容:

目的

任务

用于管理工作单元

任务<TResult>

用于管理具有返回值的工作单位

任务工厂

用于创建任务

TaskFactory<TResult>

用于创建具有相同返回类型的任务和延续

任务计划程序

用于管理任务计划

任务完成源

用于手动控制任务的工作流

我们在中介绍了任务的基础知识;在本节中,我们将介绍针对并行编程的任务的高级功能:

调整任务的计划当一个任务从另一个任务启动时建立父/子关系延续的高级使用任务工厂注意

任务并行库允许您以最小的开销创建数百(甚至数千)个任务。但是,如果要创建数百万个任务,则需要将这些任务划分为更大的工作单元以保持效率。并行类和 PLINQ 会自动执行此操作。

注意

Visual Studio 提供了一个用于监视任务(调试→窗口→并行任务)的窗口。这等效于“线程”窗口,但用于任务。“并行堆栈”窗口还具有用于任务的特殊模式。

创建和启动任务

如所述,Task.Run创建并启动一个任务或任务<> 。此方法实际上是调用 Task.Factory.StartNew 的快捷方式,它通过额外的重载提供了更大的灵活性。

指定状态对象

Task.Factory.StartNew 允许您指定传递给目标然后,目标方法的签名必须包含单个对象类型参数:

var task = Task.Factory.StartNew (Greet, "Hello");task.Wait();  // Wait for task to complete.void Greet (object state) { Console.Write (state); }   // Hello

这避免了执行调用 Greet 的 lambda 表达式所需的闭包成本。这是一个微优化,在实践中很少需要,因此我们可以更好地利用对象,即为任务分配一个有意义的名称。然后,我们可以使用 AsyncState 属性来查询其名称:

var task = Task.Factory.StartNew (state => Greet ("Hello"), "Greeting");Console.WriteLine (task.AsyncState);   // Greetingtask.Wait();void Greet (string message) { Console.Write (message); }
注意

Visual Studio 在“并行任务”窗口中显示每个任务的 AsyncState,因此在此处使用有意义的名称可以大大简化调试。

任务创建选项

您可以通过在调用 StartNew(或实例化任务)时指定 TaskCreationOptions 枚举来调整任务的执行。TaskCreationOptions 是一个具有以下(可组合)值的标志枚举:

LongRunning, PreferFairness, AttachedToParent

LongRun 建议调度程序将一个线程专用于任务,正如我们中所述,这对于 I/O 绑定任务和长时间运行的任务是有益的,否则这些任务可能会迫使短期运行的任务在调度之前等待不合理的时间。

PreferFairness 指示调度程序尝试确保按启动顺序调度任务。它通常可能会这样做,因为它使用本地工作窃取队列在内部优化任务调度 — 这种优化允许创建任务,而不会产生单个工作队列产生的争用开销。通过指定“附加到父项”来创建子任务。

子任务

当一个任务启动另一个任务时,您可以选择建立父子:

Task parent = Task.Factory.StartNew (() =>{  Console.WriteLine ("I am a parent");  Task.Factory.StartNew (() =>        // Detached task  {    Console.WriteLine ("I am detached");  });  Task.Factory.StartNew (() =>        // Child task  {    Console.WriteLine ("I am a child");  }, TaskCreationOptions.AttachedToParent);});

子任务的特殊之处在于,当您等待任务完成时,它也会等待任何子任务。此时,任何子异常都会冒泡:

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;var parent = Task.Factory.StartNew (() => {  Task.Factory.StartNew (() =>   // Child  {    Task.Factory.StartNew (() => { throw null; }, atp);   // Grandchild  }, atp);});// The following call throws a NullReferenceException (wrapped// in nested AggregateExceptions):parent.Wait();

当子任务是延续时,这可能特别有用,您很快就会看到。

等待多个任务

我们在中看到,您可以通过调用其 Wait 方法或访问其 Result 属性(如果它是 Task<TResult> )来等待单个任务。您还可以通过静态方法 Task.WaitAll(等待所有指定任务完成)和 Task.WaitAny(仅等待一个任务完成)一次等待多个任务。

WaitAll 类似于依次等待每个任务,但效率更高,因为它(最多)只需要一个上下文切换。此外,如果一个或多个任务引发未经处理的异常,WaitAll 仍会等待每个任务。然后,它会重新抛出一个 AggregateException,该异常累积每个错误任务的异常(这是 AggregateException 真正有用的地方)。这相当于这样做:

// Assume t1, t2 and t3 are tasks:var exceptions = new List<Exception>();try { t1.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }try { t2.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }try { t3.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }if (exceptions.Count > 0) throw new AggregateException (exceptions);

调用 WaitAny 等效于等待 ManualResetEventSlim,该任务在完成时由每个任务发出信号。

除了超时之外,您还可以将传递给 Wait 方法:这允许您取消等待,。

取消任务

您可以选择在启动任务时传入取消令牌。然后,如果通过该令牌取消,任务本身将进入“已取消”状态:

var cts = new CancellationTokenSource();CancellationToken token = cts.Token;cts.CancelAfter (500);Task task = Task.Factory.StartNew (() => {  Thread.Sleep (1000);  token.ThrowIfCancellationRequested();  // Check for cancellation request}, token);try { task.Wait(); }catch (AggregateException ex){  Console.WriteLine (ex.InnerException is TaskCanceledException);  // True  Console.WriteLine (task.IsCanceled);                             // True  Console.WriteLine (task.Status);                             // Canceled}

TaskCanceledException 是 OperationCanceledException 的一个子类。如果要显式抛出 OperationCanceledException(而不是调用令牌。ThrowIfCancelRequest),您必须将取消令牌传递到 OperationCanceledException 的构造函数中。如果不这样做,任务将不会以 TaskStatus.Canceled 状态结束,也不会触发 OnlyOnCanceled 。

如果任务在启动之前被取消,则不会计划它 - 操作已取消异常将立即抛给任务。

由于取消令牌由其他 API 识别,因此您可以将它们传递到其他构造中,取消将无缝传播:

var cancelSource = new CancellationTokenSource();CancellationToken token = cancelSource.Token;Task task = Task.Factory.StartNew (() =>{  // Pass our cancellation token into a PLINQ query:  var query = someSequence.AsParallel().WithCancellation (token)...  ... enumerate query ...});

在此示例中调用 Cancel on cancelSource 将取消 PLINQ 查询,该查询将在任务正文上引发 OperationCanceledException,然后取消任务。

注意

可以传递到“等待”和“取消和等待”等方法中的取消令牌允许您取消操作,而不是取消任务本身。

延续

方法在任务结束后立即执行委托:

Task task1 = Task.Factory.StartNew (() => Console.Write ("antecedant.."));Task task2 = task1.ContinueWith (ant => Console.Write ("..continuation"));

一旦任务 1(任务)完成、失败或取消,任务 2()就会启动。(如果任务 1 在第二行代码运行之前已完成,则任务 2 将计划立即执行。传递给延续的 lambda 表达式的 ant 参数是对先验任务的引用。ContinueWith 本身会返回一个任务,以便轻松添加进一步的延续。

默认情况下,前置任务和延续任务可以在不同的线程上执行。您可以通过指定 TaskContinuationOptions.ExecuteSync 在调用 ContinueWith 时强制它们在同一线程上执行:这可以通过减少间接性来提高非常细粒度的延续的性能。

延续和任务<任务>

就像普通任务一样,延续可以是 Task<TResult> 类型并返回数据。在下面的示例中,我们使用一系列链式任务计算 Math.Sqrt(8*2),然后写出结果:

Task.Factory.StartNew<int> (() => 8)  .ContinueWith (ant => ant.Result * 2)  .ContinueWith (ant => Math.Sqrt (ant.Result))  .ContinueWith (ant => Console.WriteLine (ant.Result));   // 4

为了简单起见,我们的例子有些做作;在现实生活中,这些 lambda 表达式会调用计算密集型函数。

延续和例外

延续可以通过查询前置任务的 Exception 属性来了解前置任务是否出错,或者只是通过调用 Result / Wait 并捕获生成的 AggregateException。如果先前的错误,而延续也不存在,则异常被视为,并且静态 TaskScheduler.UnobservedTaskException 事件在稍后对任务进行垃圾回收时触发。

安全的模式是重新引发先前的异常。只要延续是 Wait edon,异常就会传播并重新抛出到 Waiter:

Task continuation = Task.Factory.StartNew     (()  => { throw null; })                                .ContinueWith (ant =>  {    ant.Wait();    // Continue processing...  });continuation.Wait();    // Exception is now thrown back to caller.

处理异常的另一种方法是为特殊与非异常结果指定不同的延续。这是通过任务延续选项完成的:

Task task1 = Task.Factory.StartNew (() => { throw null; });Task error = task1.ContinueWith (ant => Console.Write (ant.Exception),                                 TaskContinuationOptions.OnlyOnFaulted);Task ok = task1.ContinueWith (ant => Console.Write ("Success!"),                              TaskContinuationOptions.NotOnFaulted);

此模式在与子任务结合使用时特别有用,您很快就会看到。

以下扩展方法“吞噬”任务的未处理异常:

public static void IgnoreExceptions (this Task task){  task.ContinueWith (t => { var ignore = t.Exception; },    TaskContinuationOptions.OnlyOnFaulted);}

(这可以通过添加代码来记录异常来改进。以下是它的使用方式:

Task.Factory.StartNew (() => { throw null; }).IgnoreExceptions();
延续和子任务

延续的一个强大功能是,它们仅在所有子任务完成时启动(参见)。此时,子项引发的任何异常都将封送到延续中。

在下面的示例中,我们启动三个子任务,每个子任务抛出一个 NullReferenceException 。然后,我们通过父级的延续一举捕获所有这些:

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;Task.Factory.StartNew (() =>{  Task.Factory.StartNew (() => { throw null; }, atp);  Task.Factory.StartNew (() => { throw null; }, atp);  Task.Factory.StartNew (() => { throw null; }, atp);}).ContinueWith (p => Console.WriteLine (p.Exception),                    TaskContinuationOptions.OnlyOnFaulted);

有条件的延续

默认情况下,无论前置任务完成、引发异常还是取消,都会无地安排延续。您可以通过 TaskContinuationOptions 枚举中包含的一组(可组合的)标志来更改此行为。以下是控制条件延续的三个核心标志:

NotOnRanToCompletion = 0x10000,NotOnFaulted = 0x20000,NotOnCanceled = 0x40000,

这些标志是减法的,因为您应用的越多,执行延续的可能性就越小。为方便起见,还有以下预组合值:

OnlyOnRanToCompletion = NotOnFaulted | NotOnCanceled,OnlyOnFaulted = NotOnRanToCompletion | NotOnCanceled,OnlyOnCanceled = NotOnRanToCompletion | NotOnFaulted

(组合所有 Not* 标志 [ NotOnRanToCompletion, NotOnFaulted , NotOnCanceled ] 是荒谬的,因为它会导致延续总是。

“RanToComplete”表示前置成功,没有取消或未处理的异常。

“错误”表示在前置项上抛出未处理的异常。

“已取消”是指以下两种情况之一:

前因已通过其取消令牌取消。换句话说,在前置项上抛出了一个 OperationCanceledException,其 CancelToken 属性与启动时传递给前置项的属性匹配。前置项被隐式取消,不满足条件延续谓词。

必须掌握的是,当延续没有通过这些标志执行时,延续不会被遗忘或放弃 - 它会被取消。这意味着延续本身的任何延续,除非您使用 NotOnCanceled 谓词它们。例如,考虑一下:

Task t1 = Task.Factory.StartNew (...);Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"),                              TaskContinuationOptions.OnlyOnFaulted);Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"));

就目前而言,t3 将始终被调度 — 即使 t1 没有引发异常(参见)。这是因为如果 t1 成功,故障任务将被取消,并且对 t3 没有继续限制,t3 将无条件执行。

如果我们希望 t3 仅在错误实际运行时执行,我们必须这样做:

Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"),                              TaskContinuationOptions.NotOnCanceled);

(或者,我们可以指定 OnlyOnRanToCompletion;不同之处在于,如果在错误中抛出异常,t3 将不会执行。

具有多个先行的延续

可以使用 TaskFactory 类中的 ContinueWhenAll 和 ContinueWhenAny 方法,根据多个前因的完成情况来安排继续执行。然而,随着(WhenAll和WhenAny)中讨论的任务组合器的引入,这些方法变得多余。具体来说,给定以下任务

var task1 = Task.Run (() => Console.Write ("X"));var task2 = Task.Run (() => Console.Write ("Y"));

我们可以安排在两者都完成后执行的延续,如下所示:

var continuation = Task.Factory.ContinueWhenAll (  new[] { task1, task2 }, tasks => Console.WriteLine ("Done"));

下面是与 WhenAll 任务组合器相同的结果:

var continuation = Task.WhenAll (task1, task2)                       .ContinueWith (ant => Console.WriteLine ("Done"));
单个先验的多个延续

对同一任务多次调用 ContinueWith 会在单个前因上创建多个延续。当前置完成时,所有延续将一起开始(除非您指定 TaskContinuationOptions.ExecuteSyncly ,在这种情况下,延续将按顺序执行)。

以下内容等待一秒钟,然后写入 XY 或 YX:

var t = Task.Factory.StartNew (() => Thread.Sleep (1000));t.ContinueWith (ant => Console.Write ("X"));t.ContinueWith (ant => Console.Write ("Y"));
任务计划程序

任务计划程序将任务分配给线程,并由抽象的任务类表示。 .NET 提供了两个具体的实现:与 CLR 线程池协同工作的默认计划程序和。 后者(主要)旨在帮助您使用 WPF 和 Windows 窗体的线程模型,该模型要求用户界面元素和控件只能从创建它们的线程访问(请参见中的)。通过捕获它,我们可以指示任务或延续在此上下文中执行:

// Suppose we are on a UI thread in a Windows Forms / WPF application:_uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();

假设 Foo 是一个返回字符串的计算绑定方法,并且 lblResult 是一个 WPF 或 Windows 窗体标签,那么我们可以在操作完成后安全地更新标签,如下所示:

Task.Run (() => Foo())  .ContinueWith (ant => lblResult.Content = ant.Result, _uiScheduler);

当然,C# 的异步函数更常用于这种事情。

也可以编写我们自己的任务调度程序(通过子类化 任务调度程序 ),尽管这只有在非常专业的情况下才会这样做。对于自定义计划,您更常使用 TaskCompletionSource。

任务工厂

当您调用 Task.Factory 时,您将在 Task 上调用返回默认 TaskFactory 对象的静态属性。任务工厂的目的是创建任务;具体来说,有三种任务:

“普通”任务(通过StartNew)具有多个先行的延续(通过 ContinueWhenAll 和 ContinueWhenAny )包装已失效 APM 后的方法的任务(通过 FromAsync;请参阅中的)。

创建任务的另一种方法是实例化任务并调用 Start 。但是,这允许您仅创建“普通”任务,而不创建延续。

创建自己的任务工厂

TaskFactory 不是一个工厂:您实际上可以实例化类,当您想要使用相同(非标准)值为 TaskCreationOptions、TaskContinuationOptions 或 TaskScheduler 重复创建任务时,这很有用。例如,如果我们想重复创建长时间运行的任务,我们可以创建一个自定义工厂,如下所示:

var factory = new TaskFactory (  TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent,  TaskContinuationOptions.None);

然后,只需在工厂调用 StartNew,即可创建任务:

Task task1 = factory.StartNew (Method1);Task task2 = factory.StartNew (Method2);...

自定义继续选项在调用 ContinueWhenAll 和 ContinueWhenAny 时应用。

使用 AggregateException

正如我们所看到的,PLINQ、并行类和任务会自动将异常封送到使用者。若要了解为什么这是必不可少的,请考虑以下 LINQ 查询,该查询在第一次迭代时引发 DivideByZeroException:

try{  var query = from i in Enumerable.Range (0, 1000000)              select 100 / i;  ...}catch (DivideByZeroException){  ...}

如果我们要求 PLINQ 并行化此查询,并且它忽略了异常的处理,则可能会在上抛出 DivideByZeroException,从而绕过我们的 catch 块并导致应用程序死亡。

因此,会自动捕获异常并将其重新抛出给调用方。但不幸的是,这并不像捕获DivideByZeroException那么简单。由于这些库使用许多线程,因此实际上可以同时引发两个或多个异常。因此,为了确保报告所有异常,异常将包装在 AggregateException 容器中,该容器公开一个包含每个捕获异常的 InnerExceptions 属性:

try{  var query = from i in ParallelEnumerable.Range (0, 1000000)              select 100 / i;  // Enumerate query  ...}catch (AggregateException aex){  foreach (Exception ex in aex.InnerExceptions)    Console.WriteLine (ex.Message);}
注意

PLINQ 和 Parallel 类在遇到第一个异常时都会结束查询或循环执行,方法是不处理任何其他元素或循环体。但是,在当前周期完成之前,可能会引发更多异常。AggregateException 中的第一个异常在 InnerException 属性中可见。

展平和处理

AggregateException 类提供了几种方法来简化异常处理:Flatten 和 Handle 。

扁平 化

AggregateExceptions 通常会包含其他 AggregateExceptions。发生这种情况的一个示例是子任务引发异常。您可以通过调用 Flatten 来消除任何级别的嵌套以简化处理。此方法返回一个新的 AggregateException,其中包含内部异常的简单平面列表:

catch (AggregateException aex){  foreach (Exception ex in aex.Flatten().InnerExceptions)    myLogWriter.LogException (ex);}
处理

有时,仅捕获特定的异常类型并重新引发其他类型的异常类型很有用。AggregateException 上的 Handle 方法提供了执行此操作的快捷方式。它接受一个异常谓词,它在每个内部上运行:

public void Handle (Func<Exception, bool> predicate)

如果谓词返回 true,则认为该异常“已处理”。委托运行每个异常后,将发生以下情况:

如果所有异常都“处理”(委托返回 true),则不会重新引发异常。如果存在委托返回 false 的任何异常(“未处理”),则会构建包含这些异常的新 AggregateException,并重新引发。

例如,以下内容最终会重新抛出另一个包含单个 NullReferenceException 的 AggregateException:

var parent = Task.Factory.StartNew (() => {  // We’ll throw 3 exceptions at once using 3 child tasks:  int[] numbers = { 0 };  var childFactory = new TaskFactory   (TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);  childFactory.StartNew (() => 5 / numbers[0]);   // Division by zero  childFactory.StartNew (() => numbers [1]);      // Index out of range  childFactory.StartNew (() => { throw null; });  // Null reference});try { parent.Wait(); }catch (AggregateException aex){  aex.Flatten().Handle (ex =>   // Note that we still need to call Flatten  {    if (ex is DivideByZeroException)    {      Console.WriteLine ("Divide by zero");      return true;                           // This exception is "handled"    }    if (ex is IndexOutOfRangeException)    {      Console.WriteLine ("Index out of range");      return true;                           // This exception is "handled"       }    return false;    // All other exceptions will get rethrown  });}
并发集合

.NET 在 System.Collections.Concurrent 中提供线程安全集合:

并发收集

非并发等效项

ConcurrentStack<T>

堆栈<T>

ConcurrentQueue<T>

队列<T>

ConcurrentBag<T>

(无)

并发词典<啦,电视>

词典<啦,电视>

并发集合针对高并发场景进行了优化;但是,每当需要线程安全集合(作为锁定普通集合的替代方法)时,它们也很有用。不过,有一些注意事项:

传统集合在除高并发方案之外的所有方案中都优于并发集合。线程安全集合不能保证使用它的代码是线程安全的(参见中的)。如果在另一个线程修改并发集合时枚举该集合,则不会引发异常,而是会混合使用新。没有 List<T 的并发版本> 。并发堆栈、队列和包类在内部通过链表实现。这使得它们的内存效率低于非并发堆栈和队列类,但更适合并发访问,因为链表有利于无锁或低锁实现。(这是因为将节点插入链表只需要更新几个引用,而将元素插入类似 List<T> 的结构可能需要移动数千个现有元素。

换句话说,这些集合不仅仅是使用带锁的普通集合的快捷方式。为了演示,如果我们在线程上执行以下代码

var d = new ConcurrentDictionary<int,int>();for (int i = 0; i < 1000000; i++) d[i] = 123;

它的运行速度比这慢三倍:

var d = new Dictionary<int,int>();for (int i = 0; i < 1000000; i++) lock (d) d[i] = 123;

(但是 ConcurrentDictionary 读取速度很快,因为读取是。

并发集合与传统集合的不同之处在于,它们公开了执行原子测试和操作操作的特殊方法,例如 TryPop 。这些方法中的大多数都是通过IProducerConsumerCollection<T>统一的。

IProducerConsumerCollection<T>

生产者/使用者集合是两个主要用例的集合:

添加元素(“生产”)在删除元素时检索元素(“消耗”)

典型的示例是堆栈和队列。生产者/使用者集合在并行编程中非常重要,因为它们有利于高效的无锁实现。

IProducerConsumerCollection<T> 接口表示线程安全的集合。以下类实现此接口:

ConcurrentStack<T>ConcurrentQueue<T>ConcurrentBag<T>

IProducerConsumerCollection<T>扩展了ICollection,增加了以下方法:

void CopyTo (T[] array, int index);T[] ToArray();bool TryAdd (T item);bool TryTake (out T item);

TryAdd 和 TryTake 方法测试是否可以执行添加/删除操作;如果是这样,他们将执行添加/删除。测试和操作以原子方式执行,无需像在传统集合周围那样锁定:

int result;lock (myStack) if (myStack.Count > 0) result = myStack.Pop();

如果集合为空,则 TryTake 返回 false。TryAdd 始终成功,并在提供的三个实现中返回 true。但是,如果您编写了自己的并发集合,该集合禁止重复,则如果元素已存在,则会使 TryAdd 返回 false(例如,如果您编写了并发)。

TryTake 删除的特定元素由子类定义:

对于堆栈,TryTake 会删除最近添加的元素。对于队列,TryTake 会删除最近添加最少的元素。使用袋子,TryTake 可以最有效地去除任何元素。

这三个具体的类大多显式地实现 TryTake 和 TryAdd 方法,通过更具体命名的公共方法(如 TryDequeue 和 TryPop)公开相同的功能。

ConcurrentBag<T>

ConcurrentBag<T> 存储对象的集合(允许重复)。ConcurrentBag<T> 适用于在调用 Take 或 TryTake 时您哪个元素的情况。

ConcurrentBag<T> 相对于并发队列或堆栈的好处是,当同时被多个线程调用时,包的 Add 用。相反,在队列或堆栈上并行调用 Add 会导致争用(尽管比锁定集合要少得多)。在并发包上调用 Take 也非常有效——只要每个线程占用的元素不超过它添加的元素。

在并发包中,每个线程都有自己的私有链表。元素被添加到属于调用 Add 的线程的私有列表中,消除了。枚举包时,枚举器遍历每个线程的私有列表,依次生成其每个元素。

当您调用 Take 时,包首先查看当前线程的私有列表。如果至少有一个元素,1它可以轻松完成任务,而不会争用。但是,如果列表为空,则必须从另一个线程的私有列表中“窃取”元素,并引起争用的可能性。

因此,准确地说,调用 Take 会为您提供最近在该线程上添加的元素;如果该线程上没有元素,它将为您提供最近在另一个线程上添加的元素,这是随机选择的。

当集合上的并行操作主要包括添加元素时,或者当添加 s 和 Take s 在线程上平衡时,并发袋是理想的选择。我们之前在使用 Parallel.ForEach 实现并行拼写检查器时看到了前者的示例:

var misspellings = new ConcurrentBag<Tuple<int,string>>();Parallel.ForEach (wordsToTest, (word, state, i) =>{  if (!wordLookup.Contains (word))    misspellings.Add (Tuple.Create ((int) i, word));});

对于生产者/消费者队列来说,并发包将是一个糟糕的选择,因为元素是由线程添加和删除的。

阻止集合<T>

如果在上一节中讨论的任何生产者/使用者集合上调用 TryTake,ConcurrentStack<T>、ConcurrentQueue<T> 和 ConcurrentBag<T> ,并且集合为空,则该方法返回 false 。有时,在这种情况下,到元素可用会更有用。

PFX 的设计人员没有使用此功能重载 TryTake 方法(在允许取消令牌和超时后会导致成员井喷),而是将此功能封装到名为 BlockingCollection<T> 的包装类中。阻塞集合包装实现 IProducerConsumerCollection<T> 的任何集合,并允许您从包装的集合中获取元素 — 如果没有可用的元素,则阻止。

阻止集合还允许您限制集合的总大小,如果超过该大小,则阻止。以这种方式限制的集合称为。

要使用 BlockingCollection<T> :

实例化类,可以选择指定要包装的 IProducerConsumerCollection<T> 以及集合的最大大小(绑定)。调用 Add 或 TryAdd 以将元素添加到基础集合。调用 Take 或 TryTake 以从基础中删除(使用)元素。

如果在不传入集合的情况下调用构造函数,则该类将自动实例化 ConcurrentQueue<T> 。生成和使用方法允许您指定取消令牌和超时。如果集合大小有限,则添加和 TryAdd 可能会阻止;当集合为空时,Take 和 TryTake 块。

使用元素的另一种方法是调用 GetConsumingEnumerable 。这将返回一个(可能)无限序列,该序列在元素可用时生成元素。您可以通过调用 CompleteAdd 来强制序列结束:此方法还可以防止其他元素排队。

BlockingCollection 还提供了称为 AddToAny 和 TakeFromAny 的静态方法,它们允许您在指定多个阻塞集合的同时添加或获取元素。然后,能够为请求提供服务的第一个集合将执行该操作。

编写生产者/使用者队列

生产者/使用者队列是一种有用的结构,无论是在并行编程还是常规并发方案中。以下是它的工作原理:

设置一个队列来描述工作项或工作的数据。当任务需要执行时,它会排队,调用方继续处理其他事情。一个或多个工作线程在后台插入,选取并执行排队的项目。

生产者/使用者队列可让您精确控制一次执行多少工作线程,这不仅可用于限制 CPU 消耗,还可用于限制其他资源。例如,如果任务执行密集型磁盘 I/O,则可以限制并发性以避免使操作系统和其他应用程序匮乏。您还可以在队列的整个生命周期内动态添加和删除工作线程。CLR 的线程池本身是一种生产者/使用者队列,针对短期运行的计算绑定作业进行了优化。

生产者/使用者队列通常包含对其执行(相同)任务的数据项。例如,数据项可能是文件名,任务可能是加密这些文件。但是,通过将项目设置为委托,您可以编写一个更通用的生产者/使用者队列,其中每个项目都可以执行任何操作。

,我们展示了如何使用AutoResetEvent从头开始编写生产者/消费者队列(以及后来使用监视器的等待和脉冲)。但是,从头开始编写生产者/消费者是不必要的,因为大多数功能都是由 BlockingCollection<T> 提供的。以下是使用它的方法:

public class PCQueue : IDisposable{  BlockingCollection<Action> _taskQ = new BlockingCollection<Action>();  public PCQueue (int workerCount)  {    // Create and start a separate Task for each consumer:    for (int i = 0; i < workerCount; i++)      Task.Factory.StartNew (Consume);  }  public void Enqueue (Action action) { _taskQ.Add (action); }  void Consume()  {    // This sequence that we’re enumerating will block when no elements    // are available and will end when CompleteAdding is called.    foreach (Action action in _taskQ.GetConsumingEnumerable())      action();     // Perform task.  }  public void Dispose() { _taskQ.CompleteAdding(); }}

因为我们没有将任何东西传递到 BlockingCollection 的构造函数中,所以它会自动实例化一个并发队列。如果我们在ConcurrentStack中传递,我们最终会得到一个生产者/消费者堆栈。

使用任务

我们刚刚编写的生产者/使用者是不灵活的,因为我们无法在工作项排队后跟踪它们。如果我们能做到以下几点,那就太好了:

了解工作项何时完成(并等待它)取消工作项优雅地处理工作项引发的任何异常

一个理想的解决方案是让 Enqueue 方法返回一些对象,为我们提供刚才描述的功能。好消息是,已经存在一个类来做到这一点 - Task 类,我们可以使用 TaskCompletionSource 或通过直接实例化(创建未启动或任务)来生成它:

public class PCQueue : IDisposable{  BlockingCollection<Task> _taskQ = new BlockingCollection<Task>();  public PCQueue (int workerCount)  {    // Create and start a separate Task for each consumer:    for (int i = 0; i < workerCount; i++)      Task.Factory.StartNew (Consume);  }  public Task Enqueue (Action action, CancellationToken cancelToken                                            = default (CancellationToken))  {    var task = new Task (action, cancelToken);    _taskQ.Add (task);    return task;  }  public Task<TResult> Enqueue<TResult> (Func<TResult> func,               CancellationToken cancelToken = default (CancellationToken))  {    var task = new Task<TResult> (func, cancelToken);    _taskQ.Add (task);    return task;  }    void Consume()  {    foreach (var task in _taskQ.GetConsumingEnumerable())      try       {          if (!task.IsCanceled) task.RunSynchronously();      }       catch (InvalidOperationException) { }  // Race condition  }  public void Dispose() { _taskQ.CompleteAdding(); }}

在 排队 ,我们将创建但不启动的任务排队并返回给调用方。

在 消费 中,我们在使用者的线程上同步运行任务。我们捕获一个 InvalidOperationException 来处理在检查任务是否已取消和运行它之间取消任务的不太可能的事件。

以下是我们如何使用此类:

var pcQ = new PCQueue (2);    // Maximum concurrency of 2string result = await pcQ.Enqueue (() => "That was easy!");...

因此,我们拥有任务的所有好处(异常传播、返回值和取消),同时完全控制调度。

标签: #c语言结构化程序设计方法步骤