【MQ】4.RabbitMQ 集群高可用
(前提: 已经搭建了 三个磁盘节点的镜像集群)
完整的RabbitMQ镜像集群 + HA Proxy + keepalived 搭建过程:
1. 镜像集群 + HA Proxy:
1.1 结构图:

1.2 在 10.20.61.135 安装 HA Proxy:
yum -y install haproxy #使用 yum 安装,简单省心 vim /etc/haproxy/haproxy.cfg # 修改配置文件,默认路径 /etc/haproxy/
1.3 修改配置:配置文件原有内容不用修改,在文件最后添加:
#######################HAproxy监控页面######################### listen http_front bind *:1080 #监听端口 stats refresh 30s #统计页面自动刷新时间 stats uri / #统计页面url stats realm Haproxy Manager #统计页面密码框上提示文本 stats auth guest:guest #统计页面用户名和密码设置 stats hide-version #隐藏统计页面上HAProxy的版本信息 #####################RabbitMQ服务代理########################################### listen rabbitmq_cluster *:5673 mode tcp stats enable balance roundrobin option tcpka option tcplog timeout client 3h timeout server 3h timeout connect 3h #balance url_param userid #balance url_param session_id check_post 64 #balance hdr(User-Agent) #balance hdr(host) #balance hdr(Host) use_domain_only #balance rdp-cookie #balance leastconn #balance source //ip server rabbit_ca 10.20.83.245:5672 check inter 5s rise 2 fall 3 # check inter 2000 是检测心跳频率, server rabbit_bogon 10.20.61.135:5672 check inter 5s rise 2 fall 3 # rise 2是2次正确认为服务器可用, server tabbit_jy_157 10.20.61.157:5672 check inter 5s rise 2 fall 3 # fall 3是3次失败认为服务器不可用
需要把 listen rabbitmq_cluster 内容中的 server 修改为 你自己的地址。
1.4 启动HAProxy :
service haproxy start # CentOS 6 systemctl start haproxy # CentOS 7
启动成功,可以通过 ip + 1080 访问监控服务:

2. 镜像集群 + 2 * HA Proxy:
2.1 结构图:

2.2 在 10.20.61.157 上安装 HA Proxy ,与 2.1 完全一致,
2.3 修改配置,配置信息与 1.3 完全一致,
2.4 启动服务,与 1.4 完全一致,
启动之后,通过 10.20.61.157:1080 和 10.20.61.135:1080 浏览器访问,能看到同样的页面:

3. 镜像集群 + 2 * HA Proxy + 2 * KeepAlived :
3.1 结构图:

3.1 在 10.20.61.135 和 10.20.61.157 上安装 Keepalived :
yum install keepalived # yum 安装keepalived
3.2 修改配置:配置文件在 /etc/keepalived/
10.20.61.135 的 配置文件:
! Configuration File for keepalived global_defs { notification_email { yancy_01@163.com } } # 集群资源监控,组合track_script进行 vrrp_script check_haproxy { script "killall -0 haproxy" interval 2 } vrrp_instance HAPROXY_HA { state MASTER # Master BackUp interface eth0 virtual_router_id 135 # 两个服务保持一致 unicast_src_ip 10.20.61.135 # 顺序相反 unicast_peer { 10.20.61.157 } priority 100 # 优先级高的为 Master advert_int 2 # 设置主备节点间的通信验证类型及密码,同一个VRRP实例中需一致 authentication { auth_type PASS auth_pass 1234 } track_script { check_haproxy } virtual_ipaddress { 10.20.61.158 #虚拟ip配置完之后就用它访问 } }
10.20.61.157的配置文件:
! Configuration File for keepalived global_defs { notification_email { yancy_01@163.com } } # 集群资源监控,组合track_script进行 vrrp_script check_haproxy { script "killall -0 haproxy" interval 2 } vrrp_instance HAPROXY_HA { state BACKUP interface eth0 virtual_router_id 135 unicast_src_ip 10.20.61.157 unicast_peer { 10.20.61.135 } priority 80 advert_int 2 # 设置主备节点间的通信验证类型及密码,同一个VRRP实例中需一致 authentication { auth_type PASS auth_pass 1234 } track_script { check_haproxy } virtual_ipaddress { 10.20.61.158 #虚拟ip配置完之后就用它访问 } }
配置完成,启动服务,(前提是 三个 ribbitMQ 已启动,两个 HAproxy 已启动)
service keepalived start # CentOS 6 systemctl start keepalived # CentOS 7
启动之后,通过 ip addr 查看 10.20.61.135 的ip信息:(不要用 ifconfig ,看不到的)
在 10.20.61.135 的 Master 机器上:(可以看到 eth0 网卡上的 10.20.61.158 虚地址就是 配置文件的虚地址)

在10.20.61.157 的 BackUp 的机器上看:(没有 158 的IP绑定)

在 10.20.61.135 的Master机器上 停止 keepalived 服务,观察 IP 漂移:(再次启动Master会抢占 VIP ,因为优先级高)

在 BakcUp 的机器查看 IP : (发现 158 的 ip 漂移到 157 的机器上)

因为 Master 的 keepalived 服务停止了,所以生成的 VIP 发生 IP 漂移,转移到 BackUp 的机器上,(原理应该是 157 去访问了 135的服务,发现不通了,所以就主动绑定了 VIP (10.20.61.158))。
到这里: 3 * rabbitMQ + 2 * HAProxy + 2 * keepalived 全部启动成功了。
这个结构不太正确的,正常的结构应该是 3个 rabbitMQ 在三台不同 ip的机器上。所以应该用 5 台机器。
三台机器每台运行一个 rabbitMQ , 剩下的两台机器 每台都是 HAProxy + keepalived 的结构。
通过浏览器访问 rabbitMQ 和 HAProxy 的端口,测试所有的服务是不是都正常运行。
最后通过 client 代码访问 VIP的方式,访问到 RabbitMQ服务:

附上一份 简单的 Java 代码:
生产者:
package test.RabbitMQ; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; /*** * @ClassName: Producer1 */ public class Test_rabbitMQ_p { private static int size = 1000; private static final String Queue = "test"; private static String message = "hello maxchen"; private static Channel channel; public static void main(String[] args) throws Exception { long start = System.currentTimeMillis(); ExecutorService es = Executors.newFixedThreadPool(10); final CountDownLatch cdl = new CountDownLatch(size); init_connection(); for (int a = 0; a < size; a++) { es.execute(new Runnable() { public void run() { try { sendMessage(); } catch (Exception e) { e.printStackTrace(); } cdl.countDown(); } }); } cdl.await(); es.shutdown(); long time = System.currentTimeMillis() - start; System.out.println("插入" + size + "条JSON,共消耗:" + (double) time / 1000 + " s"); System.out.println("平均:" + size / ((double) time / 1000) + " 条/秒"); } public static void init_connection() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); //connectionFactory.setHost("10.20.83.245"); connectionFactory.setHost("10.20.61.158"); connectionFactory.setPort(5673); //connectionFactory.setUsername("guest"); //connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); Connection connection = null; connection = connectionFactory.newConnection(); channel = connection.createChannel(); Map<String,Object> map = new HashMap<String, Object>(); map.put("x-ha-policy", "all"); channel.queueDeclare(Queue, true, false, false, map); // queue 持久化 channel.confirmSelect(); // 添加一个确认监听 channel.addConfirmListener(new ConfirmListener() { public void handleAck(long deliveryTag, boolean multiple) { } public void handleNack(long deliveryTag, boolean multiple) { System.err.println(deliveryTag); System.err.println("-------no ack!-----------"); } }); } public static void sendMessage() throws Exception { channel.basicPublish("", Queue, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); // 消息持久化 //channel.basicPublish("", Queue, null, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); } public static void addConfirmListener(Channel channel) { } }
消费者:
package test.RabbitMQ; import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Test_rabbitMQ_c { private static int size = 400000; private static final String Queue = "test"; private static Channel channel; private static Consumer consumer; public static void main(String[] args) throws Exception { long start = System.currentTimeMillis(); ExecutorService es = Executors.newFixedThreadPool(10); final CountDownLatch cdl = new CountDownLatch(size); init_connection(); for (int a = 0; a < size; a++) { es.execute(new Runnable() { public void run() { try { receiver_Message(); } catch (Exception e) { e.printStackTrace(); } cdl.countDown(); } }); } cdl.await(); es.shutdown(); long time = System.currentTimeMillis() - start; System.out.println("插入" + size + "条JSON,共消耗:" + (double) time / 1000 + " s"); System.out.println("平均:" + size / ((double) time / 1000) + " 条/秒"); } public static void init_connection() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("10.20.61.157"); //connectionFactory.setHost("10.20.61.135"); connectionFactory.setPort(5672); //connectionFactory.setUsername("guest"); //connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); Connection connection = null; connection = connectionFactory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(Queue, true, false, false, null); // queue 持久化 channel.confirmSelect(); consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //System.out.println("recv message: " + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }; } public static void receiver_Message() throws Exception { channel.basicConsume(Queue, consumer); } public static void addConfirmListener(Channel channel) { } }
访问前的 RabbiMQ队列:

生产者通过 访问 VIP地址 : 10.20.61.158 访问 RabbitMQ 服务:


再看RabbitMQ:

over…..