著名的双检锁技术
双检锁(Double-Check Locking) 是一个非常著名的技术,开发人员用它将 单实例(Singleton) 对象的构造推迟到应用程序首次请求该对象时进行。有时也称为 延迟初始化(Lazy initialization)。
如果应用程序永远不请求对象,对象就永远不会构造,从而节约了事件和内存。但当多个线程同时请求单实例对象时就可能出现问题。这个时候必须使用一些线程同步机制确保单实例对象只被构造一次。
然而CLR很好的支持了双检锁技术,以下代码演示了如何使用C#实现双检锁技术:
// 大多数时候这个双检锁技术实际会损害效率
public sealed class Singleton
{
// s_lock对象是实现线程安全所需要的。定义这个对象时,
// 我们假设创建单实例对象的代价要高于创建一个System.Object对象,
// 并假设可能根本不需要创建单实例对象
// 否则更经济更简单的办法是在类构造器中创建单实例对象
private static Object m_lock = new Object();
// 这个字段引用单实例对象
private static Singleton s_value = null;
// 私有构造器,阻止在这个类的外部创建类的实例
private Singleton()
{
}
// 以下公共静态方法返回单实例对象
public static Singleton GetSingleton()
{
// 如果单实例对象已经创建, 直接返回它
if (s_value != null) return s_value;
// 没创建,让一个线程创建它
Monitor.Enter(m_lock);
if (s_value == null)
{
//仍未创建,创建它
Singleton temp = new Singleton();
//将引用保存到s_value中
Volatile.Write(ref s_value, temp);
}
Monitor.Exit(m_lock);
// 返回单实例对象的引用
return s_value;
}
}
双检锁背后的思路在于,对GetSingleton
方法的调用可以快速检查s_value字段
, 判断对象是否创建.
- 如果是, 方法就返回对它的引用
- 这里的妙处就在于, 如果对象已经构造好, 就不需要线程同步, 程序会运行的很快.
- 如果没创建, 就会获取一个同步锁来确保只有一个线程构造单实例对象.
- 这就意味着只有线程第一次查询单实例对象时, 才会出现性能上的损失.
此技术在java虚拟机会出现问题.
在CLR中, 对任何锁方法的调用都构成了一个完整的内存栅栏.
- 在栅栏之前的任何变量的写入必须在栅栏之前完成.
- 在栅栏之后的任何变量读取都必须在栅栏之后开始.
对于GetSingleton
方法, 这意味着s_value
字段的值必须在调用了Monitor.Enter
之后重新读取. 调用前缓存到寄存器的东西不算.
Volatile.Write作用
Volatile.Write
这个的作用, 假如在第二个if语句中包含的是下面这句代码:
s_value = new Singleton(); // 你极有可能这样写
你的想法是让编译器为一个Singleton
分配内存, 调用构造器来初始化字段, 再将引用赋给s_value
,但是编译器可能这样做:
- 为Singleton分配内存,
- 将引用赋给s_value,
- 再调用构造器. (从单线程的角度出发, 这样改变顺序是无关紧要的)
但是在引用赋给s_value
之后,在调用构造器之前, 如果另一个线程调用了GetSingleton
方法,就会发生s_value
这个值不为null
, 就会开始使用Singleton
对象, 但是此对象的构造器还没有执行.
Volatile.Write
就修正了这个问题, 它保证temp
中的引用只有在构造器结束执行之后,才发布到s_value
中.
解决这个问题的另一个办法就是使用C#的volatile
关键字标记s_value
字段. 这同样使构造器必须在写入发生前结束运行. 但不好的地方, 同时会使所有读取操作具有易变性
. 这是完全没有必要的.会使性能无谓地收到损害.
不使用双检锁的更好版本
internal sealed class Singleton
{
private static Singleton s_value = new Singleton();
//私有化构造器防止这个类外部的任何代码创建一个实例
private Singleton(){}
// 以下公共静态方法返回单实例对象(第一次调用之后才会创建)
public static Singleton GetSingleton()
{
return s_value;
}
}
首次调用GetSingleton
方法,CLR就会自动调用类构造器,从而创建一个对象实例. CLR保证对类构造器的调用时线程安全的.(第8章3节中解释了) 首次访问类的任何成员都会调用类型构造器. 如果定义了其他静态成员, 就会在访问其他任何静态成员时创建Singleton
对象.有人通过定义嵌套类来解决这个问题.
另一种生成Singleton方式
internal sealed class Singleton
{
private static Singleton s_value = null;
private Singleton(){}
public static Singleton GetSingleton()
{
if(s_value != null) return s_value;
//创建一个新的单实例对象,并把它固定下来(如果另一个线程还为固定的话)
Singleton temp = new Singleton();
Interlocked.CompareExchange(ref s_value,temp,null);
//如果这个线程竞争失败,新建的第二个实例对象就会被回收
return s_value;
}
}
上面的代码保证了只有在第一个调用GetSingleton()
方法方法时,才会构建单实例对象。但是缺点也是明显的,就是可能会创建多个Singleton对象,但是最终Interlocked.CompareExchange
只会固定一个Singleton
实例对象。 没有通过固定的对象会被垃圾回收, 但是一般很少同时调用GetSingleton()
的情况.
上述代码面的优势:
- 速度非常快
- 永不阻塞线程
此方法只有在构造器没有副作用的时候才能使用这个技术.
FCL的Lazy类封装了上述描述的模式
System.Lazy
和System.Threading.LazyInitializer
是FCL封装提供的延迟构造的类。
public class Lazy<T>
{
public Lazy(Func<T> valueFactory, LazyThreadSafetyMode mode);
public Boolean IsValueCreated { get; }
public T Value { get; }
}
public enum LazyThreadSafetyMode
{
None, // 完全没有线程安全支持(适合GUI应用程序)
PublicationOnly, // 使用双检锁技术
ExecutionAndPublication // 使用Interlocked.CompareExchange技术
}
Lazy类用法:
// 创建一个"延迟初始化"包装器, 它将DateTime的获取包装起来
// // isThreadSafe ? LazyThreadSafetyMode.ExecutionAndPublication : LazyThreadSafetyMode.None
Lazy<String> s = new Lazy<String>(() => DateTime.Now.ToLongTimeString(), true);
Console.WriteLine(s.IsValueCreated); // 还没查询Value 所以返回false
Console.WriteLine(s.Value); // 现在调用
Console.WriteLine(s.IsValueCreated); // 能查询到Value 返回true
Thread.Sleep(10000); // 等待10秒, 再次显示时间
Console.WriteLine(s.Value); // Lambda方法委托没有被调用, 显示之前的结果
// False
// 20:18:54
// True
// 20:18:54
内存有限时可能不想创建Lazy类的实例 , 可以调用LazyInitializer
类的静态方法:
public static class LazyInitializer
{
// 这两个方法在内部使用Interlocked.CompareExchange
public static T EnsureInitialized<T>(ref T target) where T : class;
public static T EnsureInitialized<T>(ref T target, Func<T> valueFactory) where T : class;
// 这两个方法在内部将同步锁(syncLock)传给Monitor的Enter和Exit方法
public static T EnsureInitialized<T>(ref T target, ref bool initialized, ref object syncLock);
public static T EnsureInitialized<T>(ref T target, ref bool initialized, ref object syncLock, Func<T> valueFactory);
}
// 缓存激活选择器函数(activation selector function)以避免委托分配。
static class LazyHelpers<T>
{
internal static Func<T> s_activatorFactorySelector = new Func<T>(ActivatorFactorySelector);
private static T ActivatorFactorySelector()
{
try
{
return (T)Activator.CreateInstance(typeof(T));
}
catch (MissingMethodException)
{
throw new MissingMemberException(Environment.GetResourceString("Lazy_CreateValue_NoParameterlessCtorForT"));
}
}
}
EnsureInitialized
方法的syncLock参数
显式指定同步对象, 可以用同一个锁保护多个初始化函数和字段.
String name = null;
// 由于name是null, 所以委托执行并初始化name
LazyInitializer.EnsureInitialized(ref name, () => "Jeff");
Console.WriteLine(name); // Jeff
// 由于name不为null, 所以委托不运行
LazyInitializer.EnsureInitialized(ref name, () => "Richter");
Console.WriteLine(name); // Jeff
条件变量模式
假定一个线程希望在一个符合条件为true时执行一些代码. 一个选项是让线程连续”自旋”, 反复测试条件, 但这会浪费CPU时间. 也不可能对构造成符合条件的多个变量进行原子性的测试.
幸好, 有一个模式允许线程根据一个符合条件来同步他们的操作,而且不会浪费资源, 这个模式成为条件变量模式, 我们通过Monitor
类中定义的一下方法来使用该模式:
public static Boolean Wait(Object obj);
public static Boolean Wait(Object obj, Int32 millisecondsTimeout);
// Pulse 脉冲脉搏,使跳动
public static void Pulse(Object obj);
public static void PulseAll(Object obj);
下面演示了这个模式的写法:
public sealed class ConditionVariablePattern
{
private readonly Object m_lock = new Object();
private Boolean m_condition = false;
public void Thread1()
{
// 获取一个互斥锁
Monitor.Enter(m_lock);
//在锁中, 原子性地测试复合条件
while (!m_condition)
{
// 如果条件不满足, 就等待另一个线程更改条件
// 临时释放锁, 使其他线程能获取它
// 如果你使用了Monitor.Wait,调用的线程将阻止在这里,直到另外一个线程调用Monitor.Pulse.代码再继续往下执行
Monitor.Wait(m_lock);
}
// 条件满足,处理数据
Monitor.Exit(m_lock); // 永久释放锁
}
public void Thread2()
{
// 获取一个互斥锁
Monitor.Enter(m_lock);
// 处理数据并修改条件
m_condition = true;
// Monitor.Pulse(m_lock); // 释放锁之后唤醒一个正在等待的线程
Monitor.PulseAll(m_lock); // 释放锁之后唤醒所有正在等待的线程
Monitor.Exit(m_lock); // 释放锁
}
}
执行Thread1
方法的线程进入一个互斥锁, 然后对一个条件进行测试. 在这里, 我只是检查一个Boolean
字段, 但它可以是任意复合条件. 线程不是自旋,而是调用Wait
释放锁, 使另一个线程能获得它并阻塞调用线程. (如果你使用了Monitor.Wait,调用的线程将阻止在这里,直到另外一个线程调用Monitor.Pulse
.代码再继续往下执行)
Thread2
方法是第二个线程执行的代码, 它调用Enter
来获取锁的所有权,处理一些数据造成一些状态的改变. 再调用Pulse
或PulseAll
, 从而解除一个线程因为调用Wait而进入的阻塞状态. 注意, Pulse
只能解除等待最久的线程,而PulseAll
解除所有正在等待的线程的阻塞.
执行Thread2
线程Pulse
之后必须调用Monitor.Exit
, 允许锁由另一个线程拥有, 另外如果调用的是PulseAll
, 其他线程不会同时解除阻塞. 调用Wait
的线程解除阻塞后, 它成为锁的所有者, 由于这是一个互斥锁, 所以一次只能有一个线程拥有它. 其他线程只有在锁的所有者调用了Wait
或者Exit
之后才能得到它.
执行Thread1
的线程醒来时, 它进行下一次循环迭代, 再次对条件进行测试. 如果条件仍为false, 它就再次调用Wait. 如果条件为true,它就处理数据, 并最终调用Exit
. 这样就会将锁释放, 使其他线程能得到它.
这个模式的妙处在于, 可以使用简单的同步逻辑(只是一个锁)来测试构成一个符合条件的几个变量,而且多个正在等待的线程可以全部解除阻塞,而不会造成任何逻辑错误, 唯一的缺点就是解除线程的阻塞可能会浪费一些CPU时间.
线程安全的队列
允许多个线程在其中对数据项进行入队和出队操作. 注意, 除非有了一个可供处理的数据项, 否则试图出队的线程会一直阻塞.
public sealed class SynchronizedQueue<T>
{
private readonly Object m_lock = new Object();
private readonly Queue<T> m_queue = new Queue<T>();
public void Enqueue(T item)
{
Monitor.Enter(m_lock);
// 一个数据项入队后, 就唤醒 任何/所有 正在等待的线程
m_queue.Enqueue(item);
Monitor.PulseAll(m_lock);
Monitor.Exit(m_lock);
}
public T Dequeue()
{
Monitor.Enter(m_lock);
// 队列为空(这是条件)就一直循环
while (m_queue.Count == 0)
Monitor.Wait(m_lock);
// 使一个数据项出队, 返回它供处理
T item = m_queue.Dequeue();
Monitor.Exit(m_lock);
return item;
}
}
异步的同步构造
任何使用了内核模式的基元的线程同步构造, 作者都不是很喜欢. 因为所有这些基元都会阻塞一个线程的运行. 创建线程的代价很大, 创建了不用,这于情于理都说不通. 下面这个例子能很好地说明这个问题.
观察本章介绍的所有构造, 你会发现这些构造想要解决的许多问题其实最好都是用Task类完成.
Task具有下述许多优势:
- 任务使用的内存比线程少得多, 创建和销毁所需的时间也少得多.
- 线程池根据可用CPU数量自动伸缩任务规模.
- 每一个任务完成一个阶段后, 运行任务的线程回到线程池, 在那里能接受新任务.
- 线程池是站在整个进程的高度观察任务. 所以,它能更好地调度这些任务, 减少进程中的线程数, 并减少上下文切换.
锁很流行,但长时间拥有会带来巨大的伸缩性问题。如果代码能够通过异步的同步构造指出它想要一个锁,那么会非常有用。在这种情况下,如果线程得不到锁,可以直接返回并执行其他工作,而不必在哪里傻傻地阻塞。以后当锁可用时,代码可恢复执行并访问锁所保护的资源。
SemaphoreSlim类通过WaitAsync方法实现了这个思路,下面是这个方法最复杂的版本:
public Tast<Boolean> WaitAsync(Int32 millisecondsTimeout,CancellationToken cancellationToken);
可用它异步地同步对一个资源的访问(不阻塞任何线程):
private static async Task AccessResourceViaAsyncSynchronization(SemaphoreSlim asyncLock)
{
//do something
//请求获取锁对资源进行独占访问
await asyncLock.WaitAsync();
//执行到这里, 表明没有其他线程正在访问资源
//独占式访问资源
//资源访问完毕,释放锁
asyncLock.Release();
//do Something
}
SemaphoreSlim
的WaitAsync
方法很好用,但它提供的是信号量语义。一般创建最大计数为1的SemaphoreSlim
, 从而对SemaphoreSlim
保护的资源进行互斥访问.所以, 这和使用Monitor
时的行为相似, 只是SemaphoreSlim
不支持线程所有权和递归语义(这是好事).
.Net Framework并没有提供reader-writer语义的异步锁。 但作者构建了这样一个类, 称为AsyncOneManyLock
. 它的用法和SemaphoreSlim
一样. 如下使用:
private static async Task AccessResourceViaAsyncSynchronization(AsyncOneManyLock asyncLock)
{
// Execute whatever code you want here...
// 为想要访问的并发传递OneManyMode.Exclusive或OneManyMode.Shared
await asyncLock.WaitAsync(OneManyMode.Shared); // 要求共享访问
// 如果执行到这里, 表明没有其他线程在想资源写入; 可能有其他线程在读取
// TODO:从资源读取
//资源访问完毕就放弃锁, 使其他代码能访问资源
asyncLock.Release();
// Execute whatever code you want here...
}
/// <summary>
/// 这个类实现了一个从不阻塞任何线程的reader/writer(读/写锁)
/// 若要使用,请等待AccessAsync的结果,并在操作共享状态后执行,
/// 调用Release.
/// </summary>
public sealed class AsyncOneManyLock
{
#region 锁的代码
private SpinLock m_lock = new SpinLock(true); // 自旋锁不要用readonly
private void Lock()
{
Boolean taken = false;
m_lock.Enter(ref taken);
}
private void Unlock()
{
m_lock.Exit();
}
#endregion
#region 锁的状态和辅助方法
private Int32 m_state = 0;
private Boolean IsFree
{
get { return m_state == 0; }
}
private Boolean IsOwnedByWriter
{
get { return m_state == -1; }
}
private Boolean IsOwnedByReaders
{
get { return m_state > 0; }
}
private Int32 AddReaders(Int32 count)
{
return m_state += count;
}
private Int32 SubtractReader()
{
return --m_state;
}
private void MakeWriter()
{
m_state = -1;
}
private void MakeFree()
{
m_state = 0;
}
#endregion
// 目的是在非竞态条件时增强性能和减少内存消耗
private readonly Task m_noContentionAccessGranter;
// 每个等待的writer都通过它们在这里排队的TaskCompletionSource来唤醒
private readonly Queue<TaskCompletionSource<Object>> m_qWaitingWriters =
new Queue<TaskCompletionSource<Object>>();
// 一个TaskCompletionSource收到信号, 所有等待的reader都唤醒
private TaskCompletionSource<Object> m_waitingReadersSignal =
new TaskCompletionSource<Object>();
private Int32 m_numWaitingReaders = 0;
/// <summary>Constructs an AsyncOneManyLock object.</summary>
public AsyncOneManyLock()
{
m_noContentionAccessGranter = Task.FromResult<Object>(null);
}
/// <summary>
/// Asynchronously requests access to the state protected by this AsyncOneManyLock.
/// </summary>
/// <param name="mode">Specifies whether you want exclusive (write) access or shared (read) access.</param>
/// <returns>A Task to await.</returns>
public Task WaitAsync(OneManyMode mode)
{
Task accressGranter = m_noContentionAccessGranter; // 假定无竞争
Lock();
switch (mode)
{
case OneManyMode.Exclusive:
if (IsFree)
{
MakeWriter(); // 无竞争
}
else
{
// 竞争: 递增等待的reader数量, 并返回reader任务使reader等待
var tcs = new TaskCompletionSource<Object>();
m_qWaitingWriters.Enqueue(tcs);
accressGranter = tcs.Task;
}
break;
case OneManyMode.Shared:
if (IsFree || (IsOwnedByReaders && m_qWaitingWriters.Count == 0))
{
AddReaders(1); // 无竞争
}
else
{
// 竞争
// 竞争: 递增等待的reader数量, 并返回reader任务使reader等待
m_numWaitingReaders++;
accressGranter = m_waitingReadersSignal.Task.ContinueWith(t => t.Result);
}
break;
}
Unlock();
return accressGranter;
}
/// <summary>
/// Releases the AsyncOneManyLock allowing other code to acquire it
/// </summary>
public void Release()
{
TaskCompletionSource<Object> accessGranter = null; // 假定没有代码被释放
Lock();
if (IsOwnedByWriter) MakeFree(); // 一个writer离开
else SubtractReader(); // 一个reader离开
if (IsFree)
{
// 如果自由, 唤醒1个等待的writer或所有等待的readers
if (m_qWaitingWriters.Count > 0)
{
MakeWriter();
accessGranter = m_qWaitingWriters.Dequeue();
}
else if (m_numWaitingReaders > 0)
{
AddReaders(m_numWaitingReaders);
m_numWaitingReaders = 0;
accessGranter = m_waitingReadersSignal;
// 为将来需要等待的reader创建一个新的TCS
m_waitingReadersSignal = new TaskCompletionSource<Object>();
}
}
Unlock();
// 唤醒锁外面的reader/writer,减少竞争几率以提高性能
if (accessGranter != null) accessGranter.SetResult(null);
}
}
上述代码永远不会阻塞线程, 原因是在内部没有使用任何内核构造. 这里确实使用了一个SpinLock, 它在内部使用了用户模式的构造. 但29章讨论自旋锁的时候说过, 只有执行时间很短的代码才可以用自旋锁来保护. 查看我的WaitAsync
方法, 会发现我用锁保护的只是一些整数计算和比较, 以及构造一个TaskCompletionSource
, 同样花不了多少时间. 使用一个SpinLock
来保护对Queue
的访问.
并发集合类
FCL提供了4个线程线程安全的集合类,全部在System.Collections.Concurrent
命名空间中定义。它们是ConcurrentQueue
、ConcurrentStack
、ConcurrentDictionary
和ConcurrentBag
。
- ConcurrentQueue 先入先出FIFO的顺序处理数据项
- ConcurrentStack 后入先出
- ConcurrentDictionary 无序key/value对集合
- ConcurrentBag 无序数据项集合,允许重复
这些集合类都是非阻塞的, 换言之, 如果一个先出试图提取一个不存在的元素, 线程会立即返回; 线程不会阻塞在哪里等着一个元素的出现.
剩下的以后再看…