华为云 RocketMQ 的配置主要包括环境准备、实例创建、Topic 创建等步骤,详细的配置指南:
- 准备工作:
- 注册华为云账号:如果还没有华为云账号,需要先注册一个华为账号,并完成实名认证。
- 创建 VPC 和子网:在华为云控制台创建一个虚拟私有云(VPC)和子网,VPC 需与 RocketMQ 实例在同一区域。
- 创建安全组:为 RocketMQ 实例创建一个安全组,并添加必要的安全组规则。如果安全组默认规则允许同一安全组内的 ECS 通信以及所有出站流量,则可直接使用,无需额外添加规则。
- 准备 ECS 服务器:准备一台弹性云服务器(ECS),用于连接和使用 RocketMQ 实例,确保 ECS 与 RocketMQ 实例在同一 VPC 内。
- 安装 JDK:使用 Oracle JDK 1.8.111 或更高版本,避免使用 ECS 默认的 JDK(如 OpenJDK)。
- 创建 RocketMQ 实例:登录华为云控制台,找到分布式消息服务 RocketMQ,点击 “创建实例”。根据需求选择实例的规格、配置等参数,如是否启用 SSL、是否启用 ACL 等,然后配置 VPC、子网和安全组等信息,完成实例创建。
- 创建 Topic:实例创建完成后,在控制台找到对应的 RocketMQ 实例,进入实例详情页面,点击 “Topic 管理”,然后点击 “创建 Topic”。设置 Topic 的名称、分区数、副本数等参数,完成 Topic 创建。
- 配置客户端连接:在用于生产和消费消息的客户端服务器上,配置环境变量
NAMESRV_ADDR
,指向 RocketMQ 实例的地址和端口,格式为ip:port
。如果是在华为云 ECS 上连接同一 VPC 内的 RocketMQ 实例,可使用内网地址。 - 发送和消费消息:以 Java 客户端为例,需要引入 RocketMQ 的客户端依赖。如果使用 Maven 项目,在
pom.xml
中添加如下依赖:
xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq - client - java</artifactId>
<version>4.9.5</version>
</dependency>
然后编写生产者和消费者代码,生产者代码示例如下:java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message message = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
producer.shutdown();
}
}
消费者代码示例如下:java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
java复制991234567891011121314151617181920212223242526›⌄⌄⌄⌄⌄import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }}