分布式系统消息通信技术主要包括以下几种:
一般是C/S方式,同步的,跨语言跨平台,面向过程
CORBA从概念上扩展了RPC。面向对象的,企业级的(面向对象中间件还有DCOM)
面向对象方式的JavaRPC
基于Web,C/S或B/S,跨系统跨平台跨网络。多为同步调用,实时性要求较高
在AMQP模型中,消息的producer将Message发送给Exchange,Exchange负责交换/路由,将消息正确地转发给相应的Queue。消息的Consumer从Queue中读取消息。
AMQP系统构架
目前业界上关于消息中间件的实现多达好几十种,可谓百花齐放,所用的实现语言同样也五花八门。下面挑选了一部分,在网上开源社区相对容易搜索出来的十多种MQ来作简单介绍。
开源MQ
概述
1.Qpid
Apach的一个开源AMQP实现,broker架构,有C++和Java两个版本
2.RabbitMQ
LShift用Erlang实现,支持多协议,broker架构,重量级
3.ZeroMQ
AMQP最初设计者iMatix公司实现,轻量消息内核,无broker设计。C++实现
4.Jafka/Kafka
5.ActiveMQ
Apach的一种JMS具体实现,支持代理和p2p部署。支持多协议。Java实现
6.Apollo
ActiveMQ的下一代产品,支持多协议,Scala实现
7.Redis
Key-valueNoSQL数据库,有MQ的功能
8.MemcacheQ
国人利用memcache缓冲队列协议开发的消息队列,C/C++实现
9.Open-MQ
C++和QT实现,支持JMS
10.ActiveMQ-CPP
ActiveMQ的C++纯客户端库,用于跟ActiveMQ通信
11.MQ4CPP
一个C++实现的MQ,信息甚少
12.MetaQ
Alibaba对Kafka的改造,增加事务支持等新特性,用纯Java实现
13.Beanstalkd
一个类memcached协议设计的消息队列,C/C++实现
14.OpenAMQ
iMatix公司AMQP1.0的实现,类似rabbitMQ。C++实现。2010年项目放弃
15.SpreadToolkit
高性能的分布式分组消息系统,C++实现
16.SAFMQ
C++实现的储存转发消息队列中间件
17.Mosquitto
一个轻量级的IBM物联网连接协议的消息中间件实现,C/C++实现
18.MUSCLE
提供一个多路消息服务器和消息对象传递功能,支持C/C++
19.JORAM
一个类似OpenJMS(SunOpenMQ)的JMS消息中间件,JAVA实现
Qpid是Apache开发的一款面向对象的消息中间件,它是一个AMQP的实现,可以和其他符合AMQP协议的系统进行通信。Qpid提供了C++/Python/Java/C#等主流编程语言的客户端库,Qpid提供了很多额外的HA特性,非常适于集群环境下的消息通信。
它提供了C++和Java两个版本的broker服务端,并支持多种语言的客户端。C++版本的服务器端具备高性能/低消耗以及RDMA支持;而Java版本的服务器则支持JMS。Qpid还提供了一些额外的特性:
提供了安全认证特性,任何producer/consumer需要和broker通信时,都需要提供身份认证。QPID的安全认证使用SSL协议。
开发语言:JavaC/C++
操作系统:跨平台
跟Qpid有关联的其他项目主要有:
LShift用Erlang编写的一个开源的消息队列,支持很多的协议:AMQP,XMPP,SMTP,STOMP,重量级,更适合于企业级的开发。代理(Broker)架构,对路由(Routing),负载均衡(Loadbalance)或者数据持久化都有很好的支持。
缺点:可扩展性差,速度较慢,因为中央节点增加了延迟,消息封装后也比较大。
AMQP里主要要说两个组件:Exchange和Queue(在AMQP1.0里还会有变动),如下图所示,绿色的X就是Exchange,红色的是Queue,这两者都在Server端,又称作Broker,这部分是RabbitMQ实现的,而蓝色的则是客户端,通常有Producer和Consumer两种类型:
开发语言:ErLang
早期需要设计可靠消息系统比如AMQP,但是这种方式引入了single-pointbroker。对于需要这种可靠消息系统的应用来说,需要在broker上面做相当多的事情确保可靠性以及性能。但是这样对于中小应用陷入了尴尬,为了使用这种方便的消息系统他们需要引入broker这么东西是不能够忍受的。我们需要的一种简单方便的消息传输系统,没有任何附加代价(比如所有数据都流经broker),这就是ZeroMQ设计初衷。
2010年3月30日,AMQP的最初设计者iMatix公司的首席执行官PieterHintjens宣布iMatix将退出AMQP工作组,而且为了简单得多,快的多的ZeroMQ,将不支持可能发布的AMQP/1.0。一个非常轻量级的消息内核,专门为高吞吐量/低延迟的场景开发。ZeroMQ支持许多高级消息场景,但是你必须实现ZeroMQ框架中的各个块(比如Socket或Device等)。没有中间件架构,应用程序端点扮演了这个服务角色。部署简单,仅提供非持久性的队列。与RabbitMQ相比,MQ并不像是一个传统意义上的消息队列服务器,事实上,它也根本不是一个服务器,它更像是一个底层的网络通讯库,在socketAPI之上做了一层封装,将网络通讯、进程通讯和线程通讯抽象为统一的API接口。
支持C、C++、Python、.NET/Mono、Fortran和Java语言
开发语言:C/C++
开发语言:Scala
居于两者(RabbitMQ&ZeroMQ)之间,类似于ZeroMQ,它可以部署于代理模式和P2P模式。完全支持JMS1.1和J2EE1.4规范。跨平台的,多种语言和协议编写客户端,Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议:OpenWire,StompREST,WSNotification,XMPP,AMQP。如需配置ActiveMQ则需要在目标机器上安装Java环境。支持集群,同等网络,自动检测,TCP,SSL,广播,持久化,XA,多个消息也可以组成原子事务
缺点:默认的配置性能偏低,需要优化配置,但是配置文件复杂,本身不提供管理工具;示例代码非常少;主页上的文档看上去比较全面,但是缺乏一种有效的组织方式,文档只有片段,用户很难由浅入深进行了解,二来文档整体的专业性太强。
开发语言:Java
ActiveMQ的下一代产品为Apollo,Apollo以ActiveMQ原型为基础,是一个更快、更可靠、更易于维护的消息代理工具。Apache称Apollo为最快、最强健的STOMP(StreamingTextOrientatedMessageProtocol,流文本定向消息协议)服务器。
lApollo的特性如下:
l支持Stomp1.0和Stomp1.1协议
l主题和队列
l队列浏览器
l主题持久订阅
l镜像队列
l可靠的消息传递
l消息过期和交换
l消息选择器
lJAAS验证
l支持SSL/TLS,证书验证
lRESTManagementAPI
国人开发的持久化消息队列memcacheq(简称mcq)是一个轻量级的消息队列,MemcacheQ的特性:
l简单易用
l处理速度快
l多条队列
l并发性能好
l与memcache的协议兼容。这就意味着只要装了memcache的extension就可以了,不需要额外的插件。
l在zendframework中使用也很方便。
一个开源的消息中间件,类似IBM的WebSphereMQ(MQSeries),采用C++和Qt库编写的,支持Windows、Unix以及MacOS平台,支持JMS。
CMS(全称是C++MessagingService)是一个C++实现的类似JMS的API,用于实现例如ActiveMQ的消息代理服务。CMS可以帮助你的C++客户端代码更见简单。ActiveMQ-CPP是一个纯客户端库,用它来跟例如ActiveMQ等消息服务通讯。我们的CMS实现名为ActiveMQ-CPP,使用可插入式的传输和协议,当前支持OpenWire和Stomp协议,基于TCP和SSL。同时支持故障转移传输。
MQ4CPPisaMessageOrientedMiddleware(MOM)andimplementsthefollowingmessagingparadigms:
–Direct/Indirectmessaging(local)
–Unsolicitedmessaging(remote)
–Request/Reply(remote)
–Conversation(remote)
–Broadcast(local/remote)
–Publish/Subscribe
–Store&Forward
–MemoryChannel
–FileTransfer
–DistributedLockManager
Supportof:
–Multithreading(pthread,WinThread)
–Sockets(berkley,WinSock2)
–Cluster(failover,sessionreplication)
–Encription(Rijndael128/256)
–Compression
–Servicelookup(local/remote)
–Messagerouting
Testedplatforms:
–Linux(x86,IA64)POSIX
–Windows(x86,IA64)SDK
开发语言:C++
一个高性能、高可用、可扩展的分布式消息中间件,Linkedin开源MQ——Kafka的Java版本,阿里巴巴对此做了定制和优化,具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景
一个简单、快速的消息队列。Beanstalk之于RabbitMQ,就好比Nginx之于Apache,Varnish之于Squid。简单、轻量级、高性能、易使用等特点,以及优先级、多队列、持久化、分布式容错、超时控制等特性。Beanstalkd包含多种编程语言的客户端开发包。Beanstalkd是典型的类Memcached设计。
不足就是尚无提供删除一个tube的操作,只能将tube的job依次删除,并让Beanstalkd来自动删除空tube。还有就是Beanstalkd不支持客户端认证机制(开发者将应用场景定位在局域网)。没有提供主从同步+故障切换机制,在应用中有可能成为单点的风险。在实际应用中,可以使用数据库为job提供持久化存储。和Memcached类似,Beanstalkd依赖libevent单线程事件分发机制,不能有效的利用多核cpu的性能。这一点可以通过单机部署多个实例克服。
高性能的分布式分组消息系统,支持局域网以及广域网通讯.Spread可以作为一个分布式应用的消息总线,并且具有高度的灵活性,可以做到多播,分组,以及点对点饿消息传递。
TheSpreadtoolkit包括一个消息服务器server,以及多种语言的apiC/C++libraries(withandwithoutthreadsupport),aJavaPerl,Python,andRuby.还有很多其他语言的第三方扩展。
在一个典型的环境中,通常每台服务器上运行一个Spreadserver,客户端的程序本地连接server,发送信息,而这台服务器上的spreadserver会传递信息给其他订阅了这条消息的应用。当然也可以只有一个spreadserver,而其他的客户端分布在整个网络中。
SomeoftheservicesandbenefitsprovidedbySpread:
lReliableandscalablemessagingandgroupcommunication.
lAverypowerfulbutsimpleAPIsimplifiestheconstructionofdistributedarchitectures.
lEasytouse,deployandmaintain.
lHighlyscalablefromonelocalareanetworktocomplexwideareanetworks.
lSupportsthousandsofgroupswithdifferentsetsofmembers.
lEnablesmessagereliabilityinthepresenceofmachinefailures,processcrashesandrecoveries,andnetworkpartitionsandmerges.
lProvidesarangeofreliability,orderingandstabilityguaranteesformessages.
lEmphasisonrobustnessandhighperformance.
lCompletelydistributedalgorithmswithnocentralpointoffailure.
消息队列服务器提供了异步的、round-trip、可靠的消息传输。
开发语言:JavaC/C++PHP
有三种消息发布服务质量:
“至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
“至少一次”,确保消息到达,但消息重复可能会发生。
“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
提供一个多路的消息服务器以及相应的网络API,其客户端涉及多种语言包括C,C++,C#,Delphi,Java,和Python。MUSCLE用来在网络上传输消息对象,所有消息存储在服务端并为客户端进行传递。
开发语言:JavaC/C++PythonC#
JORAM一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。文档非常完备,并且带有很多示例。
缺点:
判断JMS客户端是否在线非常缓慢,有时甚至不会通知应用
一些国外网站提供的,ActiveMQ、RabbitMQ、RocketMQ(MeteMQ)、HornetQ、Qpid、ZeroMQ的对比数据。
协议支持比较:
ActiveMQ
Apollo
HornetQ
Qpid
RabbitMQ
ZeroMQ
AMQP
1.0
announced
0-8,0-9,0-9-1
-
MQTT
OpenWire
REST
STOMP
STOMPoverWebsockets
XMPP
OverGateway
客户端接口支持比较:
ZeroQ
C
C++
Erlang
Haskell
JavaJMS
Javaproprietary
.NET
Objective-C
Perl
PHP
Python
Ruby
性能测试场景对比:
l情景A:先入队20,000条1024字节大小的消息,然后再出队
l情景B:20,000条1024字节大小的消息同时入队和出队
l情景C:200,000条32字节大小的消息同时入队和出队
l情景D:200条32K字节大小的消息同时入队和出队
两种不同配置的broker,一种开启持久化消息(Persistent),一种是没有开启持久化,即瞬时化消息(Transient)
下面是测试的所有brokers和对应的配置:
情景A:
情景B:
情景C:
情景D:
结论:
1)Brokers普遍擅长于处理大消息。因此如果客户端支持对消息分组,那性能会得到更大的提升。但分组消息却不能在consumer之间传播。
2)处理大消息,持久化的弊端(磁盘或数据库保存)就开始凸显(QPID处理瞬时化消息不论消息大小,都显得非常高效)。可以得出,处理中小消息的耗时主要在集中CPU,而不是I/O网络上。
3)ZeroMQbroker比其他所有MQbroders表现得更优越。倘若需求上要用到一些特殊的broker特性,不然ZeroMQ绝对是分发消息系统的一个最好选择。
4)QPID似乎是在处理瞬时消息上综合表现得最好。
5)从RabbitMQ的测试结果看,AMQP协议似乎比STOMP协议更优越.当然这结果也可能跟不良设计的客户端受到影响。
6)HornetQ在处理中、小消息时,表现得最差的。
7)除了处理大消息之外,RabbitMQ似乎是最好的选择,因它的性能是其他的3倍左右。
下图是显示的是发送和接受的每秒钟的消息数。整个过程共产生1百万条1K的消息。测试的执行是在一个WindowsVista上进行的。
选择消息系统根据业务需要需要考虑以下几个方面:
l是否持久化
l吞吐能力
l高可用,避免单点故障
l分布式扩展能力
l兼容现有协议
l易于维护
l其他,如消息丢失和重复的处理
l负载均衡
常见消息系统协议:
lSTOMP
lAMQP
l类似MEMCACHE的协议
lHTTP
lMQTT
Beanstalkd
Spread
Open-MQ
memcachedQ
SAFMQ
Mosquitto
MUSCLE
JMS
━
√
Stomp
Memcache协议
HA集群(防单点故障)
认证
Broker架构
持久化
支持广域网
高吞吐
事务
下面对非持久化消息进行了测试(ZeroMQ自实现一个简单broker,直接内存操作和转发)
测试硬件环境:
Broker:
OperationSystem:Windows7旗舰版sp1x64
CPU:IntelCorei5-3470CPU@3.20GHz
MEM:4.00GB
Disk:500GB
NetworkAdapter:GibabitNetworkconnection
Client:
CPU:IntelCore2DuoCPUT6400@2.00GHz
Disk:250GB
测试结果:
2)两者都更擅长处理大消息体数据
3)处理的消息体越小时,Qpid的性能下降得比较明显
4)Qpid在处理持久化消息时,消息体越大,性能越高。这说明消息体比较大的情形,瓶颈在于网络IO,消息体越小,瓶颈在于CPU和磁盘读写。
ZeroMQ用于node与node间的通信,node可以是主机或者是进程。ZeroMQ把通讯的需求看成四类。其中一类(Exclusive-Pair)是一对一对应通讯,用来支持传统的TCPsocket模型,但并不推荐使用。常用的通讯模式只有三类。
由请求端发起请求,并等待回应端回应请求(阻塞的)。请求端和回应端都可以是1:N的模型。通常把1认为是server,N认为是Client。ZeroMQ可以很好的支持路由功能(实现路由功能的组件叫作Device),把1:N扩展为N:M。
发布端是单向只发送数据的,且不关心是否把全部的信息都发送给订阅端。如果发布端开始发布信息的时候,订阅端尚未连接上来,这些信息直接丢弃。不过一旦订阅端连接上来,中间会保证没有信息丢失(之前的消息会丢掉,Slowjoiner问题)。同样,订阅端则只负责接收,而不能反馈。Publisher中途离开,所有的Subscriber会hold住,等待Publisher再上线的时候,会继续接受信息。如果发布端和订阅端需要交互(比如要确认订阅者是否已经连接上),则使用额外的socket采用请求回应模型满足这个需求。
这个模型里,管道是单向的,从PUSH端单向的向PULL端单向的推送数据流。
任何分布式并行的需求,都可以用这三种模型组合起来解决问题。ZeroMQ只专注和解决了消息通讯这一基本问题。
ZeroMQ中的Transient(短暂)和Durable(持久)socket也并非区别于实现层是否保持了tcp连接。而是概念上的不同。对于Durablesocket,其生命期可以长于一个进程的生命期,即使进程退出,再次启动后依旧可以维持继续之前的socket。
lzmq_init创建一个context,可以认为是一个MQ实例或句柄。1表示IO线程数。
lzmq_socket根据context来创建一个socket,后面类型指定了MQ通信类型。
lzmq_bind/zmq_connect可以进行绑定进行监听或者是进行连接。
lzmq_msg_init/zmq_msg_init_size可以用来初始化一个message
lzmq_send/zmq_recv可以进行message的发送和接收。
lzmq_msg_close销毁一个message
lzmq_close关闭一个socket
lzmq_term销毁一个context
通信协议:
ltcp//跨主机间通信
lipc//进程间通信
linproc//线程间通信
lpgm//━━━
lepgm//━━━
消息:
Identity:可以用来表示一个socket的身份
Device:一旦通信节点超过一定数量的话,那么最好需要一个转发节点或者是中间节点。
拥塞:
ZMQ可以通过控制HWM(high-watermark)来控制拥塞。内部实现上每一个socket有关联了buffer,HWM可以控制buffer大小
lPUB/PUSH有transmitbuffers.
lSUB/PULL/REQ/REP有receivebuffers.
lDEALER/ROUTER/PAIR有transmitbuffers也有receivebuffers.
一旦socket达到了high-watermark的话,那么会根据socket类型来决定是丢弃还是block.现在实现而言的话PUB会尝试丢弃数据,而其他类型的socket就会block住。如果socket是线程之间进行通信的话,那么HWM是两者socket的HWM之和。因为默认HWM是ulimited的,所以只要一端没有设置的话那么容量就无限。
关于ZeroMQ的权威资料,除了官方文档,还有O'Reilly出版社2013年出版的一本书,《ZeroMQ:MessagingforManyApplications》,作者:PieterHintjens
目前最新稳定版本4.0.5源代码中,C/C++代码一共大概1.7万行,包括主体代码占1.3万左右,API和Demo占4千行左右。详细如下图。
基本Qpid通信系统的几个组件
Address地址
QpidAddress表示一个节点,有两种节点:一种是queue,另外一种是topic。queue映射到AMQP概念就是Queue;而topic则映射到Exchange。Queue节点能够缓存消息,直到被读取走为止;而topic节点则即时进行转发,比如假如有4个consumer对某消息感兴趣,当消息到达节点时,有3个consumer正在运行,那么topic节点会将消息转发给这3个consumer,然后就将该消息丢弃。剩下的那个consumer再运行时,则收不到这个消息。
Address是一个带格式的字符串,其语法如下:
address_string::=
[/options::={
其中address,subject和key都是字符串。
Subject类似email的主题。每个消息都可以有一个主题,接收者可以通过主题对消息进行过滤。
Option的具体含义有点儿复杂,可以参考Qpid的编程手册获取完整的描述。
BrokerFederation
单一Broker的构架
BrokerFederation的配置
设置federation的命令叫Qpid-route。Qpid支持两类路由:Queueroutes和ExchangeRoutes。
Queueroute:目的地必须是Exchange,源是一个Queue。此类路由将源地址Queue收到的所有消息都转发到目的Exchange去。
ExchangeRoute:目的地依然必须是一个Exchange,源也是一个Exchange。此类路由允许将源Exchange上收到的,拥有指定RouteKey的消息转发到目的Exchange上去。
RDMA
RDMA全称为“RemoteDirectMemoryAccess”,它是一种协议,将数据从一台计算机的内存直接传输到另外一台计算机的内存中,而无需CPU的参与。类似CPU与外设之间的DMA传输,RDMA将DMA的概念扩展到了网络。
C++版本的Qpidbroker除了使用传统的TCP/IP作为网络通信机制之外,在拥有infiniband设备的集群上Qpid还可以使用RDMA进行网络通信。
持久化消息
Broker将收到的消息暂存在磁盘等可以永久保存信息的地方备用,这样,即使Broker因为某种意外而中断,当其再次重新启动之后,还是可以将保持在永久存储介质中的消息读出来并继续进行转发。除了Queue需要durable之外,我们还必须保证发送的消息有durable属性。
高可用性
在一个Qpidbroker集群中,所有的broker都互相备份,进行failover的必要准备工作。每个broker的内部对象都同步到其他集群中的Broker,保持一致,这样在单一Broker无法工作的情况下,client可以切换到其他Broker,而避免信息的丢失和服务中断。
Qpid和Corosync的工作模式
QpiddA和QpiddB通过Corsync同步。Qpid还通过CMAN来防止集群中的”splitbrain”问题,CMAN提供了quorum算法,Qpidd利用CMAN的接口,知道自己是否能够达到法定人数,是否能够加入集群工作
Qpid集群是一个Active/Active模式的集群。客户端可以使用任意一个broker。如下左图所示:
当client连接到一个集群中的broker时,该broker返回给Client相应的BrokerURL列表。在上图中,Client将得到[QpiddA,QpiddB]这样一个列表。当QpiddA的连接断开时,客户端可以自动重新连接到QpiddB继续服务。
最新稳定版本0.30发布于2014-09-26。2015-01-27发布修复几个漏洞的0.31版本补丁。0.30版本源代码中,C/C++代码一共大概10万行,其中主体代码占大概8万,客户端API及Demo占大概2万。官方0.30版本整个源码包统计详细结果如下图。