RocketMQ单机和集群搭建
# RocketMQ简介
RocketMQ是一个开源的分布式消息和流数据平台,由阿里研发目前属于apache顶级项目:https://rocketmq.apache.org/ (opens new window) RocketMQ消息队列主要功能功能:引用解耦、流量消峰、消息分发、保证最终一致性、方便动态扩容
# RocketMQ安装
# 下载地址
# 系统要求
64位 Linux、unix或mac;
JDK版本1.8以上;
2
# 准备工作
由于下载的是zip压缩格式文件,因此在linux上安装unzip来进行解压
yum install -y unzip
使用unzip命令解压
unzip rocketmq-all-4.5.1-bin-release.zip
# 单机模式安装
# 启动nameserver
两种启动方式
- 进入解压目录:/rocketmq-all-4.5.1-bin-release/bin,输入./mqnamesrv进行启动
[root@node1 bin]# ./mqnamesrv
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
2
3
4
- 命令行提示启动成功
进入解压目录:/rocketmq-all-4.5.1-bin-release/bin,输入指令nohup sh bin/mqnamesrv &
[root@node1 bin]# nohup sh ./mqnamesrv &
[1] 68705
2
命令行显示启动的进程号,查看启动日志
namesrv.log
...
The Name Server boot success. serializeType=JSON
2
3
日志提示启动成功
# 启动broker
两种启动方式:
- 直接运行mqbroker
./mqbroker -n localhost:9876
The broker[node1, 172.17.0.1:10911] boot success. serializeType=JSON and name server is localhost:9876
2
或者
./mqbroker
The broker[node1, 172.17.0.1:10911] boot success. serializeType=JSON
2
提示启动成功
- 通过nohup指令启动
nohup sh ./mqbroker -n localhost:9876 &
[2] 69248
2
查看broker启动日志
tail -f ~/logs/rocketmqlogs/broker.log
...
INFO main - The broker[node1, 172.17.0.1:10911] boot success. serializeType=JSON and name server is localhost:9876
2
3
提示启动成功
# 测试
运行源文件中以写好的测试demo
export NAMESRV_ADDR=localhost:9876
# 生产者
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId=AC110001104B2B193F2D6EA72CC003E7...
# 消费者
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_20 Receive New Messages: [MessageExt [queueId=3, storeSize=179...
2
3
4
5
6
7
单点消息队列启动成功
# 关闭消息队列
通过mqshutdown命令关闭消息队列,依次关闭nameserver和broker
sh mqshutdown broker
The mqbroker(69255) is running...
Send shutdown request to mqbroker(69255) OK
2
3
sh mqshutdown namesrv
The mqnamesrv(68711) is running...
Send shutdown request to mqnamesrv(68711) OK
2
3
# 问题
broker内存不够
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /component/rocketmq-all-4.5.1-bin-release/bin/hs_err_pid68880.log
2
3
4
5
6
# 解决方案
报错原因是虚拟机启动内存不够,修改bin下的服务启动脚本 runserver.sh 、runbroker.sh 中对于内存的限制,
runserver.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
runbroker.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
2
3
4
5
改成如下示例:
runserver.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
runbroker.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m"
2
3
4
5
# 集群搭建
通过两台虚拟机(192.168.108.128和192.168.108.129)模拟集群
# 启动NameServer
分别启动两台虚拟机的NameServer,启动方式与单机模式相同
# 启动broker
两台虚拟机的broker互为主备,即各自启动一个Master和一个Slave,在启动broker之前需要修改相关配置文件
# 修改配置文件
在目录conf/2m-2s-sync中(两主两从,同步更新),该目录下有四个配置文件,分别对应两个Master和两个Slave配置
├── broker-a.properties #Master-a配置
├── broker-a-s.properties #Slave-a配置
├── broker-b.properties #Master-b配置
└── broker-b-s.properties #Slave-b配置
2
3
4
在这里我们需要修改192.168.108.128的broker-a.properties和broker-b-s.properties,即128这台机器是名字为broker-a的broker的主节点,是名称为broker-b的broker的从节点;相对应的192.168.108.129是名字为broker-a的broker的从节点,是名称为broker-b的broker的主节点,因此修改129机器的broker-b.properties和broker-a-s.properties配置文件。修改如下:
修改机器1的配置文件(192.168.108.128)
master:broker-a.properties
namesrvAddr=192.168.108.128:9876;192.168.108.129:9876 #配置nameserver地址
listenPort=10911 #broke监听端口号
storePathRootDir=/home/rocketmq/store-a #存储消息和配置信息路径
brokerClusterName=DefaultCluster #集群名称
brokerName=broker-a #broke名称,集群中master和slave通过相同的broke名来进行关联
brokerId=0 #0表示master,大于0为slave对应ID
deleteWhen=04 #删除消息时间,04表示凌晨4点
fileReservedTime=48 #磁盘上保存消息的时长,单位小时
brokerRole=SYNC_MASTER #broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE
flushDiskType=ASYNC_FLUSH #刷盘策略
2
3
4
5
6
7
8
9
10
11
slave:broker-b-s.properties
namesrvAddr=192.168.108.128:9876;192.168.108.129:9876 #配置nameserver地址
listenPort=11011 #broke监听端口号
storePathRootDir=/home/rocketmq/store-b #存储消息和配置信息路径
brokerClusterName=DefaultCluster #集群名称
brokerName=broker-b #broke名称,集群中master和slave通过相同的broke名来进行关联
brokerId=1 #0表示master,大于0为slave对应ID
deleteWhen=04 #删除消息时间,04表示凌晨4点
fileReservedTime=48 #磁盘上保存消息的时长,单位小时
brokerRole=SLAVE #broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE
flushDiskType=ASYNC_FLUSH #刷盘策略
2
3
4
5
6
7
8
9
10
修改机器2的配置文件(192.168.108.129)
master:broker-b.properties
namesrvAddr=192.168.108.128:9876; 192.168.108.129:9876 #配置nameserver地址
listenPort=10911 #broke监听端口号
storePathRootDir=/home/rocketmq/store-b #存储消息和配置信息路径
brokerClusterName=DefaultCluster #集群名称
brokerName=broker-b #broke名称,集群中master和slave通过相同的broke名来进行关联
brokerId=0 #0表示master,大于0为slave对应ID
deleteWhen=04 #删除消息时间,04表示凌晨4点
fileReservedTime=48 #磁盘上保存消息的时长,单位小时
brokerRole=SYNC_MASTER #broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE
flushDiskType=ASYNC_FLUSH #刷盘策略
2
3
4
5
6
7
8
9
10
11
slave:broker-a-s.properties
namesrvAddr=192.168.108.128:9876; 192.168.108.129:9876 #配置nameserver地址
listenPort=11011 #broke监听端口号
storePathRootDir=/home/rocketmq/store-a #存储消息和配置信息路径
brokerClusterName=DefaultCluster #集群名称
brokerName=broker-a #broke名称,集群中master和slave通过相同的broke名来进行关联
brokerId=1 #0表示master,大于0为slave对应ID
deleteWhen=04 #删除消息时间,04表示凌晨4点
fileReservedTime=48 #磁盘上保存消息的时长,单位小时
brokerRole=SLAVE #broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE
flushDiskType=ASYNC_FLUSH #刷盘策略
2
3
4
5
6
7
8
9
10
# 启动四个broker
nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-a.properties &
nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-b-s.properties &
nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-b.properties &
nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-a-s.properties &
2
3
4
5
2019-07-25 14:00:19 INFO BrokerControllerScheduledThread1 - Update slave consumer offset from master, 192.168.108.128:10911
2019-07-25 14:00:19 INFO BrokerControllerScheduledThread1 - Update slave delay offset from master, 192.168.108.128:10911
2019-07-25 14:00:20 INFO brokerOutApi_thread_2 - register broker[0]to name server 192.168.108.129:9876 OK
2019-07-25 14:00:20 INFO brokerOutApi_thread_1 - register broker[0]to name server 192.168.108.128:9876 OK
2
3
4
5
名两行出现broker注册到nameserver成功,则集群搭建成功
# 问题
Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.0.2:10911> failed
# 解决方案
出现该问题是因为虚拟机中搭建了docker产生了虚拟网卡过多,RocketMQ在访问docker网络时不通产生问题。 首先关闭docker服务
systemctl stop docker
systemctl is-enabled docker #查询是否自启动
systemctl disable docker #禁止自启动
2
3
然后重启虚拟机即可解决
# 测试
通过java代码对集群进行测试
# 生产者代码:
public class producer {
public static void main(String[] args) throws Exception {
//创建生产者,并设置group
DefaultMQProducer producer = new DefaultMQProducer("group");
//设置nameserver
producer.setNamesrvAddr("192.168.108.128:9876");
//启动生产者
producer.start();
//创建发送的消息,发送100条数据
for (int i = 0; i < 100; i++){
Message msg = new Message("TopicTest", "TagA",("MQ Test"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送
SendResult result = producer.send(msg);
System.out.println(result);
}
//关闭生产者
producer.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 消费者代码
public class consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
consumer.setNamesrvAddr("192.168.108.128:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
System.out.println(Thread.currentThread().getName() + " Receive Message: " + list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
System.out.println("consumer start...");
consumer.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# 测试结果
生产者
消费者
从控制台可以看到,在消费者端显示消息发送成功的数据,sendStatus均为ok,消费者端显示读取出来对应的消息,集群搭建成功
# 可视化管理平台
RocketMQ拥有一个可视化的管理平台,可以通过图形界面的方式查看集群情况、topic、生产者、消费者等具体信息地址: https://github.com/apache/rocketmq-externals (opens new window),项目拉下来后进入目录rocketmq-console 项目使用springboot搭建,在配置文件application.properties中修改配置rocketmq.config.namesrvAddr
rocketmq.config.namesrvAddr=192.168.108.128:9876;192.168.108.129:9876
启动项目,运行App.java。访问localhost:8080即可进入管理平台