【MQ】ActiveMQ 概念
ActiveMQ的介绍
MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件,是使用Java语言实现的中间件。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。
特点:
支持的语言(支持很多种语言编写客户端):Java、C、C++、C#、Ruby、Perl、python、PHP 应用协议:OpenWire、Stomp、WS、Notifaction、XMPP、AMQP
完全支持JMS1.1和J2EE1.4规范(持久化、XA消息,事务)
支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA,支持通过JDBC和journal提供高速的消息持久化
从设计上保证了高性能的集群,客户端-服务器,点对点
支持Ajax,支持与Ajax的整合,Webservices
可以很容易的调用内嵌JMS provider,进行测试
对Spring的支持,ActiveMQ可以很容易内嵌到使用spring的系统里去
除了常见的J2EE服务器(如Geronimo,JBoss4,GlassFish,WebLogic)的测试,其中通过JCA1.5 resource adaptors的配置可以让ActiveMQ可以自动化的部署到任何兼容J2EE1.4商业服务器上。
相关概念:

1,Desination
目的地,JMS Provider(消息中间件)负责维护,用于对Message进行管理的对象。MessageProducer需要指定Destination才能发送消息,MessageConsumer需要指定Destination才能接收消息。
2,Producer
消息生成者(客户端,生成消息),负责发送Message到目的地,应用接口为MessageProducer。在JMS规范中,所有的标准定义都在javax.jms包中。
3,Consumer(Receiver)
消息消费者(处理消息),负责从目的地中消费(处理、监听、订阅)Message,应用接口为MessageConsumer。
4,Message
消息(Message),消息封装一次通信的内容,常见的类型有:StreamMessage、BytesMessage、TextMessage、ObjectMessage、MapMessage。
5,ConnectionFactory
链接工厂,用于创建链接的工厂类型看,注意,不能和JDBC中的ConnectionFactory混淆。
6,Connection
链接,用于建立访问ActiveMQ连接的类型,由链接工厂创建,不能喝JDBC中的Connection混淆。
7,Session
会话,一次持久有效状态的访问,由链接创建,是具体操作消息的基础支撑。
8,Queue & Topic
Queue是队列目的地,Topic是主题目的地。都是Destination的子接口。
Queue特点:队列中的消息,默认只能由唯一的一个消费者处理,一旦处理消息删除。
Topic特点:主题中的消息,会发送给所有消费者同时处理,只有在消息可以重复处理的业务场景中可以使用。
9,PTP
Point to Point,点对点消息模型,就是基于Queue实现的消息处理方式。
10,PUB & SUB
publish & Sububscribe,消息的发布订阅模型,是基于Topic实现的消息处理方式。
ActiveMQ有两种信息传递方式:queue 和 topic。
queue 是队列,Point to Point(P2P),一对一通信。生产者将消息发送到 broker ,消费者从 broker获取信息,生产者和消费者通过指定的 queueName 名称标识往哪个队列插入消息 / 从哪个队列获取信息。
topic 是主题,(Pub/Sub),发布和订阅。一对多通信。生产者将消息发送到 broker,broker 将消息推送给 监听该topic 的消费者。
注意:queue 是 生产者 插入,消费者获取,都是客户端主动。topic 是生产者发布,broker 推送,并不是消费者主动获取。
1. 如果 topic 的生产者发布了消息到 broker,此时没有 consumer 监听该topic,broker 不知道将消息推送到哪里,此时broker也不会保存消息,而是将消息丢弃。topic类型的消息没有持久化(有异议,明天查证),当没有消费者在监听,消息便丢失了。
2. queue类型的消息,支持持久化。当生产者 发送消息到broker的队列中,broker首先接收消息,并持久化消息,然后回送确认ACK到生产者,表示接收成功。此时即使没有消费者,也没有关系,等消费者上线之后,依然可以获取到之前的消息。
ActiveMQ的持久化有三种方式,KaHaDB、jdbc数据库、LevelDB。
KaHaDB:ActiveMQ默认的持久化方式,通过在本地的文件系统保存数据内容来保证数据不丢失。
jdbc数据库:通过将数据内容保存到数据库,来保证数据的不丢失。
LevelDB: Google推出的一种文件系统,但已不被ActiveMQ推荐。主要原因为没有精力适配。
相关说明:https://blog.csdn.net/qq447995687/article/details/93924145
所以在做持久化的时候,建议使用 KaHaDB或者使用数据库。个人感觉如果数据格式比较简单,可以使用 KaHaDB,省心。
测试代码:
Queue 生产者:
public void testMQProducerQueue() throws Exception{ //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Queue queue = session.createQueue("test-queue"); //6、使用会话对象创建生产者对象 MessageProducer producer = session.createProducer(queue); //7、使用会话对象创建一个消息对象 TextMessage textMessage = session.createTextMessage("hello!test-queue"); //8、发送消息 producer.send(textMessage); //9、关闭资源 producer.close(); session.close(); connection.close(); }
Queue 消费者:
public void TestMQConsumerQueue() throws Exception{ //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Queue queue = session.createQueue("test-queue"); //6、使用会话对象创建生产者对象 MessageConsumer consumer = session.createConsumer(queue); //7、向consumer对象中设置一个messageListener对象,用来接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // TODO Auto-generated method stub if(message instanceof TextMessage){ TextMessage textMessage = (TextMessage)message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); //8、程序等待接收用户消息 System.in.read(); //9、关闭资源 consumer.close(); session.close(); connection.close(); }
Topic 生产者:
public void TestTopicProducer() throws Exception{ //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Topic topic = session.createTopic("test-topic"); //6、使用会话对象创建生产者对象 MessageProducer producer = session.createProducer(topic); //7、使用会话对象创建一个消息对象 TextMessage textMessage = session.createTextMessage("hello!test-topic"); //8、发送消息 producer.send(textMessage); //9、关闭资源 producer.close(); session.close(); connection.close(); }
Topic消费者:
public void TestTopicConsumer() throws Exception{ //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Topic topic = session.createTopic("test-topic"); //6、使用会话对象创建生产者对象 MessageConsumer consumer = session.createConsumer(topic); //7、向consumer对象中设置一个messageListener对象,用来接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // TODO Auto-generated method stub if(message instanceof TextMessage){ TextMessage textMessage = (TextMessage)message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); //8、程序等待接收用户消息 System.in.read(); //9、关闭资源 consumer.close(); session.close(); connection.close(); }
思考:
生产者发送消息时候,可以指定 该消息的一些属性:是否持久化、是否是事务提交、同步发送还是异步发送
是否持久化:持久化的信息在mq宕机之后,仍然可以被消费者消费。没有持久化的信息,MQ宕机,消息就没有了。
是否是事务提交:事务提交支持多条消息一起发送,但是我测试发现多发一条,性能降低很多,网上也没有事务的例子。
同步or异步:同步发送,生产者等待broker回送确认消息,会阻塞。异步发送,程序继续执行,broker接收成功,会回调函数同时生产者接收成。压测数据表示:同步非常慢,异步非常快,所以建议使用异步,但是要注意重新实现回调函数,如果发送失败了,需要重新发送或者暂时持久化到本地。异步方式在 url 的末尾添加 ?useAsyncSend=true 即可
遗留问题:
0. broker收到消息之后,将消息存在哪里?有没有持久化有没有区别?
1. broker 收到消息之后,如何将消息持久化到 KaHaDB 如何持久化到 数据库?
2. ActiveMQ启动的时候,如何读取配置信息,如何保持与 KaHaDB 和 数据库的连接?
3. KaHaDB 每个文件内存多大,或者每次多大的时候存一次?
4. MQ宕机之后,queue持久化 和 topic持久化的信息都能被消费者获取嘛?