异步方法的实现原理
异步方法的实现原理:
- 异步方法不需要多线程,因为一个异步方法并不是运行在一个独立的线程中的。
- 异步方法运行在当前同步上下文中,只有激活的时候才占用当前线程的时间。
- 异步模型采用时间片轮转来实现。
异步函数和事件处理程序
异步函数的返回类型一般是Task
或Task<TResult>
, 它们代表函数的状态机完成. 但异步函数是可以返回void的. 实现异步事件处理程序
时, C#编译器允许你利用这个特殊情况简化编码. 几乎所有事件处理程序
都遵循以下方法签名:
void EventHandlerCallback(Object sender, EventArgs e);
但经常需要在事件处理方法中执行I/O操作. 比如在用户点击UI元素来打开并读取文件时. 为了保持UI的可响应性, 这个I/O应该以异步方式执行. 而要在返回void
的事件处理方法中写这样的代码, C#编译器就要允许异步函数返回void
,这样才能利用await
操作符执行不阻塞的I/O操作. 编译器仍然为返回void
的异步函数创建状态机, 但不再创建Task对象, 因为创建了也没法使用. 所以没办法知道返回void的异步函数的状态机在什么时候运行完毕.
正是这个原因, 不能将入口方法Main标记为async修饰符. 进程的主线会在遇到第一个await之后立即从Main返回, 由于调用Main的代码无法获得一个可以进行监视并等待完成的Task, 所以进程终止, 因此编译器认为这是一个错误.
FCL的异步函数
规范要求为方法名附加Async后缀.
其他基于异步的编程模型已经过时, 使用Task的新模型才是你的首要选择.
异步函数和异常处理
Windows设备驱动程序处理异步I/O请求时可能出错, Windows需要向应用程序通知这个情况.
例如: 通过网络收发字节时可能超时, 如果数据没有及时到达, 设备驱动程序希望告诉应用程序异步虽然完成, 但存在一个错误. 为此,设备驱动程序会向CLR的线程池post已完成的IRP. 一个线程池线程会完成Task对象并设置异常. 你的状态机方法恢复时, await操作符发现操作失败并引发该异常.
Task对象
通常抛出一个AggregateExcetpiton
可查询该异常的InnerExceptions属性
来查看正在发生了什么异常. 但是, 将await用于Task时, 抛出的是第一个内部异常而不是AggregateExcetpiton. 是TaskAwaiter
的GetResult
方法抛出的第一个内部异常.
如果状态机出现未处理的异常, 那么代表异步函数的Task对象会因为未处理的异常而完成. 然后正在等待该Task的代码会看到异常, 但异步函数也可能使用了void返回类型, 这时调用者就没有办法发现未处理异常. 所以当返回void的异步函数抛出未处理的异常时,编译器生成的代码将捕捉它, 并使用调用者的同步上下文重新抛出它. 重新抛出这种异常通常造成整个进程终止.
异步函数的其他功能(调试功能)
如果调试器在await操作符上停止,
- F10(逐过程) 会在异步操作完成后,在抵达下一个语句时重新由调试器接管.
- 在这个时候,执行代码可能已经不是当初发起异步操作的线程
- F11(逐语句) 进入异步函数后, 可以按跳出(Shift+F11) 函数并返回至调用者
- 但是必须在位于异步函数的起始大括号的时候执行这个操作, 一旦越过大括号, 除非异步函数执行完成,否则跳出操作无法中断异步函数.
- 要在状态机运行完毕前对调用方法进行调试,在调用方法中插入断点并运行至断点(F5) 即可.
有的异步操作执行速度很快, 几乎瞬间就能完成, 在这种情况下,挂起状态机并让另一个线程立即恢复状态机就显得不太划算, 更有效的做法是让状态机继续执行, 幸好, 编译器为await操作符生成的代码能检测到这个问题, 如果异步操作再线程返回前完成,就阻止线程返回,直接由它执行下一行代码.
可以利用Task
的静态Run方法
从非调用线程的其他线程中执行异步函数.
// Task.Run在GUI线程上调用
Task.Run(async ()=>
{
// 这里的代码在一个线程池线程上运行
// 发起异步操作
await XxxAsync();
// 在这里执行更多处理...
});
上述代码演示了C#的异步lambda表达式. 可以看出, 不能只在普通的lambda表达式主体中添加await操作符就完事, 是因为编译器不知道如何将方法转换成状态机, 但同时在lambda表达式前面添加async,编译器就能将lambda表达式转换成状态机方法来返回一个Task
或Task<TResult>
的任何Func委托
变量.
如果在异步函数(async)中没有使用await操作符
, C#编译器会显示警告, 为了去掉警告, 可以为异步方法返回的Task赋给一个变量. 然后忽略这个变量. 还可以定义一个扩展方法.
// 会发出警告
InnerAsyncFunc();// 此处没有添加await操作符, 会发出警告
// 不会发出警告, 需要定义一个忽略的变量
var noWarning = InnerAsyncFunc();//为异步方法返回的Task赋给一个变量. 然后忽略这个变量.
// 后续方法也会继续执行
// 更好的方法
[MethodImpl(MethodImplOptions.AggressiveInlining)] // 造成编译器优化调用
public static void NoWarning(this Task task) { }
// 这样使用
private static async Task OuterAsyncFunction()
{
// 这里故意不添加await操作符,
InnerAsyncFunction().NoWarning();
// 在InnerAsyncFunction执行期间, 这里代码也继续执行
await Task.Delay(0);
}
异步I/O操作最好的一个地方是可以同时发起许多这样的操作, 让它们并发执行, 从而显著提升应用程序的性能.
// 这会等待所有的request都完成后,才能处理responses
// 这样处理并不好
String[] responses = await Task.WhenAll(requests);
// 使用以下方法
// 使用while去判断是否有任务完成,
while(request.Count > 0)
{
// 顺序处理每个完成的响应, 完成一个处理一个
Task<String> response = await Task.WhenAny(requests);
requests.Remove(response);// 从集合中删除已完成的任务
// 处理响应
}
根据需求用WhenAny
方法还是WhenAll
.
应用程序及其线程处理模型
.Net Framework支持几种不同的应用程序模型, 而每种模型都可能引入它自己的线程处理模型.
控制台应用程序
和Window服务
(实际也是控制台应用程序,只是看不见控制台而已) 没有引入任何线程处理模型.- 也就是任何线程可在任何时候做它想做的任何事情.
- 但是
GUI应用程序
(windows窗体,WPF,Silverlight,WindowStore应用程序) 引入了一个线程处理模型.- 在这个模型中, UI元素只能由创建它的线程更新. 在GUI线程中,经常都需要生成一个异步操作,使GUI线程不至于阻塞并停止用户输入. 当异步操作完成时, 是由一个线程池线程完成Task对象并恢复状态机.
对于某些应用程序模型,这样做没什么问题, 因为非常高效, 但是对于另一些应用程序模型(比如GUI应用程序), 这个做法会造成问题, 因为一旦通过线程池线程
更新UI元素就会抛出异常. 线程池线程必须以某种方式告诉GUI线程
更新UI元素.
ASP.NET应用程序允许任何线程做它想做的任何事情, 线程池线程开始处理一个客户端的请求时, 可以对客户端的语言文化做出假定, 从而允许Web服务器对返回的数字,日期,时间进行该语言的特有格式化处理. 此外, Web服务器还可对客户端的身份标识做出假定, 确保只能访问客户端有权访问的资源.
线程池生成一个异步操作后, 它可能由另一个线程池线程完成, 该线程将处理异步操作的结果. 代表原始客户端工作时, 语言文化和身份标识信息需要”流向”新的线程池线程. 这样一来,代表客户端执行的任何额外的工作才能使用客户端的语言文化和身份标识信息.
幸好,FCL定义了一个名为System.Threading.SynchronizationContext
的基类. 它解决了所有这些问题. 简单的说,SynchronizationContext
派生对象将应用程序模型连接到它的线程处理模型. FCL定义了几个SynchronizationContext派生类
, 但你一般不直接和这些类打交道: 事实上, 它们中的许多都没有公开或记录到文档.
等待一个Task
时会获取调用线程的SynchronizationContext
对象, 线程池线程完成Task后, 会使用该SynchronizationContext对象
, 确保为应用程序模型使用正确的线程处理模型. 所以,当一个GUI线程
等待一个Task
时, await操作符
后面的代码保证在GUI线程上执行, 使代码能更新UI元素.
让状态机使用应用程序模型的线程处理模型来恢复, 这在多数时候都很有用, 也很方便. 但偶尔也会带来问题. 下面是WPF应用程序死锁的一个例子:
protected override void OnActivated(EventArgs e)
{
// 发出HTTP请求, 让线程从GetHttp3线程返回
// 查询Result属性会阻止GUI线程返回;
// 线程阻塞等待结果
String http = GetHttp1().Result; // 以同步方式获取字符串!
base.OnActivated(e);
}
private async Task<String> GetHttp1()
{
// 发出HTTP请求, 让线程从GetHttp返回
HttpResponseMessage msg = await new HttpClient().GetAsync("http://Wintellect.com/");
// 这里永远执行不到, GUI线程在等待这个方法结束.
// 等待执行, 但是永远执行不到,因为 需要同一个主线程执行,但是主线程在等待该方法执行结束, 死锁!!!
return await msg.Content.ReadAsStringAsync();
}
尤其要注意SynchronizationContext
类, 由于许多类库代码都要求不依赖特定的应用程序模型, 所以要避免因为使用SynchronizationContext
对象而产生的额外开销. 防止死锁, 为了解决这两方面的问题, Task
和Task<TResult>
类提供了一个ConfigureAwait
方法, 签名如下:
// 定义这个方法的Task
public ConfiguredAwaitable ConfigureAwait(Boolean continueOnCapturedContext);
//定义这个方法的Task<TResult>
public ConfiguredAwaitable ConfigureAwait<TResult>(Boolean continueOnCapturedContext);
向方法传递true
相当于根本没有调用方法, 但是如果传递false
, await
操作符就不查询调用线程的SynchronizationContext
对象, 当线程池线程结束Task
时会直接完成它, await操作符
后面的代码通过线程池线程执行.
通过ConfigureAwait
方法解决死锁问题:
private async Task<String> GetHttp2()
{
// 发出HTTP请求, 让线程从GetHttp返回
// 必须将ConfigureAwait(false)应用于等待的每个Task对象
// ConfigureAwait(false) 不依赖 线程处理模型,可用线程池线程唤醒状态机
HttpResponseMessage msg = await new HttpClient().GetAsync("http://Wintellect.com/").ConfigureAwait(false);
// 这里能执行到了,可用线程池线程唤醒状态机,线程池线程可以执行这里的代码,
// 而非被迫由GUI线程执行 ,
return await msg.Content.ReadAsStringAsync().ConfigureAwait(false);
}
必须将ConfigureAwait(false)
应用于等待的每个Task
对象. 这是由于异步操作可能同步完成,而且在发生这个情况时, 调用线程直接继续执行, 不会返回至它的调用者; 你根本不知道哪个操作要求忽略SynchronizationContext
对象, 所以只能要求所有操作都忽略它. 这还以为着类库代码不能依赖于任何特定的应用程序模型. 也可以用一个线程池线程执行所有操作:
private Task<String> GetHttp3()
{
return Task.Run(async () =>
{
// 运行一个无SynchronizationContext的线程池线程
HttpResponseMessage msg = await new HttpClient().GetAsync("http://Wintellect.com/");
// 这里的代码真的能执行, 因为某个线程池线程能执行这里的代码
return await msg.Content.ReadAsStringAsync();
});
}
这个版本中GetHttp方法不再是异步函数, 从方法签名中删除了async关键字
,因为方法中没有了await操作符
.
SynchronizationContext
只决定await之后
的代码在哪个线程运行,至于异步操作是否开了新线程跟synccontext没关系. 也就是说有context
的话就能确保一段代码看起来是一个函数的时候就真的在一个线程里执行.
以异步方法实现服务器
FCL内建了对伸缩性很好的一些异步服务器的支持。下面列举中MSDN文档中值的参考的地方。
- 要构建异步ASP.NET Web窗体,在
.aspx
文件中添加Async="true"
的网页指令,并参考System.Web.UI.Page
的RegisterAsyncTask
方法。 - 要构建异步ASP.NET MVC控制器,使你的控制器类从
System.Web.Mvc.AsyncController
派生,让操作方法返回一个Task<ActionResult>
即可。 - 要构建异步ASP.NET 处理程序,使你的类从
System.Web.HttpTaskAsyncHandler
派生,重写其ProcessRequestAsync
方法。 - 要构建异步WCF服务,将服务作为异步函数来实现,让它返回
Task
或Task<TResult>
。
取消I/O操作
Windows一般没有提供取消未完成I/O操作的途径,这是许多开发人员都想要的功能,实现起来却很困难。毕竟,如果向服务器请求了1000个字节,然后决定不再需要这些字节,那么其实没有办法告诉服务器忘掉你的请求。在这种情况下,只能让字节照常返回,再将他们丢弃。此外,这里还发生竞态条件-取消请求的请求可能正在服务器发送响应的时候到来,要在代码中处理这种潜在的竞态条件,决定是丢弃还是使用数据。
官方的定义是如果程序运行顺序的改变会影响最终结果,这就是一个竞态条件(race condition).如果一段程序运行多次的结果不一致(排除生成随机数的情况),那这就可能是竞态条件的体现。导致竞态条件发生的代码区称作临界区。在临界区中使用适当的同步就可以避免竞态条件。临界区实现方法有两种,一种是用synchronized,一种是用Lock显式锁实现。
建议实现一个WithCancellation
扩展方法Task<TResult>
:
internal static class Cancellation
{
public static async Task Go()
{
// 创建一个CancellationTokenSource,它在3秒之后将自己取消
var cts = new CancellationTokenSource(3000);
// 提前取消需要调用 cts.Cancel()
var ct = cts.Token;
try
{
// 可能需要6秒后完成任务, 但是ct最多允许进行3秒,然后取消
await Task.Delay(6000).WithCancellation(ct);
Console.WriteLine("Task completed");
}
catch (OperationCanceledException)
{
Console.WriteLine("Task cancelled");
}
// 因此完成不了任务, 会抛出OperationCanceledException异常
// 输出 Task cancelled
}
// 因为没有非泛型的TaskCompletionSource类
private struct Void { }
/// <summary>
/// 扩展Task<TResult> 方法
/// </summary>
/// <param name="orignalTask"> 原始Task对象 </param>
/// <param name="ct"> CancellationToken中的Task对象 </param>
/// <typeparam name="TResult"> 返回类型 </typeparam>
public static async Task<TResult> WithCancellation<TResult>(this Task<TResult> orignalTask, CancellationToken ct)
{
// 创建在CancellationToken被取消时完成的一个Task
var cancelTask = new TaskCompletionSource<Void>();
// 当CancellationToken取消时, 使cancelTask的Task完成
// cancelTask对象会传入回调方法
using (ct.Register(t => ((TaskCompletionSource<Void>) t).TrySetResult(new Void()), cancelTask))
{
// 创造在原始Task或CancellationToken里的Task完全时都完成一个Task
// 哪个先完成any就引用哪个
Task any = await Task.WhenAny(orignalTask, cancelTask.Task);
// 如果any是cancelTask, 则说明是取消了任务, 抛出OperationCanceledException
if (any == cancelTask.Task) ct.ThrowIfCancellationRequested();
}
// 等待原始任务(以同步的方式); 若任务失败,等待它抛出第一个内部异常
//而不是抛出AggregateException
return await orignalTask;
}
public static async Task WithCancellation(this Task task, CancellationToken ct)
{
var tcs = new TaskCompletionSource<Void>();
using (ct.Register(t => ((TaskCompletionSource<Void>) t).TrySetResult(default(Void)), tcs))
{
if (await Task.WhenAny(task, tcs.Task) == tcs.Task) ct.ThrowIfCancellationRequested();
}
// 等待原始任务(以同步的方式); 若任务失败,等待它抛出第一个内部异常
//而不是抛出AggregateException
await task;
}
}
有的I/O操作必须同步进行
有的方法不允许异步方式执行I/O, 例如Win32 CreateFile
方法总是以同步方式执行. 因此FCL不能以异步方式高效地打开文件. Windos也没有提供函数以异步方式访问注册表,访问事件日志,获取目录的文件/子目录或者更改文件/目录的属性等.
FileSteam特有的问题
创建FileStream
对象时,可通过FileOptions.AsyncChronous
标志指定以同步方式还是异步方式进行通信。如果不指定该标志,Windows将以同步方式执行所有文件操作。当然,仍然可以调用FileStream
的ReadAsync
方法,对于你的应用程序,表面上是异步执行,但FileStream类在内部用另一个线程模拟异步行为。这个额外的线程纯属是浪费, 而且会影响到性能.
如果创建FileStream
对象时指定FileOptions.AsyncChronous
标志。然后,可以调用FileStream
的Read
方法执行一个同步操作。在内部,FileStream
类会开始一个异步操作,然后立即调用线程进入睡眠状态,直到操作完成才唤醒,从而模拟同步行为,这样依然效率低下。
总之,使用FileStream
时应该想好是以同步方式还是以异步方式执行I/O操作,并指定FileOptions.Asynchronous
标志来指明自己的选择。如果指定了该标志,就总是调用ReadAsync
。如果没有使用这个标志,就总是调用Read
。这样能够获得最佳性能。
如果想先对FileStream
执行一些同步操作,再执行一些异步操作,那么更高效的做法是使用FileOptions.Asynchronous
标志来构造它。另外也可针对同一个文件,创建两个FileStream对象,一个FileStream进行同步操作,另一个FileStream执行异步操作。
FileStream的辅助方法(Create
,Open
和OpenWrite
)创建并返回FileStream
对象,这些方法都没有指定FileOptions.Asynchronous标志,所以为了实现响应灵敏的、可伸缩性的应用程序,应避免使用这些方法.
I/O请求优先级
线程还要执行I/O请求以便从各种硬件设备中读写数据.
如果一个低优先级线程获得了CPU事件, 它可以在非常短的时间里轻易地将成百上千的I/O请求放入队列, 由于I/O请求需要时间来执行, 所以一个低优先级线程可能挂起高优先级线程, 使后者不能快速完成工作. 因此, 当系统执行一些耗时的低优先级服务(磁盘碎片整理程序, 病毒扫描程序,内容索引程序)时, 机器的响应能力可能会变得非常差.
Windows允许线程在发出I/O请求时指定优先级. 但是FCL还没有包含这个功能. 如果想用,可以采取P/Invoke
本机Win32函数 的方式:
internal static class ThreadIO
{
public static void Go()
{
using (ThreadIO.BeginBackgroundProcessing())
{
// 在这里执行低优先级I/O请求, 例如WirteAsync/ReadAsync
}
}
public static BackgroundProcessingDisposer BeginBackgroundProcessing(Boolean process = false)
{
ChangeBackgroundProcessing(process, true);
return new BackgroundProcessingDisposer(process);
}
public static void EndBackgroundProcessing(Boolean process = false)
{
ChangeBackgroundProcessing(process, false);
}
private static void ChangeBackgroundProcessing(Boolean process, Boolean start)
{
Boolean ok = process
? SetPriorityClass(GetCurrentWin32ProcessHandle(),
start ? ProcessBackgroundMode.Start : ProcessBackgroundMode.End)
: SetThreadPriority(GetCurrentWin32ThreadHandle(),
start ? ThreadBackgroundgMode.Start : ThreadBackgroundgMode.End);
if (!ok) throw new Win32Exception();
}
// 这个结构使C#的using语句能终止后台处理模式
public struct BackgroundProcessingDisposer : IDisposable
{
private readonly Boolean m_process;
public BackgroundProcessingDisposer(Boolean process)
{
m_process = process;
}
public void Dispose()
{
EndBackgroundProcessing(m_process);
}
}
// See Win32’s THREAD_MODE_BACKGROUND_BEGIN and THREAD_MODE_BACKGROUND_END
private enum ThreadBackgroundgMode
{
Start = 0x10000,
End = 0x20000
}
// See Win32’s PROCESS_MODE_BACKGROUND_BEGIN and PROCESS_MODE_BACKGROUND_END
private enum ProcessBackgroundMode
{
Start = 0x100000,
End = 0x200000
}
[DllImport("Kernel32", EntryPoint = "GetCurrentProcess", ExactSpelling = true)]
private static extern SafeWaitHandle GetCurrentWin32ProcessHandle();
[DllImport("Kernel32", ExactSpelling = true, SetLastError = true)]
[return: MarshalAs(UnmanagedType.Bool)]
private static extern Boolean SetPriorityClass(SafeWaitHandle hprocess, ProcessBackgroundMode mode);
[DllImport("Kernel32", EntryPoint = "GetCurrentThread", ExactSpelling = true)]
private static extern SafeWaitHandle GetCurrentWin32ThreadHandle();
[DllImport("Kernel32", ExactSpelling = true, SetLastError = true)]
[return: MarshalAs(UnmanagedType.Bool)]
private static extern Boolean SetThreadPriority(SafeWaitHandle hthread, ThreadBackgroundgMode mode);
// http://msdn.microsoft.com/en-us/library/aa480216.aspx
[DllImport("Kernel32", SetLastError = true, EntryPoint = "CancelSynchronousIo")]
[return: MarshalAs(UnmanagedType.Bool)]
private static extern Boolean CancelSynchronousIO(SafeWaitHandle hThread);
}
要调用ThreadIO
的BeginBackgroundProcessing
方法, 告诉windows你的线程要发出低优先级I/O请求, 注意, 这同时会降低线程的CPU调度优先级. 可以调用BeginBackgroundProcessing
或者EndBackgroundProcessing
返回的值上调用Dispose
(上述代码用了using
), 使线程恢复为发出普通优先级的I/O请求(以及普通CPU的调度优先级).
如果希望一个进程中的所有线程都发出低优先级I/O请求和进行低优先级的CPU调度, 可以调用BeginBackgroundProcessing
,为它的process参数
传递true
值.
线程只能影响它自己的后台处理模式. 不允许更改另一个线程的后台处理模式.
一个例子
类比通常有帮助。你在一家餐馆做饭。鸡蛋和烤面包的订单。
同步:你煮鸡蛋,然后你做烤面包。
异步,单线程:你开始煮鸡蛋并设置一个计时器。你开始吐司做饭,并设置一个计时器。虽然他们都在做饭,但你要清理厨房。当计时器响起时,你可以将鸡蛋从烤箱中取出,然后将烤面包机从烤面包机中取出并送达。
异步,多线程:你雇两个厨师,一个煮鸡蛋,一个煮烤面包。现在你遇到了协调厨师的问题,这样他们在共享资源时就不会在厨房里相互冲突。你必须付钱给他们。
现在,多线程只是一种异步才有意义吗? 线程是关于工人的; 异步是关于任务。在多线程工作流中,您可以将任务分配给工作人员。在异步单线程工作流程中,您有一个任务图表,其中某些任务取决于其他任务的结果; 当每个任务完成时,它会调用代码来调度可以运行的下一个任务,给定刚刚完成的任务的结果。但是你(希望)只需要一个工人来执行所有任务,而不是每个任务一个工人。
它将有助于意识到许多任务不受处理器约束。对于处理器绑定的任务,雇用与处理器一样多的工作者(线程),为每个工作者分配一个任务,为每个工作者分配一个处理器,让每个处理器完成其他工作,除了将结果计算为尽快。但是对于没有在处理器上等待的任务,您根本不需要分配工作人员。您只是等待消息到达,结果可用并在您等待时执行其他操作。当该消息到达时,您可以安排继续完成的任务作为待办事项列表中的下一个要检查的事项。
有人问你要一份文件。您通过邮件发送文件,继续做其他工作。当它到达邮件时,您会收到信号,当您有这样的信息时,您会完成工作流程的其余部分 - 打开信封,支付运费,无论如何。您不需要聘请其他工作人员为您完成所有这些工作。