I/O限制的异步操作
27章讲的是如何异步执行计算限制的操作, 允许线程池在多个CPU内核上调度任务,使多个线程能并发工作. 本章重点讲如何异步进行I/O限制的操作, 允许将任务交给由硬件设备处理, 期间完全不占用线程和CPU资源. 然而, 线程池仍然扮演了一个重要的角色, 各种I/O操作的结果还是要由线程池线程来处理.
Windows如何执行I/O操作
上图是连接了几个硬件设备的计算机系统.
- 程序通过构造
FileStream对象
来打开磁盘文件 - 调用
Read方法
从文件中读取数据- 调用
Read方法
时,你的线程从托管代码
转变为本机/用户模式代码
Read
内部调用Win32ReadFile
函数①ReadFile
分配一个小的数据结构,称为I/O请求包(I/O Request Packet,IRP)
②IRP结构
初始化后包含的内容有: 文件句柄,文件中的偏移量(从这个位置开始读取字符),一个Byte[]数组的地址(数组用读取的字节来填充),要传输的字节数以及其他常规性内容.
- 调用
ReadFile
将你的线程从本机/用户模式
代码转变成本机/内核模式
代码, 并传递IRP
数据结构,从而调用Window内核③- 根据IRP中的设备句柄,Windows内核知道I/O操作要传送给哪个硬件设备.
- Windows将
IRP
传送给对应的设备驱动程序的IRP队列
④ - 每个设备都维护自己的
IRP队列
, 其中包含了机器上运行的所有进程发出的I/O请求
- Windows将
IRP数据包
到达时, 设备驱动程序将IRP信息
传给物理硬件设备
, 硬件设备执行请求的I/O操作⑤
注意一个重要问题, 在硬件设备执行I/O操作期间,发出了I/O请求的线程将无事可做, 所以Windows将线程变为睡眠状态,防止它浪费CPU时间⑥. 虽然线程不浪费时间,但其仍然浪费空间(内存),因为它的用户模式栈,内核模式栈,线程环境块,其他数据结构都还在内存中,完全没有谁去访问这些东西.
- 最终,硬件设备完成I/O操作,然后Windows会唤醒你的线程,把它调度给一个CPU,使它从内核模式返回用户模式, 再返回至托管代码⑥⑦⑧
FileStream对象
的Read方法
现在返回一个Int32,指明从文件中读取的实际字节数, 使你知道在传给Read的Byte[]
中, 实际能检索到多少个字节.
上面的步骤看起来很不错,但是依旧存在两个问题:
- 请求的数量越来越多,创建的线程就越来越多,那么被阻塞的线程就会越来越多,这样会更浪费内存。
- 用执行结果来响应请求,如果请求的数量非常多,那么解锁的阻塞线程也就很多,而且机器上的线程数都会远远大于CPU数,所以在阻塞线程被集中解锁期间CPU很有可能会频繁地发生上下文切换,损害性能。
上图展示Windows如何异步读取I/O操作,仍然使用FileStream来构建对象,但是需要传递FileOptions.Asynchronous标志
,告诉Windows希望文件的读/写以异步的方式进行, 上图删除了除硬盘外的硬件设备, 引入了CLR的线程池, 稍微修改了代码. 传递了FileOptions.Asynchronous
标志.
- 现在调用
ReadAsync
而不是Read
从文件中读取数据.ReadAsync
. 在ReadAsync
内部分配一个Task<Int32>
对象来代表用于完成的读取操作的代码。然后ReadAsync
调用Win32ReadFile
函数① ReadFile
分配IRP数据包②- 然后将其传递给Windows内核③
- Windows内核把IRP数据包添加到IRP队列中④
- 此时线程不会再阻塞,而是可以直接运行返回至你的代码。所以线程能够立即从
ReadAsync调用
中返回. ⑤⑥⑦
当然, 此时IRP可能尚未处理好, 所以不能够在ReadAsync之后的代码中访问传递的Byte[]中的字节.
那么, 什么时候以及用什么方式处理最终读取的数据呢? 在调用ReadAsync
后返回一个Task<Int32>对象
,可以在该对象上调用ContinueWith
来登记任务完成时执行的回调方法,然后在回调方法中处理数据。当硬件设备处理好IRP后(步骤a)。硬件设备会把IRP放到CLR的线程池中队列中(步骤b)。将来某个时候,一个线程池会提取完成的IRP并执行任务的代码,最终要么设置异常(如果发生异常),要么返回结果(步骤c)。在知道这些之后,就知道使用异步I/O可以尽量的减少同步I/O访问存在的那些问题。
C#的异步函数
异步操作允许利用机器中的所有CPU, Microsoft意识到其中的巨大潜力,设计了一个编程模型来帮助开发者利用这种能力. 用到了Task
和称为异步函数
的C#语言功能.
private static async Task<String> IssueClientRequestAsync(String serverName, String message)
{
using (var pipe = new NamedPipeClientStream(serverName, "PipeName", PipeDirection.InOut,
PipeOptions.Asynchronous | PipeOptions.WriteThrough))
{
pipe.Connect(); // 必须在设置ReadMode之前连接
// 将传入的消息转换成一个Byte[]
pipe.ReadMode = PipeTransmissionMode.Message;
// 将数据异步发送给服务器
Byte[] request = Encoding.UTF8.GetBytes(message);
// WriteAsync内部分配一个Task对象, 返回给此方法
// 此时await操作符实际会在Task对象上调用ContinueWith
// 向它传递用于恢复状态机的方法
// 然后线程从IssueClientRequestAsync返回至调用处
await pipe.WriteAsync(request, 0, request.Length);
// 异步读取服务器的响应
Byte[] response = new Byte[1000];
Int32 bytesRead = await pipe.ReadAsync(response, 0, response.Length);
return Encoding.UTF8.GetString(response, 0, bytesRead);
} // 关闭管道
}
一旦将方法标记为async
, 编译器就会将方法的代码转换成实现了状态机的一个类型. 这就允许线程执行状态机中的一些代码并返回, 方法不需要一直执行到结束.
WriteAsync
内部分配一个Task对象
, 返回给此方法,此时await操作符
实际会在Task
对象上调用ContinueWith
,向它传递用于恢复状态机的方法,然后线程从IssueClientRequestAsync
返回至调用处.
将来某个时候,设备驱动程序会结束向管道的写入, 一个线程池线程会通知Task对象, 后来激活ContinueWith
回调方法, 造成一个线程恢复状态机. 具体地说, 一个线程会重新进入IssueClientRequestAsync
方法, 但这次是从await操作符
的位置开始. 方法现在执行编译器生成的,用于查询Task对象状态的代码. 如果操作成功完成,await操作符会返回结果, 如果操作失败,会设置代表错误的一个异常. 在本例中,WriteAsync
返回一个Task
而不是Task<TResult>
,所以无返回值.
现在方法继续执行, 分配一个Byte[]并调用NamedPipeClientStream
的异步ReadAsync
方法. 方法内部创建一个Task<Int32>
对象并返回它, 同样的, await操作符
实际会在Task<Int32>
对象上调用ContinueWith
向它传递用于恢复状态机的方法,然后线程从IssueClientRequestAsync
返回至调用处.
将来某个时候, 服务器向客户机发送一个响应, 网络设备驱动程序获得这个响应, 一个线程池线程通知Task<Int32>
对象, 恢复状态机.await操作符
造成编译器生成代码来查询Task对象
的Result属性(含有一个Int32)
并将结果赋给局部变量bytesRead
; 如果操作失败则抛出异常. 然后执行IssueClientRequestAsync
剩余的代码, 返回结果字符串并关闭管道.
由于异步函数在状态机执行完毕之前返回, 所有在IssueClientRequestAsync
执行它的第一个await操作符
之后, 调用IssueClientRequestAsync
的方法会继续执行, 但是调用者如何知道IssueClientRequestAsync
已经执行完毕它的状态机呢? 一旦将方法标记了async
,编译器会自动生成代码, 在状态机开始执行时创建一个Task对象
. 该Task对象在状态机执行完毕时自动完成. 在IssueClientRequestAsync
方法靠近尾部的地方, 我返回了一个字符串, 这造成编译器生成的代码完成它创建的Task<String>
对象, 把对象的Result属性
设为返回的字符串.
异步函数存在以下限制:
- 不能将应用程序的
Main方法
转变成异步函数.构造器
,属性访问器方法
和事件访问器方法
也不能转变成异步函数. - 异步函数不能使用任何
out
或ref
参数 - 不能在
catch
,finally
,unsafe
块中使用await操作符
- 不能在await操作符前获得一个支持线程所有权或递归的锁,并在await之后释放它. 这是因为await操作符之前的代码由一个线程执行, 之后的代码则可能由另一个线程执行. 在lock语句中使用await会报错, 如果显式调用Monitor的Enter和Exit方法,虽然能编译,但是运行时会抛出一个
SynchronzizationLockException
- 在查询表达式中, await操作符只能在初始from子句的第一个集合表达式中使用,或者join子句的集合表达式中使用.
以上限制会在你违反时,编译器会提醒你.
不要让线程等待一个线程同步构造从而造成线程的阻塞. 相反可以等待await从SemaphoreSlim的WaitAsync方法或者我自己的OneManyLock的AcquireAsync方法所返回的任务,从而避免线程被阻塞.
编译器如何将异步函数转换成状态机
private sealed class Type1 {}
private sealed class Type2 {}
private static Task<Type1> Method1Async()
{
// 以异步方式执行一些操作, 最后返回一个Type1对象
return Task.Run(() =>
{
/*Task.Yield(); */
return new Type1();
});
}
private static Task<Type2> Method2Async()
{
// 以异步方式执行一些操作, 最后返回一个Type2对象
return Task.Run(() =>
{
/*Task.Yield(); */
return new Type2();
});
}
// 通过异步函数来使用这些简单的类型和方法
private static async Task<String> MyMethodAsync(Int32 argument)
{
Int32 local = argument;
try
{
Type1 result1 = await Method1Async();
for (Int32 x = 0; x < 3; x++)
{
Type2 result2 = await Method2Async();
}
}
catch (Exception)
{
Console.WriteLine("Catch");
}
finally
{
Console.WriteLine("Finally");
}
return "Done";
}
然后对IL代码进行逆向工程转换回C#源代码. 简化了一些代码,并添加了大量注释.
编译为IL代码后,再利用ILSPY把IL代码反编译为C#代码,在返编译IL代码的时候,需要注意,不能勾选“decompile async methods(async/await)”
// AsyncStateMachine 特性指出这是一个异步方法(对反射的工具有用)
// 类型指出实现状态机的是哪个结构
[DebuggerStepThrough, AsyncStateMachine(typeof(StateMachine))]
private static Task<String> MyMethodAsync_ActualImplementation(Int32 argument)
{
// 创建状态机实例并初始化它
StateMachine stateMachine = new StateMachine()
{
// 创建builder,这个存根方法返回Task<String>
// Statemachine(状态机)实例访问builder来设置Task 完成/异常
m_builder = AsyncTaskMethodBuilder<String>.Create(),
m_state = -1, // 初始化状态机
m_argument = argument // 将实参拷贝到状态机字段
};
// 开始执行状态机
stateMachine.m_builder.Start(ref stateMachine);
// 返回状态机的Task
return stateMachine.m_builder.Task;
}
// 这是状态机结构
[CompilerGenerated, StructLayout(LayoutKind.Auto)]
private struct StateMachine : IAsyncStateMachine
{
// 代表状态机builder(Task)及其位置的字段
public AsyncTaskMethodBuilder<String> m_builder;
public Int32 m_state;
// 实参和局部变量现在成了字段
public Int32 m_argument, m_local, m_x;
public Type1 m_resultType1;
public Type2 m_resultType2;
// 每个awaiter类型一个字段
// 任何时候这些字段只有一个是重要的, 那个字段引用最近执行的,以异步方式完成的await
private TaskAwaiter<Type1> m_awaiterType1;
private TaskAwaiter<Type2> m_awaiterType2;
// 状态机方法本身
void IAsyncStateMachine.MoveNext()
{
// Task的结果值
String result = null;
// 编译器插入try块来确保状态机的任务完成
try
{
// 先假定逻辑上离开try块
Boolean executeFinally = true;
if (m_state == -1)
{
// 如果第一次在状态机方法中
// 原始方法就从头开始执行
m_local = m_argument; // 异步方法里的第一句代码
}
// 原始代码中的try块
try
{
TaskAwaiter<Type1> awaiterType1;
TaskAwaiter<Type2> awaiterType2;
switch (m_state)
{
case -1: // 开始执行try块中的代码
// 调用'Method1Async'并获得它的awaiter
awaiterType1 = Method1Async().GetAwaiter();
if (!awaiterType1.IsCompleted)
{
// 'Method1Async'要以异步方式完成
m_state = 0;
//保存 awaiter 以便将来返回
m_awaiterType1 = awaiterType1;
// 告诉awaiter在操作完成时调用MoveNext
// 引用状态机的MoveNext方法
m_builder.AwaitUnsafeOnCompleted(ref awaiterType1, ref this);
// 上述代码调用awaiterType1的OnCompleted, 它会在被等待的任务上
// 调用ContinueWith(t => MoveNext())
// 任务完成后, ContinueWith任务调用MovedNext
executeFinally = false; // 逻辑上不离开try块
return; // 线程返回至调用者
}
// 'Method1Async' 以同步方式完成了
break;
case 0: // 'Method1Async' 以异步方式完成了
awaiterType1 = m_awaiterType1; // 恢复最新的awaiter
break;
case 1: // 'Method2Async' 以异步方式完成了
awaiterType2 = m_awaiterType2; // 恢复最新的awaiter
goto ForLoopEpilog;
}
// 在第一个await后, 我们捕捉结果并启动for循环
m_resultType1 = awaiterType1.GetResult(); // 获取awaiter的结果
// for循环开场
ForLoopPrologue:
m_x = 0; // for循环初始化
goto ForLoopBody; // 跳到for循环主体ForLoopBody
// for循环尾声
ForLoopEpilog:
m_resultType2 = awaiterType2.GetResult();
m_x++; // 每次循环迭代都递增
// 直通到for循环主体
// for循环主体
ForLoopBody:
if (m_x < 3)
{
// for循环测试
// 调用Method2Async并获取它的awaiter
awaiterType2 = Method2Async().GetAwaiter();
if (!awaiterType2.IsCompleted)
{
m_state = 1; // 'Method2Async' 以异步方式完成
m_awaiterType2 = awaiterType2; // 保存awaiter以便将来返回
// 告诉awaiter在操作完成时调用MoveNext
m_builder.AwaitUnsafeOnCompleted(ref awaiterType2, ref this);
executeFinally = false; // 逻辑上不离开try块
return; // 线程返回至调用者
}
// 'Method2Async' 以同步方式完成了
goto ForLoopEpilog; // 以同步方式完成就再次循环
}
}
catch (Exception)
{
Console.WriteLine("Catch");
}
finally
{
// 只要线程物理上离开try就会执行finally
// 我们希望在线程逻辑上离开try时才执行这些代码
if (executeFinally)
{
Console.WriteLine("Finally");
}
}
// 这是最终从异步函数返回的东西
result = "Done";
}
catch (Exception exception)
{
// 从未处理的异常: 通过设置异常来完成状态机的Task
m_builder.SetException(exception);
return;
}
// 无异常,通过返回结果来完成状态机的Task
m_builder.SetResult(result);
}
[DebuggerHidden]
void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine param0)
{
m_builder.SetStateMachine(param0);
}
}
如何将被等待的对象与状态机粘合起来, 任何时候需要使用await操作符,编译器都会获取操作数, 并尝试在它上面调用GetAwaiter方法. 这可能是实例方法或者扩展方法. 调用GetAwaiter方法
所返回的对象称为awaiter(等待者)
, 正是它将被等待的对象
与状态机
粘合起来.
状态机获得awaiter
后, 会查询其IsCompleted
属性.
- 如果操作已经以同步方式完成了, 属性将返回
true
, 而作为一项优化措施, 状态机将继续执行并调用awaiter
的GetResult方法
. 该方法要么抛出异常, 要么返回结果. 状态机继续执行以处理结果. - 如果操作以异步方式完成,
IsCompleted
属性将返回false
. 状态机调用awaiter
的OnCompleted
方法, 并向它传递一个委托(引用状态机的MoveNext方法
). 现在状态机允许它的线程回到原地以执行其他代码. 将来某个时候, 封装了底层任务的awaiter
会在完成时调用委托以执行MoveNext
. 可根据状态机中的字段知道如何到达代码中的正确位置. 使方法能从它当初离开的位置继续. 这时, 代码调用awaiter
的GetResult
方法, 执行将从这里继续, 以便对结果进行处理.
这就是异步函数的工作原理.
但任务未完成时,isCompleted
返回false
,所以会在onCompleted
登记任务完成时会调用的action
动作,action
动作执行完成后,会再一次调用MoveNext
,然后isCompleted
就返回true
,此时就可以通过GetResult
获得结果。
异步函数转换成状态机详细流程
- 状态机调用
__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
来将自己注册为任务的continuation
。 AsyncTaskMethodBuilder
会确保当任务完成时,一个IAsyncStateMachine.MoveNext
方法会被调用:AsyncTaskMethodBuilder
会捕获(capture)当前的执行上下文ExecutionContext
并创建一个MoveNextRunner实例
,并将其与当前的状态机实例相关联。然后它会创建一个MoveNextRunner.Run
的Action实例
,这个Action实例
会让状态机在捕获的上下文
中进入下一状态。AsyncTaskMethodBuilder
调用TaskAwaiter.UnsafeOnCompleted(action)
,这个方法将给定的action
注册为一个被等待的任务的continuation
。 也就是IAsyncStateMachine.MoveNext
continuation : 延续
- 生成的状态机,包含了所有原始的异步方法的逻辑,就像是一个异步方法的堆栈帧(stack frame)。
- 包含着完成的任务的
AsyncTaskMethodBuilder
(十分类似于TaskCompletionSource<T>
),它管理状态机的状态转换。 - 装饰(wrap)着一个任务的
TaskAwaiter
,它在必要时会给任务添加continuation
。 MoveNextRunner
,它会在正确的执行上下文(ExecutionContext)
中调用IAsyncStateMachine.MoveNext
。
执行上下文(Execution Context)
在同步的世界里,每个线程都将上下文信息
保存在线程本地(thread-local)的存储中。可以是安全相关的信息,特定文化的数据,或其他东西。当在一个线程中按顺序调用三个方法时,这些信息会自然地在这些方法中传递。但对于异步方法来说,这已经不再适用了。异步方法的每个“部分”都可以在不同的线程中执行,这使得线程本地的信息无法使用。
执行上下文保存了逻辑上的控制流的信息,即使它跨越多个线程。
像Task.Run
或ThreadPool.QueueUserWorkItem
这样的方法会自动捕获上下文。Task.Run方法
从调用线程中捕获ExecutionContext
,并将其存储在Task实例中。当与此Task实例
相关联的TaskScheduler
执行一个给定的委托时,它会在存储的上下文中执行ExecutionContext.Run
。
我们可以用AsyncLocal
来实际演示一下这个概念:
static Task ExecutionContextInAction()
{
var li = new AsyncLocal<int>();
li.Value = 42;
return Task.Run(() =>
{
// Task.Run会恢复执行上下文
Console.WriteLine("In Task.Run: " + li.Value);
}).ContinueWith(_ =>
{
// 任务的continuation也会恢复执行上下文
Console.WriteLine("In Task.ContinueWith: " + li.Value);
});
}
// 在这些情况下,执行上下文被传递到Task.Run,然后又被传递到Task.ContinueWith. 所以如果你运行此方法你会看到:
// In Task.Run: 42
// In Task.ContinueWith: 42
但并不是所有BCL中的方法都会自动捕获和恢复执行上下文。有两个例外分别是TaskAwaiter<T>.UnsafeOnComplete
和AsyncMethodBuilder<T>.AwaitUnsafeOnComplete
。语言的设计者们决定添加一些“不安全的”方法,使用AsyncMethodBuilder<T>
和MoveNextRunner
而不是依靠如AwaitTaskContinuation
的内置设施,来手动地传递执行上下文。我怀疑在现有的实现中有一些性能上的原因或是其他限制。
static async Task ExecutionContextInAsyncMethod()
{
var li = new AsyncLocal<int>();
li.Value = 42;
await Task.Delay(42);
// 上下文被隐式地捕获。li.Value为42
Console.WriteLine("After first await: " + li.Value);
var tsk2 = Task.Yield();
tsk2.GetAwaiter().UnsafeOnCompleted(() =>
{
// 上下文没有被捕获:li.Value为0
Console.WriteLine("Inside UnsafeOnCompleted: " + li.Value);
});
await tsk2;
// 上下文被捕获。li.Value为42
Console.WriteLine("After second await: " + li.Value);
}
// After first await: 42
// Inside UnsafeOnCompleted: 0
// After second await: 42
结论
- 异步方法与同步方法有很大的不同。
- 编译器为每个异步方法都生成一个状态机,并将原来方法中所有的逻辑移到状态机中。
- 生成的代码对同步场景进行了高度优化:如果所有被等待的任务都完成了,那么异步方法的额外开销是很小的。
- 如果被等待的任务还没有完成,则依赖于许多帮助类来完成工作,以保持原方法的逻辑不变。
如果你想学习更多与执行上下文相关的内容,我强烈推荐以下两篇博文:
ExecutionContext vs SynchronizationContext 作者Stephen Toub
Implicit Async Context (“AsyncLocal”) 作者Stephen Cleary
异步函数的扩展性
在扩展性方面, 能用Task对象
包装一个将来完成的操作, 就可以用await操作符
来等待该操作. 用一个类型Task来表示各种异步操作. 可以实现组合操作(WhenAll
和WhenAny
)和其他有用的操作. 之后会演示用Task
包装一个CancellationToken
,在等待异步操作的同事利用超时和取消功能.
分享一个TaskLogger类
, 用它显示尚未完成的异步操作. 这在调试时特别有用, 尤其是当应用程序因为错误的请求或者未响应的服务器而挂起的时候.
public static class TaskLogger
{
public static async Task Go()
{
#if DEBUG
// 使用TaskLogger会影响内存和性能, 所以只在调试生成中启用它
TaskLogger.LogLevel = TaskLogger.TaskLogLevel.Pending;
#endif
// 初始化3个任务, 为了测试TaskLogger,我们显式控制其持续时间
var tasks = new List<Task>
{
Task.Delay(2000).Log("2s op"),
Task.Delay(5000).Log("5s op"),
Task.Delay(6000).Log("6s op")
};
try
{
// 等待全部任务, 但在3秒后取消; 只有一个任务能按时完成
// 注意: WithCancellation扩展方法将在本章之后进行讲述
await Task.WhenAll(tasks).WithCancellation(new CancellationTokenSource(3000).Token);
}
catch (OperationCanceledException)
{
}
// 查询logger哪些任务尚未完成, 按照从等待时间最长到最短的顺序排序
foreach (var op in TaskLogger.GetLogEntries().OrderBy(tle => tle.LogTime))
Console.WriteLine(op);
}
public enum TaskLogLevel
{
None,
Pending
}
public static TaskLogLevel LogLevel { get; set; }
public sealed class TaskLogEntry
{
public Task Task { get; internal set; }
public String Tag { get; internal set; }
public DateTime LogTime { get; internal set; }
public String CallerMemberName { get; internal set; }
public String CallerFilePath { get; internal set; }
public Int32 CallerLineNumber { get; internal set; }
public override string ToString()
{
return String.Format("LogTime={0}, Tag={1}, Member={2}, File={3}({4})",
LogTime, Tag ?? "(none)", CallerMemberName, CallerFilePath, CallerLineNumber);
}
}
private static readonly ConcurrentDictionary<Task, TaskLogEntry> s_log =
new ConcurrentDictionary<Task, TaskLogEntry>();
public static IEnumerable<TaskLogEntry> GetLogEntries()
{
return s_log.Values;
}
public static Task<TResult> Log<TResult>(this Task<TResult> task, String tag = null,
[CallerMemberName] String callerMemberName = null,
[CallerFilePath] String callerFilePath = null,
[CallerLineNumber] Int32 callerLineNumber = -1)
{
return (Task<TResult>) Log((Task) task, tag, callerMemberName, callerFilePath, callerLineNumber);
}
public static Task Log(this Task task, String tag = null,
[CallerMemberName] String callerMemberName = null,
[CallerFilePath] String callerFilePath = null,
[CallerLineNumber] Int32 callerLineNumber = -1)
{
if (LogLevel == TaskLogLevel.None) return task;
var logEntry = new TaskLogEntry
{
Task = task,
LogTime = DateTime.Now,
Tag = tag,
CallerMemberName = callerMemberName,
CallerFilePath = callerFilePath,
CallerLineNumber = callerLineNumber
};
s_log[task] = logEntry;
task.ContinueWith(t =>
{
TaskLogEntry entry;
s_log.TryRemove(t, out entry);
},
TaskContinuationOptions.ExecuteSynchronously);
return task;
}
}
// 输出以下结果:
// LogTime=2019/9/27 13:30:46, Tag=6s op, Member=Go, File=D:\TD_ET\ConsoleApp1\ConsoleApp1\Program.cs(41)
// LogTime=2019/9/27 13:30:46, Tag=5s op, Member=Go, File=D:\TD_ET\ConsoleApp1\ConsoleApp1\Program.cs(40)
Callation类,用于取消正在执行的异步操作:
static class Cancellation
{
public struct Void { }
public static async Task WithCancellation(this Task originalTask, CancellationToken ct)
{
//创建在Cancellation被取消时完成的一个Task
var cancelTask = new TaskCompletionSource<Void>();
using (ct.Register(t => ((TaskCompletionSource<Void>)t).TrySetResult(new Void()), cancelTask)) {
//创建在原始Task或CancellationToken Task完成时都完成的一个Task
Task any = await Task.WhenAny(originalTask,cancelTask.Task);
//任务Task因为CancellationToken而完成,就抛出OperationCanceledException
if (any == cancelTask.Task)
ct.ThrowIfCancellationRequested();
};
//等待原始任务;若任务失败,它将抛出一个异常
await originalTask;
}
}
除了增强使用Task
时的灵活性, 异步函数另一个对扩展性有力的地方在于编译器可以在await
的任何操作数上调用GetAwaiter
. 所以操作数不一定是Task对象
. 可以是任意类型, 只要提供一个可以调用一个可以调用的GetAwaiter方法
. 下例展示我自己的awaiter
,在异步方法的状态机和被引发的事件之间, 它扮演粘合剂的角色.
internal static class EventAwaiterDemo
{
// 演示这一切是如何工作的
public static void Go()
{
ShowExceptions();
for (Int32 x = 0; x < 3; x++)
{
try
{
switch (x)
{
case 0: throw new InvalidOperationException();
case 1: throw new ObjectDisposedException("");
case 2: throw new ArgumentOutOfRangeException();
}
}
catch
{
}
}
}
// 以下方法使用我的EventAwaiter类在事件发生时候从await操作符返回,
// 在本例中, 一旦AppDomain中的任何线程抛出异常, 状态机就会继续
private static async void ShowExceptions()
{
var eventAwaiter = new EventAwaiter<FirstChanceExceptionEventArgs>();
AppDomain.CurrentDomain.FirstChanceException += eventAwaiter.EventRaised;
while (true)
{
Console.WriteLine("AppDomain exception: {0}",
(await eventAwaiter).Exception.GetType());
}
}
public sealed class EventAwaiter<TEventArgs> : INotifyCompletion
{
private ConcurrentQueue<TEventArgs> m_events = new ConcurrentQueue<TEventArgs>();
private Action m_continuation;
#region 状态机调用的成员
// 状态机先调用这个来获得awaiter:我们自己返回自己
public EventAwaiter<TEventArgs> GetAwaiter()
{
return this;
}
// 告诉状态机是否发生了任何事件
public Boolean IsCompleted
{
get { return m_events.Count > 0; }
}
// 状态机告诉我们最后要调用什么方法, 我们把它保存起来
public void OnCompleted(Action continuation)
{
Volatile.Write(ref m_continuation, continuation);
}
// 状态机查询结果; 这是await操作符的结果
public TEventArgs GetResult()
{
TEventArgs e;
m_events.TryDequeue(out e);
return e;
}
#endregion
//如果引发了事件, 多个线程可能同时调用
public void EventRaised(Object sender, TEventArgs eventArgs)
{
m_events.Enqueue(eventArgs); // 保存EventArgs以便从GetResult/await返回
// 如果有一个等待进行的延续任务, 该线程会运行它
Action continuation = Interlocked.Exchange(ref m_continuation, null);
if (continuation != null) continuation(); // 恢复状态机
}
}
}
笔者自定义的EventAwaiter<TEventArgs>
提供了GetAwaiter()
、isCompleted()
、onCompleted(Action continuation)
、GetResult()
几个重要的方法,其实这几个方法恰好对应了第3.1中“异步函数如何转化为状态机”中状态机需要操作的各个方法,在3.1中笔者给出一张状态机执行的流程图
笔者接下来结合这个案例,说一说本例的流程:
- 当执行到
await eventAwaiter
时,会去调用eventAwaiter
的GetAwaiter()
方法,然后得到Awaiter
对象。 - 查询
Awaiter
对象和IsCompleted()
方法,判断当前Awaiter
是否发生了事件。 - 若
Awaiter
还没有发生事件,就调用OnCompleted(Action)
方法,并且传递一个Action
委托给OnCompleted()
方法,其中的Action
委托里就包含了恢复状态机的逻辑。 - 此时还没有线程执行恢复状态机的代码,
await eventWaiter
的线程将会被阻塞。 - 当结合本例的程序逻辑,当出现异常时
EventRaised
会被调用,然后在EventRaised
中会恢复状态机,唤醒await eventWaiter
阻塞的线程。 - 状态机然后会再次调用
IsCompleted方法
判断是否有事件,这时m_events 已经有一个事件了,所以IsCompleted
会返回true
。 - 状态机接着调用
GetResult
,并且将结果值赋值给await关键字的表达式。
扩展C#中的异步方法(网络摘要)
关于如何控制异步方法机制有3种方法:
- 在
System.Runtime.CompilerServices
命名空间中提供你自己的async method builder
。 - 使用自定义的
task awaiter
。 - 定义你自己的“类任务”(task-like) 类型
异步方法被C#编译器转换从而生成的状态机是依靠于某些预定义的类型的。但是C#编译器却并不一定要求这些众所周知的类型来自于某个特定的程序集。例如,你可以在你的项目中提供自己对AsyncVoidMethodBuilder
的实现,然后C#编译器就会把异步机制“绑定”到你的自定义类型。
“类任务”(Task-like)类型
从支持async/await
的编译器的第一个版本(即C# 5)开始,就可以自定义awaiter了。这个可扩展性十分有用但是却是有限的,因为所有的异步方法都必须返回void
,Task
或Task<T>
。从C# 7.2开始,编译器支持“类任务”类型。
“类任务”类型是一个class或者struct,它与一个通过AsyncMethodBuilderAttribute标识
的builder类型
相关联。要使“类任务”类型有用,它应该像我们前面描述的awaiter那样是可等待的。基本上,“类任务”类型结合了前面描述的两种可扩展性的方法,并且使第一种方法得到了正式支持。
现在你还必须自己定义这个attribute,例子:github。