0.背景简介
微软在 .NET 框架中提供了多种实用的线程同步手段,其中包括 monitor 类及 reader-writer锁。但跨进程的同步方法还是非常欠缺。另外,目前也没有方便的线程间及进程间传递消息的方法。例如C/S和SOA,又或者生产者/消费者模式中就常常需要传递消息。为此我编写了一个独立完整的框架,实现了跨线程和跨进程的同步和通讯。这框架内包含了信号量,信箱,内存映射文件,阻塞通道,及简单消息流控制器等组件。这篇文章里提到的类同属于一个开源的库项目(BSD许可),你可以从这里下载到 www.cdrnet.net/projects/threadmsg/.
这个框架的目的是:
- 封装性:通过MSMQ消息队列发送消息的线程无需关心消息是发送到另一个线程还是另一台机器。
- 简单性:向其他进程发送消息只需调用一个方法。
注意:我删除了本文中全部代码的XML注释以节省空间。如果你想知道这些方法和参数的详细信息,请参考附件中的代码。
1.先看一个简单例子
使用了这个库后,跨进程的消息传递将变得非常简单。我将用一个小例子来作示范:一个控制台程序,根据参数可以作为发送方也可以作为接收方运行。在发送程序里,你可以输入一定的文本并发送到信箱内(返回key),接收程序将显示所有从信箱内收到的消息。你可以运行无数个发送程序和接收程序,但是每个消息只会被具体的某一个接收程序所收到。
[Serializable]
struct Message
{
public string Text;
}
class Test
{
IMailBox mail;
public Test()
{
mail = new ProcessMailBox("TMProcessTest",1024);
}
public void RunWriter()
{
Console.WriteLine("Writer started");
Message msg;
while(true)
{
msg.Text = Console.ReadLine();
if(msg.Text.Equals("exit"))
break;
mail.Content = msg;
}
}
public void RunReader()
{
Console.WriteLine("Reader started");
while(true)
{
Message msg = (Message)mail.Content;
Console.WriteLine(msg.Text);
}
}
[STAThread]
static void Main(string[] args)
{
Test test = new Test();
if(args.Length > 0)
test.RunWriter();
else
test.RunReader();
}
}
</div>
信箱一旦创建之后(这上面代码里是 ProcessMailBox ),接收消息只需要读取 Content 属性,发送消息只需要给这个属性赋值。当没有数据时,获取消息将会阻塞当前线程;发送消息时如果信箱里已经有数据,则会阻塞当前线程。正是有了这个阻塞,整个程序是完全基于中断的,并且不会过度占用CPU(不需要进行轮询)。发送和接收的消息可以是任意支持序列化(Serializable)的类型。
然而,实际上暗地里发生的事情有点复杂:消息通过内存映射文件来传递,这是目前唯一的跨进程共享内存的方法,这个例子里我们只会在 pagefile 里面产生虚拟文件。对这个虚拟文件的访问是通过 win32 信号量来确保同步的。消息首先序列化成二进制,然后再写进该文件,这就是为什么需要声明Serializable属性。内存映射文件和 win32 信号量都需要调用 NT内核的方法。多得了 .NET 框架中的 Marshal 类,我们可以避免编写不安全的代码。我们将在下面讨论更多的细节。
2. .NET里面的跨线程/进程同步
线程/进程间的通讯需要共享内存或者其他内建机制来发送/接收数据。即使是采用共享内存的方式,也还需要一组同步方法来允许并发访问。
同一个进程内的所有线程都共享公共的逻辑地址空间(堆)。对于不同进程,从 win2000 开始就已经无法共享内存。然而,不同的进程可以读写同一个文件。WinAPI提供了多种系统调用方法来映射文件到进程的逻辑空间,及访问系统内核对象(会话)指向的 pagefile 里面的虚拟文件。无论是共享堆,还是共享文件,并发访问都有可能导致数据不一致。我们就这个问题简单讨论一下,该怎样确保线程/进程调用的有序性及数据的一致性。
2.1 线程同步
.NET 框架和 C# 提供了方便直观的线程同步方法,即 monitor 类和 lock 语句(本文将不会讨论 .NET 框架的互斥量)。对于线程同步,虽然本文提供了其他方法,我们还是推荐使用 lock 语句。
void Work1()
{
NonCriticalSection1();
Monitor.Enter(this);
try
{
CriticalSection();
}
finally
{
Monitor.Exit(this);
}
NonCriticalSection2();
}
void Work2()
{
NonCriticalSection1();
lock(this)
{
CriticalSection();
}
NonCriticalSection2();
}
</div>
Work1 和 Work2 是等价的。在C#里面,很多人喜欢第二个方法,因为它更短,且不容易出错。
2.2 跨线程信号量
信号量是经典的同步基本概念之一(由 Edsger Dijkstra 引入)。信号量是指一个有计数器及两个操作的对象。它的两个操作是:获取(也叫P或者等待),释放(也叫V或者收到信号)。信号量在获取操作时如果计数器为0则阻塞,否则将计数器减一;在释放时将计数器加一,且不会阻塞。虽然信号量的原理很简单,但是实现起来有点麻烦。好在,内建的 monitor 类有阻塞特性,可以用来实现信号量。
public sealed class ThreadSemaphore : ISemaphore
{
private int counter;
private readonly int max;
public ThreadSemaphore() : this(0, int.Max) {}
public ThreadSemaphore(int initial) : this(initial, int.Max) {}
public ThreadSemaphore(int initial, int max)
{
this.counter = Math.Min(initial,max);
this.max = max;
}
public void Acquire()
{
lock(this)
{
counter--;
if(counter < 0 && !Monitor.Wait(this))
throw new SemaphoreFailedException();
}
}
public void Acquire(TimeSpan timeout)
{
lock(this)
{
counter--;
if(counter < 0 && !Monitor.Wait(this,timeout))
throw new SemaphoreFailedException();
}
}
public void Release()
{
lock(this)
{
if(counter >= max)
throw new SemaphoreFailedException();
if(counter < 0)
Monitor.Pulse(this);
counter++;
}
}
}
</div>
信号量在复杂的阻塞情景下更加有用,例如我们后面将要讨论的通道(channel)。你也可以使用信号量来实现临界区的排他性(如下面的 Work3),但是我还是推荐使用内建的 lock 语句,像上面的 Work2 那样。
请注意:如果使用不当,信号量也是有潜在危险的。正确的做法是:当获取信号量失败时,千万不要再调用释放操作;当获取成功时,无论发生了什么错误,都要记得释放信号量。遵循这样的原则,你的同步才是正确的。Work3 中的 finally 语句就是为了保证正确释放信号量。注意:获取信号量( s.Acquire() )的操作必须放到 try 语句的外面,只有这样,当获取失败时才不会调用释放操作。
ThreadSemaphore s = new ThreadSemaphore(1);
void Work3()
{
NonCriticalSection1();
s.Acquire();
try
{
CriticalSection();
}
finally
{
s.Release();
}
NonCriticalSection2();
}
</div>
2.3 跨进程信号量
为了协调不同进程访问同一资源,我们需要用到上面讨论过的概念。很不幸,.NET 中的 monitor 类不可以跨进程使用。但是,win32 API提供的内核信号量对象可以用来实现跨进程同步。 Robin Galloway-Lunn 介绍了怎样将 win32 的信号量映射到 .NET 中(见 Using Win32 Semaphores in C# )。我们的实现也类似:
[DllImport("kernel32",EntryPoint="CreateSemaphore",
SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern uint CreateSemaphore(
SecurityAttributes auth, int initialCount,
int maximumCount, string name);
[DllImport("kernel32",EntryPoint="WaitForSingleObject",
SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern uint WaitForSingleObject(
uint hHandle, uint dwMilliseconds);
[DllImport("kernel32",EntryPoint="ReleaseSemaphore",
SetLastError=true,CharSet=CharSet.Unicode)]
[return : MarshalAs( UnmanagedType.VariantBool )]
internal static extern bool ReleaseSemaphore(
uint hHandle, int lReleaseCount, out int lpPreviousCount);
[DllImport("kernel32",EntryPoint="CloseHandle",SetLastError=true,
CharSet=CharSet.Unicode)]
[return : MarshalAs( UnmanagedType.VariantBool )]
internal static extern bool CloseHandle(uint hHandle);
public class ProcessSemaphore : ISemaphore, IDisposable
{
private uint handle;
private readonly uint interruptReactionTime;
public ProcessSemaphore(string name) : this(
name,0,int.MaxValue,500) {}
public ProcessSemaphore(string name, int in

