1:支持集群模型,强调集群无单点,负载均衡以及水平扩展能力2:亿级别的消息堆积能力3:采用零拷贝原理Consumer消费消息过程,使用了零拷贝顺序写盘随机读4:丰富的api同步消息,异步消息顺序消息,延迟消息,事务消息5:底层通信框架采用nettynio6:nameserver代替zookpeer7:消息消费失败重试机制,消息可查询
灵活可扩展性RocketMQ天然支持集群,其核心四组件(NameServer、Broker、Producer、Consumer)每一个都可以在没有单点故障的情况下进行水平扩展。
海量消息堆积能力RocketMQ采用零拷贝原理实现超大的消息的堆积能力,据说单机已可以支持亿级消息堆积,而且在堆积了这么多消息后依然保持写入低延迟。
支持顺序消息可以保证消息消费者按照消息发送的顺序对消息进行消费。顺序消息分为全局有序和局部有序,一般推荐使用局部有序,即生产者通过将某一类消息按顺序发送至同一个队列来实现。
多种消息过滤方式消息过滤分为在服务器端过滤和在消费端过滤。服务器端过滤时可以按照消息消费者的要求做过滤,优点是减少不必要消息传输,缺点是增加了消息服务器的负担,实现相对复杂。消费端过滤则完全由具体应用自定义实现,这种方式更加灵活,缺点是很多无用的消息会传输给消息消费者。
支持事务消息RocketMQ除了支持普通消息,顺序消息之外还支持事务消息,这个特性对于分布式事务来说提供了又一种解决思路。
下面是一张RocketMQ的部署结构图,里面涉及了RocketMQ核心的四大组件:NameServer、Broker、Producer、Consumer,每个组件都可以部署成集群模式进行水平扩展。
同步发送同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。
单向发送单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。
示例代码如下:
//1、同步发送SendResultsendResult=producer.send(msg);//2、异步发送producer.send(msg,newSendCallback(){@OverridepublicvoidonSuccess(SendResultsendResult){}@OverridepublicvoidonException(Throwablee){}});//3、Oneway发送producer.sendOneway(msg);生产者组生产者组(ProducerGroup)是一类Producer的集合,这类Producer通常发送一类消息并且发送逻辑一致,所以将这些Producer分组在一起。从部署结构上看生产者通过ProducerGroup的名字来标记自己是一个集群。
消费者消费者(Consumer)负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。站在用户应用的角度消费者有两种类型:拉取型消费者、推送型消费者。
拉取型消费者拉取型消费者(PullConsumer)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以Pull称为主动消费型。
推送型消费者推送型消费者(PushConsumer)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以Push称为被动消费类型,但从实现上看还是从消息服务器中拉取消息,不同于Pull的是Push首先要注册消费监听器,当监听器处触发后才开始消费消息。
消费者组消费者组(ConsumerGroup)一类Consumer的集合名称,这类Consumer通常消费同一类消息并且消费逻辑一致,所以将这些Consumer分组在一起。消费者组与生产者组类似,都是将相同角色的分组在一起并命名,分组是个很精妙的概念设计,RocketMQ正是通过这种分组机制,实现了天然的消息负载均衡。消费消息时通过ConsumerGroup实现了将消息分发到多个消费者服务器实例,比如某个Topic有9条消息,其中一个ConsumerGroup有3个实例(3个进程或3台机器),那么每个实例将均摊3条消息,这也意味着我们可以很方便的通过加机器来实现水平扩展。
单Master这种方式一旦Broker重启或宕机会导致整个服务不可用,这种方式风险较大,所以显然不建议线上环境使用。
多Master所有消息服务器都是Master,没有Slave。这种方式优点是配置简单,单个Master宕机或重启维护对应用无影响。缺点是单台机器宕机期间,该机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受影响。
多Master多Slave(异步复制)每个Master配置一个Slave,所以有多对Master-Slave,消息采用异步复制方式,主备之间有毫秒级消息延迟。这种方式优点是消息丢失的非常少,且消息实时性不会受影响,Master宕机后消费者可以继续从Slave消费,中间的过程对用户应用程序透明,不需要人工干预,性能同多Master方式几乎一样。缺点是Master宕机时在磁盘损坏情况下会丢失极少量消息。
多Master多Slave(同步双写)每个Master配置一个Slave,所以有多对Master-Slave,消息采用同步双写方式,主备都写成功才返回成功。这种方式优点是数据与服务都没有单点问题,Master宕机时消息无延迟,服务与数据的可用性非常高。缺点是性能相对异步复制方式略低,发送消息的延迟会略高。
消息消息(Message)就是要传输的信息。一条消息必须有一个主题(Topic),主题可以看做是你的信件要邮寄的地址。一条消息也可以拥有一个可选的标签(Tag)和额处的键值对,它们可以用于设置一个业务key并在Broker上查找此消息以便在开发期间查找问题。
主题主题(Topic)可以看做消息的规类,它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个Topic。Topic与生产者和消费者的关系非常松散,一个Topic可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的Topic发送消息。一个Topic也可以被0个、1个、多个消费者订阅。
标签标签(Tag)可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同Topic而不同的Tag来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有Tag。标签有助于保持您的代码干净和连贯,并且还可以为RocketMQ提供的查询系统提供帮助。
消息队列消息队列(MessageQueue),主题被划分为一个或多个子主题,即消息队列。一个Topic下可以设置多个消息队列,发送消息时执行该消息的Topic,RocketMQ会轮询该Topic下的所有队列将消息发出去。下图Broker内部消息情况:
Broker内部消息
消息消费模式消息消费模式有两种:集群消费(Clustering)和广播消费(Broadcasting)。默认情况下就是集群消费,该模式下一个消费者集群共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。而广播消费消息会发给消费者组中的每一个消费者进行消费。
消息顺序消息顺序(MessageOrder)有两种:顺序消费(Orderly)和并行消费(Concurrently)。顺序消费表示消息消费的顺序同生产者为每个消息队列发送的顺序一致,所以如果正在处理全局顺序是强制性的场景,需要确保使用的主题只有一个消息队列。并行消费不再保证消息顺序,消费的最大并行数量受每个消费者客户端指定的线程池限制。
工程实例
3:
修改日志配置文
修改启动脚本参
启动NameServer
5:RocketMQ-Console
进入项目文件夹并修改配置文件
#NameServer地址,修改成你自己的服务地址rocketmq.config.namesrvAddr=10.0.74.198:9876;10.0.74.199:9876
将项目打成jar包,并运行jar文件。
6:引入依赖包
关闭broker服务:shbin/mqshutdownbroker
13:发送给指定的队列否则生产端轮流消息发送给队列
SendResultsr=producer.send(message,newMessageQueueSelector(){@OverridepublicMessageQueueselect(List
DefaultPullConsumer消息拉取需要自己维护offset很少使用
当发出请求服务时,客户端通过注册中心服务知道所有的服务实例。客户端接着使用负载均衡算法选择可用的服务实例中的一个并进行发送。
发出请求服务时,客户端通过请求负载平衡器,负载均衡器通过注册中心服务知道所有的服务实例。负载均衡器接着使用负载均衡算法选择可用的服务实例中的一个并进行发送。
备注:NginxHTTP服务器和反向代理服务器就是这种。
客户端发现机制:客户端有所有可用的服务实例,可以灵活方便的特定应用进行特定的负载均衡决策。
服务端发现机制:客户端只需要给负载均衡器发请求即可,客户端屏蔽掉了一些细节。
22:双十一抗压前端dns解析,软硬负载均衡设施进行分流,限流lvsnginxhaproxy负载均衡openresty防刷限流限制ip调用接口总数量
限制ip漏桶算法
限制ip令牌桶算法
DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("CID_EXAMPLE");consumer.subscribe("TOPIC","TAGA||TAGB||TAGC");但tag有个限制,一个消息只能设置一个tag,在某些场景下这就很不方便了,这时就可以使用filter,rocketmq支持使用sql语句的方式来进行消息过滤。
Messagemsg=newMessage("FilterTest","tagA",("HelloRocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));msg.putUserProperty("a",String.valueOf(i));SendResultsendResult=producer.send(msg);消费端加入如下代码即可consumer.subscribe("FilterTest",MessageSelector.bySql("abetween0and3"));