RocketMq
本文最后更新于 2023-12-09,文章内容可能已经过时。
MQ:Message Queue:
MQ 的全称是消息队列(Message Queue)。消息队列是一种用于在分布式系统中传递消息的通信模式。它允许应用程序或系统的不同组件通过异步方式进行通信,提高了系统的可伸缩性、可靠性和灵活性。在消息队列中,消息生产者将消息发送到队列,而消息消费者从队列中接收消息,实现了解耦和异步通信。
本质上,mq能做的就是一个接口内访问另一个接口。但是集成度更高,更加安全和快捷。
主要讲解rockermq.
1.安装:
安装特点:
NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。
集群工作流程:
启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
1.1 docker安装:
我们在安装rocketmq后,要开放的端口一般有4个:9876,10911,10912,10909.
搜索镜像
docker search rocketmq
拉取镜像
docker pull rocketmqinc/rocketmq
启动nameserver
mkdir -p /docker/rocketmq/nameserver/logs /docker/rocketmq/nameserver/store
docker run -d --restart=always --name rmqnamesrv --privileged=true -p 9876:9876 -v /docker/rocketmq/nameserver/logs:/root/logs -v /docker/rocketmq/nameserver/store:/root/store -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv
启动broker
创建配置文件
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1 = 主机的IP
启动容器
docker run -d --restart=always --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 --privileged=true -v /docker/rocketmq/data/broker/logs:/root/logs -v /docker/rocketmq/data/broker/store:/root/store -v /docker/rocketmq/conf/broker.conf:/opt/docker/rocketmq/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c /opt/docker/rocketmq/broker.conf
name-server参数说明:
参数 | 说明 |
---|---|
-d | 以守护进程的方式启动 |
- -restart=always | docker重启时候容器自动重启 |
- -name rmqnamesrv | 把容器的名字设置为rmqnamesrv |
-p 9876:9876 | 把容器内的端口9876挂载到宿主机9876上面 |
-v /docker/rocketmq/nameserver/logs:/root/logs | 目录挂载 |
-v /docker/rocketmq/nameserver/store | 目录挂载 |
rmqnamesrv | 容器的名字 |
-e “MAX_POSSIBLE_HEAP=100000000” | 设置容器的最大堆内存为100000000 |
rocketmqinc/rocketmq | 使用的镜像名称 |
sh mqnamesrv | 启动namesrv服务 |
broker参数说明:
-d | 以守护进程的方式启动 |
---|---|
- -restart=always | docker重启时候容器自动重启 |
- -name rmqbroker | 把容器的名字设置为rmqbroker |
- --link rmqnamesrv:namesrv | 和rmqnamesrv容器通信 |
-p 9876:9876 | 把容器内的端口9876挂载到宿主机9876上面 |
-p 10909:10909 | 把容器的vip通道端口挂载到宿主机 |
-e “NAMESRV_ADDR=namesrv:9876” | 指定namesrv的地址为本机namesrv的ip地址:9876 |
-e “MAX_POSSIBLE_HEAP=200000000” rocketmqinc/rocketmq sh mqbroker | 指定broker服务的最大堆内存 |
rocketmqinc/rocketmq | 使用的镜像名称 |
sh mqbroker -c /opt/docker/rocketmq/broker.conf | 指定配置文件启动broker节点 |
1.2 普通安装:
首先安装java.
//下载
wget https://dist.apache.org/repos/dist/release/rocketmq/5.1.1/rocketmq-all-5.1.1-bin-release.zip
unzip rocketmq-all-5.1.1-bin-release.zip
下载目录介绍
bin:启动脚本,包括shell脚本和CMD脚本
conf:实例配置文件 ,包括broker配置文件、logback配置文件等
lib:依赖jar包,包括Netty、commons-lang、FastJSON等
修改配置:
修改runbroker.sh runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"
将启动jvm内存参数调小
修改conf/broker.conf
这里需要改一下Broker配置文件,需要指定NameServer的地址,因为需要Broker需要往NameServer注册
在文件末尾追加namesrv地址
namesrvAddr = localhost:9876
因为NameServer跟Broker在同一台机器,所以是localhost,NameServer端口默认的是9876。
文件末尾继续追加brokerIp,IP值是当前部署broker的服务器外网IP
brokerIP1 = 192.168.200.143
brokerIP2 = 192.168.200.143
因为Broker向NameServer进行注册的时候,带过去的ip如果不指定就会自动获取,但是自动获取的有个坑,就是有可能客户端无法访问到这个自动获取的ip,所以我建议手动指定客户端可以访问到的服务器ip。
启动
nameserver:
sh ./mqnamesrv
cd /root/rocketmq/rocketmq-all/bin
nohup sh `./mqnamesrv` &
在bin目录下执行
运行jps,出现了namesrvstartup即为成功
broker:
进入bin目录执行
nohup sh ./mqbroker -c ../conf/broker.conf -n localhost:9876 autoCreateTopicEnable=true &
jps查看当前已启动的java进程,出现brokerstartup即为成功
2.特性:
2.1 MQ选择:
特性 | ActiveMQ | RabbitMQ | RocketMQ | kafka |
---|---|---|---|---|
吞吐量 | 万级 | 万级 | 10万级 | 10万级 |
时效性 | ms级 | 微秒级 | ms级 | ms级 |
可用性 | 高:主从 | 高:主从 | 非常高:分布式 | 非常高:分布式 |
消息可靠性 | 可能丢失 | 可能丢失 | 0丢失 | 0丢失 |
2.1.1 RabbitMQ:
开发语言: RabbitMQ 使用 Erlang 语言进行开发,这使得它在处理并发连接时非常强大和高效。
协议支持: RabbitMQ 支持多种协议,包括AMQP(Advanced Message Queuing Protocol)等,这使得它具有很好的可扩展性和与其他系统集成的能力。
可靠性: RabbitMQ 提供了强大的消息持久性和可靠性保证,可以确保消息不会在系统故障或重启时丢失。
社区和生态系统: RabbitMQ 有一个活跃的社区,广泛用于企业和开源项目,有大量的文档和教程可用。
2.1.2 RocketMQ:
开发语言: RocketMQ 使用 Java 进行开发,因此更适合Java生态系统。
协议支持: RocketMQ 采用了自己的通信协议,与其他消息队列系统可能不太兼容。
可靠性: RocketMQ 也提供了消息持久性和可靠性,可以保证消息不会丢失。
分布式特性: RocketMQ 是为构建分布式系统而设计的,提供了一些分布式特性,如水平扩展和高可用性。
适用场景: RocketMQ 在阿里巴巴集团的业务中得到广泛应用,特别是在大规模的分布式场景下。
选择 RabbitMQ 还是 RocketMQ 取决于项目的具体需求和技术栈。如果你的项目主要使用Java,并且你需要一个在Java生态系统中更紧密集成的消息队列系统,RocketMQ可能是一个更好的选择。如果你需要更广泛的语言支持和更成熟的社区支持,RabbitMQ可能更适合。
3.使用:
3.1 Java原生使用:
3.1.1 依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.1.1</version>
</dependency>
3.1.2 生产者:
public class Producer {
public static void main(String[] args) throws Exception {
//创建一个生产者,指定生产者组为StarGeo
DefaultMQProducer producer = new DefaultMQProducer("StarGeo");
// 指定NameServer的地址
producer.setNamesrvAddr("154.8.204.64:9876");
// 第一次发送可能会超时,设置的比较大
producer.setSendMsgTimeout(1000000);
// 启动生产者
producer.start();
// 创建一条消息
// topic为HomuraAkime
// 消息内容为homura daisuki
// tags 为 homura
Message msg = new Message("HomuraAkime", "homura", "homura daisuki ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息并得到消息的发送结果,然后打印
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
// 关闭生产者
producer.shutdown();
}
}
3.1.3 消费者:
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 通过push模式消费消息,指定消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("StarGeoConsumer");
// 指定NameServer的地址
consumer.setNamesrvAddr("154.8.204.64:9876");
// 订阅这个topic下的所有的消息
consumer.subscribe("HomuraAkime", "*");
// 注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("消费消息:%s", new String(msg.getBody()) + "\n");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
3.2 spring集成:
3.2.1 依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
3.2.2 yml配置:
rocketmq:
producer:
group: homura
name-server: 154.8.204.64:9876
3.2.3 生产者:
@Slf4j
@RestController
public class RocketMqDemo {
@Autowired
RocketMQTemplate rocketMQTemplate;
@GetMapping("send/{id}")
public String send(@PathVariable("id") String id){
UserVo userVo = new UserVo(id,"侯征");
log.warn(JSON.toJSONString(userVo));
rocketMQTemplate.send("rocket-topic-01", MessageBuilder.withPayload(userVo).build());
return "SUCESS";
}
}
3.2.4 消费者:
@Slf4j
@Component
@RocketMQMessageListener(topic = "rocket-topic-01", consumerGroup = "my-rocket-topic-01")
public class UserConsumer implements RocketMQListener<UserVo> {
@Override
public void onMessage(UserVo message) {
log.warn("接受到消息: {}",message.toString());
}
}
4.应用场景:
4.1 解耦:
在跨系统调用接口时,会出现代码侵入性,而mq可以解决这种问题。
4.2 异步:
首先要知道什么叫异步:
同步(Synchronous):
在同步操作中,发起请求的任务会被阻塞,直到得到响应或完成特定操作为止。
执行顺序是按照调用的顺序进行的,即一个任务在完成之前会一直等待。
同步操作通常意味着代码会按照顺序执行,每个操作都会等待前一个操作的完成。
异步(Asynchronous):
在异步操作中,发起请求的任务不会被阻塞,它可以继续执行其他任务而不必等待操作完成。
异步操作通常涉及回调机制、事件驱动或者使用异步任务来处理。
异步操作允许并发执行多个任务,提高系统的响应性和性能。
简单来说,同步就是顺序执行,异步就是乱序执行,能缩短所需要的时间。
使用mq可以实现异步,在跨系统接口调用时,使用mq可以把请求发送出去然后返回,从而缩短处理时间。
4.3 削峰:
当接口吞吐量过大时,可以先把请求放入mq,然后再从mq中取出能处理的请求进行处理。
但是使用mq也会出现系统可用性降低,系统复杂程度提高,并有可能会出现一致性问题。