计算限制的异步操作
本章将讨论以异步方式执行操作的各种方式. 异步的计算限制操作要用其他线程执行. 例如: 编译代码,检查拼写,语法检查, 音频视频数据转码以及生产图像的略缩图.
CLR线程池基础
前面说过,创建和销毁线程是一个比较昂贵的操作,太多的线程也会浪费内存资源。由于操作系统必须调度可运行的线程并执行上下文切换,所以太多的线程还有损于性能。为了改善这个情况,CLR使用了代码来管理它自己的线程池(thread pool)
。线程池是你的应用程序能使用的线程集合。每个CLR都有一个线程池,这个线程池由CLR控制的所有AppDomain共享.
CLR初始化时,线程池是没有线程的。在内部,线程池维护了一个操作请求队列。应用程序想执行一个异步操作时,就调用某个方法,将一个记录项(entry)追加到线程池的队列中。线程池的代码从这个队列中提取记录项,将这个记录项派遣(dispatch)给一个线程池线程。如果线程池中没有线程,就创建新的线程。创建线程要产生一定的性能损失。然而,当线程池完成任务后,线程不会被销毁。相反,线程会返回线程池,在那里进入空闲状态,等待响应另一个请求。由于线程不销毁自身,所以不再产生额外的性能损失。
如果你的应用程序向线程池发出许多请求,线程池会尝试只用一个线程来服务所有的请求。然而,如果你的应用程序发出请求的速度超过了线程池处理它们的速度,就会创建额外的线程。最终,你的应用程序所有请求都可能有少量的线程处理,所有线程池不必创建大量的线程。
如果你的应用程序停止向线程池发出请求,池中含有大量空闲的线程。这是对内存资源的一种浪费。所以,当一个线程池线程空闲一段时间以后,线程会自己醒来终止自己以释放资源。
当线程闲着没事一段时间后, 线程会自己醒来终止自己以释放资源, 会产生一定的性能损失。 这个性能损失关系不大.
在内部,线程池将自己的线程划分为工作者(Worker)线程
和I/O线程
。应用程序要求线程池执行一个异步的计算限制操作时(这个操作可能发起一个I/O限制的操作),使用的就是工作者线程。I/O线程用于通知你的代码一个异步I/O限制操作已经完成,具体的说,这意味着使用”异步编程模型”发出I/O请求,比如访问文件、网络服务器、数据库等等。
执行简单的计算限制操作
将一个异步的、计算限制的操作放到一个线程池的队列中,通常可以调用ThreadPool类
定义的以下方法之一:
//将方法排入队列以便执行。此方法在有线程池线程变得可用时执行。
static Boolean QueueUserWorkItem(WaitCallback callBack);
//将方法排入队列以便执行,并指定包含该方法所用数据的对象。此方法在有线程池线程变得可用时执行。
static Boolean QueueUserWorkItem(WaitCallback callBack, Object state);
这些方法向线程池的队列中添加一个"工作项"(work item)
以及可选的状态数据
, 如果此方法成功排队,则为 true;如果无法将该工作项排队,则引发 OutOfMemoryException
。
工作项
其实就是由callBack参数标识的一个方法
,该方法将由线程池线程调用。可通过state实参(状态数据)
向方法传递一个参数。无state参数的那个版本的QueueUserWorkItem
则向回调方法传递null
。最终,池中的某个线程会处理工作项
,造成你指定的方法被调用。你写的回调方法必须匹配System.Threading.WaitCallBack委托类型
,它的定义如下:
delegate void WaitCallback(Object state);
WaitCallback委托
,TimerCallback委托
,ParameterizedThreadStart委托
签名完全一致, 使用ThreadPool.QueueUserWorkItem
,System.Threading.Timer
和System.Threading.Thread
对象都可以调用该方法.
以下演示了如何让一个线程池线程以异步方式调用一个方法:
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Main thread: queuing an asynchronous operation");
ThreadPool.QueueUserWorkItem(ComputeBoundOp, 5);
Console.WriteLine("Main thread: Doing other work here...");
Thread.Sleep(10000); // 模拟其它工作 (10 秒钟)
//Console.ReadLine();
}
// 这是一个回调方法,必须和WaitCallBack委托签名一致
private static void ComputeBoundOp(Object state)
{
// 这个方法通过线程池中线程执行
Console.WriteLine("In ComputeBoundOp: state={0}", state);
Thread.Sleep(1000); // 模拟其它工作 (1 秒钟)
// 这个方法返回后,线程回到线程池,等待其他任务
}
}
// 可以输出以下结果
// Main thread: queuing an asynchronous operation
// Main thread: Doing other work here...
// In ComputeBoundOp: state=5
// 但有时也会得到一下输出:
// Main thread: queuing an asynchronous operation
// In ComputeBoundOp: state=5
// Main thread: Doing other work here...
之所以有两种输出结果,是因为这两个方法相互之间是异步运行的。由Windows调度器决定先调度哪一个线程。
执行上下文
每个线程都关联了一个执行上下文数据结构。
执行上下文(execution context)
包括的东西有:
- 安全设置(压缩栈、
Thread的Principal属性[指示线程的调度优先级]
和Windows身份)、 - 宿主设置(参见
System.Threading.HostExecutionContextManager[提供使公共语言运行时宿主可以参与执行上下文的流动(或移植)的功能]
) - 逻辑调用上下文数据 (参见
System.Runtime.Remoting.Messaging.CallContext[提供与执行代码路径一起传送的属性集]
的LogicalSetData[将一个给定对象存储在逻辑调用上下文中并将该对象与指定名称相关联]
和LogicalGetData[从逻辑调用上下文中检索具有指定名称的对象]
方法).
线程执行代码时,有的操作会受到线程的执行上下文设置
(尤其是安全设置)的影响。理想情况下,每当一个线程(初始线程)使用另一个线程(辅助线程)执行任务时,前者的执行上下文应该”流向”(复制到)辅助线程。这就确保辅助线程执行的任何操作使用的都是相同的安全设置和宿主设置。还确保了初始线程的逻辑调用上下文可以在辅助线程中使用。
默认情况下,CLR自动造成初始线程的执行上下文
会”流向”(复制到)任何辅助线程。这就是将上下文信息传输到辅助线程,但这对损失性能,因为执行上下文
中包含大量信息,而收集这些信息,再将这些信息复制到辅助线程,要耗费不少时间。如果辅助线程又采用更多的辅助线程,还必须创建和初始化更多的执行上下文数据结构。
System.Threading
命名空间中有一个ExecutionContext类[管理当前线程的执行上下文]
,它允许你控制线程的执行上下文如何从一个线程”流向”(复制到)另一个线程。下面展示了这个类的样子:
public sealed class ExecutionContext : IDisposable, ISerializable
{
[SecurityCritical]
//取消执行上下文在异步线程之间的流动
public static AsyncFlowControl SuppressFlow();
//恢复执行上下文在异步线程之间的流动
public static void RestoreFlow();
//指示当前是否取消了执行上下文的流动。
public static bool IsFlowSuppressed();
//不常用方法没有列出
}
可用这个类阻止一个执行上下文的流动,从而提升应用程序的性能。对于服务器应用程序,性能的提升可能非常显著。但是,客户端应用程序的性能提升不了多少。另外,由于SuppressFlow方法
用[SecurityCritical]attribute]
进行了标识,所以在某些客户端应用程序(比如Silverlight)中是无法调用的。当然,只有在辅助线程不需要或者不防问上下文信息时,才应该阻止执行上下文的流动。如果初始线程的执行上下文不流向辅助线程,辅助线程会使用和它关联起来的任何执行上下文。在这种情况下,辅助线程不应该执行要依赖于执行上下文状态(比如用户的Windows身份)的代码。
注意:添加到
逻辑调用上下文
的项必须是可序列化的。对于包含了逻辑调用上下文数据线的一个执行上下文,如果让它流动,可能严重损害性能,因为为了捕捉执行上下文,需对所有数据项进行序列化和反序列化。
下例展示了向CLR的线程池队列
添加一个工作项
的时候,如何通过阻止执行上下文的流动来影响线程逻辑调用上下文中的数据:
class Program
{
static void Main(string[] args)
{
// 将一些数据放到Main线程的逻辑调用上下文中
CallContext.LogicalSetData("Name", "Jeffrey");
// 线程池能访问到逻辑调用上下文数据,加入到程序池队列中
ThreadPool.QueueUserWorkItem(
state => Console.WriteLine("Name={0}", CallContext.LogicalGetData("Name")));
// 现在阻止Main线程的执行上下文流动
ExecutionContext.SuppressFlow();
//再次访问逻辑调用上下文的数据
ThreadPool.QueueUserWorkItem(
state => Console.WriteLine("Name={0}", CallContext.LogicalGetData("Name")));
//恢复Main线程的执行上下文流动
ExecutionContext.RestoreFlow();
//再次访问逻辑调用上下文的数据
ThreadPool.QueueUserWorkItem(
state => Console.WriteLine("Name={0}", CallContext.LogicalGetData("Name")));
Console.Read();
}
}
// Name=Jeffrey
// Name=
// Name=Jeffrey
虽然现在我们讨论的是调用ThreadPool.QueueUserWorkItem
时阻止执行上下文的流动
,但在使用Task对象
(以后会提到),以及在发起异步I/O操作(以后会提到)时也会用到
协作式取消和超时
Microsoft .NET Framework提供了一个标准的取消操作模式。这个模式是协作式的,意味着你想取消的操作必须显式的支持取消。换言之,无论执行操作的代码,还是试图取消操作的代码,都必须使用本节提到的类型。对于长时间运行的计算限制操作来说,支持取消是一件非常”棒”的事。所以,你应该考虑为自己的计算限制操作添加取消能力。首先,先解释一下FCL提供的两个主要类型,它们是标准协作式取消模式的一部分。
为了取消一个操作,首先必须创建一个System.Thread.CancellationTokenSource[通知 CancellationToken,告知其应被取消]
对象。这个类如下所示:
public class CancellationTokenSource : IDisposable
{
//构造函数
public CancellationTokenSource();
//获取是否已请求取消此 System.Threading.CancellationTokenSource
public bool IsCancellationRequested { get; }
//获取与此 System.Threading.CancellationTokenSource 关联的 System.Threading.CancellationToken
public CancellationToken Token;
//传达取消请求。
public void Cancel();
//传达对取消的请求,并指定是否应处理其余回调和可取消操作。
public void Cancel(bool throwOnFirstException);
...
}
这个对象包含了管理取消有关的所有状态。构造好一个CancellationTokenSource
(引用类型)之后,可以从它的Token属性
获得一个或多个CancellationToken(值类型)实例
,并传给你的操作,使那些操作可以取消。以下是CancellationToken值类型
最有用的一些成员:
public struct CancellationToken //一个值类型
{
// 获取此标记是否能处于已取消状态,
// 由非通过Task来调用(invoke)的一个操作调用(call)
public bool IsCancellationRequested { get; }
// 如果已请求取消此标记,则引发 System.OperationCanceledException,
// 由通过Task来调用的操作调用
public void ThrowIfCancellationRequested();
// 获取在取消标记时处于有信号状态的
// CancellationTokenSource 取消时,WaitHandle会收到信号
public WaitHandle WaitHandle { get; }
//返回空 CancellationToken 值。
public static CancellationToken None;
// 判断此CancellationToken是否是特殊的None实例, 特殊的是不允许取消操作
public bool CanBeCanceled{ get; }
//注册一个将在取消此 System.Threading.CancellationToken 时调用的委托。省略了简单重载版本
public CancellationTokenRegistration Register(Action<object> callback, object state, bool useSynchronizationContext);
//省略了GetHashCode、Equals成员
}
CancellationToken实例
是一个轻量级的值类型
,它包含单个私有字段:对它的CancellationTokenSource对象
的一个引用。在一个计算限制操作的循环中,可以定时调用CancellationToken
的IsCancellationRequested属性
,了解循环是否应该提前终止,进而终止计算限制的操作。当然,提前终止的好处在于,CPU不再需要把时间浪费在你对其结果已经不感兴趣的一个操作上。现在,用一些示例代码演示一下:
class Program
{
static void Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
// 将CancellationToken和"要循环到的目标数"传入操作中
ThreadPool.QueueUserWorkItem(o => Count(cts.Token, 1000));
Console.WriteLine("Press <Enter> to cancel the operation.");
Console.ReadLine();
cts.Cancel(); // 如果Count方法已返回,Cancel没有任何效果
// Cancel立即返回,方法从这里继续运行
Console.ReadLine();
}
private static void Count(CancellationToken token, Int32 countTo)
{
for (Int32 count = 0; count < countTo; count++)
{
//判断是否接收到了取消任务的信号
if (token.IsCancellationRequested)
{
Console.WriteLine("Count is cancelled");
break; // 退出循环以停止操作
}
Console.WriteLine(count);
Thread.Sleep(200); // 出于演示浪费一点时间
}
Console.WriteLine("Count is done");
}
}
注意:如果要执行一个不允许被取消的操作,可以向该操作传递通过调用CancellationToken
的静态None属性
返回的CancellationToken(特殊的实例, 它不与任何CancellationTokenSource关联,实例的私有字段为null)
。 由于没有CancellationTokenSource
, 所以没有代码能调用Cancel
.如果查询token.IsCancellationRequested
则总是返回false
.
此时, 查询CancellationToken
的CanBeCanceled
属性,会返回false, 相反, 查询正常实例对象的此属性,都会返回true.
如果愿意,可以调用Register方法
登记一个或多个在取消一个CancellationTokenSource
时调用的方法。每个回调方法都用CancellactionToken
的Register
方法来登记的。要向这个方法传递一个Action<Object>委托
;一个要通过委托传给回调的状态;以及一个Boolean值(名为useSynchronizationContext
),该值指定了是否要使用调用线程的SynchronizationContext(同步上下文)
来调用委托。
如果为
useSynchronizationContext
参数传递的是false
,那么调用Cancel
的线程会顺序调用已登记的所有方法(post)。如果为
useSynchronizationContext
参数传递的是true
,那么回调(方法)会被send(而不是post) 给已捕捉的SynchronizationContext
对象,由同步上下文对象
决定由哪个线程调用回调方法。
说明:如果执行send操作,要等到目标线程那里处理完毕之后才会返回。再次期间,调用线程会被阻塞。这相当于同步调用。而如果执行post操作,是指将东西post到一个队列中便完事,调用线程可以立即返回。相当于异步调用。
注意: 向被取消的CancellationTokenSource
登记一个回调方法, 将由调用Register
的线程调用回调方法. 如果为useSynchronizationContext
参数传递了true
值, 就可能要通过调用线程的SynchronizationContext(同步上下文进行)
.
如果多次调用Regiser
,那么多个回调方法都会调用。这些回调方法可能抛出未处理的异常。如果调用CancellationTokenSource
的Cancel
方法,
向它传递
true
,那么抛出了未处理异常的第一个回调方法会阻止其他回调方法的执行,抛出的异常也会从Cancel中抛出。如果调用
Cancel
并向它传递false
,那么登记的所有回调方法都会调用。所有未处理的异常都会添加到一个集合中。所有回调方法都执行后,如果其中任何一个抛出一个未处理的异常,Cancel就会抛出一个AggregateException
,该异常实例的InnerException
属性会被设为已抛出的所有异常对象的一个集合。如果以登记的所有回调方法都没有抛出异常,那么Cancel
直接返回,不抛出任何异常。
没有办法将
AggregateException
的InnerException
集合中的一个异常对象和特定操作对应起来; 你只知道某个操作出错, 并通过异常类型知道出了什么错. 要跟踪错误的具体位置, 需要检查异常对象的StackTrace
属性, 并手动扫描你的源代码.
CancellactionToken
的Register方法
返回一个CancellationTokenRegistration
,如下所示:
public struct CancellationTokenRegistration : IEquatable<CancellationTokenRegistration>, IDisposable
{
public void Dispose();
.......
}
可调用Dispose
从关联的CancellationTokenSource
中删除一个已登记的回调方法;这样一来,在调用Cancel
时,便不会再调用这个回调。以下代码演示了如何向一个CancellationTokenSource
登记两个回调:
private static void Register()
{
var cts = new CancellationTokenSource();
// 向CancellationTokenSource注册 取消时的回调方法
cts.Token.Register(() => Console.WriteLine("Canceled 1"));
cts.Token.Register(() => Console.WriteLine("Canceled 2"));
// 出于测试目的,让我们取消它,以便执行两个回调
cts.Cancel();
}
//输出:
// Canceled 2
// Canceled 1
最后,可通过链接(CreateLinkedTokenSource
)另一组的CancellationTokenSource
来新建一个CancellationTokenSource
对象。任何一个链接的CancellationTokenSource
被取消,这个CancellationTokenSource对象
就会被取消。以下代码对此进行的演示:
class Program
{
static void Main(string[] args)
{
// 创建一个 CancellationTokenSource
var cts1 = new CancellationTokenSource();
cts1.Token.Register(() => Console.WriteLine("cts1 canceled"));
// 创建另一个 CancellationTokenSource
var cts2 = new CancellationTokenSource();
cts2.Token.Register(() => Console.WriteLine("cts2 canceled"));
// 创建新的CancellationTokenSource,它在 cts1 o或 ct2 is 取消时取消
var ctsLinked = CancellationTokenSource.CreateLinkedTokenSource(cts1.Token, cts2.Token);
ctsLinked.Token.Register(() => Console.WriteLine("linkedCts canceled"));
// 取消其中一个 CancellationTokenSource objects (这里选择了 cts2)
cts2.Cancel();
// 显示哪个 CancellationTokenSource objects 被取消 了
Console.WriteLine("cts1 canceled={0}, cts2 canceled={1}, ctsLinked canceled ={2}",
cts1.IsCancellationRequested, cts2.IsCancellationRequested, ctsLinked.IsCancellationRequested);
Console.ReadLine();
}
}
// 输出结果:
// linkedCts canceled
// cts2 canceled
// cts1 canceled=False, cts2 canceled=True, ctsLinked canceled =True
在很多情况下,我们需要在过一段时间之后才取消操作. 例如, 服务器应用程序会根据客户端的请求而开始请求计算. 但必须在2秒钟之内有响应, 无论此时工作是否已经完成. 有的时候, 与其等待漫长时间获得一个完整的结果, 还不如在短时间内报错, 或者用部分计算好的结果进行响应.
幸好, CancellationTokenSource
提供了在指定时间后自动取消的机制, 为了利用这个机制,
- 要么用接收延时参数的构造器构造一个
CancellationTokenSource
对象 - 要么调用
CancellationTokenSource
的CancelAfter
方法.
public sealed class CancellationTokenSource : IDisposable // 一个引用类型
{
public CancellationToken(Int32 millisecondsDelay);
public CancellationToken(TimeSpan delay);
public void CancelAfter(Int32 millisecondsDelay);
public void CancelAfter(TimeSpan delay);
...
}
任务
调用ThreadPool
的QueueUserWorkItem
方法来发起一次异步的受计算限制的操作是非常简单的。然而。这个技术存在许多限制。最大的问题是没有一个内建的机制让你知道操作在什么时候完成,也没有一个机制在操作完成时获得一个返回值。为了克服这些限制并解决一些其它问题,Microsoft引入了任务(Task) 的概念。我们通过System.Treading.Tasks
命名空间中的类型来使用它们。
所以, 不是调用ThreadPool.QueueUserWorkItem
方法, 而是用任务来做相同的事情.
// 旧的方式
ThreadPool.QueueUserWorkItem(ComputeBoundOp, 5); // 调用QueueUserWorkItem
// 任务概念, 新的方式
new Task(ComputeBoundOp, 5).Start(); // 用Task来做相同的事情
// 等价写法
Task.Run(()=> ComputeBoundOp(5));
在上述代码中,创建了Task对象
,并立即调用Start方法
来调度该任务方法。当然,也可以先创建好Task对象
,以后在调用Start方法
。
创建一个Task
的方式总是调用构造器,传递一个Action
或者Action<Object>委托
。这个委托就是你想执行的操作。如果传递期待一个Object的方法,还必须向Task的构造器传递最终想传给操作的实参。还可以选择向Task的构造器传递一个CancellationToken
,这便允许Task在调度之前取消。
还可以选择向构造器传递一些TaskCreationOptions标志
来控制Task
的执行方式。TaskCreationOptions
是一个枚举类型,定义了一组可按位OR
到一起的标志。它的定义如下:
[FlagsAttribute, SerializableAttribute]
public enum TaskCreationOptions
{
// 指定应使用默认行为
None = 0x0,
// [提议] TaskScheduler(任务调度器) 以一种尽可能公平的方式安排任务,这意味着较早安排的任务将更可能较早运行,而较晚安排运行的任务将更可能较晚运行。
// 造成默认的TaskScheduler(任务调度器) 将线程池中的任务放到全局队列中,而不是放到一个工作者线程的本地队列中
PreferFairness = 0x1,
// [提议] TaskScheduler(任务调度器)应尽可能地创建线程池线程
// 指定某个任务将是长时间运行、粗粒度的操作(最好是创建线程池线程)。
LongRunning = 0x2,
// [提议总是被采纳]: 将一个任务和它的父Task关联。
AttachedToParent = 0x4,
// [提议总是被采纳]: 如果一个任务试图和这个父任务连接, 它就是一个普通任务,而不是子任务
DenyChildAttach = 0x8,
// [提议总是被采纳]: 强迫子任务使用默认调度器而不是父任务的调度器
HideScheduler = 0x10
}
大多是标志只是一些提议而已,TaskScheduler
在调度一个Task
时,可能会也可能不会采纳这些提议。不过,AttacedToParent标志
总是得到采纳,因为它和TaskScheduler
本身无关。
等待任务完成并获取结果
对于任务,可以等待它完成,然后获取它们的结果。假定有一个Sum方法,在n值很大的时候,它要执行较长的时间:
private static Int32 Sum(Int32 n)
{
Int32 sum = 0;
for (; n > 0; n--)
checked { sum += n; } //如果n太大,这一行代码会抛出异常
return sum;
}
现在可以构造一个Task<TResult>对象
(派生自Task
),并为泛型TResult参数
传递计算限制操作的返回类型。在开始任务后,可以等待它完成并获取结果,如以下代码所示:
class Program
{
static void Main(string[] args)
{
// 创建 Task, 现在还没开始运行
Task<Int32> t = new Task<Int32>(n => Sum((Int32)n), 10000);
// 启动任务
t.Start();
// 可以选择显式的等待任务完成
t.Wait();
// 可获得结果(Result属性内部会调用Wait)
Console.WriteLine("The sum is: " + t.Result); //一个Int32的值
Console.ReadLine();
}
private static Int32 Sum(Int32 n)
{
Int32 sum = 0;
for (; n > 0; n--) checked { sum += n; } //如果n太大,这一行代码会抛出异常
return sum;
}
}
Result属性内部会调用Wait. 如果计算限制的任务抛出一个未处理的异常,异常会被”吞噬”并存储到一个集合中,而线程池线程允许返回到线程池中。调用Wait方法
或者Result属性
时,这些成员会抛出一个System.AggregateException
对象。
一个线程调用Wait方法
时,系统会检查系统要等待的Task是否已开始执行。
- 如果是,调用Wait的线程会阻塞,直到Task运行结束为止。
- 如果
Task
还没有开始执行,系统可能(取决于TaskSecheduler
)使用调用Wait
的线程来执行Task
。如果发生这种情况,那么调用Wait的线程不会阻塞;它会执行Task并立即返回。这样做的好处在于,- 没有线程会被阻塞,所以减少了资源的使用(因为不需要创建一个线程来替代被阻塞的线程),
- 并提升了性能(因为不需要花时间创建一个线程,也没有上下文切换)。
- 但是不好的地方在于,假如线程在调用
Wait
前已经获得了一个线程同步锁,而Task
试图获取同一个锁,就会造成一个死锁的线程。
AggregateException类型
封装了异常对象的一个集合(如果父任务生成了多个子任务,而多个子任务都抛出异常,这个集合便有可能包含多个异常)。该类型有一个InnerExceptions属性
,它返回一个ReadOnlyCollection<Excepyion>
对象。不要混淆InnerExceptions
(有s)和InnerException
(没有s)属性,后者是AggregateException类
从System.Exception
基类继承来的。对于上例来说,AggregateException
的InnerExceptions
属性的元素0
将引用由计算限制方法(Sum)抛出的实际System.OverflowException
对象。
为方便编码,AggregateException
重写了Exception
的GetBaseException
方法。AggregateException
的这个实现会返回作为问题根源的最内层的AggregateException
。AggregateException
还提供了一个Flatten
方法,它创建一个新的AggregateException
,其InnerExceptions
属性包含一个异常列表,其中的异常是通过遍历原始AggregateException
的内层异常层次结构而生成的。最后,AggregateException
还提供了一个Handle
方法,它为AggregateException
中包含的每个异常都调用一个回调方法,然后,回调方法可以为每个在调用Handle
之后,如果至少有一个异常没有处理,就创建一个新的AggregateException
对象,其中只包含未处理的异常,并抛出这个新的AggregateException
对象。
除了等待单个任务, Task类
还提供了两个静态方法, 允许线程等待一个Task对象数组.
其中Task的静态WaitAny方法
会阻塞调用线程, 直到数组中的任何一个Task对象完成. 方法返回Int32数组索引值, 指明完成的是哪个Task
对象. 方法返回后, 线程被唤醒并继续运行, 如果发生超时, 方法返回-1
. 如果WaitAny通过CancellationToken
取消, 会抛出一个OpreationCanceledException
异常.
类似的,Task类还提供了静态WaitAll方法
,它阻塞调用线程,直到数组中所有的Task对象都完成。如果Task对象都完成,WaitAll方法
返回true
。如果发生超时,就返回false
。如果WaitAll
通过一个CancellationToken
而取消,会抛出一个OpreationCanceledException
。
取消任务
可以用一个CancellationTokenSource
取消一个Task
。首先,我们必须修订前面的Sum方法,让它接受一个CancellationToken
参数:
private static Int32 Sum(CancellationToken ct, Int32 n)
{
Int32 sum = 0;
for (; n > 0; n--)
{
// 定时检查操作是否已取消
// 在取消标志引用的CancellationTokenSource上如果调用Cancel,
// 下面这一行就抛出OpreationCanceledException
ct.ThrowIfCancellationRequested();
checked { sum += n; } //如果n太大,这一行代码会抛出异常
}
return sum;
}
在上述代码中,在计算限制的循环中,我们通过调用CancellationToken
的ThrowIfCancellationRequested
方法来定时检查操作是否已取消。这个方法和CancellationToken
的IsCancellationRequested属性
相似。如果CancellationTokenSource
已经取消,ThrowIfCancellationRequested
会抛出一个OpreationCanceledException
。之所以选择抛出一个异常,是因为和ThreadPool
的QueueUserWorkItem
方法初始化的工作项(work item)不同,任务有办法表示完成,甚至还有一个返回值。所以,需要采取一种方式将已完成的任务和出错的任务区分开。而让任务抛出一个异常,就可以知道任务没有一直运行到结束。
现在,我们向下面这样创建CancellationTokenSource
和Task对象
:
static void Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
Task<Int32> t = new Task<Int32>(() => Sum(cts.Token, 10000), cts.Token);
t.Start();
// 在之后的某个时间,取消CancellationTokenSource以取消Task
cts.Cancel();
try
{
// 如果任务已经取消,Result会抛出一个AggregateException
Console.WriteLine("The sum is: " + t.Result); // An Int32 value
}
catch (AggregateException ae)
{
// 将任何OperationCanceledException对象都视为已处理
// 其他任何异常都造成抛出一个新的AggregateException,其中
// 只包含未处理的异常
ae.Handle(e => e is OperationCanceledException);
// 所有的异常都处理好之后,执行下面这一行
Console.WriteLine("Sum was canceled");
}
Console.ReadLine();
}
private static Int32 Sum(CancellationToken ct, Int32 n)
{
Int32 sum = 0;
for (; n > 0; n--)
{
// 定时检查操作是否已取消
// 在取消标志引用的CancellationTokenSource上如果调用Cancel,
// 下面这一行就抛出OpreationCanceledException
ct.ThrowIfCancellationRequested();
checked
{
sum += n;
} //如果n太大,这一行代码会抛出异常
}
return sum;
}
创建一个Task
时,可以将一个CancellationToken
传给Task
的构造器,从而将这个CancellationToken
和该Task
关联起来。
- 如果
CancellationToken
在Task
调度前取消,Task会被取消,永远不会执行。- 一个任务还没开始就试图取消它,会抛出
InvalidOperationException
- 一个任务还没开始就试图取消它,会抛出
- 如果
Task
已经调度(通过调用Start
方法),那么Task
为了允许它的操作在执行期间取消,Task
的代码必须显式支持取消。- 调用静态的
Run方法
会自动创建Task对象
并立即调用Start
- 调用静态的
遗憾的是,虽然Task对象
关联了一个CancellationToken
,但没有办法访问它。因此,必须通过某种方式,在Task
的代码本身中获得用于创建Task
对象的同一个CancellationToken
。为了写这样的代码,最简单的方法就是使用一个lambda表达式,并将CancellationToken
作为一个闭包变量”传递”(就像上例所示)。
闭包
定义: 在C#中,闭包是由匿名函数来表示的。C#中的闭包也叫做捕获的变量。当一个匿名函数引用了他所在作用域(一般情况下是一个方法)的局部变量时,为了能够顺利的执行匿名函数而不至于包含它的函数执行完之后线程栈弹出导致局部变量消失,会将这个变量的生命周期延长。这时就形成了闭包。闭包利用了匿名函数的一个特性:因为编译器会为匿名函数生成一个类(或结构),所以,提升匿名函数捕获的这个变量的生命周期的方法就是在把这个变量放到这个类中。此外,这个类中定义的方法既是这个匿名函数。
// for循环中的闭包陷阱
Action[] actions = new Action[5];
// for中的局部变量i被提升了生命周期
for (int i = 0; i < 5; i++)
{
// 编译器有提示关于变量i : Access to modified closure
// 修改闭包的访问
actions[i] = () => Console.WriteLine(i);
}
// 此时i=5;
foreach (var action in actions)
{
// 输出5个5
action();
}
任务完成时自动启动新任务
要写可伸缩的软件,一定不能使你的线程阻塞。这意味着如果调用Wait
,或者在任何尚未完成时查询任务的Result属性
,极有可能 造成线程池创建一个新线程,这增大了资源的消耗,并损害了伸缩性。幸好,有更好的方式知道一个任务在什么时候结束运行。一个任务完成时,它可以启动另一个任务。下面重写了前面的代码,它不会阻塞线程:
// 创建 Task, 推迟启动它, 继续另一个任务
Task<Int32> t = new Task<Int32>(n => Sum((Int32) n), 10000);
// ContinueWith 返回一个 Task 但一般不再关心这个对象
Task cwt = t.ContinueWith(task => Console.WriteLine("The sum is: " + task.Result));
现在,当执行Sum的任务完成后,这个任务会启动另一个任务(也在某个线程池线程上)以显示结果。执行上述代码的线程不会进入阻塞状态并等待这个两个任务中的任何一个完成。相反,线程可以执行其它代码。如果线程本身就是线程池线程,它可以返回到池中,以执行其他操作。注意,执行Sum的任务可能在调用ContinueWith
之前完成。但这不是一个问题,因为ContinueWith
方法会看到Sum任务已经完成,会立即启动显示结果的任务。
另外,注意ContinueWith
会返回新的Task对象
的一个引用(在上述代码中, 放入了cwt变量中)。当然,可以用这个Task
对象调用各种成员(比如Wait
,Result
,甚至ContinueWith
),但你一般都是忽略这个Task对象,不把它的引用保存到一个变量中。
Task
对象内部包含了ContinueWith任务
的一个集合。所以,实际上可以用一个Task
对象来多次调用ContinueWith
。任务完成时,所有ContinueWith任务
都会进入线程池的队列中。此外,调用ContinueWith
时,可以传递对一组TaskContinuationOptions枚举值
进行按位OR运行的结果。前4个标志(None
,PreferFairness
,LongRunning
和AttachToParent
)与早先描述的TaskCreationOptions
枚举类型提供的标志完全一致,下面是TaskContinuationOptions
类型的定义:
[System.FlagsAttribute, System.SerializableAttribute]
public enum TaskContinuationOptions
{
// [默认]
None = 0x00000,
// [提议] 希望任务尽快运行
PreferFairness = 0x00001,
// [提议] 应尽可能地创建线程池线程
LongRunning = 0x00002,
// [提议总是被采纳] 将一个任务和它的父Task关联
AttachedToParent = 0x00004,
// 任务和这个父任务链接将抛出异常
DenyChildAttach = 0x00008,
// 强迫子任务使用默认调度器而不是父任务的调度器
HideScheduler = 0x00010,
// 除非前置任务完成,否则禁止延续任务完成(取消)
LazyCancellation = 0x00020,
// 这些标志指出在什么情况下运行ContinueWith任务
// 指定不应在延续任务前面的任务已完成运行的情况下安排延续任务。 此选项对多任务延续无效。
NotOnRanToCompletion = 0x10000,
//指定不应在延续任务前面的任务引发了未处理异常的情况下安排延续任务。 此选项对多任务延续无效。
NotOnFaulted = 0x20000,
//指定不应在延续任务前面的任务已取消的情况下安排延续任务。 此选项对多任务延续无效。
NotOnCanceled = 0x40000,
// 这些标志是以上三种标志的便利组合
//指定只应在延续任务前面的任务已完成运行的情况下才安排延续任务。 此选项对多任务延续无效。
OnlyOnRanToCompletion = 0x60000,
//指定只有在延续任务前面的任务引发了未处理异常的情况下才应安排延续任务。 此选项对多任务延续无效。
OnlyOnFaulted = 0x50000,
//指定只应在延续任务前面的任务已取消的情况下安排延续任务。此选项对多任务延续无效。
OnlyOnCanceled = 0x30000,
//指定应同步执行延续任务。
// 指定此选项后,延续任务将在导致前面的任务转换为其最终状态的相同线程上运行。 如果在创建延续任务时已经完成前面的任务,则延续任务将在创建此延续任务的线程上运行。 只应同步执行运行时间非常短的延续任务。
//-------------------------------------------
// 这个标志指出你希望由执行第一个任务的线程执行ContinueWith任务,
// 第一个任务完成后调用
// ContinueWith的线程接着执行ContinueWith任务
// 这里是指同步执行, 两个任务在使用同一个线程一前一后的执行
ExecuteSynchronously = 0x80000,
}
调用ContinueWith
时,可以指定你希望新任务只有在第一个任务被取消时才运行,这是使用TaskContinuationOptions.OnlyOnCanceled
标志来实现。
类似的TaskContinuationOptions.OnlyOnFaulted
标志指定新任务只有在第一个任务抛出未处理的异常时才执行.
当然还可使用TaskContinuationOptions.OnlyOnRanToCompletion
标志指定新任务只有在第一个任务顺利完成(中途没有取消,也没有抛出未处理异常)时才执行.
默认情况下,如果没有指定上述任何标志,新任务无论如何都会执行下去,不管第一个任务是如何完成的。一个Task完成时,它的所有未运行(不满足前面说的各种条件)的延续任务都会自动取消。下面用一个例子演示所有这些概念。
CancellationTokenSource cts = new CancellationTokenSource();
Task<Int32> t = new Task<Int32>(() => Sum(cts.Token, 10000), cts.Token);
t.Start();
// 每个 ContinueWith 都返回一个 Task,但你不必关心这些Task对象
t.ContinueWith(task => Console.WriteLine("The sum is: " + task.Result),
TaskContinuationOptions.OnlyOnRanToCompletion);
t.ContinueWith(task => Console.WriteLine("Sum threw: " + task.Exception),
TaskContinuationOptions.OnlyOnFaulted);
t.ContinueWith(task => Console.WriteLine("Sum was canceled"),
TaskContinuationOptions.OnlyOnCanceled);
// 如果注释这行代码, 则只输出 The sum is: 50005000
// 否则只输出: Sum was canceled
cts.Cancel();
任务可以启动子任务
最后,任务支持父/子关系,如下代码所示:
static void Main(string[] args)
{
Task<Int32[]> parent = new Task<Int32[]>(() =>
{
var results = new Int32[3]; // 创建数组来存储结果
// 这个任务创建并启用了3个子任务
new Task(() => results[0] = Sum(10000), TaskCreationOptions.AttachedToParent).Start();
new Task(() => results[1] = Sum(20000), TaskCreationOptions.AttachedToParent).Start();
new Task(() => results[2] = Sum(30000), TaskCreationOptions.AttachedToParent).Start();
// 返回对数组的一个引用(即使数组元素可能还没有初始化)
return results;
});
// 父任务及其子任务运行完成后, 用一个延续任务显示结果
var cwt = parent.ContinueWith(parentTask => Array.ForEach(parentTask.Result, Console.WriteLine));
// 启动父任务, 便于启动它的子任务
parent.Start();
Console.ReadLine();
}
在本例子中,父任务
创建并启用3个Task对象
。默认情况下,一个任务创建的Task对象是顶级任务,这些任务与创建它们的那个任务无关。然而,TaskContinuationOptions.AttachedToParent
标志将一个Task和创建它的那个Task关联起来,结果是除非所有子任务结束运行,否则父任务不会认为已经结束。调用ContinueWith
方法创建一个Task
时,可以指定TaskContinuationOptions. AttachedToParent
标志将延续任务指定的一个子任务
, 用于控制台循环输出结果。
任务内部揭秘
每个Task对象
都有一组构成任务状态的字段。
- 有一个
Int32 ID(Task的只读Id属性)
、 - 代表
Task
执行状态的一个Int32
、 - 对父任务的一个引用、
- 对
Task
创建时指定的TaskScheduler
的一个引用、 - 对
回调方法
的一个引用、 - 对要传给回调方法的
对象
的一个引用(可通过Task
的只读AsynState
属性查询)、 - 对一个
ExecutionContext(执行上下文)
的引用 - 以及对一个
ManualResetEventSlim
对象的引用。 - 除此之外,每个Task对象都有对根据需要创建的补充状态的一个引用。
- 包含一个
CancellationToken
- 一个
ContinueWithTask
对象集合 - 为抛出未处理异常的子任务而准备的一个Task对象集合等.
- 包含一个
如果不需要任务提供的附加功能,那么使用ThreadPool.QueueUserWorkItem
,资源的使用效率上会更高一些。
Task
和Task<TResult>
类实现了IDisposable
接口,允许你在用完Task
对象后调用Dispose
。如今,所有Dispose方法
所做的都是关闭ManuaResetEventSlim对象
。
然而,可以定义从Task
和Task<Result>
派生的类,在这些类中分配它们自己的资源,并在它们重写的Dispose
方法中释放这些资源。当然,大多数开发人员都不会在自己的代码中显式的为一个Task
对象调用Dispose
;他们只让垃圾回收器回收任何不再需要的资源。
在每个Task
对象中,都包含代表Task
唯一ID的一个Int32字段。创建一个Task对象时,字段会被初始化为零。第一次查询Task的只读ID属性,属性将一个唯一Int32值分配给该字段,并从属性中返回它。TaskID从1开始,每分配一个ID都会递增1. 在Visual Studio调试器中查看一个Task对象,会造成调试器显示Task的ID,从而造成为Task分配一个ID。
这个ID的意义在于,每个Task都可以用一个唯一的值来标识。事实上,Visual Studio会在它的”并行任务”和”并行堆栈”窗口中会显示这个任务ID。但是,由于不在自己的代码中分配ID,所以几乎不可能将这个ID和代码正在做的事联系起来。运行一个任务的代码时,可以查询Task
的静态CurrenId
属性,它返回一个可空的Int32(Int32?)
。还可以在调式期间,在Vasul Studio的”监视”或”即时”窗口中调用它,以便获得当前正在调试的代码的ID。然后,可以在”并行任务”和”并行堆栈”窗口中找到自己的任务。如果当前没有任务正在执行,查询CurrenId
属性会返回null。
一个Task
对象存在期间,可查询Task
的只读Status
属性了解它在其生存期的什么位置。这个属性返回一个TaskStatus
值,定义如下:
public enum TaskStatus
{
//这些标志指出了一个Task在其生命周期内的状态
// 任务已显式创建,可以手动Start()这个任务
Created,
// 任务已隐式创建,会自动开始
WaitingForActivation,
// 任务已调度,但尚未运行
WaitingToRun,
// 任务正在运行
Running,
// 任务正在等待它的子任务完成,子任务完成后它才完成
WaitingForChildrenToComplete,
// 一个任务的最终状态是以下三种之一
// 已成功完成执行的任务
RanToCompletion,
// 该任务已通过对其自身的 CancellationToken引发 OperationCanceledException
// 对取消进行了确认,此时该标记处于已发送信号状态;
// 或者在该任务开始执行之前,已向该任务的 CancellationToken 发出了信号
Canceled,
// 由于未处理异常的原因而完成的任务
Faulted
}
- 首先构造一个Task对象时,它的状态是
Created
。 - 以后,任务启动时,它的状态变为
WaitngToRun
。 - Task在一个线程上运行时,它的状态就变成了
Running
。 - 任务停止运行,并等待它的任何子任务时,状态变成
WaitingForChildrenToComplete
。 任务完全结束时,它会进入以下三种状态的一种:
RanToCompletion
(运行完成)、Canceled
(取消)或Faulted
(出错)。一个Task
运行完成时,可通过 Task<TResult>
的Result
属性来查询任务的结果。- 一个Task或者Task
出错时,可以查询 Task
的Exception
属性来获得任务抛出的未处理的异常:该属性总是返回一个AggregateException
对象,对象的InnerExceptions
集合包含了所有未处理的异常。
为简化编码,Task
提供了几个只读的Boolean属性:IsCanceled
,IsFaulted
和IsCompleted
。注意,当Task处于RanToCompleted
,Canceled
或者Faulted
状态时,IsCompleted
返回true
。为了判断一个Task是否成功完成,最简单的办法就是使用如下所示的代码:
if (task.Status == TaskStatus.RanToCompleted )
如果Task是通过调用以下某个函数来创建的,这个Task对象就处于WaitingForActivation
状态:
ContinueWith
ContinueWithAll
ContinueWithAny
或者FromAsync
等方法来创建。
如果通过构造一个TaskCompletionSource<TResult>
(任务基础结构)对象[表示未绑定到委托的 Task<TResult> 的制造者方,并通过 Task 属性提供对使用者方的访问]
创建一个Task,该Task在创建时也处于WaitingForActivation
状态。这个状态意味着该Task
的调度由任务基础结构
控制。
例如,不能显式启动一个通过ContinueWith
创建的对象。这个Task
会在它的前置任务(antecedent task) 执行完毕后自动开始。
任务工厂
有的时候需要创建一组共享相同配置的Task
对象。为了避免机械地将相同的参数传给每一个Task的构造器,可以创建一个任务工厂来封装通用的配置,System.Threding.Tasks
命名空间定义了一个TaskFactory
类型和一个TaskFactory<TResult>
类型。两个类型都派生自System.Object
;也就是说,它们是平级的。
如果要创建的是一组没有返回值的任务,那么要构造一个TaskFactory
;如果要创建的是一组有一个特定返回值的任务,那么要构造一个TaskFactory<TResult>
,并通过泛型TResult实参
来传递任务的返回类型。创建任何任务工厂类时,要向它的构造器传递一些默认值。工厂创建的任务都将具有这些默认值。具体的说,要向任务工厂
传递你希望希望任务具有的CancellationToken
,TaskScheduler
,TaskCreationOptions
和TaskContinuationOptions
设置。
以下实例代码演示了如何使用一个TaskFatory:
class Program
{
static void Main(string[] args)
{
// 创建一个父Task
Task parent = new Task(() =>
{
var cts = new CancellationTokenSource();
var tf = new TaskFactory<Int32> // 创建一个任务工厂, 通用设置如下
(
cts.Token, // 传入CancellationTokenSource任务基础结构控制的CancellationToken对象
TaskCreationOptions.AttachedToParent, // 由TaskFactory创建的任务都视为其 子任务
TaskContinuationOptions.ExecuteSynchronously, // 指定由TaskFactory创建的任务都以同步方式执行
TaskScheduler.Default // 都使用默认调度器
);
// 创建并启动3个子任务
// 每个Task对象都共享相同的CancellationTokenSource标记
// 任务都被视为parent的子任务
var childTasks = new[]
{
// 通过任务工厂的StartNew方法创建任务并启动
tf.StartNew(() => Sum(cts.Token, 10000)),
tf.StartNew(() => Sum(cts.Token, 20000)),
tf.StartNew(() => Sum(cts.Token, Int32.MaxValue)) // 将最大值传入,计算之后会超出,抛异常
};
// 通过循环告诉每个子任务, 如果抛出未处理异常,
// 就取消其他仍在运行的所有子任务
for (Int32 task = 0; task < childTasks.Length; task++)
childTasks[task].ContinueWith // 完成指定任务并达到OnlyOnFaulted条件之后, 进行后续的特定任务
(
t => cts.Cancel(), // 特定任务: 取消其他仍在运行的所有子任务
TaskContinuationOptions.OnlyOnFaulted // 条件: 指定任务完成失败
);
// ContinueWhenAll当所有子任务完成后,启动一个特定任务,
// 由于是通过任务工厂创建的, 需要覆盖CancellationToken.None来覆盖它的父子任务设定
// 使该任务在取消父任务时, 不作为子任务一起取消.
// 当处理所有结果的任务完成后, 创建另一个后续任务来显示从所有子任务中返回的最大值
tf.ContinueWhenAll(
childTasks, // 传入子任务数组
completedTasks =>
completedTasks.Where(t => t.Status == TaskStatus.RanToCompletion).Max(t => t.Result),
CancellationToken.None) // 子任务都完成(没有异常和取消)后, 获取返回最大值
.ContinueWith(
t => Console.WriteLine("最大的是: " + t.Result), // 将最大值输出的后续任务
TaskContinuationOptions.ExecuteSynchronously // 设定为同步执行延续任务
).Wait(); // 等待只是为了测试
});
// 子任务完成后, 也显示任何未处理的异常
parent.ContinueWith(p =>
{
// 我将所有文本放到一个StringBuilder中, 并只调用Console.WriteLine一次
// 因为这个任务可能和上面的任务并行执行, 而我不希望任务的输出变的不连续
StringBuilder sb = new StringBuilder("发生了以下异常:" + Environment.NewLine);
foreach (var e in p.Exception.Flatten().InnerExceptions)
sb.AppendLine(" " + e.GetType().ToString());
Console.WriteLine(sb.ToString());
}, TaskContinuationOptions.OnlyOnFaulted);
// 启动父任务, 使它能启动子任务
parent.Start();
try
{
parent.Wait(); // 出于测试目的
}
catch (AggregateException)
{
}
// 最大的是: 200010000
// 发生了以下异常:
// System.OverflowException
}
}
然后创建一个数组,其中包含了3个子Task
对象,所有都是通过TaskFactory
的StartNew
方法来创建的。使用这个方法,可以方便的创建并启动每个子任务。在一个循环中,告诉每个子任务,如果抛出一个未处理的异常,就会取消其它仍在运行的所有子任务。最后,在TaskFacroty
上调用ContinueWithAll
,它创建一个在所有子任务都结束后运行的一个Task
。由于这个任务是用TaskFactory
创建的,所以它仍然被视为父任务的一个子任务,会使用默认的TaskScheduler同步执行
。然而,希望即使其他子任务被取消,也要运行这个任务。因此,我传递CancellationToken.None
来覆盖TaskFactory
的CancellationToken
。这会造成该任务完全不能取消。 最后,当处理所有结果的任务完成后,创建另一个任务来显示从所有子任务中返回的最大值。
注意: 调用TaskFactory
或TaskFactory<TResult>
的静态ContinueWhenAll
和ContinueWhenAny
方法时, 以下TaskContinuationOption
标志是不允许的:
NotOnRanToCompletion
NotOnFaulted
NotOnCanceled
OnlyOnFaulted
OnlyOnCanceled
OnlyOnRanToCompletion
也就是说, 无论前置任务是如何完成的, ContinueWhenAll
和ContinueWhenAny
方法都会执行延续任务.
任务调度器
任务基础结构
是很灵活的,其中TaskScheduler
对象功不可没。TaskScheduler
对象负责执行调度的任务,同时向Visual Studio 调试器公开任务信息。FCL提供了两个派生自TaskScheduler
的类型:
- 线程池任务调度器(thread pool task scheduler)
- 同步上下文任务调度器(synchronization context task scheduler)
默认情况下,所有应用程序使用的都是线程池任务调度器
。这个任务调度器将任务调度给线程池的工作者线程,将在后面进行更详细的讨论。可以查询TaskScheduler
的静态Default属性
来获得对默认任务调度器的一个引用。
同步上下文任务调度器
通常用于Windows窗体、WPF和Silverlight应用程序。这个任务调度器将所有任务都调度给应用程序的GUI线程,使所有任务代码都能成功更新UI,比如按钮。菜单项等。同步上下文任务调度器不使用线程池。可以查询TaskScheduler
的FromCurrentSynchronizationContext
方法来获取对一个同步上下文任务调度器的引用
。
private sealed class MyForm : System.Windows.Forms.Form
{
private readonly TaskScheduler m_syncContextTaskScheduler;
public MyForm()
{
// 获得一个堆上下文任务调度器的引用
m_syncContextTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
Text = "Synchronization Context Task Scheduler Demo";
Visible = true;
Width = 400;
Height = 100;
}
private CancellationTokenSource m_cts;
protected override void OnMouseClick(System.Windows.Forms.MouseEventArgs e)
{
if (m_cts != null) // 如果有操作正在进行, 取消它
{
m_cts.Cancel();
m_cts = null;
}
else // 操作还没开始,启动它
{
Text = "Operation running";
m_cts = new CancellationTokenSource();
// 这个任务使用默认任务调度器, 在一个线程池上执行
Task<Int32> t = Task.Run(() => Sum(m_cts.Token, 20000), m_cts.Token);
// 这些任务使用同步上下文任务调度器, 在GUI线程上执行
t.ContinueWith(task => Text = "Result: " + task.Result,
CancellationToken.None, TaskContinuationOptions.OnlyOnRanToCompletion,
m_syncContextTaskScheduler);
t.ContinueWith(task => Text = "Operation canceled",
CancellationToken.None, TaskContinuationOptions.OnlyOnCanceled,
m_syncContextTaskScheduler);
t.ContinueWith(task => Text = "Operation faulted",
CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted,
m_syncContextTaskScheduler);
}
base.OnMouseClick(e);
}
}
获得一个对同步上下文任务调度器
的引用, 然后在调用ContinueWith
延续任务时, 使用此调度器.
m_syncContextTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
使用线程池很好, 因为GUI线程在此期间不会被阻塞, 能响应其他UI操作, 但线程池线程执行的代码不应尝试更新UI组件, 否则会抛出InvalidOperationException
.
计算限制的任务完成后,执行3个延续任务, 它们由GUI线程对应的同步上下文任务器来调度, 任务调度器将任务放到GUI线程的队列中, 使它们的代码能成功更新UI组件, 所有任务都通过继承的Text属性来更新窗体的标题.
由于计算限制的工作(Sum)在线程池上运行, 所以用户可以和UI交互来取消操作. 在这个简单的例子中, 我允许用户在操作进行期间单击窗体的客户端区域来取消操作.
当然, 如果有特殊的任务调度需求, 完全可以定义自己的TaskScheduler
派生类. Microsoft在Parallel Extensions Extras包中提供了大量和任务有关的示例代码.
例如: ThreadPerTaskScheduler
private sealed class ThreadPerTaskScheduler : TaskScheduler
{
protected override IEnumerable<Task> GetScheduledTasks()
{
return Enumerable.Empty<Task>();
}
protected override void QueueTask(Task task)
{
new Thread(() => TryExecuteTask(task)) {IsBackground = true}.Start();
}
protected override Boolean TryExecuteTaskInline(Task task, Boolean taskWasPreviouslyQueued)
{
return TryExecuteTask(task);
}
}