星辰.Net技术社区论坛

首页 » .NET » WCF » 基于WCF和MSMQ构建发布/订阅消息总线(Pub/Sub Message Bus)
admin - 2008-6-14 20:12:00
这几年经常谈到采用事件驱动架构(event-driven architecture)技术来构建可扩展的、可维护的系统。我发现在一些方案中,这是非常有趣的模型。但是,在Microsoft平台上这种架构一直没有很好的支持。因此,许多人发现比较难以实现。几年以前,在一个系统中,我采用.Net Remoting / MSMQ / HTTP 构建了发布/订阅消息总线(Pub/Sub Message Bus),但是觉得不是很完美。很多地方实现比较困难,并且需要定制的代码,如承载队列监听(host the queue listeners)、编码/解码消息、处理可靠性和管理订阅者等等。 s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
在我目前的项目中,我再一次尝试了这一模型。然而,这几年技术发展有很大的变化,我高兴的说:我的体验比以前要好多了。在说明我的方案之前,我想提醒的一点是:我仅仅是描述实现这一模型的一种方法,在不同的应用需求情况下,应该有其他更合适的方法。因为我工作的项目是一个大的.Net应用系统,因此跨平台的互操作不需要考虑(真幸运!)。 s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
我们完成的项目构建在.NET Framework 3.0 平台,并使用了WCF / MSMQ 4.0 / IIS 7.0,部署在Windows Server 2008平台上。 s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
定义Service Contract s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
第一步定义Contact,发布者用来通知订阅者“发生事件”。在我们的项目中,有许多不同的事件类型,但是为了尽可能重用代码,我们使用了泛型Service Contract: s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

[ServiceContract]

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

public interface IEventNotification<TLog>

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

{

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

    [OperationContract(IsOneWay = true)]

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

    void OnEventOccurred(TLog value);

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
} s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
对于任一事件类型,我们可以简单定义一个Data Contract 来负载数据(carry the payload),并提供一个继承的Service Contract 类型,如下所示: s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
[ServiceContract]public interface IAccountEventNotification : IEventNotification<AccountEventLog>{}实现发布者(Publisher s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
发布/订阅模型中最重要的一点是发布和订阅方应该是低耦合的。尤其是发布者不应该知道订阅者的任何事情,包括多少个订阅者和订阅者在哪里。起初,我们试图采用MSMQ的PGM多播特性来实现 – 实质上让你定义单个队列地址,悄悄地路由相同的消息到多个目标队列。虽然这可以满足要求,但是这一方案也存在一些缺点。第一,在WCF中使用多播队列地址的唯一方式是采用MsmqIntegrationBinding,MsmqIntegrationBinding没有NetMsmqBinding灵活。其次,多播地址仅仅适用非事务队列(non-transactional queues),这样对我们的系统产生不能接受的可靠性影响。 s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
因此,我们放弃了这一方案,决定在发布者实现我们自己的轻量级多播。技术上而言,这样打破了“发布者完全不了解订阅者”这一黄金规则,所有订阅者的信息完全存放在配置文件中。这意味着,我们增加、修改或删除订阅者完全不影响代码。 s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
我们开发一个组件-ServiceFactory(与p&p Web Service Software Factory 没有关系),ServiceFactory只是一个简单的抽象,通过读取配置文件来创建本地或WCF实例。ServiceFactory组件没有对外公布,但是你可以简单替换为你喜欢的依赖注入框架(Dependency Injection Framework),达到相同的效果。在我们的系统中,Web Services的配置文件web.config 可能有如下定义的依赖服务: s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
<serviceFactory> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
    <
services> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
        <
addname="EmailUtility" contract="MyProject.IEmailUtility, MyProject" type="MyProject.EmailUtility, MyProject" mode="SameAppDomain" instanceMode="Singleton" enablePolicyInjection="false" /> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
        <
addname="SubsctiberXAccountEventNotification"contract="MyProject.Contracts.IAccountEventNotification, MyProject.Contracts"mode="Wcf"endpoint="SubsctiberXAccountEventNotification" /> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
        <
addname="SubsctiberYAccountEventNotification" contract="MyProject.Contracts.IAccountEventNotification, MyProject.Contracts" mode="Wcf" endpoint="SubsctiberYAccountEventNotification" /> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
    </
services> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
</
serviceFactory>
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
我们使用ServiceFactory创建单个实例,代码如下: s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
IEmailUtility email = ServiceFactory.GetService<IEmailUtility>(); s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
根据上面的配置文件,上述代码将获得一个本地EmailUtility单件实例,但是不同的配置可以返回一个WCF proxy 代理类实例。可以非常方便重用ServiceFactory组件,返回所有配置的、匹配特定Contract的服务。我们将根据这些,构建NotificationPublisher类,代码如下: s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

public class NotificationPublisher<TInterface, TLog>

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

    where TInterface : class, IEventNotification<TLog>                   

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

{

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

    public static void OnEventOccurred(TLog value)

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

    {

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

        List<TInterface> subscribers = ServiceFactory.GetAllServices<TInterface>();

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

        foreach (TInterface subscriber in subscribers)

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

        {

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

            subscriber.OnEventOccurred(value);

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

        }

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

    }

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
} s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
根据上述代码,发布者发布事件所需要做的是:传入合适的泛型参数,实例化NotificationPublisher对象,并调用OnEventOccured 方法。假定我们使用IAccountEventNotification 接口和上述配置,这样事件将通过WCF到达SubscriberXAccountEventNotification 和 SubscriberYAccountNotification 端点,并触发相关事件。 s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
配置发布者 s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
发布端最后一部分是WCF配置。如上述所提及的,我们选择使用MSMQ提供可靠的、异步的消息传递。过去编写MSMQ代码比较困难,但是对WCF编程模型而言,MSMQ与其他传输协议没什么区别。在我们的案例中,我们选择了NetMsmqBinding,NetMsmqBinding 为核心MSMQ特性提供了全面访问WCF功能(与MsmqIntegrationBinding不同,MsmqIntegrationBinding提供了更丰富的MSMQ支持,但是限制了WCF功能)。 s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
如下是客户端的WCF的配置示例: s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
<system.serviceModel> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
    <
bindings> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
        <
netMsmqBinding> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
          <
bindingname="TransactionalMsmqBinding"exactlyOnce="true"deadLetterQueue="System" /> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
        </
netMsmqBinding> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
    </
bindings> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
    <
client> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
        <
endpointname="SubscriberXAccountEventNotification" s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
            address="net.msmq://localhost/private/SubscriberX/accounteventnotification.svc" s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
            binding="netMsmqBinding"bindingConfiguration="TransactionalMsmqBinding" s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
            contract="MyProject.Contracts.IAccountEventNotification" /> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
    s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
        <
endpointname="SubscriberYAccountEventNotification" s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
            address="net.msmq://localhost/private/SubscriberY/accounteventnotification.svc" s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
            binding="netMsmqBinding"bindingConfiguration="TransactionalMsmqBinding" s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
            contract="MyProject.Contracts.IAccountEventNotification" /> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
    </
client> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
</
system.serviceModel>
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
上述配置没什么特别的地方 – 需要关注的是 exactlyOnce=”true” 设置,这是事务队列必须的设置。另外就是 net.msmq:// 地址语法,这是NetMsmqBinding 协议所需要的。私有队列分别为 SubscriberX/accounteventnotification.svc 和 SubscriberY/accountnotification.svc。为什么我给队列这样愚蠢的命名呢?继续读下面内容...... s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
承载和配置订阅者 s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
在过去,如果说创建MSMQ客户端是烦人的,那么创建MSMQ服务更是噩梦。你不得不创建你自己的host(一般而言为Windows Service)或者使用一些灵活的MSMQ触发器功能。然后,你需要做很多工作,确保你的服务没有丢失消息,或者没有被“poison messages”所阻塞,因为poison message错误的消息体(malformed payload)会不断导致你的服务失败。 s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
就像在客户端一样,WCF需要很多工作在服务端 – 但是这些不是直接帮助承载服务和监听队列。幸运的是,这一问题由Windows Vista和Windows Server 2008中提供的IIS 7.0和Windows Activation Services (WAS) 轻松解决。IIS 7.0 负责监听MSMQ / TCP / Named Pipes,并且激活WCF 服务,就像 IIS 6.0监听HTTP一样。听起来不错,但是需要提醒的是 – 这些需要灵巧的配置。 s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
首先,你需要在IIS中设置application 指向service,包括.svc文件和web.config配置文件,这与在IIS 通过HTTP部署service一样。 s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
接着,你需要创建消息队列,你可以通过Vista的Computer Management console 或Windows Server 2008 的Server Manager配置。队列的名称必须匹配application name 加上.svc 文件名,例如 SubscriberX/accounteventnotification.svc。在创建队列时,确保标记队列支持事务,因为随后不能改变。你也需要设置队列的权限,这样运行Net.Msmq Listener 服务的帐号(缺省为NETWORK SERVICE)能够接收消息,任何运行Client/Publisher 都能够发送消息(缺省为NETWORK SERVICE)。 s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
最后,你需要配置IIS和WAS 的站点和特定application支持Net.Msmq 监听器(在开始操作之前,确保你已经安装了WAS和non-HTTP激活windows组件)。最简单的办法是使用appcmd.exe 命令行(\system32\inetsrv目录下): s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
配置好IIS后,接下来是确保service的WCF配置是正确的。如同你期望的那样,这将与客户端的配置非常相似。 s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
<system.serviceModel> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
    <
bindings> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
        <
netMsmqBinding> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
            <
bindingname="TransactionalMsmqBinding"exactlyOnce="true"deadLetterQueue="System"receiveErrorHandling="Move"/> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
        </
netMsmqBinding> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
    </
bindings> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
    <
services> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
        <
servicename="SubscriberX.NotificationService"> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
            <
endpointcontract="MyProject.Contracts.IAccountEventNotification" s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
                bindingConfiguration="TransactionalMsmqBinding" s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
                binding="netMsmqBinding" s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
                address="net.msmq://localhost/private/SubscriberX/accounteventnotification.svc"/> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
        </
service> s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
    </
services>    s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
</
system.serviceModel>
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
值得说明的一点是:receiveErrorHandling=”Move”,这一属性可以帮助节省我们一个月的工作。这一属性让WCF转移多次重复处理失败的消息到MSMQ的子队列poison,然后继续处理下一个消息,而不是阻塞服务。这一子队列和远程队列事务性读取等待功能是Vista 和 Windows Server 2008中MSMQ 4.0的一些新特性。 s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
实现订阅者(Subscribers) s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
最后一件事是实现订阅者。当然,最多的代码是特定业务逻辑的实现,因为我仅仅描述service interface的实现。在我们的系统中,确保没有消息丢失是非常重要的。既然MSMQ能够确保消息的到达,因此消息不会无缘故的消失。事实上,大多数消息的丢失是在MSMQ成功传递消息到达service 之后。有可能service 接收到消息之后,在service 成功处理消息之前,发生异常导致失败(可能由于bug或配置问题)。避免这一问题最好的办法是使用事务,跨越从队列接收消息和业务逻辑的处理。如果发生失败,将回滚事-包括从队列中接收的消息。如果只是一个临时的小故障,消息将再次被成功处理。如果问题持续或是错误的消息,在经过多次尝试后,该消息将被认为是poison 消息。如前面提及的,该消息将被转移到poison 子队列,由管理员手动处理。 s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
上述所有的工作非常简单,因为MSMQ和WCF支持所有这些特性(假定你使用事务性队列)。你需要做的工作是由一些attributes标记你的服务实现,声明当消息从队列取出后,业务逻辑应该登记事务。 s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

public class NotificationService : IAccountEventNotification

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

{

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

    [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

    public void OnEventOccurred(AccountEventLog value)

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

    {

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

        // Business-specific logic

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·

    }

s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
} s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
结论 s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
这是我最近最长的blog文章之一,这一解决方案非常强大且特别简单去实现,这是由于WCF / MSMQ / IIS 技术的先进性。在过去,许多人(包括我)花了几个月的时间尽力去实现pub/sub模式,往往不能达到预期的效果。现在,使用这些的技术消除了大量的定制代码,事实上,这篇文章中少量的代码和配置脚本实现了pub/sub模式。 s<¦Þã}æwww.netcsharp.cnmåÑþ{G ·
1
查看完整版本: 基于WCF和MSMQ构建发布/订阅消息总线(Pub/Sub Message Bus)