RocketMQ单机部署完整学习笔记
文章目录
前言
本文是基于4.X版本RocketMQ,从MQ的搭建,消息推送和消费以及dashboard的使用
一、RocketMQ是什么?
参考文档 https://rocketmq.apache.org/zh/docs/4.x/
 重点角色如下
- Producer:消息的发送者;举例:发信者
- Consumer:消息接收者;举例:收信者
- Broker:暂存和传输消息;举例:邮局
- NameServer:管理Broker;举例:各个邮局的管理机构
- Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息;举例
- tag:消息标签,方便服务器过滤使用
二、使用步骤
1.安装MQ
首先安装jdk 再安装mq
1.安装JDK
- 查看Linux系统是否有自带的jdk
java -version
如果有 输入 rpm -qa | grep java 检测jdk的安装包
 接着进行一个个删除包,输入:rpm -e --nodeps +包名
 最后再次:rpm -qa | grep java检查是否删除完即可
- 下载jdk
https://www.oracle.com/java/technologies/downloads/#java8
 资源连接地址
 还在审核中
 3. 上传jdk到linux服务器
 
- 解压jdk
tar -zvxf jdk-8u241-linux-x64..tar.gz
- 配置环境变量
 用vim /etc/profile进入编辑状态,加入下边这段配置
 JAVA_HOME 根据自己的解压路径来写
JAVA_HOME=/usr/local/jdk/jdk1.8.0_241
JRE_HOME=$JAVA_HOME/jre
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME PATH CLASSPATH
- 重新加载配置
source /etc/profile
- 进行测试
java -version

2.安装mq
-  下载mq 
 连接 https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-source-release.zip
-  解压上传 
 我下载的是公司的mq是4.8的,官网链接给的是根4.9的,这个问题不大,不影响
目录结构如下
 

 
- 配置环境变量
vim /etc/profile
# 在末尾加入下面配置 路径和自己解压的mq路径一直 上一步有截图
export ROCKETMQ_HOME=/bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0
# 使环境变量生效
source /etc/profile
3.MQ配置(核心)
这一步很重要,配置完这里,那mq就算部署好了
- 修改runserver.sh
 默认配置比较大,修改启动大小
## cd /bin
vim runserver.sh
## 修改启动大小
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
- 启动服务nameserver
## 启动
nohup sh bin/mqnamesrv &
## 关闭
sh bin/mqshutdown namesrv
出现下面就算启动成功了
 The Name Server boot success. serializeType=JSON
 注意目录别进错了
 
- 指定NameServer地址
 相当于 broker注册到nameserver上
vim /etc/profile
# 在末尾加入下面配置 有多个时以分号隔开,这个是集群时使用的 mq端口默认是9876 
# 192.168.141.101是服务器地址
export NAMESRV_ADDR=192.168.141.101:9876
# 使环境变量生效
source /etc/profile
- 修改 runbroker.sh
修改启动参数
vim runbroker.sh
## 
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"
- 修改broker.conf
 重要,核心配置,以后关于mq服务的配置都在这里
 conf目录下
 2m-2s-async
 2m-2s-sync
 2m-noslave
 dledger
 这四个目录是集群配置时会用到,我们这路是单机的先不管
vim broker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
#brokerid,0就表示是Master,>0的都是表示
brokerId = 0
# 这个就是第三三步配置的export NAMESRV_ADDR=192.168.141.101:9876 多个以分号分割
namesrvAddr=192.168.141.101:9876
#如果是多网卡的机器,比如云服务器,那么需要在broker.conf中增加brokerIP1属性,
#指定所在机器的外网网卡地址
brokerIP1=192.168.141.101
#对外服务的监听端口
listenPort=10911
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
traceTopicEnable=true
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true 

 
- 启动broker
 进入bin目录
 注意 -c 很多人启动不起来就是没加上
#启动
nohup sh  bin/mqbroker -c conf/broker.conf &
# 关闭
sh bin/mqshutdown broker
查看nohup.out
 出现这样的就说明启动成功了
 The broker[broker-a, 192.168.141.101:10911] boot success. serializeType=JSON and name server is 192.168.141.101:9876

- 到此mq服务已启动完成
jps

- 查看日志
## 查看namesrv日志
 tailf /root/logs/rocketmqlogs/namesrv.log 
 ## broker日志
 tailf /root/logs/rocketmqlogs/broker.log 
其实前面启动的时候查看这里的日志也可以看是否启动没
 停掉时先停broker 再停namesrv 启动先启 namesrv 再启动broker 因为broker 需要注册到namesrv 上
- 发送和接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer


sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

2.搭建可视化dashboard
1.下载源码
点击下面直接下载4.X的
 https://github.com/apache/rocketmq-dashboard/tree/release-1.0.0
 说下
 现在MQ已经到5.X了,但是现在还保留着4的,分支下拉到最后可以看到一个relaese-1.0.0
 这个就是4.X用的,下载下来后解压
 切记版本要对上,不然你和我一样折腾个一两天

2.修改配置
主要改下这个
rocketmq.config.namesrvAddr=192.168.141.101:9876
如果自己端口需要修改也可以,我是改成了8078
 
3.启动

 访问 http://localhost:8078
 
3.整合java
1.生产者
直接上代码
package com.bsoft;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.IOException;
/**
 * @author:
 * @time: 2023/12/29 15:40
 */
public class MQPublisher {
    private final static String nameServer = "192.168.141.101:9876";
    private final static String producerGroup = "my_group";
    private final static String consumerGroup = "my_group";
    private final static String topic = "topic_test";
    public static void main(String[] args) throws IOException {
        // 初始化一个producer并设置Producer group name
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        try {
            // 设置NameServer地址
            producer.setNamesrvAddr(nameServer);
            // 启动producer
            producer.start();
            // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
            Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 异步发送消息, 发送结果通过callback返回给客户端
            producer.send(msg, new SendCallback() {
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("OK %s %n",
                            sendResult.getMsgId());
                }
                public void onException(Throwable e) {
                    System.out.printf("Exception %s %n", e);
                    e.printStackTrace();
                }
            },10000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.in.read();
    }
}
2.消费者
package com.bsoft;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
/**
 * @author: liucheng
 * @time: 2023/12/29 15:39
 */
public class MQConsumer {
    private final static String nameServer = "192.168.141.101:9876";
    private final static String consumerGroup = "my_group_test";
    private final static String topic = "topic_test";
    public static void main(String[] args) throws MQClientException, IOException, InterruptedException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup,true);
        // 设置NameServer的地址
        consumer.setNamesrvAddr(nameServer);
        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe(topic, "tagE");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                msgs.forEach((msg)->{
                    byte[] body = msg.getBody();
                    String s = new String(body, Charset.defaultCharset());
                    System.out.println("msg=================> " +s);
                });
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started......");
//        Thread.sleep(5000);
//        consumer.shutdown();
        System.in.read();
    }
}
3.启动生产者


查看详情
 
4.启动消费者
package com.bsoft;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
/**
 * @author: liucheng
 * @time: 2023/12/29 15:39
 */
public class MQConsumer {
    private final static String nameServer = "192.168.141.101:9876";
    private final static String consumerGroup = "my_group_test";
    private final static String topic = "topic_test";
    public static void main(String[] args) throws MQClientException, IOException, InterruptedException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        // 设置NameServer的地址
        consumer.setNamesrvAddr(nameServer);
        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe(topic, "tagA");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                msgs.forEach((msg)->{
                    byte[] body = msg.getBody();
                    String s = new String(body, Charset.defaultCharset());
                    System.out.println("msg=================> " +s);
                });
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started......");
//        Thread.sleep(5000);
//        consumer.shutdown();
        System.in.read();
    }
}

 没日志打印,但是又显示消费了
 
5.dashboard添加消费组
没查询到那添加一个
 
 
 重启生产者发现可以消费
对于消费组还是得在dashboard创建好了再去写代码,有的说能够改配置能否直接创建,试过了,没生效,先这样吧,有好的方法或搭建过程中遇到什么问题可以私聊我,看到及时回答
三、总结
整个搭建过程踩了不少坑,比如
 版本的不一致导致部分功能一直报错;
 启动brocker时未指定实例文件没有加-c来启动导致部署失败;消
 费组未在dashboard创建时代码中不显示消费信息
 关于5.x无法打包的问题是因为缺少yarn-1.22.10.tar.gz 这个已经上传到jdk那个资源下了,把这个复制到
 {path}\maven\repository\com\github\eirslett\yarn\1.22.10再打包即可
全部的配置
#  cat /etc/profile
JAVA_HOME=/usr/local/jdk/jdk1.8.0_241
JRE_HOME=$JAVA_HOME/jre
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME PATH CLASSPATH
export ROCKETMQ_HOME=/bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0
export NAMESRV_ADDR=192.168.141.101:9876
#  cat runbroker.sh 
 
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"
# cat runserver.sh 
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
# cat broker.conf 
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
namesrvAddr=192.168.141.101:9876
brokerIP1=192.168.141.101
listenPort=10911
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
traceTopicEnable=true
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true 
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!