消息队列的作用就是接收消息生产者的消息,然后将消息发送到消费者
1、信道channel
我的理解是生产者/消费者和rabbitmq交互的一个通道,负责交换机、队列管理;消息发布和消费管理;事务管理等
2、交换机
四种交换机:
direct:可以用一个或者多个key绑定到一个或者多个队列上
topic:支持路由的适配符#*
Fanout广播:将消息发送给所有的队列
Header头交换机:自定义通过头消息属性来定义路由的匹配
3、队列:保存消息的队列
4、消费者:消息的接收者
5、生产者:消息的发送者
0、环境和依赖
//获取连接ConnectionFactoryfactory=newConnectionFactory();factory.setHost("localhost");//mq主机地址factory.setPort(5672);//端口,默认时5672factory.setUsername("leyou");factory.setPassword("leyou");factory.setVirtualHost("/leyou");Connectionconnection=factory.newConnection();//获取信道Channelchannel=connection..createChannel();2、申明交换机/队列/绑定交换机和队列
//交换机名,交换机类型channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.DIRECT);/***第一个参数是queue:要创建的队列名*第二个参数是durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息*第三个参数是exclusive:true表示一个队列只能被一个消费者占有并消费*第四个参数是autoDelete:true表示服务器不在使用这个队列是会自动删除它*第五个参数是arguments:包括死信队列,队列的ttl*/channel.queueDeclare(QUEUE_ONE,true,false,false,null);//绑定交换机和队列队列名,交换机名,routekeychannel.queueBind(QUEUE_ONE,EXCHANGE,GIRL);3、发布消息
//1、交换机名2、routekey3、mandatory强制(需要return回调时必须设置为true)4、发布消息参数5、消息channel.basicPublish(EXCHANGE,GIRL,true,null,"xxx降价了".getBytes());4、接收消息
//接收消息前也需要获取连接和channel,申明队列//接收消息Consumerconsumer=newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{ //拿到消息 System.out.println(newString(body,"utf-8")); }};/***参数说明*1:队列名字*2:是否自动应答autoACk,为false时需要手动ack*3:消费者,当接收到消费者时会调用给对象中的handleDelivery方法*/channel.basicConsume(QUEUE_ONE,true,consumer);2.2、基本应用1、功能:有两个人小明和小华,小明对美女感兴趣,小华对股票和没事感兴趣,使用消息队列将他们感兴趣的消息发送给他们两个
(1)写一个类来提供创建连接和信道;(2)生产者(发送消息方)类发送消息(3)消费者(接收消息)类接收消息
对于固定消息体大小和线程数,如果消息持久化,生产者confirm(或者采用事务机制),消费者ack那么对性能有很大的影响.
消息持久化的优化没有太好方法,用更好的物理存储(SAS,SSD,RAID卡)总会带来改善。生产者confirm这一环节的优化则主要在于客户端程序的优化之上。归纳起来,客户端实现生产者confirm有三种编程方式:
channel.confirmSelect();for(inti=0;i 异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是通过SortedSet维护消息序号的。关键代码: //第二个参数就是指定是否手动ackfalse时为手动channel.basicConsume(QUEUE_ONE,false,consumer);手动ack有三种 单个确认ack //接收消息Consumerconsumer=newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{System.out.println(newString(body,"utf-8"));//手动应答ack可以在该方法中进行;参数:1.消息tag,2.是否批量ack channel.basicAck(envelope.getDeliveryTag(),false);//拒绝消息;参数:1.消息tag;2.消息是否重新入队,当只有一个消费者时,会引起重复消费channel.basicReject(envelope.getDeliveryTag(),false);//批量ack消息;参数:1.消息tag;2.是否批量ack消息,3.是否重回队列 channel.basicNack(envelope.getDeliveryTag(),true,false);}};//这里只需要条应答的语句,我这里知识都列出来channel.basicConsume(QUEUE_ONE,false,consumer);//注意上面第二个参数要为false才能手动ack2.5、消息TTL和队列TTL、死信队列、延迟队列这一块暂时不使用原始RabbitMqClientAPI实现,后面再研究,但是会使用下面的org.springframework.amqp来实现 Spring对RabbitMp进行了抽象,将交换机,队列,消息,绑定,连接等抽象出实体类,方便操作,还提供了RabbitAdmit和RabbitTemplate来方便交换机队列的管理以及消息的发送接收等 //1、创建连接时conn.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);//2、创建rabbitTemplate时rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{//ack为确认消息是否成功发送到mqif(ack){//成功发送log.info("消息发送成功");}});2、消息回调机制关键代码: //1、创建连接时conn.setPublisherReturns(true);//2、创建rabbitTemplate时//改标志位设置位true时,当交换机根据自身类型和routeKey无法找到对应的队列时,//则mq会将消息返还给生产者//当为false时则mq会将消息直接删除rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnCallback((Messagemessage,intreplyCode,StringreplyText,Stringexchange,StringroutingKey)->{//消息log.info("message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}",message,replyCode,replyText,exchange,routingKey);});生产者: @ComponentpublicclassRabbitReceive{@RabbitListener(bindings=@QueueBinding(exchange=@Exchange(value=EXCHANGE_NAME,type=ExchangeTypes.DIRECT),key=VERIFICATION_CODE_ROUTE_KEY,value=@Queue(value=VERIFICATION_CODE_QUEUE,autoDelete="false"),ignoreDeclarationExceptions="true"),concurrency="1",//指定监听该队列的消费者个数ackMode="MANUAL"//手动ack)publicvoidreceiveCode(Channelchannel,Messagemsg,@HeadersMap