在ActiveMQ中的点对点模式中存在多个消费者
1.点对点模型(基于队列 Point to Point,PTP) 每个消息只能有一个消费者。消息的生产者和消费者之间没有时间上的 相关性.可以有多个发送者,但只能被一个消费者消费。 一个消息只能被一个接受者接受一次 生产者把消息发送到队列中(Queue),接受者无需订阅,当接受者未接受到消息时就会处于阻塞状态
2. 发布者/订阅者模型(基于主题的/,pub/sub) 每个消息可以有多个消费者。 生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消 费自它订阅之后发布的消息. 允许多个接受者,类似于广播的方式 生产者将消息发送到主题上(Topic) 接受者必须先订阅注:持久化订阅者:特殊的消费者,告诉主题,我一直订阅着,即使网络断开,消息服务器也记住所有持久化订阅者,如果有新消息,也会知道必定有人回来消费。
在中写的实例
连接工具类:
/** Copyright @ 2019 com.iflysse.trains* 01SpringBoot 下午2:18:30* All right reserved.**/package com.dcx.comm.utils;import java.util.List;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ScheduledMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.connection.ConnectionFactoryUtils;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsUtils;
import org.springframework.stereotype.Component;/*** @ClassName: ActiveMqUtils* @author: cxding* @createTime: 2019年4月26日 下午2:18:30* @version: v1.0*/
@Component
public class ActiveMqUtils {/*** 注入JMS*/@Autowiredprivate JmsTemplate jmsTemplate;/*** 设置普通* @Title: sendNorMolMessage * @author: cxding* @createTime: 2019年4月28日 下午1:03:56* @param destination* @param text void*/public void sendNorMolMessage(Destination destination, String text) {// 连接工厂ConnectionFactory connectionFactory = jmsTemplate.getConnectionFactory();Connection connection = null;Session session = null;MessageProducer producer = null;try {// 创建链接connection = connectionFactory.createConnection();connection.start();// 创建session,开启事物session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);// 创建生产者producer = session.createProducer(destination);// 设置持久化producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 设置过期时间//producer.setTimeToLive(time);TextMessage message = session.createTextMessage(text);producer.send(message);// 提交session.commit();} catch (JMSException e) {throw new RuntimeException(e);} finally {// 关闭连接close(producer, session, connection, connectionFactory);}}}
连接信息配置.
#-------------------------------activeMQ--------------------------------
# active mq
spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.user=root
spring.activemq.password=pass
控制层
@PostMapping("/producer")@ApiOperation(value="产生消息队列(点对点模式,监听器目前只监听:“queueSend”)",response=Result.class)public Result createMq(@ApiParam(value = "队列目的地", required = false,defaultValue="queueSend") @RequestParam String quenName,@ApiParam(value = "队列消息内容", required = false) @RequestParam String text){Result result = stuService.createMq(quenName,text);return result;}
生产者:实现层
/*** 产生点对点的消息队列*/@Overridepublic Result createMq(String quenName,String text) {Destination destination = new ActiveMQQueue(quenName);for (int i = 0; i < 10; i++) {utils.sendNorMolMessage(destination, text+i);}return new Result("消息已经产生",true);}
监听消费,设置两个监听者
@Component
public class ActiveMqListenConfig {@Autowiredprivate ActiveMqUtils util;private Logger logger = LoggerFactory.getLogger(this.getClass());@JmsListener(destination="queueSend")public void recieveTaskMq(String message) {//Destination destination = new ActiveMQQueue("easySend"); logger.info("消费者1监听到的消息是:"+message+",并且这条消息已经被消费了");//再次推送消息//util.sendMessage(destination, "监听到消息,给你返回");}@JmsListener(destination="queueSend")public void recieveTaskMq2(String message) {//Destination destination = new ActiveMQQueue("easySend"); logger.info("消费者2监听到的消息是:"+message+",并且这条消息已经被消费了");//再次推送消息//util.sendMessage(destination, "监听到消息,给你返回");}}
运行项目后
查看结果
由结果可以看出,有两个消费者,并且产生的10条消息已经被消费了,从控制台结果可以看出:点对点模式并不是只能有一个消费者,而是一条消息只能有一个消费者消费。