著名的双检锁技术

双检锁(Double-Check Locking) 是一个非常著名的技术,开发人员用它将 单实例(Singleton) 对象的构造推迟到应用程序首次请求该对象时进行。有时也称为 延迟初始化(Lazy initialization)

如果应用程序永远不请求对象,对象就永远不会构造,从而节约了事件和内存。但当多个线程同时请求单实例对象时就可能出现问题。这个时候必须使用一些线程同步机制确保单实例对象只被构造一次。

然而CLR很好的支持了双检锁技术,以下代码演示了如何使用C#实现双检锁技术:

// 大多数时候这个双检锁技术实际会损害效率
public sealed class Singleton
{
    // s_lock对象是实现线程安全所需要的。定义这个对象时,
    // 我们假设创建单实例对象的代价要高于创建一个System.Object对象,
    // 并假设可能根本不需要创建单实例对象
    // 否则更经济更简单的办法是在类构造器中创建单实例对象
    private static Object m_lock = new Object();

    // 这个字段引用单实例对象
    private static Singleton s_value = null;

    // 私有构造器,阻止在这个类的外部创建类的实例
    private Singleton()
    {
    }

    // 以下公共静态方法返回单实例对象
    public static Singleton GetSingleton()
    {
        // 如果单实例对象已经创建, 直接返回它
        if (s_value != null) return s_value;

        // 没创建,让一个线程创建它
        Monitor.Enter(m_lock);
        if (s_value == null)
        {
            //仍未创建,创建它
            Singleton temp = new Singleton();

            //将引用保存到s_value中
            Volatile.Write(ref s_value, temp);
        }

        Monitor.Exit(m_lock);
        // 返回单实例对象的引用
        return s_value;
    }
}

双检锁背后的思路在于,对GetSingleton方法的调用可以快速检查s_value字段, 判断对象是否创建.

  • 如果是, 方法就返回对它的引用
    • 这里的妙处就在于, 如果对象已经构造好, 就不需要线程同步, 程序会运行的很快.
  • 如果没创建, 就会获取一个同步锁来确保只有一个线程构造单实例对象.
    • 这就意味着只有线程第一次查询单实例对象时, 才会出现性能上的损失.

此技术在java虚拟机会出现问题.

在CLR中, 对任何锁方法的调用都构成了一个完整的内存栅栏.

  • 在栅栏之前的任何变量的写入必须在栅栏之前完成.
  • 在栅栏之后的任何变量读取都必须在栅栏之后开始.

对于GetSingleton方法, 这意味着s_value字段的值必须在调用了Monitor.Enter之后重新读取. 调用前缓存到寄存器的东西不算.

Volatile.Write作用

Volatile.Write这个的作用, 假如在第二个if语句中包含的是下面这句代码:

s_value = new Singleton(); // 你极有可能这样写

你的想法是让编译器为一个Singleton分配内存, 调用构造器来初始化字段, 再将引用赋给s_value,但是编译器可能这样做:

  • 为Singleton分配内存,
  • 将引用赋给s_value,
  • 再调用构造器. (从单线程的角度出发, 这样改变顺序是无关紧要的)

但是在引用赋给s_value之后,在调用构造器之前, 如果另一个线程调用了GetSingleton方法,就会发生s_value这个值不为null, 就会开始使用Singleton对象, 但是此对象的构造器还没有执行.

Volatile.Write就修正了这个问题, 它保证temp中的引用只有在构造器结束执行之后,才发布到s_value中.

解决这个问题的另一个办法就是使用C#的volatile关键字标记s_value字段. 这同样使构造器必须在写入发生前结束运行. 但不好的地方, 同时会使所有读取操作具有易变性. 这是完全没有必要的.会使性能无谓地收到损害.

不使用双检锁的更好版本

internal sealed class Singleton
{
    private static Singleton s_value = new Singleton();
    //私有化构造器防止这个类外部的任何代码创建一个实例
    private Singleton(){}
    // 以下公共静态方法返回单实例对象(第一次调用之后才会创建)
    public static Singleton GetSingleton()
    {
        return s_value;
    }
}

首次调用GetSingleton方法,CLR就会自动调用类构造器,从而创建一个对象实例. CLR保证对类构造器的调用时线程安全的.(第8章3节中解释了) 首次访问类的任何成员都会调用类型构造器. 如果定义了其他静态成员, 就会在访问其他任何静态成员时创建Singleton对象.有人通过定义嵌套类来解决这个问题.

另一种生成Singleton方式

internal sealed class Singleton
{
    private static Singleton s_value = null;

    private Singleton(){}

    public static Singleton GetSingleton()
    {
        if(s_value != null) return s_value;
        //创建一个新的单实例对象,并把它固定下来(如果另一个线程还为固定的话)
        Singleton temp = new Singleton();
        Interlocked.CompareExchange(ref s_value,temp,null);

        //如果这个线程竞争失败,新建的第二个实例对象就会被回收

        return s_value;
    }
}

上面的代码保证了只有在第一个调用GetSingleton()方法方法时,才会构建单实例对象。但是缺点也是明显的,就是可能会创建多个Singleton对象,但是最终Interlocked.CompareExchange只会固定一个Singleton实例对象。 没有通过固定的对象会被垃圾回收, 但是一般很少同时调用GetSingleton()的情况.

上述代码面的优势:

  • 速度非常快
  • 永不阻塞线程

此方法只有在构造器没有副作用的时候才能使用这个技术.

FCL的Lazy类封装了上述描述的模式

System.LazySystem.Threading.LazyInitializer是FCL封装提供的延迟构造的类。

public class Lazy<T>
{
   public Lazy(Func<T> valueFactory, LazyThreadSafetyMode mode);
   public Boolean IsValueCreated { get; }
   public T Value { get; }
}

public enum LazyThreadSafetyMode
{
  None,                      // 完全没有线程安全支持(适合GUI应用程序)
  PublicationOnly,           // 使用双检锁技术
  ExecutionAndPublication    // 使用Interlocked.CompareExchange技术
}

Lazy类用法:

// 创建一个"延迟初始化"包装器, 它将DateTime的获取包装起来
// // isThreadSafe ? LazyThreadSafetyMode.ExecutionAndPublication : LazyThreadSafetyMode.None
Lazy<String> s = new Lazy<String>(() => DateTime.Now.ToLongTimeString(), true);
Console.WriteLine(s.IsValueCreated); // 还没查询Value 所以返回false
Console.WriteLine(s.Value);          // 现在调用
Console.WriteLine(s.IsValueCreated); // 能查询到Value 返回true
Thread.Sleep(10000); // 等待10秒, 再次显示时间
Console.WriteLine(s.Value); // Lambda方法委托没有被调用, 显示之前的结果

// False
// 20:18:54
// True
// 20:18:54

内存有限时可能不想创建Lazy类的实例 , 可以调用LazyInitializer类的静态方法:

public static class LazyInitializer
{
   // 这两个方法在内部使用Interlocked.CompareExchange
   public static T EnsureInitialized<T>(ref T target) where T : class;
   public static T EnsureInitialized<T>(ref T target, Func<T> valueFactory) where T : class;

   // 这两个方法在内部将同步锁(syncLock)传给Monitor的Enter和Exit方法
   public static T EnsureInitialized<T>(ref T target, ref bool initialized, ref object syncLock);
   public static T EnsureInitialized<T>(ref T target, ref bool initialized, ref object syncLock, Func<T> valueFactory);   
}

// 缓存激活选择器函数(activation selector function)以避免委托分配。
static class LazyHelpers<T>
{
    internal static Func<T> s_activatorFactorySelector = new Func<T>(ActivatorFactorySelector);

    private static T ActivatorFactorySelector()
    {
        try
        {
            return (T)Activator.CreateInstance(typeof(T));
        }
        catch (MissingMethodException)
        {
            throw new MissingMemberException(Environment.GetResourceString("Lazy_CreateValue_NoParameterlessCtorForT"));
        }
    }
}

EnsureInitialized方法的syncLock参数显式指定同步对象, 可以用同一个锁保护多个初始化函数和字段.

String name = null;
// 由于name是null, 所以委托执行并初始化name
LazyInitializer.EnsureInitialized(ref name, () => "Jeff");
Console.WriteLine(name); // Jeff

// 由于name不为null, 所以委托不运行
LazyInitializer.EnsureInitialized(ref name, () => "Richter");
Console.WriteLine(name); // Jeff

条件变量模式

假定一个线程希望在一个符合条件为true时执行一些代码. 一个选项是让线程连续”自旋”, 反复测试条件, 但这会浪费CPU时间. 也不可能对构造成符合条件的多个变量进行原子性的测试.

幸好, 有一个模式允许线程根据一个符合条件来同步他们的操作,而且不会浪费资源, 这个模式成为条件变量模式, 我们通过Monitor类中定义的一下方法来使用该模式:

public static Boolean Wait(Object obj);
public static Boolean Wait(Object obj, Int32 millisecondsTimeout);
// Pulse 脉冲脉搏,使跳动
public static void Pulse(Object obj);
public static void PulseAll(Object obj);

下面演示了这个模式的写法:

public sealed class ConditionVariablePattern
{
    private readonly Object  m_lock      = new Object();
    private          Boolean m_condition = false;

    public void Thread1()
    {
        // 获取一个互斥锁
        Monitor.Enter(m_lock);

        //在锁中, 原子性地测试复合条件
        while (!m_condition)
        {
            // 如果条件不满足, 就等待另一个线程更改条件
            // 临时释放锁, 使其他线程能获取它
            // 如果你使用了Monitor.Wait,调用的线程将阻止在这里,直到另外一个线程调用Monitor.Pulse.代码再继续往下执行
            Monitor.Wait(m_lock);
        }

        // 条件满足,处理数据

        Monitor.Exit(m_lock); // 永久释放锁
    }

    public void Thread2()
    {
        // 获取一个互斥锁
        Monitor.Enter(m_lock);

        // 处理数据并修改条件
        m_condition = true;

        // Monitor.Pulse(m_lock);         // 释放锁之后唤醒一个正在等待的线程
        Monitor.PulseAll(m_lock);      // 释放锁之后唤醒所有正在等待的线程

        Monitor.Exit(m_lock); // 释放锁
    }
}

执行Thread1方法的线程进入一个互斥锁, 然后对一个条件进行测试. 在这里, 我只是检查一个Boolean字段, 但它可以是任意复合条件. 线程不是自旋,而是调用Wait释放锁, 使另一个线程能获得它并阻塞调用线程. (如果你使用了Monitor.Wait,调用的线程将阻止在这里,直到另外一个线程调用Monitor.Pulse.代码再继续往下执行)

Thread2方法是第二个线程执行的代码, 它调用Enter来获取锁的所有权,处理一些数据造成一些状态的改变. 再调用PulsePulseAll, 从而解除一个线程因为调用Wait而进入的阻塞状态. 注意, Pulse只能解除等待最久的线程,而PulseAll解除所有正在等待的线程的阻塞.

执行Thread2线程Pulse之后必须调用Monitor.Exit, 允许锁由另一个线程拥有, 另外如果调用的是PulseAll, 其他线程不会同时解除阻塞. 调用Wait的线程解除阻塞后, 它成为锁的所有者, 由于这是一个互斥锁, 所以一次只能有一个线程拥有它. 其他线程只有在锁的所有者调用了Wait或者Exit之后才能得到它.

执行Thread1的线程醒来时, 它进行下一次循环迭代, 再次对条件进行测试. 如果条件仍为false, 它就再次调用Wait. 如果条件为true,它就处理数据, 并最终调用Exit. 这样就会将锁释放, 使其他线程能得到它.

这个模式的妙处在于, 可以使用简单的同步逻辑(只是一个锁)来测试构成一个符合条件的几个变量,而且多个正在等待的线程可以全部解除阻塞,而不会造成任何逻辑错误, 唯一的缺点就是解除线程的阻塞可能会浪费一些CPU时间.

线程安全的队列

允许多个线程在其中对数据项进行入队和出队操作. 注意, 除非有了一个可供处理的数据项, 否则试图出队的线程会一直阻塞.

   public sealed class SynchronizedQueue<T>
   {
       private readonly Object   m_lock  = new Object();
       private readonly Queue<T> m_queue = new Queue<T>();

       public void Enqueue(T item)
       {
           Monitor.Enter(m_lock);

           // 一个数据项入队后, 就唤醒 任何/所有 正在等待的线程
           m_queue.Enqueue(item);
           Monitor.PulseAll(m_lock);

           Monitor.Exit(m_lock);
       }

       public T Dequeue()
       {
           Monitor.Enter(m_lock);

           // 队列为空(这是条件)就一直循环
           while (m_queue.Count == 0)
               Monitor.Wait(m_lock);

           // 使一个数据项出队, 返回它供处理
           T item = m_queue.Dequeue();
           Monitor.Exit(m_lock);
           return item;
       }
   }

异步的同步构造

任何使用了内核模式的基元的线程同步构造, 作者都不是很喜欢. 因为所有这些基元都会阻塞一个线程的运行. 创建线程的代价很大, 创建了不用,这于情于理都说不通. 下面这个例子能很好地说明这个问题.

观察本章介绍的所有构造, 你会发现这些构造想要解决的许多问题其实最好都是用Task类完成.

Task具有下述许多优势:

  • 任务使用的内存比线程少得多, 创建和销毁所需的时间也少得多.
  • 线程池根据可用CPU数量自动伸缩任务规模.
  • 每一个任务完成一个阶段后, 运行任务的线程回到线程池, 在那里能接受新任务.
  • 线程池是站在整个进程的高度观察任务. 所以,它能更好地调度这些任务, 减少进程中的线程数, 并减少上下文切换.

锁很流行,但长时间拥有会带来巨大的伸缩性问题。如果代码能够通过异步的同步构造指出它想要一个锁,那么会非常有用。在这种情况下,如果线程得不到锁,可以直接返回并执行其他工作,而不必在哪里傻傻地阻塞。以后当锁可用时,代码可恢复执行并访问锁所保护的资源。

SemaphoreSlim类通过WaitAsync方法实现了这个思路,下面是这个方法最复杂的版本:

public Tast<Boolean> WaitAsync(Int32 millisecondsTimeout,CancellationToken cancellationToken);

可用它异步地同步对一个资源的访问(不阻塞任何线程):

private static async Task AccessResourceViaAsyncSynchronization(SemaphoreSlim asyncLock)
{
    //do something

    //请求获取锁对资源进行独占访问
    await asyncLock.WaitAsync();
    //执行到这里, 表明没有其他线程正在访问资源

    //独占式访问资源

    //资源访问完毕,释放锁
    asyncLock.Release();

    //do Something
}

SemaphoreSlimWaitAsync方法很好用,但它提供的是信号量语义。一般创建最大计数为1的SemaphoreSlim, 从而对SemaphoreSlim保护的资源进行互斥访问.所以, 这和使用Monitor时的行为相似, 只是SemaphoreSlim不支持线程所有权和递归语义(这是好事).

.Net Framework并没有提供reader-writer语义的异步锁。 但作者构建了这样一个类, 称为AsyncOneManyLock. 它的用法和SemaphoreSlim一样. 如下使用:

private static async Task AccessResourceViaAsyncSynchronization(AsyncOneManyLock asyncLock)
{
    // Execute whatever code you want here...

    // 为想要访问的并发传递OneManyMode.Exclusive或OneManyMode.Shared
    await asyncLock.WaitAsync(OneManyMode.Shared); // 要求共享访问
    // 如果执行到这里, 表明没有其他线程在想资源写入; 可能有其他线程在读取

    // TODO:从资源读取

    //资源访问完毕就放弃锁, 使其他代码能访问资源
    asyncLock.Release();

    // Execute whatever code you want here...
}

/// <summary>
/// 这个类实现了一个从不阻塞任何线程的reader/writer(读/写锁)
/// 若要使用,请等待AccessAsync的结果,并在操作共享状态后执行,
/// 调用Release.
/// </summary>
public sealed class AsyncOneManyLock
{
    #region 锁的代码

    private SpinLock m_lock = new SpinLock(true); // 自旋锁不要用readonly

    private void Lock()
    {
        Boolean taken = false;
        m_lock.Enter(ref taken);
    }

    private void Unlock()
    {
        m_lock.Exit();
    }

    #endregion

    #region 锁的状态和辅助方法

    private Int32 m_state = 0;

    private Boolean IsFree
    {
        get { return m_state == 0; }
    }

    private Boolean IsOwnedByWriter
    {
        get { return m_state == -1; }
    }

    private Boolean IsOwnedByReaders
    {
        get { return m_state > 0; }
    }

    private Int32 AddReaders(Int32 count)
    {
        return m_state += count;
    }

    private Int32 SubtractReader()
    {
        return --m_state;
    }

    private void MakeWriter()
    {
        m_state = -1;
    }

    private void MakeFree()
    {
        m_state = 0;
    }

    #endregion

    // 目的是在非竞态条件时增强性能和减少内存消耗
    private readonly Task m_noContentionAccessGranter;

    // 每个等待的writer都通过它们在这里排队的TaskCompletionSource来唤醒
    private readonly Queue<TaskCompletionSource<Object>> m_qWaitingWriters =
        new Queue<TaskCompletionSource<Object>>();

    // 一个TaskCompletionSource收到信号, 所有等待的reader都唤醒
    private TaskCompletionSource<Object> m_waitingReadersSignal =
        new TaskCompletionSource<Object>();

    private Int32 m_numWaitingReaders = 0;

    /// <summary>Constructs an AsyncOneManyLock object.</summary>
    public AsyncOneManyLock()
    {
        m_noContentionAccessGranter = Task.FromResult<Object>(null);
    }

    /// <summary>
    /// Asynchronously requests access to the state protected by this AsyncOneManyLock.
    /// </summary>
    /// <param name="mode">Specifies whether you want exclusive (write) access or shared (read) access.</param>
    /// <returns>A Task to await.</returns>
    public Task WaitAsync(OneManyMode mode)
    {
        Task accressGranter = m_noContentionAccessGranter; // 假定无竞争

        Lock();
        switch (mode)
        {
            case OneManyMode.Exclusive:
                if (IsFree)
                {
                    MakeWriter(); // 无竞争
                }
                else
                {
                    // 竞争: 递增等待的reader数量, 并返回reader任务使reader等待
                    var tcs = new TaskCompletionSource<Object>();
                    m_qWaitingWriters.Enqueue(tcs);
                    accressGranter = tcs.Task;
                }

                break;

            case OneManyMode.Shared:
                if (IsFree || (IsOwnedByReaders && m_qWaitingWriters.Count == 0))
                {
                    AddReaders(1); // 无竞争
                }
                else
                {
                    // 竞争
                    // 竞争: 递增等待的reader数量, 并返回reader任务使reader等待
                    m_numWaitingReaders++;
                    accressGranter = m_waitingReadersSignal.Task.ContinueWith(t => t.Result);
                }

                break;
        }

        Unlock();

        return accressGranter;
    }

    /// <summary>
    /// Releases the AsyncOneManyLock allowing other code to acquire it
    /// </summary>
    public void Release()
    {
        TaskCompletionSource<Object> accessGranter = null; // 假定没有代码被释放

        Lock();
        if (IsOwnedByWriter) MakeFree(); // 一个writer离开
        else SubtractReader();           // 一个reader离开

        if (IsFree)
        {
            // 如果自由, 唤醒1个等待的writer或所有等待的readers
            if (m_qWaitingWriters.Count > 0)
            {
                MakeWriter();
                accessGranter = m_qWaitingWriters.Dequeue();
            }
            else if (m_numWaitingReaders > 0)
            {
                AddReaders(m_numWaitingReaders);
                m_numWaitingReaders = 0;
                accessGranter       = m_waitingReadersSignal;

                // 为将来需要等待的reader创建一个新的TCS
                m_waitingReadersSignal = new TaskCompletionSource<Object>();
            }
        }

        Unlock();

        // 唤醒锁外面的reader/writer,减少竞争几率以提高性能
        if (accessGranter != null) accessGranter.SetResult(null);
    }
}

上述代码永远不会阻塞线程, 原因是在内部没有使用任何内核构造. 这里确实使用了一个SpinLock, 它在内部使用了用户模式的构造. 但29章讨论自旋锁的时候说过, 只有执行时间很短的代码才可以用自旋锁来保护. 查看我的WaitAsync方法, 会发现我用锁保护的只是一些整数计算和比较, 以及构造一个TaskCompletionSource, 同样花不了多少时间. 使用一个SpinLock来保护对Queue的访问.

并发集合类

FCL提供了4个线程线程安全的集合类,全部在System.Collections.Concurrent命名空间中定义。它们是ConcurrentQueueConcurrentStackConcurrentDictionaryConcurrentBag

  • ConcurrentQueue 先入先出FIFO的顺序处理数据项
  • ConcurrentStack 后入先出
  • ConcurrentDictionary 无序key/value对集合
  • ConcurrentBag 无序数据项集合,允许重复

这些集合类都是非阻塞的, 换言之, 如果一个先出试图提取一个不存在的元素, 线程会立即返回; 线程不会阻塞在哪里等着一个元素的出现.

剩下的以后再看…