首页 >> 大全

八、代码实现-自动/手动应答

2023-06-17 大全 45 作者:考证青年

这里写自定义目录标题

一、的概念

是一个消息中间件,他接收并且供第三方使用。就像快递一样,商家为生产者,快递站为MQ,而用户为消费者。

二、为什么使用 解耦

比如公司的系统A,需要将数据推送到其他不同的系统中去。这样子系统间的耦合度变高,系统A需要考虑一些列乱七八糟的因素,如其他系统挂了,需不需要重新推送、、等待问题。这时候如果使用,则系统A只需要考虑将数据推送到,其他系统需要消费的,自己去系统A获取即可,可以有效的做到系统A与其他系统的解耦。削峰

比如公司的业务存在高峰期,如上午数据写入存在w/s的数据量,而数据库的承受量加入只有k/s,系统就有可能导致崩溃卡死。但是如果将消息写入,由控制消息读取速度小于系统承受量,这样即使在高峰时期也不会挂掉。异步

比如系统A存在需求,如果接受某一份数据,则需要写入A系统和其他系统,时间是累加的。但是如果将数据发送到其他,由各个系统自己实现写入,则写入的时间将会大幅度缩减。 三、使用的缺点 系统可用性

由于加入了,导致系统的组件增加,部署与运维难度增加。而且如果挂掉, 也会影响系统的使用。

2.数据一致性

因为加入了,需要考虑的因为也会随之增加。比如消息丢失、消息重复消费、消息发送成功却是否被消费者成功消费等等、、、 四、MQ的选择 名称优点缺点适用场景

单机吞吐量万级,时效性ms级,可用性高,较低概率出现丢失数据。

官方社区现在对于 5.x的版本维护越来越少,高吞吐量场景较少使用。

早期使用的 ,随着其他MQ的出现使用量渐渐减少,而且没经过大规模吞吐量场景的验证,社区也不是很活跃,所以不推荐。

Kafka

单机吞吐量 百万级,时效性 ms级,可用性非常高,消息可靠性可配置 0 丢失。而且是分布式的,一个数据有多个副本,少数机器宕机也不会丢失数据。

单机超过64个队列/分区,CPU会明显变高,队列越多越高,发送消息响应时间变长。消费失败不支持重试。

主要用于日志的采集与传输,一般大公司使用

单机吞吐量十万级,可用性高,支持分布式,消息可靠性可以做到0丢失,扩展性好,支持十亿级别消息堆积。

支持的客户端不多,目前只支持java和c++。

主要用于金融领域,被阿里广泛的用于订单、交易、重置、消息推送等场景。

用语言编写,性能较好,单机吞吐量万级,时效性μs级,可用性高,消息可靠性基本不丢失,而且支持大量的其他语言,社区活跃度高,更新频率高。

商业版需要收费,学习成本较高。

界面管理方便,功能完备。如果数据量较小可以推荐使用,适用于中小型公司。

五、安装

ps:由于需要的包下载较慢,可以直接从这里下载(网络较好的可以忽略):

由于需要的支持,所以下载包之前需要确定和的对应关系:安装

1)下载:wget

2)安装:rpm -ivh -21.3.1-1.el7..rpm

ps1: 如果安装错误:--3.8.8-1.el7..rpm: 不是 rpm 软件包 (或者没有)。这是因为网络访问较慢,有可能下载不全,所以如果wget实在有问题,可以选择直接登录网站包下载下来,再传到linux即可!

ps2:下载的版本el7根据linux来,可通过uname -a查看:

安装

1)下载依赖:yum -y socat

2)下载包:wget

3)安装:rpm -ivh --3.8.8-1.el7..rpm

ps:如果安装报错,下载不全,解决办法如上!启动: start -设置开机自启动: -

ps1:相关命令-关闭: stop

ps2:相关命令-重启: -

ps3:相关命令-插件列表:- list

ps4:相关命令-启动插件:- XXX (XXX为插件名)

ps5:相关命令-停用插件:- XXX查看启动后的情况 : 、 -

新建一个用户

1)新建用户: admin admin

2)授予管理员权限: admin

3)设置admin可以使用的虚机权限: admin–> -p admin admin ".*" ".*" ".*"查看用户:

重启: -查看页面::15672

输入账号密码:admin/admin

进入该界面说明安装成功(这里因为做了一些代码操作,所以界面不是原始的,新安装的跟我的有些不同) 六、工作原理图

(生产者),即发消息的系统。(连接通道):与服务器连接的TCP通道(交换机)和queue(队列):消息先发给交换机,再由交换机根据规则转发给队列。一个交换机可以对应多个队列。如果工作时,不省明交换机,则会使用默认交换机。(消费者):即最后消费队列消息的系统。 七、代码实现-简单工作

工作流程

由于我们后续测试都会连接服务器,所以这里创建一个公共连接类。

创建公共连接类

package com.rabbitmqUtils;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* 创建信道* @create 2022-02-08 16:26* @desc**/
public class RabbitmqUtils {//队列名称public static final String RABBITMQ_QUEUE = "RABBITMQ_QUEUE";//交换机名称public static final String RABBITMQ_EXCHANGE = "RABBITMQ_EXCHANGE";public static Channel getChannel() throws IOException, TimeoutException {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();//工厂ipfactory.setHost("192.168.248.10");//用户名factory.setUsername("admin");//密码factory.setPassword("admin");//创建链接Connection connection = factory.newConnection();//获取信道Channel channel = connection.createChannel();return channel;}
}

创建生产者

package com.rabbitmq1;import com.rabbitmq.client.Channel;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Product {//发消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//生成队列,不创建交换机,走默认的交换机//1.名称//2.队列消息是否持久化(否:存内存,是:存磁盘。默认否)//3.队列是否只供一个消费者消费,默认否//4.最后一个消费者断开连接后,是否自动删除。//5.其他参数channel.queueDeclare(RabbitmqUtils.RABBITMQ_QUEUE, true, false, false, null);//发消息String message = "this is QUEUE_P1";//持续发送消息for (int i = 0; i < 10; i++) {Thread.sleep(1000);//1.交换机,简单版本不考虑,直接空字符串,即默认交换机//2.路由key,直接写队列名即可//3.参数,忽略//4.消息体channel.basicPublish("", RabbitmqUtils.RABBITMQ_QUEUE, null, (message+i).getBytes());}System.out.println("消息发送成功");}
}

创建消费者

package com.rabbitmq1;import com.rabbitmq.client.*;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 15:34* @desc**/
public class Consume {//发消息public static void main(String[] args) throws IOException, TimeoutException {//获取信道Channel channel = RabbitmqUtils.getChannel();//消费者未成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答//3.消费者成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(RabbitmqUtils.RABBITMQ_QUEUE, true, deliverCallback, cancelCallback);}
}

启动生产者类,查看可视化界面,可以发现已经写入了10条

启动消费者,效果如下,可以看到队列消息已经被消费,则成功!

ps:如果报错ERROR com...impl.ndler - An error ,则需要admin授权

八、代码实现-多个消费者

工作原理图

生产者

package com.rabbitmq2;import com.rabbitmq.client.Channel;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Product {//发消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//0是轮询、1是不公平分发,大于1则是预取值,默认为0。预取后,再进行不公平分发。channel.basicQos(0);//生成队列,//1.名称//2.队列消息是否持久化(否:存内存,是:存磁盘。默认否)//3.队列是否只供一个消费者消费,默认否//4.最后一个消费者断开连接后,是否自动删除。//5.其他参数channel.queueDeclare(RabbitmqUtils.RABBITMQ_QUEUE, true, false, false, null);//持续发送消息for (int i = 0; i < 10; i++) {String message="this is Product"+i;//1.交换机,简单版本不考虑,直接空字符串即可(默认/无名交换机)//2.路由key,直接写队列名即可//3.参数,忽略//4.消息体channel.basicPublish("", RabbitmqUtils.RABBITMQ_QUEUE, null, message.getBytes());}System.out.println("消息发送成功");}
}

消费者1

package com.rabbitmq2;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 这是一个工作线程** @author 天真热* @create 2022-02-08 16:36* @desc**/
public class Consume01 {//接收消息public static void main(String[] args) throws IOException, TimeoutException {System.out.println("这是工作线程1....");Channel channel = RabbitmqUtils.getChannel();//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功,否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(RabbitmqUtils.RABBITMQ_QUEUE, true, deliverCallback, cancelCallback);}
}

消费者2

package com.rabbitmq2;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 这是一个工作线程** @author 天真热* @create 2022-02-08 16:36* @desc**/
public class Consume02 {//接收消息public static void main(String[] args) throws IOException, TimeoutException {System.out.println("这是工作线程2....");Channel channel = RabbitmqUtils.getChannel();//消费者未成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答//3.消费者成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(RabbitmqUtils.RABBITMQ_QUEUE, true, deliverCallback, cancelCallback);}
}

启动消费者1,再启动消费者2.然后启动生产者,可以看到,消息是轮询发送给两个消费者。

八、代码实现-自动/手动应答 概念

1)自动应答:消息从队列发送给消费者时,就已经默认消费成功。

优点:效率高。

缺点:一方面如果消费者在消费消息时候如果断开了,则消费者没有成功处理消息,而队列默认消费成功,就会造成数据丢失。另一方面,如果消费者系统性能交叉,没法及时处理消息,就会造成消息积压,内存耗尽而崩溃。

2)手动应答:消息从队列发送给消费者时,消费者需要手动确认消息,队列才会认为消费成功。

优点:数据传输较为安全,而且可操作性较高。

缺点:效率低手动应答环境下的可操作性

1)根据不同消费者应答消息的效率,队列可以动态分配消息给消费者

2)对于一些特殊的队列信息,可以选择拒收,重新放回队列代码实现

生产者

package com.rabbitmq3;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Product {//发消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//生成队列,//1.名称//2.队列消息是否持久化(否:存内存,是:存磁盘。默认否)//3.队列是否只供一个消费者消费,默认否//4.最后一个消费者断开连接后,是否自动删除。//5.其他参数channel.queueDeclare(RabbitmqUtils.RABBITMQ_QUEUE, true, false, false, null);//持续发送消息for (int i = 0; i < 10; i++) {//发消息String message = "this is Product"+i;//1.交换机,简单版本不考虑,直接空字符串即可(默认/无名交换机)//2.路由key,直接写队列名即可//3.参数,(消息持久化,需要队列开启持久化才有效)//4.消息体channel.basicPublish("", RabbitmqUtils.RABBITMQ_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());}System.out.println("消息发送成功");}
}

消费者1(接受,每秒接受1条,初始取5条)

package com.rabbitmq3;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 这是一个工作线程** @author 天真热* @create 2022-02-08 16:36* @desc**/
public class Consume01 {//接收消息public static void main(String[] args) throws IOException, TimeoutException {System.out.println("这是工作线程1....");Channel channel = RabbitmqUtils.getChannel();//0是轮询,默认值//1是不公平分发,即哪个消费者效率高,哪边分配的多//大于1则是预取值,即消费者一定会消费的消息数量。预取后,再进行不公平分发。channel.basicQos(5);//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {//睡眠try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));//手动应答//1.消息确认标记,2.是否批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功,否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(RabbitmqUtils.RABBITMQ_QUEUE, false, deliverCallback, cancelCallback);}
}

消费者2(拒收,每10s拒收一条,初始取2条)

package com.rabbitmq3;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 这是一个工作线程** @author 天真热* @create 2022-02-08 16:36* @desc**/
public class Consume02 {//接收消息public static void main(String[] args) throws IOException, TimeoutException {System.out.println("这是工作线程2....");Channel channel = RabbitmqUtils.getChannel();//0是轮询、1是不公平分发,大于1则是预取值,默认为0。预取后,再进行不公平分发。channel.basicQos(2);//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {//睡眠try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));//1.消息否定确认标记,2.消息是否重新被放回队列channel.basicReject(message.getEnvelope().getDeliveryTag(), true);};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功,否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(RabbitmqUtils.RABBITMQ_QUEUE, false, deliverCallback, cancelCallback);}
}

可以去界面查看

结果:

九、代码实现-消息发布确认的三种方式

消息发布确认就是指生产者将消息发送到 后,如果 收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到 。对于消息发布确认一共有三种方式。

单个确认为每发送一次消息就进行一次确认,优点是准确无误,缺点是资源占用较大,速度较慢。1000条数据测试时间为

    /*** 单个确认*/public static void publishDg() throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//发送消息for (int i = 0; i < 1000; i++) {channel.basicPublish("", RabbitmqUtils.RABBITMQ_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, ("message" + i).getBytes());//发布确认boolean flag = channel.waitForConfirms();if (flag) {System.out.println("发送成功");}}//结束时间long end = System.currentTimeMillis();System.out.println("单独确认花费时间=" + (end - begin));}

批量确认为每发送一批消息再进行一次确认,优点是比单个确认更快,但是无法精确定位到发送失败消息。1000条数据测试时间为170ms

  public static void publishPl() throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//批量确认大小int batchSize = 100;//批量发送消息、确认for (int i = 0; i < 1000; i++) {channel.basicPublish("", RabbitmqUtils.RABBITMQ_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, ("message" + i).getBytes());if (i % batchSize == 0) {channel.waitForConfirms();}}//结束时间long end = System.currentTimeMillis();System.out.println("单独确认花费时间=" + (end - begin));}

异步确认为在发送前创建一个支持高并发的Map,key存储消息tag,存储,并调用监听器进行监听消息发送。一般在每次发送后用Map记录下发送消息,监听器根据结果回调相关函数,若发送成功,回调成功函数,在Map中删去该消息。发送失败,回调失败函数,在失败函数中通过Map显示该消息。异步确认在所有确认中综合性能最佳。1000条数据测试时间为43ms

 public static void publishYb() throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//线程安全有序的一个哈希表,适合高并发的情况//1.将序号和消息进行关联//2.轻松批量删除条目 只要给到序号//3.支持高并发ConcurrentSkipListMap concurrentSkipListMap = new ConcurrentSkipListMap<>();//--------------------监听器--------------------------//消息确认成功回调函数//1.消息的标记,2.是否为批量操作ConfirmCallback ackCallback = (deliveryTag, multiple) -> {//消息接受处理if (multiple) {//批量ConcurrentNavigableMap confirmd = concurrentSkipListMap.headMap(deliveryTag);confirmd.clear();} else {//非批量concurrentSkipListMap.remove(deliveryTag);}System.out.println("确认的消息:" + deliveryTag);};//消息确认失败回调函数//1.消息的标记,2.是否为批量操作ConfirmCallback nackCallback = (deliveryTag, multiple) -> {//消息未接受处理String message = concurrentSkipListMap.get(deliveryTag);System.out.println("未确认的消息:" + deliveryTag + ":" + message);};//消息监听器,监听失败和成功的消息channel.addConfirmListener(ackCallback, nackCallback);//批量发送消息、确认for (int i = 0; i < 1000; i++) {String message = ("message" + i);channel.basicPublish("", RabbitmqUtils.RABBITMQ_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//记录发送的消息concurrentSkipListMap.put(channel.getNextPublishSeqNo(), message);}//结束时间long end = System.currentTimeMillis();System.out.println("单独确认花费时间=" + (end - begin));System.out.println(concurrentSkipListMap.size());}

十、代码实现-交换机

前面有说到,实际上生产者发送消息,消息是直接发给交换机,然后再由交换机根据相关规则分配给队列。而根据分配规则,常见的基类交换机分别有:直接交换机(),主题交换机(topic),标题交换机(topic),首部交换机()等。

ps:因为我这里用的是同一个交换机,如果同一个交换机,并且配置修改了,则需要删除原先的交换机,否则会报错。

交换机模式一

这种模式跟广播一样,即发送到交换机的所有消息,都会发到交换机的所有队列中,代码如下:

生产者

package com.rabbitmq5;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Product {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "fanout");for (int i = 0; i < 10; i++) {String message = "发送的消息" + i;//1.交换机,简单版本不考虑,直接空字符串即可(默认/无名交换机)//2.路由key,直接写队列名即可//3.参数,(消息持久化)//4.消息体channel.basicPublish(RabbitmqUtils.RABBITMQ_EXCHANGE, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());}}
}

消费者1

package com.rabbitmq5;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Consume01 {//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "fanout");//声明一个临时队列String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列的channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "");//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功,否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(queueName, true, deliverCallback, cancelCallback);}
}

消费者2:

package com.rabbitmq5;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Consume02 {//发消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "fanout");//声明一个临时队列String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列的channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "");//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功,否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(queueName, true, deliverCallback, cancelCallback);}
}

结果

2. 交换机模式之

这种方式相对于来说增加了一定的限制,即消息只能够发送到交换机的固定的队列中去。代码如下:

生产者:

package com.rabbitmq6;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Product {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机
//        channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "direct");for (int i = 0; i < 10; i++) {String message = "发送的消息" + i;//1.交换机,简单版本不考虑,直接空字符串即可(默认/无名交换机)//2.路由key,直接写队列名即可//3.参数,(消息持久化)//4.消息体channel.basicPublish(RabbitmqUtils.RABBITMQ_EXCHANGE, "error", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());}}
}

消费者1:

package com.rabbitmq6;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Consume01 {//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "direct");//声明一个临时队列String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列的channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "info");channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "warning");//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功,否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(queueName, true, deliverCallback, cancelCallback);System.out.println("队列info/warning等待...");}
}

消费者2

package com.rabbitmq6;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Consume01 {//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "direct");//声明一个临时队列String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列的channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "info");channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "warning");//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功,否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(queueName, true, deliverCallback, cancelCallback);System.out.println("队列error等待...");}
}

3. 交换机模式之topic

前面说的模式实际上就是绝对匹配。而topic是的模糊匹配。

*(星号)代表一个单词

#(井号)可以替代零个或多个单词

代码如下:

生产者

package com.rabbitmq7;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Product {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "topic");for (int i = 0; i < 10; i++) {String message = "发送的消息" + i;//1.交换机,简单版本不考虑,直接空字符串即可(默认/无名交换机)//2.路由key,直接写队列名即可//3.参数,(消息持久化)//4.消息体channel.basicPublish(RabbitmqUtils.RABBITMQ_EXCHANGE, "queue.queue.queue11", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());}}
}

消费者1

package com.rabbitmq7;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Consume01 {//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "topic");//声明一个临时队列String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列的channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "*.queue.*");//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功,否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(queueName, true, deliverCallback, cancelCallback);System.out.println("消费者|*.queue.*|等待中....");}
}

消费者2

package com.rabbitmq7;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Consume02 {//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "topic");//声明一个临时队列String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列的channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "queue.*.*");//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功,否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(queueName, true, deliverCallback, cancelCallback);System.out.println("消费者|queue.*.*|等待中....");}
}

效果

十一、代码实现-优先级队列

队列的消费顺序一般是先进先出。但是在某些订单中业务中,我们需要给vip用户后下单,先出货的特殊权限,这时候就需要用到优先级队列。

原理,在原来先进先出的逻辑上,给队列备注优先级,最后的顺序如下:

优先级高–>优先级低–>没有备注优先级

ps:优先级的范围为0-255

生产者

package com.rabbitmq12;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Product {//队列名称public static final String ORDER_QUEUE = "ORDER_QUEUE";//发消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//参数Map argument = new HashMap<>();argument.put("x-max-priority", 10);//设置优先级范围0-10,官方允许值是0-255。设置过大会浪费内存//生成队列,//不创建交换机,走默认的交换机//1.名称//2.队列消息是否持久化(否:存内存,是:存磁盘。默认否)//3.队列是否只供一个消费者消费,默认否//4.最后一个消费者断开连接后,是否自动删除。//5.其他参数channel.queueDeclare(ORDER_QUEUE, true, false, false, argument);//发消息String message = "this is QUEUE_P";//持续发送消息for (int i = 0; i < 10; i++) {//1.交换机,简单版本不考虑,直接空字符串即可(默认/无名交换机)//2.路由key,直接写队列名即可//3.参数,忽略//4.消息体if (i == 5) {AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();channel.basicPublish("", ORDER_QUEUE, properties, (message + i).getBytes());} else {channel.basicPublish("", ORDER_QUEUE, null, (message + i).getBytes());}}System.out.println("消息发送成功");}
}

消费者

package com.rabbitmq12;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 15:34* @desc**/
public class Consume {//队列名称public static final String ORDER_QUEUE = "ORDER_QUEUE";//发消息public static void main(String[] args) throws IOException, TimeoutException {//获取信道Channel channel = RabbitmqUtils.getChannel();//消费者未成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答//3.消费者成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(ORDER_QUEUE, true, deliverCallback, cancelCallback);}
}

测试,启动生产者,再启动消费者

十二、代码实现-死信队列

死信指的是无法被消费的消息。这些消息因为一些如网络超时等原因,导致无法被消费,就成了死信消息。所以为了保证这些数据不丢失,就有了死信队列,专门对死信消息进行处理。

工作图:

代码如下:

生成者:

package com.rabbitmq8;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Product {//正常交换机public static String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机(不需要重复声明)//channel.exchangeDeclare(NORMAL_EXCHANGE, "direct");//设置ttl时间为10s,过期则进入死信队列AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();//发送死信消息for (int i = 0; i < 10; i++) {String message = "发送的消息" + i;//1.交换机,简单版本不考虑,直接空字符串即可(默认/无名交换机)//2.路由key,直接写队列名即可//3.参数,(消息持久化)//4.消息体channel.basicPublish(NORMAL_EXCHANGE, "normalQueue", properties, message.getBytes());}}
}

消费者(正常队列)

package com.rabbitmq8;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Consume01 {//正常交换机public static String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";//死信交换机public static String DEAD_EXCHANGE = "DEAD_EXCHANGE";//正常队列public static String NORMAL_QUEUE = "NORMAL_QUEUE";//死信队列public static String DEAD_QUEUE = "DEAD_QUEUE";//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明普通和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, "direct");channel.exchangeDeclare(DEAD_EXCHANGE, "direct");//声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);//设置参数Map arguments = new HashMap<>();//设置死信交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key", "deadQueue");//设置正常队列长度arguments.put("x-max-length", 6);//设置过期时间,10s(一般不在这里设置,而是在生产者端配置,这样子过期时间可以由生产者随意改动)//arguments.put("x-message-ttl", "10000");//声明普通队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);//绑定普通交换机和队列的channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normalQueue");//绑定死信交换机和队列的channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "deadQueue");//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("拒绝");//拒绝,并且不放回队列channel.basicReject(message.getEnvelope().getDeliveryTag(), false);};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);System.out.println("正常队列准备消费消息......");}
}

消费者2(死信队列)

package com.rabbitmq8;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Consume02 {//死信队列public static String DEAD_QUEUE = "DEAD_QUEUE";//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback);System.out.println("死信队列准备消费消息......");}
}

测试1:首先先运行死信队列,然后运行生产者。由于生产的消息没有被消费掉,消息超时自动进入死信队列

测试二:运行死信队列和正常队列,然后运行生产者。由于生产的消息被拒绝,消息超时自动进入死信队列

十三、代码实现-延迟队列

延迟队列:延迟队列里面的元素,是到达指定时候后,就对这些元素进行处理。如果订单功能,如果指定时间内不进行支付,则会取消订单。

原理图:

因为我们后续肯定是要在框架运行,所以这里需要整合。

在yml配置文件配置

spring:rabbitmq:host: 192.168.248.10port: 5672username: adminpassword: admin#交换机确认接口publisher-confirms: true

添加配置类

package com.rabbitmq9;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @author 天真热* @create 2022-02-10 10:09* @desc**/
@Configuration
public class Config {//普通交换机public static final String X_EXCHANGE = "X";//死信交换机public static final String Y_DEAD_LETTER_EXCHANGE = "Y";//普通队列名称public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String QUEUE_C = "QC";//死信队列名称public static final String DEAD_LETTER_QUEUE = "QD";//声明xExchange 别名@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}//声明yExchange 别名@Bean("yExchange")public DirectExchange yExchange() {return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明普通队列A ttl 为10s@Bean("queueA")public Queue queuA() {Map argument = new HashMap<>();//设置死信交换机argument.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyargument.put("x-dead-letter-routing-key", "YD");//设置ttl,10sargument.put("x-message-ttl", 10000);//创建队列return QueueBuilder.durable(QUEUE_A).withArguments(argument).build();}//声明普通队列B ttl 为10s@Bean("queueB")public Queue queuB() {Map argument = new HashMap<>();//设置死信交换机argument.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyargument.put("x-dead-letter-routing-key", "YD");//设置ttl,40sargument.put("x-message-ttl", 40000);//创建队列return QueueBuilder.durable(QUEUE_B).withArguments(argument).build();}//声明普通队列C ttl 为生产者确定@Bean("queueC")public Queue queuC() {Map argument = new HashMap<>();//设置死信交换机argument.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyargument.put("x-dead-letter-routing-key", "YD");//创建队列return QueueBuilder.durable(QUEUE_C).withArguments(argument).build();}//死信队列@Bean("queueD")public Queue queuD() {//创建队列return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}//绑定队列A@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//绑定队列B@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//绑定队列C@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueC).to(xExchange).with("XC");}//绑定队列D@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) {return BindingBuilder.bind(queueD).to(yExchange).with("YD");}}

添加生产消息类

package com.rabbitmq9;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author 天真热* @create 2022-02-10 15:03* @desc**/
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{message}")public void sendMessage(@PathVariable String message) {log.info("发送消息");rabbitTemplate.convertAndSend("X", "XA", "消息来自10s的ttl:" + message);rabbitTemplate.convertAndSend("X", "XB", "消息来自40s的ttl:" + message);}@GetMapping("/sendExpireMsg/{message}/{ttlTime}")public void sendExpireMsg(@PathVariable String message, @PathVariable String ttlTime) {log.info("发送定时消息");rabbitTemplate.convertAndSend("X", "XC", "消息来自定时消息:" + message, msg -> {//发消息的时候,延迟延长msg.getMessageProperties().setExpiration(ttlTime);return msg;});}
}

添加消费者类

package com.rabbitmq9;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 队列ttl消费者** @author 天真热* @create 2022-02-10 16:00* @desc**/
@Slf4j
@Component
public class Consume {//接收消息@RabbitListener(queues = Config.DEAD_LETTER_QUEUE)public void receiveD(Message msg, Channel channel) {String message = new String(msg.getBody());log.info("接收到了延迟队列消息:" + message);}
}

测试1::8090/ttl//

测试2::8090/ttl//5555/100

测试3:连续发送两个地址。可以看到延迟消息需要排队,没法优先发送延时时间短的,这是个弊端,可以使用插件克服这个问题。

:8090/ttl//5555/10000

:8090/ttl///1000

十四、代码实现-插件实现延迟队列

安装教程

下载:将插件放入:/usr/lib//lib/-3.8.8/下进入目录:cd /usr/lib//lib/-3.8.8/安装:- 重启: -

工作原理:

代码如下

配置类

package com.rabbitmq10;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.CustomAutowireConfigurer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @author 天真热* @create 2022-02-10 10:09* @desc**/
@Configuration
public class DelayConfig {//队列public static final String DELAY_QUEUE = "DELAY_QUEUE";//交换机public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";//routingKeypublic static final String DELAY_ROUNTING_KEY = "DELAY_ROUNTING_KEY";//声明交换机@Beanpublic CustomExchange delayEchange() {Map arguments = new HashMap<>();arguments.put("x-delayed-type", "direct");//1.交换机名称//2.交换机类型//3.是否需要持久化//4.是否需要自动删除//5.其他参数return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, arguments);}//声明队列@Beanpublic Queue delayQueue() {//创建队列return new Queue(DELAY_QUEUE);}//绑定队列@Beanpublic Binding delayBindingQueue(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayEchange") CustomExchange delayEchange) {return BindingBuilder.bind(delayQueue).to(delayEchange).with("DELAY_ROUNTING_KEY").noargs();}}

消费者

package com.rabbitmq10;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 队列ttl消费者** @author 天真热* @create 2022-02-10 16:00* @desc**/
@Slf4j
@Component
public class DelayConsume {//接收消息@RabbitListener(queues = DelayConfig.DELAY_QUEUE)public void receiveDelay(Message msg) {String message = new String(msg.getBody());log.info("接收到了插件延迟队列消息:" + message);}
}

发送消息类

package com.rabbitmq10;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author 天真热* @create 2022-02-10 15:03* @desc**/
@Slf4j
@RestController
@RequestMapping("/delayed")
public class SendDelayMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendExpireMsg/{message}/{ttlTime}")public void sendExpireMsg(@PathVariable String message, @PathVariable Integer ttlTime) {log.info("发送定时消息");rabbitTemplate.convertAndSend(DelayConfig.DELAY_EXCHANGE, DelayConfig.DELAY_ROUNTING_KEY, "消息来自定时消息:" + message, msg -> {//发消息的时候,延迟时长msg.getMessageProperties().setDelay(ttlTime);return msg;});}
}

测试:依次执行以下两个地址,可以发现延迟时间短的会先执行,不需要排队

:8090///10000/10000

:8090///500/500

十五、代码实现-发布确认高级

原理图

配置文件

spring:rabbitmq:host: 192.168.248.10port: 5672username: adminpassword: admin#交换机确认接口publisher-confirms: true#新版本:spring.rabbitmq.publisher-confirm-type=correlated#路由回退消息给生产者publisher-returns: true

发送消息类

package com.rabbitmq11;import com.rabbitmq10.DelayConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.CorrelationDataPostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author 天真热* @create 2022-02-10 15:03* @desc**/
@Slf4j
@RestController
@RequestMapping("/confirm")
public class SendConfirmMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsgToBadExchange/{message}")public void sendMsgToBadExchange(@PathVariable String message) {//类,可以在队列接收消息的时候接受CorrelationData correlationData = new CorrelationData("1");//发送消息rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE + "bad", ConfirmConfig.CONFIRM_ROUNTING_KEY, "消息来自定时消息:" + message, correlationData);log.info("发送消息");}@GetMapping("/sendMsgToBadRounting/{message}")public void sendMsgToBadRounting(@PathVariable String message) {//类,可以在队列接收消息的时候接受CorrelationData correlationData = new CorrelationData("1");//发送消息rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE, ConfirmConfig.CONFIRM_ROUNTING_KEY + "bad", "消息来自定时消息:" + message, correlationData);log.info("发送消息");}@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message) {//类,可以在队列接收消息的时候接受CorrelationData correlationData = new CorrelationData("1");//发送消息rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE, ConfirmConfig.CONFIRM_ROUNTING_KEY , "消息来自定时消息:" + message, correlationData);log.info("发送消息");}}

配置类

package com.rabbitmq11;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 发布确认** @author 天真热* @create 2022-02-10 10:09* @desc**/
@Configuration
public class ConfirmConfig {//队列public static final String CONFIRM_QUEUE = "CONFIRM_QUEUE";//交换机public static final String CONFIRM_EXCHANGE = "CONFIRM_EXCHANGE";//routingKeypublic static final String CONFIRM_ROUNTING_KEY = "CONFIRM_ROUNTING_KEY";//备份交换机public static final String BACKUP_EXCHANGE = "BACKUP_EXCHANGE";//备份队列public static final String BACKUP_QUEUE = "BACKUP_QUEUE";//告警队列public static final String WARNING_QUEUE = "WARNING_QUEUE";//声明交换机//这里因为用的是之前的交换机,所以需要删除原先的交换机才能生效@Beanpublic DirectExchange confirmEchange() {return (DirectExchange) ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE).build();}//声明备份交换机@Beanpublic FanoutExchange backupEchange() {return new FanoutExchange(BACKUP_EXCHANGE);}//声明队列@Beanpublic Queue confirmQueue() {//创建队列return new Queue(CONFIRM_QUEUE);}//声明备份队列@Beanpublic Queue backupQueue() {//创建队列return new Queue(BACKUP_QUEUE);}//声明报警队列@Beanpublic Queue warningQueue() {//创建队列return new Queue(WARNING_QUEUE);}//绑定队列@Beanpublic Binding confirmBindingQueue(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmEchange") DirectExchange confirmEchange) {return BindingBuilder.bind(confirmQueue).to(confirmEchange).with(CONFIRM_ROUNTING_KEY);}//绑定备份队列@Beanpublic Binding backupBindingQueue(@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupEchange") FanoutExchange backupEchange) {return BindingBuilder.bind(backupQueue).to(backupEchange);}//绑定报警队列@Beanpublic Binding warningBindingQueue(@Qualifier("warningQueue") Queue warningQueue, @Qualifier("backupEchange") FanoutExchange backupEchange) {return BindingBuilder.bind(warningQueue).to(backupEchange);}}

告警配置类

package com.rabbitmq11;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** 回调接口** @author 天真热* @create 2022-02-11 15:25* @desc**/
@Component
@Slf4j
public class CallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;//注入@PostConstructprivate void init() {//注入rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/*** 交换机回调方法(针对于交换机是否成功接收消息)* 1.发消息 交换机接收到了 回调* 1.1 correlationData 保存回调消息的id及相关信息* 1.2 交换机收到消息 ack=true* 1.3 call null* 2. 发消息 交换机失败了 回调* 2.1 correlationData 保存回调消息的id及相关信息* 2.2 交换机收到消息 ack=false*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {System.out.println("交换机接受成功了");} else {System.out.println("交换机接受失败了");}}/*** 消息不可达到目的地时,返回给生产者** @param message    消息* @param replayCode 失败码* @param replayText 失败原因* @param exchanges  交换机* @param routingKey 路由*/@Overridepublic void returnedMessage(Message message, int replayCode, String replayText, String exchanges, String routingKey) {System.out.println("队列接受失败了");System.out.println("消息:" + message + ";消息码:" + replayCode + ";原因:" + replayText + ";交换机:" + exchanges + ";路由:" + routingKey);}
}

正常消费者

package com.rabbitmq11;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 发布确认** @author 天真热* @create 2022-02-10 10:09* @desc**/
@Configuration
public class ConfirmConfig {//队列public static final String CONFIRM_QUEUE = "CONFIRM_QUEUE";//交换机public static final String CONFIRM_EXCHANGE = "CONFIRM_EXCHANGE";//routingKeypublic static final String CONFIRM_ROUNTING_KEY = "CONFIRM_ROUNTING_KEY";//备份交换机public static final String BACKUP_EXCHANGE = "BACKUP_EXCHANGE";//备份队列public static final String BACKUP_QUEUE = "BACKUP_QUEUE";//告警队列public static final String WARNING_QUEUE = "WARNING_QUEUE";//声明交换机//这里因为用的是之前的交换机,所以需要删除原先的交换机才能生效@Beanpublic DirectExchange confirmEchange() {return (DirectExchange) ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE).build();}//声明备份交换机@Beanpublic FanoutExchange backupEchange() {return new FanoutExchange(BACKUP_EXCHANGE);}//声明队列@Beanpublic Queue confirmQueue() {//创建队列return new Queue(CONFIRM_QUEUE);}//声明备份队列@Beanpublic Queue backupQueue() {//创建队列return new Queue(BACKUP_QUEUE);}//声明报警队列@Beanpublic Queue warningQueue() {//创建队列return new Queue(WARNING_QUEUE);}//绑定队列@Beanpublic Binding confirmBindingQueue(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmEchange") DirectExchange confirmEchange) {return BindingBuilder.bind(confirmQueue).to(confirmEchange).with(CONFIRM_ROUNTING_KEY);}//绑定备份队列@Beanpublic Binding backupBindingQueue(@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupEchange") FanoutExchange backupEchange) {return BindingBuilder.bind(backupQueue).to(backupEchange);}//绑定报警队列@Beanpublic Binding warningBindingQueue(@Qualifier("warningQueue") Queue warningQueue, @Qualifier("backupEchange") FanoutExchange backupEchange) {return BindingBuilder.bind(warningQueue).to(backupEchange);}}

告警消费者

package com.rabbitmq11;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 队列ttl消费者** @author 天真热* @create 2022-02-10 16:00* @desc**/
@Slf4j
@Component
public class WarningConsume {//接收消息@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE)public void receiveDelay(Message msg) {String message = new String(msg.getBody());log.info("报警发现不可路由的消息:" + message);}
}

备份消费者

package com.rabbitmq11;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 队列ttl消费者** @author 天真热* @create 2022-02-10 16:00* @desc**/
@Slf4j
@Component
public class BackupConsume {//接收消息@RabbitListener(queues = ConfirmConfig.BACKUP_QUEUE)public void receiveDelay(Message msg) {String message = new String(msg.getBody());log.info("走的是备份交换机消息:" + message);}
}

测试1-正常访问::8090///100

测试2-访问错误的路由::8090///100

测试3-访问错误的::8090///100

十六、的界面属性介绍

D:d 是 的缩写,代表这个队列中的消息支持持久化。AD:ad 是 的缩写。代表当前队列的最后一个消费者退订时被自动删除。注意:此时不管队列中是否还存在消息,队列都会删除。excl:是 的缩写。代表这是一个排他队列。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。Args:是 的缩写。代表该队列配置了 参数。TTL:是 x--ttl 的缩写。设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒。Exp:Auto ,是 x- 配置的缩写。

当队列在指定的时间没有被访问(, , …)就会被删除,=Exp。注意这里是删除队列,不是队列中的消息。Lim:说明该队列配置了 x-max-。限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉。Lim B:说明队列配置了 x-max--bytes。限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小。DLX:说明该队列配置了 x-dead--。当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉。DLK:x-dead---key 的缩写,将删除的消息推送到指定交换机的指定路由键的队列中去。Pri:x-max- 的缩写,优先级队列。表明该队列支持优先级,先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费。Ovfl:x- 的缩写。队列中的消息溢出时,如何处理这些消息。要么丢弃队列头部的消息,要么拒绝接收后面生产者发送过来的所有消息。有两个配置项:drop-head,代表丢弃队列头部的消息,默认行为;- 设置队列中的消息溢出后,该队列的行为:”拒绝接收”(所有消息)。ha-all:镜像队列。all 表示镜像到集群上的所有节点,ha- 参数忽略。

关于我们

最火推荐

小编推荐

联系我们


版权声明:本站内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 88@qq.com 举报,一经查实,本站将立刻删除。备案号:桂ICP备2021009421号
Powered By Z-BlogPHP.
复制成功
微信号:
我知道了