精华 一张图进阶 RocketMQ
“一张图”系列旨在通过“一张图”系统性的解析一个板块的知识点:
本文是“一张图”系列的第一个板块:一张图解析 。
本文是《一张图解析 》系列的第 1 篇,今天的内容主要分为三个部分:
整体架构
什么是消息队列?顾名思义,首先得有一个队列,这个队列用来存储消息。那有了消息队列就得有人往里面放,有人往里面取。有没有似曾相识燕归来的感 jio,这莫非就是连小学生都知道的,经典的“生产者-消费者模式”?接下来我们就来看看它里面穿了什么?
别急,先来回顾一下 “生产者-消费者模式” 这个老朋友。简单来说,这个模型是由两类线程和一个队列构成:
有了这个队列,生产者就只需要关注生产,而不用管消费者的消费行为,更不用等待消费者线程执行完;消费者也只管消费,不用管生产者是怎么生产的,更不用等着生产者生产。
这意味着什么呢,生产者和消费者之间实现解藕和异步。这就厉害了,因为我们生活中很多都是异步的。比如最近新冠疫情卷土重来,我点的外卖只能送到小区门口的外卖队列里面,而我只能去外卖队列里面取外卖,然后一顿狼吞虎咽。
具体 “生产者-消费者模式” 怎么实现,想必各位小学都学过了,我们来看看这个模式还有什么问题吧。最大的问题就是我们小学学的 “生产者-消费者模式” 是个单机版的,只能自嗨。这就相当于,我就是外卖骑手,我点了个外卖放到外卖队列,然后我再从外卖队列里面去取,一顿操作猛如虎呀!于是就有了进化版,我们把消费者,队列,生产者放到不同的服务器上,这就是传说中的分布式消息队列了。
生产者生产的消息通过网络传递给队列存储,消费者通过网络从队列获取消息。但是还有问题,消息可能有很多种,全都放在一起岂不是乱套了?我点的外卖和快递全都放在一起,太难找了吧。于是我们就需要区分不同类型消息,相同类型的消息称为一个 Topic。同时,骑手不可能只有一个,点外卖的也不会只有我一个人,于是就有了生产者组和消费者组。
但还是有问题呀,小区那么大,一个队列放不下。我住在小区南门,点个外卖还要跑去北门拿,那真的是 eggs hurt。于是物业在东南西北门各设了一个外卖快递放置点。也就是我们有多个队列,组成队列集群。
可是,问题又双叒叕来了(还有完没完),一个小区那么多个外卖快递队列,骑手怎么知道送到哪里去,我又怎么知道去哪里取?很简单,导航呀。我们把导航的信息称为路由信息,这些信息需要有一个管理的地方,它告诉生产者,某这个 Topic 的消息可以发给哪些队列,同时告诉消费者你需要的消息可以从哪些队列里面取。 为这些路由信息的设置了管理员 ,当然 也可以有很多个,组成 集群。
到这里,你就应该知道 里面都穿了什么啦。包括了生产者(),消费者(), 以及队列本身()。 是代理的意思,负责队列的存取等操作,我们可以把 理解为队列本身。
元数据管理
因为 、 和 都需要和 交互,负责的三此君不得不先和大家唠唠 是何方神圣。上面有说道 是集群的元数据管理中心,那它到底管理了哪些元数据?我们来看看 里面又穿了什么,看完了记得关注、转发、点赞、收藏哦。
简单来说, 是我们的整个 集群的元数据管理中心,负责集群元数据的增删改查。先不管这个增删改查是怎么实现的,我们甚至可以理解就是数据库的增删改查,但是我们一定要知道这些元数据都长什么样子。才能知道 、 及 是如何根据这些数据进行消息收发的。
如图所示,二主二从的 集群相关的元数据信息,包括 、、、、 (暂时不用关注,图中未画出)。
其他角色会主动向 上报状态,根据上报消息里的请求码做相应的处理,更新存储的对应信息。
那么多数据,相信大家看的有点晕,三此君简单总结下: 通过 来维护存活的 。 会获取上面的路由信息,发送消息的时候指定发送到哪个 Topic,根据 Topic 可以从 选择一个 ,根据 可以从 获取到 IP 地址。有了地址 就可以将消息通过网络传递给 。
消息收发示例 部署
刚刚我们了解 整体架构,那怎么样通过 收发消息呢?需要先通过 部署一套 :
如果你没有安装 ,可以根据菜鸟教程 MacOS 安装/ 安装 进行安装。然后,通过 - 部署 :
注意:如果你现在不了解 不重要,只需要按照步骤部署好 即可,并不会阻碍我们理解 相关内容。
部署完成后我们就可以在 中看到 相关容器,包括 、 及 ( 控制台),到这里我们就可以使用部署的 收发消息了。
已经部署好了,接下来先来看一个简单的消息收发示例,可以说是 的 "Hello World"。
消息发送
12345678910111213141516171819public class SyncProducer {public static void main(String[] args) throws Exception {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 启动Producer实例producer.start();// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("Topic1","Tag", "Key","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送消息到一个BrokerSendResult sendResult = producer.send(msg);// 通过sendResult返回消息是否成功送达System.out.printf("%s%n", sendResult);// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}
消息接收
1234567891011121314151617181920212223public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// 设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息erbconsumerijun.subscribe("sancijun", "*");// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();}
}