消息中间件基于队列模型实现异步/同步传输数据
作用:可以实现支撑高并发、异步解耦、流量削峰、降低耦合度。
在了解中间件之前,我们先了解一下什么是同步?
首先我们想一下,两个公司之间如果有互相调用接口的业务需求,如果没有引入中间件技术,是怎么实现的呢?
用户发起请求给系统A,系统A接到请求直接调用系统B,系统B返回结果后,系统A才能返回结果给用户,这种模式就是同步调用。
所谓同步调用就是各个系统之间互相依赖,一个系统发送请求,其他系统也会跟着依次进行处理,只有所有系统处理完成后对于用户来讲才算完成了一次请求。只要其他系统出现故障,就会报错给用户。
那么引入中间件后,是如何做到异步调用的呢?
用户发起请求给系统A,此时系统A发送消息给MQ,然后就返回结果给用户,不去管系统B了。然后系统B根据自己的情况,去MQ中获取消息,获取到消息的时候可能已经过了1分钟甚至1小时,再根据消息的指示执行相应的操作。
那么想一想,系统A和系统B互相之间是否有通信?这种调用方式是同步调用吗?
系统A发送消息给中间件后,自己的工作已经完成了,不用再去管系统B什么时候完成操作。而系统B拉去消息后,执行自己的操作也不用告诉系统A执行结果,所以整个的通信过程是异步调用的。
说到这里,我们可以做个总结,消息中间件到底是什么呢?
其实消息中间件就是一个独立部署的系统。可以实现各个系统之间的异步调用。当然它的作用可不止这些,通过它可以解决大量的技术痛点,我们接下来会进行介绍。
消息中间件,总结起来作用有三个:异步化提升性能、降低耦合度、流量削峰。
异步化提升性能
先来说说异步化提升性能,上边我们介绍中间件的时候已经解释了引入中间件后,是如何实现异步化的,但没有解释具体性能是怎么提升的,我们来看一下下边的图。
没有引入中间件的时候,用户发起请求到系统A,系统A耗时20ms,接下来系统A调用系统B,系统B耗时200ms,带给用户的体验就是,一个操作全部结束一共耗时220ms。
如果引入中间件之后呢?看下边的图。
用户发起请求到系统A,系统A耗时20ms,发送消息到MQ耗时5ms,返回结果一共用了25ms,用户体验一个操作只用了25ms,而不用管系统B什么时候去获取消息执行对应操作,这样比较下来,性能自然大大提高
降低耦合度
再来聊聊解耦的场景,看下图。
如果没有引入中间件,那么系统A调用系统B的时候,系统B出现故障,导致调用失败,那么系统A就会接到异常信息,接到异常信息后肯定要再处理一下,返回给用户失败请稍后再试,这时候就得等待系统B的工程师解决问题,一切都解决好后再告知用户可以了,再重新操作一次吧。
这样的架构,两个系统耦合再一起,用户体验极差。
那么我们引入中间件后是什么样的场景呢,看下面的流程:
对于系统A,发送消息后直接返回结果,不再管系统B后边怎么操作。而系统B故障恢复后重新到MQ中拉取消息,重新执行未完成的操作,这样一个流程,系统之间没有影响,也就实现了解耦。
流量削峰
下面我们再聊聊最后一个场景,流量削峰
假如我们的系统A是一个集群,不连接数据库,这个集群本身可以抗下1万QPS
系统B操作的是数据库,这个数据库只能抗下6000QPS,这就导致无论系统B如何扩容集群,都只能抗下6000QPS,它的瓶颈在于数据库。
假如突然系统QPS达到1万,就会直接导致数据库崩溃,那么引入MQ后是怎么解决的呢,见下图:
引入MQ后,对于系统A没有什么影响,给MQ发送消息可以直接发送1万QPS。
此时对于系统B,可以自己控制获取消息的速度,保持在6000QPS一下,以一个数据库能够承受的速度执行操作。这样就可以保证数据库不会被压垮。
当然,这种情况MQ中可能会积压大量消息。但对于MQ来说,是允许消息积压的,等到系统A峰值过去,恢复成1000QPS时,系统B还是在以6000QPS的速度去拉取消息,自然MQ中的消息就慢慢被释放掉了。
这就是流量削峰的过程。在电商秒杀、抢票等等具有流量峰值的场景下可以使用这么一套架构。
1.Http请求基于请求与响应的模型,在高并发的情况下,客户端发送大量的请求达到
服务器端有可能会导致我们服务器端处理请求堆积。
2.Tomcat服务器处理每个请求都有自己独立的线程,如果超过最大线程数会将该请求缓存到队列中,如果请求堆积过多的情况下,有可能会导致tomcat服务器崩溃的问题。
所以一般都会在nginx入口实现限流,整合服务保护框架。
可以实现支撑高并发、异步解耦、流量削峰、降低耦合度。
客户端发送请求到达服务器端,服务器端实现会员注册业务逻辑,
1.insertMember()--插入会员数据1s
2.sendSms()----发送登陆短信提醒3s
3.sendCoupons()----发送新人优惠券3s
不是很好。
多线程与MQ方式实现异步?
互联网项目:
客户端安卓/IOS
服务器端:php/java
最好使用mq实现异步
用户向数据库中插入一条数据之后,在单独开启一个线程异步发送短信和优惠操作。
优点:适合于小项目实现异步
缺点:有可能会消耗服务器cpu资源资源
先向数据库中插入一条会员数据,让后再向MQ中投递一个消息,MQ服务器端在将消息推送给消费者异步解耦处理发送短信和优惠券。
MQ可以实现异步/解耦/流量削峰问题;
多线程也可以实现异步,但是消耗到cpu资源,没有实现解耦。
Producer生产者:投递消息到MQ服务器端;
Consumer消费者:从MQ服务器端获取消息处理业务逻辑;
BrokerMQ服务器端
Topic主题:分类业务逻辑发送短信主题、发送优惠券主题
Queue存放消息模型队列先进先出后进后出原则数组/链表
Message生产者投递消息报文:json
简单的比较
特性
ActiveMQ
RabbitMQ
RocketMQ
kafka
开发语言
java
erlang
scala
单机吞吐量
万级
10万级
时效性
ms级
us级
ms级以内
可用性
高(主从架构)
非常高(分布式架构)
功能特性
成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好
基于erlang开发,所以并发能力很强,性能极其好,延时很低管理界面较丰富
MQ功能比较完备,扩展性佳
只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。
多线程版本mq;
使用LinkedBlockingDeque模拟实现多线程的方式读写中间件
基于网络通讯版本mqnetty实现
生产者
消费者netty客户端与nettyServer端MQ服务器端保持长连接,MQ服务器端保存
消费者连接。
生产者netty客户端发送请求给nettyServer端MQ服务器端,MQ服务器端在将该
消息内容发送给消费者。
body:{"msg":{"userId":"123456","age":"23"},"type":"producer",”topic”:””}
生产者投递消息给MQ服务器端,MQ服务器端需要缓存该消息
如果mq服务器端宕机之后,消息如何保证不丢失:
持久化机制
如果mq接收到生产者投递消息,如果消费者不在的情况下,该消息是否会丢失?
消费者已经和mq服务器保持长连接:Mq服务器端将该消息推送消费者。
消费者第一次刚启动的时候,消费者会主动拉取消息。
Mq如何实现抗高并发思想
Mq消费者根据自身能力情况,拉取mq服务器端消息消费。
默认的情况下是取出一条消息。
缺点:存在延迟的问题
需要考虑mq消费者提高速率的问题:
如何消费者提高速率:消费者实现集群、消费者批量获取消息即可。
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件),RabbitMQ服务器是用Erlang语言编写的。
RabitMQ官方网站:
1.点对点(简单)的队列
2.工作(公平性)队列模式
3.发布订阅模式
4.路由模式Routing
5.通配符模式Topics
2.配置erlang环境变量信息
新增环境变量ERLANG_HOME=erlang的安装地址
将%ERLANG_HOME%\bin加入到path中
注意:RabbitMQ它依赖于Erlang,需要先安装Erlang。
netstartRabbitMQ
如果rabbitmq启动成功无法访问管理平台页面
进入到F:\path\rabbitmq\rabbitmq\rabbitmq_server-3.6.9\sbin>
执行
rabbitmq-pluginsenablerabbitmq_management
rabbitmqctlstart_app
默认账号:guest/guest用户可以自己创建新的账号
VirtualHosts:
像mysql有数据库的概念并且可以指定用户对库和表等操作的权限。那RabbitMQ呢?
RabbitMQ也有类似的权限管理。在RabbitMQ中可以虚拟消息服务器VirtualHost,每
个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互
隔离。exchange、queue、message不能互通。
默认的端口15672:rabbitmq管理平台端口号
默认的端口5672:rabbitmq消息中间内部通讯的端口默认的端口号25672rabbitmq集群的端口号
/VirtualHosts---分类
/队列存放我们消息
Exchange分派我们消息在那个队列存放起来类似于nginx
25672rabbitmq集群通信端口号
Amqp5672rabbitmq内部通信的一个端口号
首先需要再RabbitMQ平台创建VirtualHosts和队列。
/myVirtualHosts
----订单队列
----支付队列
下列模式用到的所有的依赖
公共类:
测试结果:
1、消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
2、消费者1和消费者2获取到的消息的数量是相同的,一个是奇数一个是偶数。
其实,这样是不合理的,应该是消费者1要比消费者2获取到的消息多才对。
Work模式的“能者多劳”
四、路由模式
1、图示
注释:DirectExchange–处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键“dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
1.RPC客户端启动后,创建一个匿名、独占的、回调的队列2.RPC客户端设置消息的2个属性:replyTo和correlationId,然后将消息发送到队列rpc_queue3.RPC服务端在队列rpc_queue上等待消息。RPC服务端处理完收到消息后,然后将处理结果封装成消息发送到replyTo指定的队列上,并且此消息带上correlationId(此值为收到消息里的correlationId)4.RPC客户端在队列replyTo上等待消息,当收到消息后,它会判断收到消息的correlationId。如果值和自己之前发送的一样,则这个值就是RPC的处理结果
使用消息确认机制+持久技术
A.消费者确认收到消息机制
channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
注:第二个参数值为false代表关闭RabbitMQ的自动应答机制,改为手动应答。
在处理完消息时,返回应答状态,true表示为自动应答模式。
channel.basicAck(envelope.getDeliveryTag(),false);
B.生产者确认投递消息成功使用Confirm机制或者事务消息
Confirm机制同步或者是异步的形式
2.RabbitMQ默认创建是持久化的
代码中设置durable为true
参数名称详解:
durable是否持久化durable为持久化、Transient不持久化
autoDelete是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQManagement,查看某个队列的消费者数量,当consumers=0时队列就会自动删除
channel.txSelect();channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());//inti=1/0;channel.txCommit();
publicclassProducer{privatestaticfinalStringQUEUE_NAME="mayikt-queue";publicstaticvoidmain(String[]args)throwsIOException,TimeoutException,InterruptedException{//1.创建一个新连接Connectionconnection=RabbitMQConnection.getConnection();//2.设置channelChannelchannel=connection.createChannel();//3.发送消息Stringmsg="每特教育6666";channel.confirmSelect();channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());booleanresult=channel.waitForConfirms();if(result){System.out.println("消息投递成功");}else{System.out.println("消息投递失败");}channel.close();connection.close();}}
publicclassConsumer{privatestaticfinalStringQUEUE_NAME="mayikt-queue";publicstaticvoidmain(String[]args)throwsIOException,TimeoutException,IOException,TimeoutException{//1.创建连接Connectionconnection=RabbitMQConnection.getConnection();//2.设置通道Channelchannel=connection.createChannel();DefaultConsumerdefaultConsumer=newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{Stringmsg=newString(body,"UTF-8");System.out.println("消费者获取消息:"+msg);//消费者完成消费该消息channel.basicAck(envelope.getDeliveryTag(),false);}};//3.监听队列channel.basicConsume(QUEUE_NAME,false,defaultConsumer);}}
两个消费者,一个生产者
yml
下面的代码演示的是生产者生产消息给mq中间件(通过服务的方式),消费者订阅消费,并插入数据库,生产者通过接口和id判断数据库有没有相对应的消息来判断有没有被消费。
这里面加入了自定义消费失败重试策略
代码整合参考mayikt-springboot-rabbitmq|#中order-dead-letter-queue项目
死信队列和普通队列区别不是很大
普通与死信队列都有自己独立的交换机和路由key、队列和消费者。
区别:
1.生产者投递消息先投递到我们普通交换机中,普通交换机在将该消息投到
普通队列中缓存起来,普通队列对应有自己独立普通消费者。
2.如果生产者投递消息到普通队列中,普通队列发现该消息一直没有被消费者消费
的情况下,在这时候会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机
对应有自己独立的死信(备胎)队列对应独立死信(备胎)消费者。
1.30分钟订单超时设计
采用死信队列,创建一个普通队列没有对应的消费者消费消息,在30分钟过后
就会将该消息转移到死信备胎消费者实现消费。
备胎死信消费者会根据该订单号码查询是否已经支付过,如果没有支付的情况下
则会开始回滚库存操作。
微服务形式
在这时候mq会自动触发重试机制,默认的情况下rabbitmq是无限次数的重试。
需要人为指定重试次数限制问题
A.消费者获取消息后,调用第三方接口,但是调用第三方接口失败呢?是否需要重试?
该情况下需要实现重试策略,网络延迟只是暂时调用不通,重试多次有可能会调用通。
B.消费者获取消息后,因为代码问题抛出数据异常,是否需要重试?
该情况下是不需要实现重试策略,就算重试多次,最终还是失败的。可以将日志存放起来,后期通过定时任务或者人工补偿形式。如果是重试多次还是失败消息,需要重新发布消费者版本实现消费。
可以使用死信队列
Mq在重试的过程中,有可能会引发消费者重复消费的问题。
Mq消费者需要解决幂等性问题
幂等性保证数据唯一
方式1:
生产者在投递消息的时候,生成一个全局唯一id,放在我们消息中。
Msgid=123456
消费者获取到我们该消息,可以根据该全局唯一id实现去重复。
全局唯一id根据业务来定的订单号码作为全局的id
实际上还是需要再db层面解决数据防重复。
业务逻辑是在做insert操作使用唯一主键约束
业务逻辑是在做update操作使用乐观锁
总结:如果消费者处理消息时,因为代码原因抛出异常是需要从新发布版本才能解决的,那么就不需要重试,重试也解决不了该问题的。存放到死信队列或者是数据库表记录、后期人工实现补偿。
详细的策略解决幂等性和保证幂等性的方法
HTTP/1.1中对幂等性的定义是:一次和多次请求某一个资源对于资源本身应该具有同样的结果(网络超时等问题除外)。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
业务开发中,经常会遇到重复提交的情况,无论是由于网络问题无法收到请求结果而重新发起请求,或是前端的操作抖动而造成重复提交情况。在交易系统,支付系统这种重复提交造成的问题有尤其明显,比如:
幂等需要通过唯一的业务单号来保证。也就是说相同的业务单号,认为是同一笔业务。使用这个唯一的业务单号来确保,后面多次的相同的业务单号的处理逻辑和执行效果是一致的。下面以支付为例,在不考虑并发的情况下,实现幂等很简单:①先查询一下订单是否已经支付过,②如果已经支付过,则返回支付成功;如果没有支付,进行支付流程,修改订单状态为‘已支付’。
上述的保证幂等方案是分成两步的,第②步依赖第①步的查询结果,无法保证原子性的。在高并发下就会出现下面的情况:第二次请求在第一次请求第②步订单状态还没有修改为‘已支付状态’的情况下到来。既然得出了这个结论,余下的问题也就变得简单:把查询和变更状态操作加锁,将并行操作改为串行操作。
如果只是更新已有的数据,没有必要对业务进行加锁,设计表结构时使用乐观锁,一般通过version来做乐观锁,这样既能保证执行效率,又能保证幂等。例如:UPDATEtab1SETcol1=1,version=version+1WHEREversion=#version#不过,乐观锁存在失效的情况,就是常说的ABA问题,不过如果version版本一直是自增的就不会出现ABA的情况。(从网上找了一张图片很能说明乐观锁,引用过来,出自Mybatis对乐观锁的支持)
使用订单号orderNo做为去重表的唯一索引,每次请求都根据订单号向去重表中插入一条数据。第一次请求查询订单支付状态,当然订单没有支付,进行支付操作,无论成功与否,执行完后更新订单状态为成功或失败,删除去重表中的数据。后续的订单因为表中唯一索引而插入失败,则返回操作失败,直到第一次的请求完成(成功或失败)。可以看出防重表作用是加锁的功能。
这种方式分成两个阶段:申请token阶段和支付阶段。第一阶段,在进入到提交订单页面之前,需要订单系统根据用户信息向支付系统发起一次申请token的请求,支付系统将token保存到Redis缓存中,为第二阶段支付使用。第二阶段,订单系统拿着申请到的token发起支付请求,支付系统会检查Redis中是否存在该token,如果存在,表示第一次发起支付请求,删除缓存中token后开始支付逻辑处理;如果缓存中不存在,表示非法请求。实际上这里的token是一个信物,支付系统根据token确认,你是你妈的孩子。不足是需要系统间交互两次,流程较上述方法复杂。
把订单的支付请求都快速地接下来,一个快速接单的缓冲管道。后续使用异步任务处理管道中的数据,过滤掉重复的待支付订单。优点是同步转异步,高吞吐。不足是不能及时地返回支付结果,需要后续监听支付结果的异步返回。