• 欢迎访问露水湾网站,WordPress信息,WordPress教程,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站
  • Git主题现已支持滚动公告栏功能,兼容其他浏览器,看到的就是咯,在后台最新消息那里用li标签添加即可。
  • 最新版Git主题已支持说说碎语功能,可像添加文章一样直接添加说说,新建说说页面即可,最后重新保存固定连接,

rocketmq实战入门

kafka dewbay 3年前 (2019-04-12) 1123次浏览 已收录 0个评论 扫描二维码

1.pom 文件


com.alibaba.rocketmq rocketmq-client 3.2.6

生产者 :

package com.sun.rocketmq.qs;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

/**

  • @ClassName: rocket 生产者

消费者 0:
package com.sun.rocketmq.qs;

import java.io.UnsupportedEncodingException;
import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

/**

  • Consumer,订阅消息
    */
    public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {
    /**
    * PushConsumer Consumer 的一种,应用通常吐 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立
    * 刻回调 Listener 接口方法。
    / DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“hw”); consumer.setNamesrvAddr(“192.168.1.244:9876;192.168.1.245:9876;192.168.1.242:9876;192.168.1.243:9876”); /*
    * ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET 一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费
    * ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET 一个新的订阅组第一次启动从队列的最后位置开始消费,后续再启动接着上次消费的进度开始消费
    * ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST @Deprecated
    * ConsumeFromWhere.CONSUME_FROM_MAX_OFFSET @Deprecated
    * ConsumeFromWhere.CONSUME_FROM_MIN_OFFSET @Deprecated
    * ConsumeFromWhere.CONSUME_FROM_TIMESTAMP 一个新的订阅组第一次启动从指定时间点开始消费,后续再启动接着上次消费的进度开始消费,
    * 时间点设置参见 DefaultMQPushConsumer.consumeTimestamp 参数
    *
    *
    * MessageModel.CLUSTERING 集群消费,一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个
    Consumer Group 有 3 个实例(可能是 3 个迕程,戒者 3 台机器),那举每个实例只消费其中的 3 条消息
    * MessageModel.BROADCASTING 广播
    *
    / consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setMessageModel(MessageModel.CLUSTERING); /*
    * 主题 Tpoic:第一级消息类型,书的标题
    * 标签 Tags:第二级消息类型,书的目录,可以基于 Tag 做消息过滤
    */
    consumer.subscribe(“TopicTest”, //指定消费主题是 topicTest
    “TagA||TagB”); //tag 为 TagA tagB 的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { /** * List<MessageExt> msgs 这里虽然是 list 但实际上基本都只会是一条,除非消息堆积, * 且记住一定是先启动消费者,再启动生产者 * 否则极有可能导致消息的重复消费 * */ for(MessageExt mext : msgs) { try { System.out.println("消费了一条消息:"+new String(mext.getBody(),"utf-8")); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); //消费失败告诉 mq 重新发送继续消费 如果多次消费仍不成功可以记录在数据库中,可以通过 mext.getReconsumeTimes()获取消费次数 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } /* * 告诉 mq 消费成功 */ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println(“Consumer Started.”);}
    }

消费者 1:
package com.sun.rocketmq.qs;

import java.io.UnsupportedEncodingException;
import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

/**

  • Consumer,订阅消息
    */
    public class Consumer1 {public static void main(String[] args) throws InterruptedException, MQClientException {
    /**
    * PushConsumer Consumer 的一种,应用通常吐 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立
    * 刻回调 Listener 接口方法。
    / DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“hw”); consumer.setNamesrvAddr(“192.168.1.244:9876;192.168.1.245:9876;192.168.1.242:9876;192.168.1.243:9876”); /*
    * ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET 一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费
    * ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET 一个新的订阅组第一次启动从队列的最后位置开始消费,后续再启动接着上次消费的进度开始消费
    * ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST @Deprecated
    * ConsumeFromWhere.CONSUME_FROM_MAX_OFFSET @Deprecated
    * ConsumeFromWhere.CONSUME_FROM_MIN_OFFSET @Deprecated
    * ConsumeFromWhere.CONSUME_FROM_TIMESTAMP 一个新的订阅组第一次启动从指定时间点开始消费,后续再启动接着上次消费的进度开始消费,
    * 时间点设置参见 DefaultMQPushConsumer.consumeTimestamp 参数
    *
    *
    * MessageModel.CLUSTERING 集群消费,一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个
    Consumer Group 有 3 个实例(可能是 3 个迕程,戒者 3 台机器),那举每个实例只消费其中的 3 条消息
    * MessageModel.BROADCASTING 广播 被每个消费组消费
    *
    / consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setMessageModel(MessageModel.CLUSTERING); /*
    * 主题 Tpoic:第一级消息类型,书的标题
    * 标签 Tags:第二级消息类型,书的目录,可以基于 Tag 做消息过滤
    */
    consumer.subscribe(“TopicTest”, //指定消费主题是 topicTest
    “TagA||TagB”); //tag 为 TagA tagB 的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { /** * List<MessageExt> msgs 这里虽然是 list 但实际上基本都只会是一条,除非消息堆积, * 且记住一定是先启动消费者,再启动生产者 * 否则极有可能导致消息的重复消费 * */ for(MessageExt mext : msgs) { try { System.out.println("消费了一条消息:"+new String(mext.getBody(),"utf-8")); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace();//消费失败告诉 mq 重新发送继续消费 如果多次消费仍不成功可以记录在数据库中,可以通过 mext.getReconsumeTimes()获取消费次数 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } /* * 告诉 mq 消费成功 */ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});consumer.start();System.out.println(“Consumer Started.”);}
    }

先启动两个消费者,然后启动生产者,生产者将产生 10 条消息,下图是打印信息:

通过打印信息我们可以看到有的消息被发送到了 broker-a ,有的消息被发送到了 broker-b 实现了自动负载均衡

在集群消费模式下,消息会被消费组里的负载均衡消费

customer0 ,打印消息:

customer1,打印:


露水湾 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:rocketmq实战入门
喜欢 (1)
[]
分享 (0)
关于作者:
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址