首页 >> 大全

精华 一张图进阶 RocketMQ

2023-12-13 大全 22 作者:考证青年

“一张图”系列旨在通过“一张图”系统性的解析一个板块的知识点:

本文是“一张图”系列的第一个板块:一张图解析 。

本文是《一张图解析 》系列的第 1 篇,今天的内容主要分为三个部分:

整体架构

什么是消息队列?顾名思义,首先得有一个队列,这个队列用来存储消息。那有了消息队列就得有人往里面放,有人往里面取。有没有似曾相识燕归来的感 jio,这莫非就是连小学生都知道的,经典的“生产者-消费者模式”?接下来我们就来看看它里面穿了什么?

别急,先来回顾一下 “生产者-消费者模式” 这个老朋友。简单来说,这个模型是由两类线程和一个队列构成:

有了这个队列,生产者就只需要关注生产,而不用管消费者的消费行为,更不用等待消费者线程执行完;消费者也只管消费,不用管生产者是怎么生产的,更不用等着生产者生产。

这意味着什么呢,生产者和消费者之间实现解藕和异步。这就厉害了,因为我们生活中很多都是异步的。比如最近新冠疫情卷土重来,我点的外卖只能送到小区门口的外卖队列里面,而我只能去外卖队列里面取外卖,然后一顿狼吞虎咽。

具体 “生产者-消费者模式” 怎么实现,想必各位小学都学过了,我们来看看这个模式还有什么问题吧。最大的问题就是我们小学学的 “生产者-消费者模式” 是个单机版的,只能自嗨。这就相当于,我就是外卖骑手,我点了个外卖放到外卖队列,然后我再从外卖队列里面去取,一顿操作猛如虎呀!于是就有了进化版,我们把消费者,队列,生产者放到不同的服务器上,这就是传说中的分布式消息队列了。

3200精华进阶__战龙三国武魂进阶表张

生产者生产的消息通过网络传递给队列存储,消费者通过网络从队列获取消息。但是还有问题,消息可能有很多种,全都放在一起岂不是乱套了?我点的外卖和快递全都放在一起,太难找了吧。于是我们就需要区分不同类型消息,相同类型的消息称为一个 Topic。同时,骑手不可能只有一个,点外卖的也不会只有我一个人,于是就有了生产者组和消费者组。

但还是有问题呀,小区那么大,一个队列放不下。我住在小区南门,点个外卖还要跑去北门拿,那真的是 eggs hurt。于是物业在东南西北门各设了一个外卖快递放置点。也就是我们有多个队列,组成队列集群

可是,问题又双叒叕来了(还有完没完),一个小区那么多个外卖快递队列,骑手怎么知道送到哪里去,我又怎么知道去哪里取?很简单,导航呀。我们把导航的信息称为路由信息,这些信息需要有一个管理的地方,它告诉生产者,某这个 Topic 的消息可以发给哪些队列,同时告诉消费者你需要的消息可以从哪些队列里面取。 为这些路由信息的设置了管理员 ,当然 也可以有很多个,组成 集群。

到这里,你就应该知道 里面都穿了什么啦。包括了生产者(),消费者(), 以及队列本身()。 是代理的意思,负责队列的存取等操作,我们可以把 理解为队列本身。

元数据管理

因为 、 和 都需要和 交互,负责的三此君不得不先和大家唠唠 是何方神圣。上面有说道 是集群的元数据管理中心,那它到底管理了哪些元数据?我们来看看 里面又穿了什么,看完了记得关注、转发、点赞、收藏哦。

简单来说, 是我们的整个 集群的元数据管理中心,负责集群元数据的增删改查。先不管这个增删改查是怎么实现的,我们甚至可以理解就是数据库的增删改查,但是我们一定要知道这些元数据都长什么样子。才能知道 、 及 是如何根据这些数据进行消息收发的。

如图所示,二主二从的 集群相关的元数据信息,包括 、、、、 (暂时不用关注,图中未画出)。

战龙三国武魂进阶表张_3200精华进阶_

其他角色会主动向 上报状态,根据上报消息里的请求码做相应的处理,更新存储的对应信息。

那么多数据,相信大家看的有点晕,三此君简单总结下: 通过 来维护存活的 。 会获取上面的路由信息,发送消息的时候指定发送到哪个 Topic,根据 Topic 可以从 选择一个 ,根据 可以从 获取到 IP 地址。有了地址 就可以将消息通过网络传递给 。

消息收发示例 部署

刚刚我们了解 整体架构,那怎么样通过 收发消息呢?需要先通过 部署一套 :

如果你没有安装 ,可以根据菜鸟教程 MacOS 安装/ 安装 进行安装。然后,通过 - 部署 :

注意:如果你现在不了解 不重要,只需要按照步骤部署好 即可,并不会阻碍我们理解 相关内容。

部署完成后我们就可以在 中看到 相关容器,包括 、 及 ( 控制台),到这里我们就可以使用部署的 收发消息了。

已经部署好了,接下来先来看一个简单的消息收发示例,可以说是 的 "Hello World"。

消息发送

12345678910111213141516171819public class SyncProducer {public static void main(String[] args) throws Exception {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 启动Producer实例producer.start();// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("Topic1","Tag", "Key","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送消息到一个BrokerSendResult sendResult = producer.send(msg);// 通过sendResult返回消息是否成功送达System.out.printf("%s%n", sendResult);// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}

消息接收

1234567891011121314151617181920212223public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// 设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息erbconsumerijun.subscribe("sancijun", "*");// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();}
}

关于我们

最火推荐

小编推荐

联系我们


版权声明:本站内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 88@qq.com 举报,一经查实,本站将立刻删除。备案号:桂ICP备2021009421号
Powered By Z-BlogPHP.
复制成功
微信号:
我知道了