RocketMQ概述
RocketMQ是一个开源的分布式消息中间件系统,最初由阿里巴巴集团开发并开源。它提供了可靠的消息传递和高效的消息订阅机制,可以支持大规模的消息通信。
RocketMQ具有高可靠性、高吞吐量、低延迟和高扩展性的特点,适用于大规模分布式系统中的消息通信需求。它支持多种消息传递模式,包括点对点和发布/订阅模式,同时还提供了丰富的消息过滤和路由功能。
RocketMQ采用了分布式架构设计,可以实现水平扩展和高可用性的部署。它支持多种消息存储方式,包括内存存储和磁盘存储,并提供了可靠的消息传递保证,确保消息不会丢失或重复。
RocketMQ还提供了丰富的监控和管理功能,可以实时监控消息的生产和消费情况,以及集群的运行状态。同时,它还提供了灵活的消息顺序控制和事务消息支持,满足了不同场景下的消息通信需求。
RocketMQ支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等。
- Name Server: 名称服务充当路由消息的提供者。是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。在消息队列 RocketMQ 中提供命名服务,更新和发现 Broker 服务。
- Broker:消息中转角色,负责存储消息,转发消息。可以理解为消息队列服务器,提供了消息的接收、存储、拉取和转发服务。
- 生产者Producer:与 Name Server 集群中的其中一个节点(随机)建立长链接(Keep-alive),定期从 Name Server 读取 Topic 路由信息,并向提供 Topic 服务的 Master Broker 建立长链接,且定时向 Master Broker 发送心跳。
- 消费者Consumer:与 Name Server 集群中的其中一个节点(随机)建立长连接,定期从 Name Server 拉取 Topic 路由信息,并向提供 Topic 服务的 Master Broker、Slave Broker 建立长连接,且定时向 Master Broker、Slave Broker 发送心跳。Consumer 既可以从 Master Broker 订阅消息,也可以从 Slave Broker 订阅消息,订阅规则由 Broker 配置决定。
RocketMQ下载:
RocketMQ地址:https://rocketmq.apache.org/zh/download
一、RocketMQ启动
1. 创建一个logs文件夹,用于存放日志(切换到rocketmq目录)
2. 启动NameServer
nohup sh bin/mqnamesrv > ./logs/namesrv.log &
3. 启动broker
nohup sh bin/mqbroker > ./logs/broker.log &
#这里的 -c 是指定使用的配置文件
nohup sh bin/mqbroker -c conf/broker.conf > ./logs/broker.log &
二、RocketMQ关闭
#切换到bin目录下
#关闭broker
./mqshutdown broker
#关闭nameserver
./mqshutdown namesrv
三、java SpringBoot整合
1. 导入依赖配置
<!-- rocketmq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
2. yml
rocketmq:
name-server: ip或域名:9876
producer:
group: aryee-message-group
3. 生产者,注意:微服务项目场景封装代码,只作参考,复制无用
@Api(tags = "消息生产者")
@RestController
@RequestMapping("/mq")
public class ProducerController extends MessageBaseController {
@Autowired
private ProducerService producerService;
@ApiOperation("普通消息")
@InnerAuth
@PostMapping("/{topic}")
public String convertAndSend(@PathVariable("topic") String topic, @RequestBody JSONObject jsonObject) {
//String topic = "aryee-message-topic";
producerService.convertAndSend(topic, jsonObject);
return "success";
}
//其它方法封装类似,这里省略大部分代码
}
4. 消费者,注意:微服务项目场景封装代码,只作参考,复制无用
@Component
public class WxConsumerListener {
@Component
@RocketMQMessageListener(topic = "wx-cp-app-msg-topic", consumerGroup = "wx-cp-app-msg")
public static class CpAppConsumerListener implements RocketMQListener<MessageExt> {
@Autowired
private WxCpAppService wxCpAppService;
@Autowired
private WxQyUserService userService;
@Override
public void onMessage(MessageExt m) {
// System.out.printf("收到消息,topic:%s, tag:%s, msgId:%s", m.getTopic(), m.getTags(), m.getMsgId());
Map<String, String> map = JSONObject.parseObject(m.getBody(), Map.class);
WxCpMessage message = JSONObject.parseObject(m.getBody(), WxCpMessage.class);
if (StringUtils.isNotEmpty(map.get("phone"))) {
LambdaQueryWrapper<WxQyUser> query = new LambdaQueryWrapper<>();
query.eq(WxQyUser::getMobile, map.get("phone"));
WxQyUser user = userService.getOne(query);
if (StringUtils.isNotNull(user)) message.setToUser(user.getUserId());
}
if (StringUtils.isEmpty(message.getToParty()) && StringUtils.isEmpty(message.getToUser())) {
message.setToParty("14");
}
if (StringUtils.isNotEmpty(map.get("chatId"))) {
WxCpAppChatMessage msg = JSONObject.parseObject(m.getBody(), WxCpAppChatMessage.class);
wxCpAppService.sendGroupMsg(msg);
} else {
R<String> result = wxCpAppService.sendAppMsg(message);
System.out.println(result);
}
}
}
}
三、配置rocketmq-dashboard
1. github下载
git:https://github.com/apache/rocketmq-dashboard.git
download:https://codeload.github.com/apache/rocketmq-dashboard/zip/refs/heads/master
2. IDE编译jar包上传到服务器
指定NameServer的地址和启动端口(8090)以及输出日志
nohup java -jar rocketmq-dashboard-1.0.0.jar &
//指定NameServer地址、启动端口、输出日志
nohup java -jar rocketmq-dashboard-1.0.0.jar --server.port=8000 --rocketmq.config.namesrvAddr=127.0.0.1:9876 > ./rocketmq-all-4.9.0-bin-release/logs/dashboard.log &
评论