标签搜索

RocketMQ 消息中间件安装、JAVA集成、运行、维护管理

Aryee
2023-11-12 / 0 评论 / 148 阅读 / 正在检测是否收录...

RocketMQ概述

RocketMQ是一个开源的分布式消息中间件系统,最初由阿里巴巴集团开发并开源。它提供了可靠的消息传递和高效的消息订阅机制,可以支持大规模的消息通信。

RocketMQ具有高可靠性、高吞吐量、低延迟和高扩展性的特点,适用于大规模分布式系统中的消息通信需求。它支持多种消息传递模式,包括点对点和发布/订阅模式,同时还提供了丰富的消息过滤和路由功能。

RocketMQ采用了分布式架构设计,可以实现水平扩展和高可用性的部署。它支持多种消息存储方式,包括内存存储和磁盘存储,并提供了可靠的消息传递保证,确保消息不会丢失或重复。

RocketMQ还提供了丰富的监控和管理功能,可以实时监控消息的生产和消费情况,以及集群的运行状态。同时,它还提供了灵活的消息顺序控制和事务消息支持,满足了不同场景下的消息通信需求。

RocketMQ支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等。

  1. Name Server: 名称服务充当路由消息的提供者。是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。在消息队列 RocketMQ 中提供命名服务,更新和发现 Broker 服务。
  2. Broker:消息中转角色,负责存储消息,转发消息。可以理解为消息队列服务器,提供了消息的接收、存储、拉取和转发服务。
  3. 生产者Producer:与 Name Server 集群中的其中一个节点(随机)建立长链接(Keep-alive),定期从 Name Server 读取 Topic 路由信息,并向提供 Topic 服务的 Master Broker 建立长链接,且定时向 Master Broker 发送心跳。
  4. 消费者Consumer:与 Name Server 集群中的其中一个节点(随机)建立长连接,定期从 Name Server 拉取 Topic 路由信息,并向提供 Topic 服务的 Master Broker、Slave Broker 建立长连接,且定时向 Master Broker、Slave Broker 发送心跳。Consumer 既可以从 Master Broker 订阅消息,也可以从 Slave Broker 订阅消息,订阅规则由 Broker 配置决定。

RocketMQ下载:

RocketMQ地址:https://rocketmq.apache.org/zh/download

一、RocketMQ启动

1. 创建一个logs文件夹,用于存放日志(切换到rocketmq目录)

2. 启动NameServer

nohup sh bin/mqnamesrv > ./logs/namesrv.log &

3. 启动broker

nohup sh bin/mqbroker > ./logs/broker.log &
#这里的 -c 是指定使用的配置文件
nohup sh bin/mqbroker -c conf/broker.conf > ./logs/broker.log &

二、RocketMQ关闭

#切换到bin目录下
#关闭broker
./mqshutdown broker
#关闭nameserver
./mqshutdown namesrv

三、java SpringBoot整合

1. 导入依赖配置

<!-- rocketmq -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${rocketmq.version}</version>
</dependency>

2. yml

rocketmq:
  name-server: ip或域名:9876
  producer:
    group: aryee-message-group

3. 生产者,注意:微服务项目场景封装代码,只作参考,复制无用

@Api(tags = "消息生产者")
@RestController
@RequestMapping("/mq")
public class ProducerController extends MessageBaseController {

    @Autowired
    private ProducerService producerService;

    @ApiOperation("普通消息")
    @InnerAuth
    @PostMapping("/{topic}")
    public String convertAndSend(@PathVariable("topic") String topic, @RequestBody JSONObject jsonObject) {
        //String topic = "aryee-message-topic";
        producerService.convertAndSend(topic, jsonObject);
        return "success";
    }
    //其它方法封装类似,这里省略大部分代码
}

4. 消费者,注意:微服务项目场景封装代码,只作参考,复制无用

@Component
public class WxConsumerListener {

    @Component
    @RocketMQMessageListener(topic = "wx-cp-app-msg-topic", consumerGroup = "wx-cp-app-msg")
    public static class CpAppConsumerListener implements RocketMQListener<MessageExt> {
        @Autowired
        private WxCpAppService wxCpAppService;
        @Autowired
        private WxQyUserService userService;

        @Override
        public void onMessage(MessageExt m) {
//            System.out.printf("收到消息,topic:%s, tag:%s, msgId:%s", m.getTopic(), m.getTags(), m.getMsgId());
            Map<String, String> map = JSONObject.parseObject(m.getBody(), Map.class);
            WxCpMessage message = JSONObject.parseObject(m.getBody(), WxCpMessage.class);
            if (StringUtils.isNotEmpty(map.get("phone"))) {
                LambdaQueryWrapper<WxQyUser> query = new LambdaQueryWrapper<>();
                query.eq(WxQyUser::getMobile, map.get("phone"));
                WxQyUser user = userService.getOne(query);
                if (StringUtils.isNotNull(user)) message.setToUser(user.getUserId());
            }
            if (StringUtils.isEmpty(message.getToParty()) && StringUtils.isEmpty(message.getToUser())) {
                message.setToParty("14");
            }
            if (StringUtils.isNotEmpty(map.get("chatId"))) {
                WxCpAppChatMessage msg = JSONObject.parseObject(m.getBody(), WxCpAppChatMessage.class);
                wxCpAppService.sendGroupMsg(msg);
            } else {
                R<String> result = wxCpAppService.sendAppMsg(message);
                System.out.println(result);
            }

        }
    }
}

三、配置rocketmq-dashboard

1. github下载

git:https://github.com/apache/rocketmq-dashboard.git
download:https://codeload.github.com/apache/rocketmq-dashboard/zip/refs/heads/master

2. IDE编译jar包上传到服务器

指定NameServer的地址和启动端口(8090)以及输出日志

nohup java -jar rocketmq-dashboard-1.0.0.jar &
//指定NameServer地址、启动端口、输出日志
nohup java -jar rocketmq-dashboard-1.0.0.jar --server.port=8000 --rocketmq.config.namesrvAddr=127.0.0.1:9876 > ./rocketmq-all-4.9.0-bin-release/logs/dashboard.log &

15

评论

博主关闭了当前页面的评论