Parallel的静态For,ForEach和Invoke方法
在一些常见的编程情形中,使用任务也许会提升性能。为了简化编程,静态类System.Threading.Tasks.Paraller
封装了这些常见的情形,它内部使用Task对象。
例如,不要像下面一样处理一个集合中的所有项:
// 不建议写法
// 一个线程顺序执行这个工作(每次迭代调用一次DoWork)
for (Int32 i = 0; i< 1000; i++ ) DoWork(i);
相反,可以使用Parallel
类型的For方法
,让多个线程池线程辅助完成这个工作:
// 建议写法
// 线程池的线程并行处理工作
Parallel.For(0, 1000, i => DoWork(i));
类似的,如果有一个集合,那么不要像下面这样写:
// 不建议写法
// 一个线程顺序执行这个工作(每次迭代调用一次DoWork)
foreach ( var item in conllection) DoWork(item);
// 建议写法
// 线程池的线程并行处理工作
Parallel.ForEach(conllection,item=>DoWork(item));
如果代码中既可以用For
,也可以用ForEach
,那么建议使用For
,因为它执行的快一点。最后,如果要执行几个方法,那么可以顺序执行它们,如下所示:
// 一个线程顺序执行所有方法
Method1();
Method2();
Method3();
// 线程池的线程并行执行
parallel.Invoke(
() => Method1(),
() => Method2(),
() => Method3());
Parallel
的所有方法都让调用线程参与处理。从资源利用的角度说,这是一件好事,因为我们不希望调用线程停下来(阻塞),等待线程池做完所有工作后才继续。然而,如果调用线程在线程池完成自己的那一部分工作之前完成工作,调用程序就会将自己挂起,直到所有工作完成。这也是一件好事,因为这个提供了和普通for和foreach循环时相同的语义:线程要在所有工作后才继续运行。还要注意,如果任何操作抛出一个未处理的异常,你调用的paraller方法
最后会抛出一个AggregateException
。
当然,这并不是说需要检查自己的所有源代码,将for循环替换成Parallel.For的调用。调用Parallel的方法时,有一个前提条件务必记住:工作项要能并行执行。因此,如果工作项必须顺序执行,就不要调用Parallel的方法。另外,要避免会修改任何共享数据的工作项,因为多个线程同时处理的数据可能损坏。为了解决这个问题,一般的方法就是围绕数据访问添加线程同步锁。但是这样一来,一次就只能有一个线程访问数据,无法享受并行处理多个想带来的好处。
除此之外,Parallel的方法
本身也有开销:委托对象必须分配,而针对每一个工作项,都要调用一次这些委托。如果有大量可由多个线程处理的工作项,那么也许会获得性能的提升。但是,如果只为区区几个工作项使用Parallel
的方法,或者为处理得非常快的工作项使用Parallel
就会得不偿失, 反而降低性能。
Parallel
的For
,ForEach
和Invoke方法
都能接受一个ParallelOptions对象
的重载版本。这个对象的定义如下:
// 存储用于配置 Parallel 类的方法的操作的选项。
public class ParallelOptions
{
// 初始化 ParallelOptions 类的新实例
public ParallelOptions();
// 允许取消操作, 默认None
public CancellationToken CancellationToken { get; set; }
// 允许指定可以并发操作的最大工作项数目,默认为-1(可用CPU数)
public int MaxDegreeOfParallelism { get; set; }
// 指定调度器, 默认为TaskScheduler.Default
public TaskScheduler TaskScheduler { get; set; }
}
除此之外,For
和ForEach
方法有一些重载版本允许传递3个委托:
- 任务局部初始化委托(localInit),为参与工作的每一个任务都调用一次委托。这个委托是在任务被要求处理一个工作项之前调用。
- 主体委托(body),为参与工作的各个线程所处理的每一项都调用一次委托。
- 任务局部终结委托(localFinally),为参与工作的每一个任务都调用一次委托。这个委托是在任务处理好派遣给它的所有工作之后调用。即使主体委托引发一个未处理的异常,也会调用它。
以下示例代码演示了如何利用3个委托,计算一个目录中的所有文件的字节长度总计值:
private static Int64 DirectoryBytes(String path, String searchPattern, SearchOption searchOption)
{
// 当前目录中文件的可枚举集合。
var files = Directory.EnumerateFiles(path, searchPattern, searchOption);
Int64 masterTotal = 0;
ParallelLoopResult result = Parallel.ForEach<String, Int64>(files,
() =>
{
// localInit: 每个任务开始之前调用一次
// 每个任务开始之前,总计值都初始化为0
return 0;
},
(file, parallelLoopState, index, taskLocalTotal) =>
{
// body: 每个任务调用一次
// 获得这个文件的大小,把它添加到这个任务的累加值上
Int64 fileLength = 0;
FileStream fs = null;
try
{
fs = File.OpenRead(file);
fileLength = fs.Length;
}
catch (IOException) { /* 忽略拒绝访问的文件 */ }
finally { if (fs != null) fs.Dispose(); }
return taskLocalTotal + fileLength;
},
taskLocalTotal =>
{
// localFinally: 每个任务完成后调用一次
// 将这个任务的总计值(taskLocalTotal)加到中的总计值(masterTotal)上去
Interlocked.Add(ref masterTotal, taskLocalTotal);
});
return masterTotal;
}
每个任务都通过taskLocalTotal
变量为分配给它的文件维护自己的总计值。每个任务完成工作之后,都调用Interlocked.Add方法
[对两个 32 位整数进行求和并用和替换第一个整数],以一种线程安全的方式更新总的总计值。由于每个任务都有自己的总计值,可以在一个工作项处理期间,无需进行线程同步。由于线程同步会造成性能的损失,所以不需要线程同步是一件好事。只有在每个任务返回之后,masterTotal才需要以一种线程安全的方式更新materTotal变量。所以,因为调用Interlocked.Add
方法而造成的性能损失每个任务只发生一次,而不会每个工作项都发生。
注意,我们向主体委托传递一个ParallelLoopState
对象,它的定义如下:
// 可用来使 Parallel 循环的迭代与其他迭代交互
public class ParallelLoopState
{
// 获取循环的任何迭代是否已引发相应迭代未处理的异常
public bool IsExceptional { get; }
// 获取循环的任何迭代是否已调用 Stop
public bool IsStopped { get; }
// 获取从中调用 Break 的最低循环迭代。
public long? LowestBreakIteration { get; }
// 获取循环的当前迭代是否应基于此迭代或其他迭代发出的请求退出。
public bool ShouldExitCurrentIteration { get; }
// 告知 Parallel 循环应在系统方便的时候尽早停止执行当前迭代之外的迭代。
public void Break();
// 告知 Parallel 循环应在系统方便的时候尽早停止执行。
public void Stop();
}
参与工作的每一个任务都会获得它自己的ParallelState
对象,并可通过这个对象和参与工作的其他任务进行交互。Stop方法
告诉循环停止处理任何更多的工作,未来对IsStopped属性的查询会返回true。Break方法
告诉循环不再继续处理当前项之后的项。例如,假如ForEach
被告知要处理100项,并在第5项时调用了Break,那么循环会确保前5项处理好之后,ForEach才返回。但注意,这并不是说在这100项中,只有前5项被处理,也许第5项之后可能在以前已经处理过了。LowestBreakIteration
属性返回在处理过程中调用过Break方法的最低的项。从来没有调用过Break,LowestBreakIteration会返回null.
处理任何一项时,如果造成一个未处理的异常,IsExceptional
属性会返回true。如果处理一项时会花费大量的时间,代码可查询ShouldExitCurrentIteration属性
看它是否应该提前退出。如果调用过Stop,调用过Break,取消过CancellationTokenSource,或者处理一项时造成了未处理的异常,这个属性就会返回true。
Parallel的For和ForEach方法都返回一个ParallelLoopResult
实例,他看起来像下面这样:
// 可以根据以下两属性判断某个线程调用Break方法还是Stop方法
public struct ParallelLoopResult
{
// 如果操作提前终止, 以下方法返回false
public bool IsCompleted { get; }
// 获取从中调用 System.Threading.Tasks.ParallelLoopState.Break() 的最低迭代的索引。
public long? LowestBreakIteration { get; }
}
可通过检查属性来了解循环的结果,如果IsCompleted
返回true。表明循环运行完成,所有项都得到了处理。如果IsCompleted
为false,而且LowestBreakIteration
为null,表明参与工作的某个线程调用了Stop方法。如果LowestBreakIteration
返回false,而且LowestBreakIteration
不为null,表名参与工作的某个线程调用的Break方法,LowestBreakIteration
返回的Int64值指明了保证已得到处理的最低一项的索引。 如果抛出异常, 应捕捉AggregateException来得体地恢复.
并行语言集成查询 PLINQ
Microsoft的语言集成查询(LINQ)功能
提供了一个简捷的语法来查询数据集合。使用LINQ,可轻松对数据线进行筛选、排序、投射等。使用LINQ to Object
时,只有一个线程顺序处理数据集合中的所有项;我们称为顺序查询。为了提高处理性能,可以使用并行LINQ(Parallel LINQ)
,它将顺序查询转换成为一个并行查询,在内部使用任务(这些任务的排列由默认TaskScheduler来调度),将集合中的数据项的处理工作分散到多个CPU上,一边并发处理多个数据。和Prarllel的方法相似,如果同时要处理大量项,或者每一项的处理过程都是一个耗时的计算限制的操作,那么就可以使用PLINQ
获得最大的收益。
静态System.Linq.ParallelEnumerable
类(在System.Core.dll
中定义)实现了PLINQ
的所有功能,所以必须通过C#的using指令将System.Linq命名空间导入到你的源码中。尤其是,这个类公开了所有标准LINQ操作符的并行版本,比如Where
,Select
,SelectMany
,GroupBy
,Join
,Skip
,Task
等。所有这些方法都是扩展了System.Linq.ParallelQuery<T>类型
的扩展方法。
为了让自己的LINQ to Object
查询调用这些方法的并行版本,必须将自己的顺序查询(基于IEnumberable
或者IEnumerable<T>
)转换成并行查询(基于ParallelQuery
或者ParallelQuery<T>
),这是用ParallelEnumerable
的AsParallel
扩展方法来实现的,如下所示:
public static ParallelQuery<TSource> AsParallel<TSource>(this IEnumerable<TSource> source)
下面是将一个顺序查询转换成并行查询的例子。查询返回的是一个程序集中定义的所有过时(obsolete)方法。
public class MyClass
{
[Obsolete("过时函数不应使用", false)]
public void testobsolete(bool a)
{
}
}
class Program
{
static void Main(string[] args)
{
ObsoleteMethods(typeof(MyClass).Assembly);
}
private static void ObsoleteMethods(Assembly assembly)
{
var query =
from type in assembly.GetExportedTypes().AsParallel() // 将顺序查询转换成并行查询
from method in type.GetMethods(BindingFlags.Public | BindingFlags.Instance | BindingFlags.Static)
let obsoleteAttrType = typeof(ObsoleteAttribute)
where Attribute.IsDefined(method, obsoleteAttrType)
orderby type.FullName
let obsoleteAttrObj = (ObsoleteAttribute) Attribute.GetCustomAttribute(method, obsoleteAttrType)
select String.Format("Type={0}\nMethod={1}\nMessage={2}\n", type.FullName, method.ToString(), obsoleteAttrObj.Message);
// 显示结果
foreach (var result in query)
Console.WriteLine(result);
// 控制台输出:
// Type = ConsoleApplication.MyClass
// Method = Void testobsolete(Boolean)
// Message = 过时函数不应使用
}
}
在一个查询中,可以从执行并行操作
切换回执行顺序操作
,这是通过调用ParallelEnumerable
的AsSequential方法
做到的.
public static IEnumerable <TSource> AsSequential<TSource>(this ParallelQuery<TSource> source)
这个方法将一个ParallQuery<T>
转换回一个IEnumerable<T>
。这样一来,在调用了AsDequential
之后执行的操作将只由一个线程执行。
通常,一个LINQ查询
的结果数是让某个线程执行一个foreach语句
计算获得的。这意味着只有一个线程遍历查询的所有结果。如果希望以并行的方式处理查询的结果,就应该使用ParallelEnumerable
的ForAll
方法处理查询:
static void ForAll<TSource>(this ParallelQuery<TSource> source,Action<TSource> action)
这个方法允许多个线程同时处理结果,可以修改前面的代码来使用该方法:
// (单线程对结果进行操作)显示结果
foreach (var result in query)
Console.WriteLine(result);
//(多线程同时处理结果)显示结果
query.ForAll(Console.WriteLine);
然而,让多个线程同时调用Console.WriteLine
反而会损害性能,因为Console类
内部会对线程进行同步,确保每次只有一个线程 能访问控制台程序窗口,避免来自多个线程的文本最后显示成一团乱麻。希望为每个结果都执行计算时,才使用ForAll方法。
由于PLINQ
可用多个线程处理数据项,所以数据项被并发处理,结果被无序返回。如果需要让PLINQ
保存数据项的顺序,可调用ParallelEnumerable
的AsOrderd
方法。调用这个方法时,线程会成组处理数据项。然后,这些数据项被合并回去,以保持顺序。这样会损害到性能。
以下操作符生成不排序的操作:Distinct
,Except
,Intersect
,Union
,Join
,GroupBy
,GroupJoin
和ToLookup
。在这些操作符之后,如果想再次强制排序,只需调用AsOrdered
方法。
以下操作符生成排序的操作:OrderBy
,OrderByDescending
,Thenby
和ThenByDescending
。在这些出操作符之后,如果想再次恢复不排序的处理,只需调用AsUnordered
方法。
PLINQ提供了一些额外的ParallelEnumerable
方法,可调用它们来控制查询的处理方式:
// 设置要与查询关联的 CancellationToken
public static ParallelQuery<TSource> WithCancellation<TSource>(this ParallelQuery<TSource> source, CancellationToken cancellationToken);
// 设置要在查询中使用的并行度。 并行度是将用于处理查询的同时执行的任务的最大数目。
public static ParallelQuery<TSource> WithDegreeOfParallelism<TSource>(this ParallelQuery<TSource> source, int degreeOfParallelism);
// 设置查询的执行模式。
public static ParallelQuery<TSource> WithExecutionMode<TSource>(this ParallelQuery<TSource> source, ParallelExecutionMode executionMode);
//设置此查询的合并选项,它指定查询对输出进行缓冲处理的方式。
public static ParallelQuery<TSource> WithMergeOptions<TSource>(this ParallelQuery<TSource> source, ParallelMergeOptions mergeOptions);
显然,WithCancellation
方法允许传递一个CancellationToken
,使查询处理能提前停止。WithDegreeOfParallelism
方法指定最多允许多少个线程处理查询;他不会强迫创建满全部线程,如果并不是全部都需要的话。你一般不会调用这个方法。另外,默认情况下,会为每个内核用一个线程来执行查询。但如果想空出一些内核做其他工作, 可调用WithDegreeOfParallelism
并传递小于可用内核数的一个数字. 另外, 如果查询要执行同步I/O操作,还可传递比内核数大的数字, 因为线程会在这些操作期间阻塞, 这虽然会浪费更多线程, 但我强烈建议不要在服务器应用程序中执行同步I/O操作.
PLINQ分析一个查询,然后决定如何最好地处理它。有的时候,顺序处理一个查询可以获得更好的性能,尤其在使用以下任何操作时:Concat
,ElementAt(OrDefault)
,First(OrDefault)
,Last(OrDefault)
,Skip(While)
,Task(While)
或Zip
。使用Select(Many)
或Where
的重载版本,并向你的selector
或predicate
委托传递一个位置索引时也是如此。然而,可以调用WithExecutionMode
,向它传递某个ParallelExecuteMode
标志,从而强迫查询以并行方式处理:
public enum ParallelExecutionMode
{
Default = 0, // 让并行LINQ决定处理查询的最佳方式
ForceParallelism = 1 // 强迫查询以其并行方式处理
}
如前所述,并行LINQ让多个线程处理数据项,结果必须再合并回去。可调用WithMergeOptions
向它传递以下某个ParallelMargeOptions
标志,从而控制这些结果的缓冲和合并方式:
// 指定查询中要使用的输出合并的首选类型。 换言之,它指示 PLINQ 如何将多个分区的结果合并回单个结果序列。 这仅是一个提示,系统在并行处理所有查询时可能不会考虑这一点。
public enum ParallelMergeOptions
{
// 使用默认合并类型,即 AutoBuffered。
Default = 0,
// 不利用输出缓冲区进行合并。 一旦计算出结果元素,就向查询使用者提供这些元素。
NotBuffered = 1,
// 利用系统选定大小的输出缓冲区进行合并。 在向查询使用者提供结果之前,会先将结果累计到输出缓冲区中。
AutoBuffered = 2,
// 利用整个输出缓冲区进行合并。 在向查询使用者提供任何结果之前,系统会先累计所有结果。
FullyBuffered = 3,
}
这些选项使你能在某种程度上控制速度和内存消耗的对应关系。NotBuffered
最省内存,但处理速度慢一些。FullyBuffered
消耗较多内存,但运行得最快。AutoBuffered
介于NotBuffered
和FullyBuffered
之间,最好亲自试验所有选项,并对比其性能,来选择那种方式。
执行定时计算限制操作
System.Threading
命名空间定义了一个Timer类
,可用它让一个线程池线程定时调用一个方法。构造Timer
类的一个实例相当于告诉线程池:在将来的某个时间会回调你的一个方法。Timer类提供了几个构造函数,相互都非常相似:
public sealed class Timer : MarshalByRefObject, IDisposable
{
public Timer(TimerCallback callback, object state, int dueTime, int period);
public Timer(TimerCallback callback, object state, long dueTime, long period);
public Timer(TimerCallback callback, object state, TimeSpan dueTime, TimeSpan period);
public Timer(TimerCallback callback, object state, uint dueTime, uint period);
}
4个构造器以完全一样的方式构造Timer
对象。
callback参数
标识希望由一个线程池线程回调的方法。当然,你写的对调方法必须和System.Threading.TimerCallback委托
类型匹配,如下所示:
delegate void TimerCallback(Object state);
state参数
允许在每次调用回调方法时都像它传递状态数据;如果没有需要传递的状态数据,可以传递null。
dueTime参数
告诉CLR在首次调用回调方法之前要等待多少毫秒。可以使用一个有符号或无符号的32位值,一个有符号的64位值或者一个TimeSpan值
指定毫秒数。如果希望回调方法立即调用,为dueTime参数
指定0即可。
最后一个period周期
参数指定了以后每次调用回调方法需要等待的时间(毫秒)。如果为这个参数传递Timeout.Infinite(-1)
,线程池线程值调用回调方法一次。
在内部,线程池为所有Timer对象
只使用了一个线程。这个线程知道下一个Timer对象
在什么时候到期。下一个Timer对象
到期时,线程就会唤醒,在内部调用TreadPool
的QueueUserWorkItem
,将一个工作项添加到线程池队列中,使你的回调方法得到调用。如果回调方法的执行时间很长,计时器可能(在上个回调还没有完成时)再次触发。这个能造成多个线程同时执行你的回调方法。为解决这个问题,我的建议是:构造Timer
时,为period
参数指定Timeout.Infinite
。这样,计时器就只触发一次。然后,在你的回调方法中,调用Change
方法指定一个新的dueTime
,并再次为period参数
指定Timeout.Infinite
。
以下是Change
方法的各个重载版本.
public sealed class Timer : MarshalByRefObject, IDisposable
{
public bool Change(int dueTime, int period);
public bool Change(long dueTime, long period);
public bool Change(TimeSpan dueTime, TimeSpan period);
public bool Change(uint dueTime, uint period);
}
Timer
类还提供了Dispose
方法,允许完全取消计时器,并可在当时处于pending状态的所有回调完成之后,向notifyObject参数
标识的内核对象发送信号。以下是Dispose方法的各个重载版本:
public sealed class Timer : MarshalByRefObject, IDisposable
{
public void Dispose();
public bool Dispose(WaitHandle notifyObject);
}
提示:一个Timer
对象被垃圾回收时,它的终结代码告诉线程池取消计时器,使它不再触发。所以,使用一个Timer
对象时,要确定有一个变量在保持Timer对象的存活,否则对你的回调方法调用就会停止。
以下代码演示了如何让一个线程池线程立即开始调用一个回调方法,以后每2秒钟调用一次:
class Program
{
private static Timer s_timer;
static void Main(string[] args)
{
Console.WriteLine("每2秒检查一次状态");
// 创建但不启动计时器
// 确保s_timer在线程池线程调用 Status 之前引用该计时器
s_timer = new Timer(Status, null, Timeout.Infinite, Timeout.Infinite);
// 现在s_timer已被赋值, 可以启动计时器
// 现在在Status中调用Change, 保证不会抛出NullReferenceException
// (防止被垃圾回收, 确定有一个变量在保持Timer对象的存活)
s_timer.Change(0, Timeout.Infinite);
Console.ReadLine(); // 防止进程终止
}
// 这个方法签名必须匹配 TimerCallback 委托匹配
private static void Status(Object state)
{
// 这个方法由一个线程池线程执行
Console.WriteLine("In Status at {0}", DateTime.Now);
Thread.Sleep(1000); // 模拟其他工作1秒钟
// 返回前放Timer在2秒后再次触发
s_timer.Change(2000, Timeout.Infinite);
// 这个方法返回后, 线程回归池中, 等待下一个工作项
}
}
// In Status at 2019/9/26 12:58:50
// In Status at 2019/9/26 12:58:53
// In Status at 2019/9/26 12:58:56
// In Status at 2019/9/26 12:58:59
如果需要定时执行的操作, 可以利用Task
的静态Delay
方法和C#的async
和await
关键字.
class Program
{
private static Timer s_timer;
static void Main(string[] args)
{
Console.WriteLine("每2秒检查一次状态");
Status();
Console.ReadLine(); // 防止进程终止
}
// 这个方法可获取你想要的任何参数
private static async void Status()
{
while (true)
{
Console.WriteLine("检查Status在 {0}", DateTime.Now);
// 要检查的代码放到这里.
// 在循环末尾, 在不阻塞线程的前提下延迟2秒
await Task.Delay(2000); // await 允许线程返回
// 2秒之后, 某个线程会在await之后介入并继续循环
}
}
}
// 检查Status在 2019/9/26 13:09:14
// 检查Status在 2019/9/26 13:09:16
// 检查Status在 2019/9/26 13:09:18
// 检查Status在 2019/9/26 13:09:20
在不阻塞线程的前提下延迟2秒await Task.Delay(2000);
await 允许线程返回,2秒之后, 某个线程会在await之后介入并继续循环.
一些计时器类的说明
FCL事实上提供了几个计时器,大多是开发人员都不清楚每个计时器到底有什么独到之处,在这里试着解释一下:
System.Threading
的Timer类
- 这是刚刚讨论过的计时器。要在一个线程池线程上执行定时的(周期性发生的)后台任务,它是最好的计时器。
System.Windows.Forms
的Timer类
- 构造这个类的一个实例,相当于告诉Windows将一个计时器和调用线程关联。当这个计时器触发时,WIndows将一条计时器消息(WM_TIMER)注入线程的消息队列。线程必须执行一个消息泵来提取这些消息,并把它们派遣给想要的回调方法。注意,所有这些工作都只有一个线程完成——设置计时器的线程保证就是执行回调方法的线程。这还意味着你的计时器方法不会由多个线程并发执行。
System.Windows.Threading
的DispatcherTimer类
- 这个类是
System.Windows.Forms
的Timer类
在Siverlight和WPF应用程序中的等价物。
- 这个类是
System.Timers
的Timer类
- 这个计时器基本是
System.Threading
的Timer类
的一个包装类。当计时器到触发时,会导致CLR将事件放到线程池的队列中。尽量不要使用这个类而是使用System.Threading的Timer类。
- 这个计时器基本是
线程池如何管理线程
讨论线程池如何管理工作者线程和I/O线程. 最好是将线程池看做一个黑盒.
设置线程池限制
CLR允许开发人员设置线程池要创建最大线程数。但实践证明,线程池永远都不该为池中的线程数设置上限,因为可能发生饥饿或死锁。假如队列中有1000个工作项,但这些工作项全都因为一个事件而阻塞,等1001个工作项发出信号才能解除阻塞。如果设置最大1000个线程,第1001个工作项就不会执行,所有1000个线程都会一直阻塞,最终用户被迫终止应用程序,并丢失他们都做的为保存的工作。
由于存在饥饿和死锁问题,CLR团队一直都在稳步地增加线程默认能够拥有的最大线程数。目前默认值是最大1000个线程。这基本可以看成是不限数量的,因为一个32位进程最大有2G的可用选址空间。加载一组Win32和CLR DLLs,并分配了本地堆和托管堆之后,剩余约1.5G的地址控制。由于每个线程都要为其用户模式栈和线程环境块(TEB)准备超过1MB的内存,所以一个32位进程中,最多能够有1360个线程。试图创建更多的线程,会抛出一个OutOfMemotyExcption。当然,64位进程提供了8TB的地址空间,所以理论上可以创建千百万个线程.
System.Threading.ThreadPool类
提供了几个静态方法,可调用它们设置和查询线程池的线程数:GetMaxThreads
,SetMaxThreads
,GetMinThreads
,SetMinThreads
和GetAvailableThreads
[获得可用的线程数量]。强烈建议你不要调用上述任何方法。限制线程池的线程数,一般只会造成应用程序性能变得更差。
如何管理工作者线程
下图展示了构成线程池的一部分的工作者线程的各种数据结构。ThreadPool.QueueUserWorkItem
方法和Timer类
总是将工作项放到全局队列中。工作者线程采用一个先入先出算法将工作项从这个队列中取出来,并处理它们。由于多个工作者线程可能同时从全局队列中拿走工作项,所以所有工作者线程都竞争一个线程同步锁,以保证两个或多个线程不会获取同一个工作项。这个线程同步锁在某些应用程序中可能成为瓶颈,对伸缩性和性能造成某种程序的限制
现在,让我们谈谈使用默认TaskScheduler
(通过查询TaskScheduler
的静态Default
属性来获得)来调度的Task
对象。当一个非工作者线程调度一个Task
时,Task
会添加到全局队列中。
但是,每个工作者线程都有它自己的本地队列。当一个工作者线程调度一个Task
时,Task
会添加到调用线程的本地队列中。
一个工作者线程准备好处理一个工作项时,它总是先检查它的本地队列来查找一个Task
。存在一个Task,工作者线程就从它的本地队列中移出Task
,并对工作项进行处理。要注意的是,工作者线程采用后入先出的算法将任务从它的本地队列中取出。由于工作者线程是唯一允许访问它自己的本地队列列头的线程,所以无需同步锁,而且在队列中添加和删除Task的速度非常快。这个行为的副作用在于,Task是按照和进入队列时相反的顺序执行的。
重要提示:线程池从来不保证排队中的工作项的处理顺序,这是合理的,尤其考虑到多线程可能同时处理工作项。然而,上述副作用使得这个问题变得更加恶化。你必须保证自己的应用程序对工作项或Task的执行顺序不做任何预设。
如果一个工作者线程发现它的本地队列变空了,工作者线程就会尝试从另一个工作者线程的本例队列中”偷”一个Task。这个Task是从一个本地队列的尾部“偷”走的,并要求获得一个线程同步锁,这对性能可能有少许影响。当然,这种”偷窃”行为很少发生,所以很少需要获取这个锁。
如果所有本地队列都变空,那么工作者线程会使用FIFO(先进先出)算法,从全局队列中提取一个工作项。
如果全局队列也为空,那么线程就会进入睡眠状态,等待事情的发生。如果睡眠的时间太长,它会自己醒来,并销毁自己,允许系统回收线程使用的资源(包括内核对象、栈、TEB等)。
线程池会快速创建工作者线程,使工作者线程的数量等于传给ThreadPool
的SetMinThreads
方法的值。如果从不调用这个方法(也不建议你调用),那么默认值等于你的进程允许使用的CPU数,这是由线程的affinity mask(关联掩码)决定的。通常,你的进程允许使用机器上的所有CPU数,所以线程池创建的工作者线程数量很快就会达到机器上的CPU数。创建了这么多的的线程后,线程池会监视工作项的完成速度。如果工作项完成的时间太长,线程池会创建更多的工作者线程。如果工作项的完成速度开始变快,工作者线程会被销毁。
缓存线和伪共享(第四版没这节)
为了提升反复访问内存的性能,如今的CPU在芯片上都集成了高速缓存
。线程首次从RAM去取一些值时,CPU从RAM获取所需的值,并把它存储到CPU的高速缓存
中。事实上,为了进一步提升性能,CPU会在逻辑上将所有内存都划分为所谓的缓冲行(cache line)
。一个缓冲行有64个字节构成,所以CPU从RAM中获取并存储64字节的块。如果应用程序需要读取一个Int32值,那么会获取包含了那个Intt32的64个字节,这样会获取到比需要的更多字节,这样通常会造成性能增强,因为大多数应用程序在访问了一些数据之后,通常会继续访问存储在那些数据周围的数据。由于相邻的数据已经提取到CPU的缓存中,就避免了慢速的RAM访问。
然而,如果两个或多个内核访问同一个缓冲行
中的字节,内核必须相互通信,并在内核之间传递缓冲行,造成多个内核不能同时处理相邻的字节,这对性能会造成严重影响。
internal static class FalseSharing
{
#if true
private class Data
{
// 这两个字段是相邻的,并(极有可能)在相同的缓冲行中
public Int32 field1;
public Int32 field2;
}
#else
// 现在,让我们修改Data类,使它看起来项下面这样
// 这两个字段分开了,不再相同的缓冲行中
// 比上述写法运行速度上快一些
[StructLayout(LayoutKind.Explicit)]
private class Data {
[FieldOffset(0)]
public Int32 field1;
[FieldOffset(64)]
public Int32 field2;
}
#endif
private const Int32 iterations = 100000000;
private static Int32 s_operations = 2;
private static Stopwatch s_stopwatch;
public static void Go()
{
// 分配一个对象,并记录开始时间
Data data = new Data();
s_stopwatch = Stopwatch.StartNew();
// // 让零个线程访问在对象中它们自己的字段
ThreadPool.QueueUserWorkItem(o => AccessData(data, 0));
ThreadPool.QueueUserWorkItem(o => AccessData(data, 1));
Console.ReadLine();
}
private static void AccessData(Data data, Int32 field)
{
// 这里的线程各自访问它们在Data对象中自己的字段
for (Int32 x = 0; x < iterations; x++)
if (field == 0) data.field1++;
else data.field2++;
// 不管哪个线程最后结束,都显示它花的时间
if (Interlocked.Decrement(ref s_operations) == 0)
Console.WriteLine("Access time: {0}", s_stopwatch.Elapsed);// Elapsed经过的时间
}
}
上述代码中,Data对象在构造时包含了两个字段。这两个字段极有可能在同一个缓冲行中。然后,两个线程池线程启动并执行AccessData
方法。一个将1加到Data的filed1上的100 000 000 次,另一个线程对filed2字段做同样的事情。每个线程完成后,都递减s_operations字段中的值;最后一个将字段递减为0的线程就是最后一个结束的线程,它显示两个线程完成它们的工作总共发了多少时间。
在上述代码中,现在用一个缓存线(64字节)分隔两个字段。再次运行,比第一个版本快了一些。从程序角度看,两个线程处理的是不同的数据。但从CPU缓存线来看,CPU处理的是相同的数据。这称为伪共享(false sharing) 。在第二个版本中,字段在不同的缓存线上,所以CPU可以真正做到独立,不必共享什么。
通过上述讨论,应该知道在多个线程同时访问相邻的数据时,缓存线
和伪共享
可能对应用程序产生严重影响。在性能非常紧要的情形下,这是你应该注意的一点。如果检查到这个问题,通常都可以设计出一种方式来避免它(这里用的就是FiledOffset attribute
)。
要注意的是,数组在数组内存起始处维护着它的长度,具体位置是在前几个数据元素之后,访问一个数组元素时,CLR验证你使用的索引在数组的长度之内。这意味着访问一个数组总是牵涉到访问数组的长度。因此,为了避免产生额外的伪共享,应该避免让一个线程向数组的前几个元素写入,同时让其他线程访问数组中的其他元素。