【MQ】ActiveMQ 概念

【MQ】ActiveMQ 概念

ActiveMQ的介绍

MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件,是使用Java语言实现的中间件。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 

特点:

支持的语言(支持很多种语言编写客户端):Java、C、C++、C#、Ruby、Perl、python、PHP    应用协议:OpenWire、Stomp、WS、Notifaction、XMPP、AMQP   
完全支持JMS1.1和J2EE1.4规范(持久化、XA消息,事务)
支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA,支持通过JDBC和journal提供高速的消息持久化
从设计上保证了高性能的集群,客户端-服务器,点对点
支持Ajax,支持与Ajax的整合,Webservices
可以很容易的调用内嵌JMS provider,进行测试
对Spring的支持,ActiveMQ可以很容易内嵌到使用spring的系统里去

除了常见的J2EE服务器(如Geronimo,JBoss4,GlassFish,WebLogic)的测试,其中通过JCA1.5 resource adaptors的配置可以让ActiveMQ可以自动化的部署到任何兼容J2EE1.4商业服务器上。

相关概念:

1,Desination

  目的地,JMS Provider(消息中间件)负责维护,用于对Message进行管理的对象。MessageProducer需要指定Destination才能发送消息,MessageConsumer需要指定Destination才能接收消息。

2,Producer

  消息生成者(客户端,生成消息),负责发送Message到目的地,应用接口为MessageProducer。在JMS规范中,所有的标准定义都在javax.jms包中。

3,Consumer(Receiver)

  消息消费者(处理消息),负责从目的地中消费(处理、监听、订阅)Message,应用接口为MessageConsumer。

4,Message

  消息(Message),消息封装一次通信的内容,常见的类型有:StreamMessage、BytesMessage、TextMessage、ObjectMessage、MapMessage。

5,ConnectionFactory

  链接工厂,用于创建链接的工厂类型看,注意,不能和JDBC中的ConnectionFactory混淆。

6,Connection

  链接,用于建立访问ActiveMQ连接的类型,由链接工厂创建,不能喝JDBC中的Connection混淆。

7,Session

  会话,一次持久有效状态的访问,由链接创建,是具体操作消息的基础支撑。

8,Queue & Topic

  Queue是队列目的地,Topic是主题目的地。都是Destination的子接口。

  Queue特点:队列中的消息,默认只能由唯一的一个消费者处理,一旦处理消息删除。

  Topic特点:主题中的消息,会发送给所有消费者同时处理,只有在消息可以重复处理的业务场景中可以使用。

9,PTP

  Point to Point,点对点消息模型,就是基于Queue实现的消息处理方式。

10,PUB & SUB

  publish & Sububscribe,消息的发布订阅模型,是基于Topic实现的消息处理方式

ActiveMQ有两种信息传递方式:queue 和 topic。

queue 是队列,Point to Point(P2P),一对一通信。生产者将消息发送到 broker ,消费者从 broker获取信息,生产者和消费者通过指定的 queueName 名称标识往哪个队列插入消息 / 从哪个队列获取信息。

topic 是主题,(Pub/Sub),发布和订阅。一对多通信。生产者将消息发送到 broker,broker 将消息推送给 监听该topic 的消费者。

注意:queue 是 生产者 插入,消费者获取,都是客户端主动。topic 是生产者发布,broker 推送,并不是消费者主动获取。

1. 如果 topic 的生产者发布了消息到 broker,此时没有 consumer 监听该topic,broker 不知道将消息推送到哪里,此时broker也不会保存消息,而是将消息丢弃。topic类型的消息没有持久化(有异议,明天查证),当没有消费者在监听,消息便丢失了。 

2. queue类型的消息,支持持久化。当生产者 发送消息到broker的队列中,broker首先接收消息,并持久化消息,然后回送确认ACK到生产者,表示接收成功。此时即使没有消费者,也没有关系,等消费者上线之后,依然可以获取到之前的消息。

ActiveMQ的持久化有三种方式,KaHaDB、jdbc数据库、LevelDB。

KaHaDB:ActiveMQ默认的持久化方式,通过在本地的文件系统保存数据内容来保证数据不丢失。

jdbc数据库:通过将数据内容保存到数据库,来保证数据的不丢失。

LevelDB: Google推出的一种文件系统,但已不被ActiveMQ推荐。主要原因为没有精力适配。

相关说明:https://blog.csdn.net/qq447995687/article/details/93924145

所以在做持久化的时候,建议使用 KaHaDB或者使用数据库。个人感觉如果数据格式比较简单,可以使用 KaHaDB,省心。

测试代码:

Queue 生产者:

public void testMQProducerQueue() throws Exception{
        //1、创建工厂连接对象,需要制定ip和端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        //2、使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();
        //3、开启连接
        connection.start();
        //4、使用连接对象创建会话(session)对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
        Queue queue = session.createQueue("test-queue");
        //6、使用会话对象创建生产者对象
        MessageProducer producer = session.createProducer(queue);
        //7、使用会话对象创建一个消息对象
        TextMessage textMessage = session.createTextMessage("hello!test-queue");
        //8、发送消息
        producer.send(textMessage);
        //9、关闭资源
        producer.close();
        session.close();
        connection.close();
}

Queue 消费者:

public void TestMQConsumerQueue() throws Exception{
        //1、创建工厂连接对象,需要制定ip和端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        //2、使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();
        //3、开启连接
        connection.start();
        //4、使用连接对象创建会话(session)对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
        Queue queue = session.createQueue("test-queue");
        //6、使用会话对象创建生产者对象
        MessageConsumer consumer = session.createConsumer(queue);
        //7、向consumer对象中设置一个messageListener对象,用来接收消息
        consumer.setMessageListener(new MessageListener() {
 
            @Override
            public void onMessage(Message message) {
                // TODO Auto-generated method stub
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage)message;
                    try {
                        System.out.println(textMessage.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        });
        //8、程序等待接收用户消息
        System.in.read();
        //9、关闭资源
        consumer.close();
        session.close();
        connection.close();
    }

Topic 生产者:

public void TestTopicProducer() throws Exception{
        //1、创建工厂连接对象,需要制定ip和端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        //2、使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();
        //3、开启连接
        connection.start();
        //4、使用连接对象创建会话(session)对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
        Topic topic = session.createTopic("test-topic");
        //6、使用会话对象创建生产者对象
        MessageProducer producer = session.createProducer(topic);
        //7、使用会话对象创建一个消息对象
        TextMessage textMessage = session.createTextMessage("hello!test-topic");
        //8、发送消息
        producer.send(textMessage);
        //9、关闭资源
        producer.close();
        session.close();
        connection.close();
    }

Topic消费者:

public void TestTopicConsumer() throws Exception{
        //1、创建工厂连接对象,需要制定ip和端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
        //2、使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();
        //3、开启连接
        connection.start();
        //4、使用连接对象创建会话(session)对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
        Topic topic = session.createTopic("test-topic");
        //6、使用会话对象创建生产者对象
        MessageConsumer consumer = session.createConsumer(topic);
        //7、向consumer对象中设置一个messageListener对象,用来接收消息
        consumer.setMessageListener(new MessageListener() {
 
            @Override
            public void onMessage(Message message) {
                // TODO Auto-generated method stub
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage)message;
                    try {
                        System.out.println(textMessage.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        });
        //8、程序等待接收用户消息
        System.in.read();
        //9、关闭资源
        consumer.close();
        session.close();
        connection.close();
    }

思考:

生产者发送消息时候,可以指定 该消息的一些属性:是否持久化、是否是事务提交、同步发送还是异步发送

是否持久化:持久化的信息在mq宕机之后,仍然可以被消费者消费。没有持久化的信息,MQ宕机,消息就没有了。

是否是事务提交:事务提交支持多条消息一起发送,但是我测试发现多发一条,性能降低很多,网上也没有事务的例子。

同步or异步:同步发送,生产者等待broker回送确认消息,会阻塞。异步发送,程序继续执行,broker接收成功,会回调函数同时生产者接收成。压测数据表示:同步非常慢,异步非常快,所以建议使用异步,但是要注意重新实现回调函数,如果发送失败了,需要重新发送或者暂时持久化到本地。异步方式在 url 的末尾添加 ?useAsyncSend=true  即可

遗留问题:

0. broker收到消息之后,将消息存在哪里?有没有持久化有没有区别?
1. broker 收到消息之后,如何将消息持久化到 KaHaDB 如何持久化到 数据库?
2. ActiveMQ启动的时候,如何读取配置信息,如何保持与 KaHaDB 和 数据库的连接?
3. KaHaDB 每个文件内存多大,或者每次多大的时候存一次?
4. MQ宕机之后,queue持久化 和 topic持久化的信息都能被消费者获取嘛?
0 0 vote
Article Rating
Subscribe
提醒
guest
0 评论
Inline Feedbacks
View all comments