当前位置: 云服务器知识 » 云服务器 » 华为云 RocketMQ 配置指南

华为云 RocketMQ 配置指南

华为云 RocketMQ 的配置主要包括环境准备、实例创建、Topic 创建等步骤,详细的配置指南:

  1. 准备工作
    • 注册华为云账号:如果还没有华为云账号,需要先注册一个华为账号,并完成实名认证。
    • 创建 VPC 和子网:在华为云控制台创建一个虚拟私有云(VPC)和子网,VPC 需与 RocketMQ 实例在同一区域。
    • 创建安全组:为 RocketMQ 实例创建一个安全组,并添加必要的安全组规则。如果安全组默认规则允许同一安全组内的 ECS 通信以及所有出站流量,则可直接使用,无需额外添加规则。
    • 准备 ECS 服务器:准备一台弹性云服务器(ECS),用于连接和使用 RocketMQ 实例,确保 ECS 与 RocketMQ 实例在同一 VPC 内。
    • 安装 JDK:使用 Oracle JDK 1.8.111 或更高版本,避免使用 ECS 默认的 JDK(如 OpenJDK)。
  2. 创建 RocketMQ 实例:登录华为云控制台,找到分布式消息服务 RocketMQ,点击 “创建实例”。根据需求选择实例的规格、配置等参数,如是否启用 SSL、是否启用 ACL 等,然后配置 VPC、子网和安全组等信息,完成实例创建。
  3. 创建 Topic:实例创建完成后,在控制台找到对应的 RocketMQ 实例,进入实例详情页面,点击 “Topic 管理”,然后点击 “创建 Topic”。设置 Topic 的名称、分区数、副本数等参数,完成 Topic 创建。
  4. 配置客户端连接:在用于生产和消费消息的客户端服务器上,配置环境变量NAMESRV_ADDR,指向 RocketMQ 实例的地址和端口,格式为ip:port。如果是在华为云 ECS 上连接同一 VPC 内的 RocketMQ 实例,可使用内网地址。
  5. 发送和消费消息:以 Java 客户端为例,需要引入 RocketMQ 的客户端依赖。如果使用 Maven 项目,在pom.xml中添加如下依赖:

xml

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq - client - java</artifactId>
    <version>4.9.5</version>
</dependency>

然后编写生产者和消费者代码,生产者代码示例如下:java

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        Message message = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
        SendResult sendResult = producer.send(message);
        System.out.println(sendResult);
        producer.shutdown();
    }
}

消费者代码示例如下:java

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

java复制991234567891011121314151617181920212223242526›⌄⌄⌄⌄⌄import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }}
华为云 RocketMQ 配置指南

腾讯云2核2G服务器一年38元,限时秒杀,点击查看
华为云2核2G服务器一年36元,点击查看

相关文章