C#并发数据结构 : SpinWait

老实说,没有哪个开发人员愿意在其编码时还要考虑线程同步。更糟糕的情况是,编写线程同步代码一点也不好玩。稍一不慎,就会导致共享资源状态不一致,从而引发程序未预期行为。此外,当我们添加线程同步代码时还会导致程序运行变慢,损害性能和可伸缩性。从这点上来看,线程同步简直一无是处。可惜,这也是现实生活中必要的一部分。尤其在多核CPU成为主流的今天。
考虑下这种情况:只有一个线程试图访问某个资源。在此种状况下,那些线程同步代码纯粹是额外开销。那么如何保护性能呢?
首先,不要让线程从用户模式转到内核模式。在Windows上,这是个非常昂贵的操作。比如大家熟知的Critical Section(在.NET上我们叫它Monitor)在等待持有锁时,便在用户模式先旋转等待一段时间,实在不行才转入内核模式。
其次,当线程进入等待状态时,它便停止运行。很显然,停止运行的线程还有啥性能可言。所幸的是,假如线程进入内核模式,它也就只能进入等待状态。所以,如果你遵守第一条规则(避免进入内核),那么第二条规则(不要让线程进入等待状态)将会自动遵守。我们应当让线程大多数时间在运行而不是在等待
最后,尽可能用尽量少的线程来完成任务。最理想的情况是,从不让存在的线程数多于你的计算机上的CPU个数。因此,当你的机器只有四个CPU的时候,最好也不要超过四个线程的数量。为什么?因为当超过四个线程的时候,操作系统就会以每隔几毫秒~几十毫秒的频率调度线程到CPU。在Windows上,这个数字大概是20毫秒。线程上下文切换花费不少时间,而且这纯粹是额外开销。在Windows上,线程上下文切换需要几千个时钟周期。如果进行切换的线程属于另一个进程,开销还会更大,因为操作系统还要切换虚拟地址空间。如果可运行的线程数量等于或小于CPU数量的话,操作系统只运行这些线程,而不会发生上下文切换。在这种情况下,当然性能最佳。
但是,这仅在理想的世界中才可行。所以,我们通常可以看到机器上存在的线程数量远多于CPU数量。事实上,在我的单核CPU机器上,目前就有378个线程活动,这跟理想情况差了岂止十万八千里。Windows上每个进程都有自己的线程,这主要是为了进程彼此隔离。假如一个进程进入死循环,其他进程还可以继续执行。现在我的机器上运行着29个进程,也就是说,至少有29个线程在运行了。那剩下的349个线程呢?很多程序通常还会用多线程来进行隔离。比如,CLR就专门有一个GC线程来负责回收资源,而不管其他线程在干什么。
当然,还有其他众多原因。其中一个比较重要的原因就是相当多的(或者我大胆的说绝大多数)开发人员并不知道如何有效的使用线程。多年来,我们从各种知识资源(甚至不少讲解多线程的教材)上学习到的编程方式,实际上对构建高性能,高可伸缩性程序有害。比如在《Windows SystemProgramming, ThirdEdition》中,作者多处提到他偏爱使用线程而不是系统提供的异步方式。很明显的一个例子就是作者采用线程编写的同步I/OAPI要快于系统异步I/OAPI。虽然异步和线程有着很亲密的关系,但我们得到的暗示是鼓励使用线程而不是异步。考虑下当有大量线程争用I/O时的情景:我们的CPU忙着线程上下文切换,磁盘的磁头数量有限,不够这些线程来回捣腾,而每一个I/O操作却又耗时良多。再比如MSDN上Microsoft提供的WindowsAPI示例。它们大多数均使用顺序思维编写。大多数情况下这无可厚非,而且符合人类直觉。然而这却让我们学习模仿,甚至变成了习惯。在并行程序设计上相当有害。
如果你发现思考下列问题,那么很可能你错误的架构了你的程序。而且需要重新架构它。
  • Windows支持的最大线程数是多少?
  • 在一个单CPU上,我能最大创建多少线程?
  • 在线程池中,我能最大拥有多少线程?
  • 我能增加线程池中最大线程数吗?
  • 我应该在线程和用户间进行1对1通信吗?我知道很少有服务端程序每个用户或客户端用掉一个线程。
接下来,我们进入正题,讲解SpinWait。那么SpinWait是什么呢?
我们知道一般的锁(比如Mutex)在当前线程无法持有时,会使得线程在内核等待。Critical Section/Monitor则有所不同。它大概需要以下几个步骤(以下简称其锁为CS):
  • 执行EnterCriticalSection()或者Monitor.Enter()/lock的线程会首先测试CS的锁定位。如果该位为OFF(解锁),那么线程就会在该位上设置一下(加锁),且不需要等待便继续。这通常只需执行1~2个机器指令。
  • 如果CS被锁定,线程就会进入一个旋转等待持有锁。而线程在旋转期间会反复测试锁定位。单处理器系统会立即放弃,而在SMP或多核CPU系统上则旋转一段时间才会放弃。在此之前,线程都在用户模式下运行。
  • 一旦线程放弃测试锁定位(在单处理器上立即如此),线程使用信号量在内核进入等待状态。
  • 执行LeaveCriticalSection()/Monitor.Exit()或代码退出了lock块。如果存在等待线程,则使用ReleaseSemaphore()通知内核。
在第二步,我们看到会有一个旋转等待。这就是本文的主旨:SpinWait。CS正是使用它在用户模式下旋转等待的。值得注意的是SpinWait只有在SMP或多核CPU下才具有使用意义。在单处理器下,旋转非常浪费CPU时间,没有任何意义。
下面让我们实现一个SpinWait。

using System;
using System.Threading;

namespace Lucifer.DataStructure
{
    public struct SpinWait
    {
        internal const int yieldFrequency = 4000;

        private volatile int count;

        public void Spin()
        {
            this.count = ++count % Int32.MaxValue;
            if (IsSingleProcessor)
            {
                SwitchToThread();
            }
            else
            {
                int numeric = this.count % yieldFrequency;
                if (numeric > 0)
                {
                    Thread.SpinWait((int)(1f + (numeric * 0.032f)));
                }
                else
                {
                    SwitchToThread();
                }
            }
        }

        public void Reset()
        {
            this.count = 0;
        }

        public int Count
        {
            get
            {
                return this.count;
            }
        }
        /**//*
        *  判断是否单处理器系统。
        */
        private static readonly bool IsSingleProcessor =
            (Environment.ProcessorCount == 1);

        [DllImport("Kernel32", ExactSpelling = true)]
        private static extern void SwitchToThread();
    }
}


注意,我们封装了非托管API-SwitchToThread()来处理单处理器情况,以便线程调度可以平滑进行。假如使用Sleep(0)则需要注意在某些地方优先级反向,且性能也不如SwitchToThread高。
此外,我们使用Thread.SpinWait()方法来处理超线程问题。超线程是一项在Xeon,P4及以后CPU上的技术。超线程CPU上有两个逻辑CPU。每个逻辑CPU都有自己的架构状态(CPU寄存器),但是各个逻辑CPU共享一个单独的执行资源集合(例如CPU缓存)。当一个逻辑CPU暂停时(由于缓存失效,分支预测失败或者等待上一指令的结果),芯片就会切换到另一个逻辑CPU上,并让它运行。据Intel报告说超线程CPU可以获得10%~30%的性能提升。x86架构提供PAUSE汇编指令来完成上述情况,同时它等效于REPNOP;汇编指令。这个指令可以兼容于那些早期未支持超线程技术的IA-32CPU。而在Windows上,Microsoft定义了YieldProcessor()宏来完成这些工作。.NET则对其进行了封装。Thread.SpinWait(int iterations)在x86平台上实际上相当于一下代码:

void Thread.SpinWait(Int32 iterations)
{
    for (Int32 i = 0; i < iterations; i++)
    {
        YieldProcessor();
    }
}


由此,我们也应该知道SpinWait(0)没有意义,调用它必须使用大于0之数。
我们为什么要使用SpinWait呢?因为它运行在用户模式下。某些情况下,我们想要在退回到真正等待状态前先旋转一段时间。看起来这似乎很难理解。这根本在做无用功嘛。大多数人在一开始就避免旋转等待。我们知道线程上下文切换需要花费几千个周期(在Windows上确实如此)。我们暂且称其为C。假如线程所等待的时间小于2C(1C用于等待自身,1C用于唤醒),则旋转等待可以降低等待所造成的系统开销和滞后时间,从而提升算法的整体吞吐量和可伸缩性。
当我们采用旋转等待时,必须谨慎行事。比如:要确保在旋转循环内调用Thread.SpinWait(),以提高 Intel超线程技术的计算机上硬件对其他硬件线程的可用性;通过轻微的回退 (back-off)来引入随机选择,从而改善访问的局部性(假定调用方持续重读共享状态)并可能避免活锁;当然,在单 CPU的计算机最好不要采用这种方法(因为在这种环境下旋转非常浪费资源)。
不可否认,选择频率和旋转计数是不确定的。与 Win32临界区旋转计数类似,我们应该根据测试和实验的结果来选择合理的数值,而且即使合理的数值在不同系统中也会发生变化。例如,根据 MicrosoftMedia Center 和 Windows kernel 团队的经验,MSDN 文档建议临界区旋转计数为 4,000,但您的选择可以有所不同。理想的计数取决于多种因素,包括在给定时间等待事件的线程数和事件出现的频率等。一个经过实践的数字大概在4000~12000之间。
接下来,我们通过《C#并发数据结构:Stack》和本文学习到的知识构造ConcurrentStack,来学习SpinWait如何使用,同时也以它作为本文结束。
代码如下:

public class ConcurrentStack
    {
        private volatile ListNode topOfStack;

        public bool IsEmpty()
        {
            return topOfStack == null;
        }

        public void Push(T item)
        {
            ListNode newTopOfStack = new ListNode(item);
            SpinWait wait = new SpinWait();
            while (true)
            {
                ListNode  oldTopOfStack = topOfStack;
                newTopOfStack.next = oldTopOfStack;

                if (Interlocked.CompareExchange>(ref topOfStack, newTopOfStack, oldTopOfStack) == oldTopOfStack)
                {
                    return;
                }

                wait.Spin();
            }
        }
        /**////
        /// 考虑到在多线程环境中,这里不抛出异常。我们需要人为判断其是否为空。
        ///
        ///
        public bool TryPop(out T result)
        {
            ListNode oldTopOfStack;
            ListNode newTopOfStack;
            SpinWait wait = new SpinWait();

        Spin:
            oldTopOfStack = topOfStack;
            if (oldTopOfStack == null)
            {
                result = default(T);
                return false;
            }
            newTopOfStack = topOfStack.next;

            if (Interlocked.CompareExchange>(ref topOfStack, newTopOfStack, oldTopOfStack) != oldTopOfStack)
            {
                wait.Spin();
                goto Spin;
            }
            result = oldTopOfStack.item;
            return true;
        }

        public bool TryPeek(out T result)
        {
            ListNode head = topOfStack;
            if (head == null)
            {
                result = default(T);
                return false;
            }
            result = head.item;
            return true;
        }

        /**//* *****************************************
        * 简单的单向链表实现
        * *****************************************/
        class ListNode
        {
            internal T item;
            internal ListNode next;

            public ListNode(T initItem, ListNode initNext)
            {
                this.item = initItem;
                this.next = initNext;
            }

            public ListNode(T initItem)
                : this(initItem, null)
            {
            }
        }
    }


:SpinWait仅适合在线程等待时间较短,且在SMP或多核环境下。单处理器系统使用没有意义。