1.pom 文件
com.alibaba.rocketmq rocketmq-client 3.2.6
生产者 :
package com.sun.rocketmq.qs;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
/**
- @ClassName: rocket 生产者
消费者 0:
package com.sun.rocketmq.qs;
import java.io.UnsupportedEncodingException;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
/**
- Consumer,订阅消息
*/
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {
/**
* PushConsumer Consumer 的一种,应用通常吐 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立
* 刻回调 Listener 接口方法。
/ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“hw”); consumer.setNamesrvAddr(“192.168.1.244:9876;192.168.1.245:9876;192.168.1.242:9876;192.168.1.243:9876”); /*
* ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET 一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费
* ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET 一个新的订阅组第一次启动从队列的最后位置开始消费,后续再启动接着上次消费的进度开始消费
* ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST @Deprecated
* ConsumeFromWhere.CONSUME_FROM_MAX_OFFSET @Deprecated
* ConsumeFromWhere.CONSUME_FROM_MIN_OFFSET @Deprecated
* ConsumeFromWhere.CONSUME_FROM_TIMESTAMP 一个新的订阅组第一次启动从指定时间点开始消费,后续再启动接着上次消费的进度开始消费,
* 时间点设置参见 DefaultMQPushConsumer.consumeTimestamp 参数
*
*
* MessageModel.CLUSTERING 集群消费,一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个
Consumer Group 有 3 个实例(可能是 3 个迕程,戒者 3 台机器),那举每个实例只消费其中的 3 条消息
* MessageModel.BROADCASTING 广播
*
/ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setMessageModel(MessageModel.CLUSTERING); /*
* 主题 Tpoic:第一级消息类型,书的标题
* 标签 Tags:第二级消息类型,书的目录,可以基于 Tag 做消息过滤
*/
consumer.subscribe(“TopicTest”, //指定消费主题是 topicTest
“TagA||TagB”); //tag 为 TagA tagB 的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { /** * List<MessageExt> msgs 这里虽然是 list 但实际上基本都只会是一条,除非消息堆积, * 且记住一定是先启动消费者,再启动生产者 * 否则极有可能导致消息的重复消费 * */ for(MessageExt mext : msgs) { try { System.out.println("消费了一条消息:"+new String(mext.getBody(),"utf-8")); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); //消费失败告诉 mq 重新发送继续消费 如果多次消费仍不成功可以记录在数据库中,可以通过 mext.getReconsumeTimes()获取消费次数 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } /* * 告诉 mq 消费成功 */ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});consumer.start();System.out.println(“Consumer Started.”);}
}
消费者 1:
package com.sun.rocketmq.qs;
import java.io.UnsupportedEncodingException;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
/**
- Consumer,订阅消息
*/
public class Consumer1 {public static void main(String[] args) throws InterruptedException, MQClientException {
/**
* PushConsumer Consumer 的一种,应用通常吐 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立
* 刻回调 Listener 接口方法。
/ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“hw”); consumer.setNamesrvAddr(“192.168.1.244:9876;192.168.1.245:9876;192.168.1.242:9876;192.168.1.243:9876”); /*
* ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET 一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费
* ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET 一个新的订阅组第一次启动从队列的最后位置开始消费,后续再启动接着上次消费的进度开始消费
* ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST @Deprecated
* ConsumeFromWhere.CONSUME_FROM_MAX_OFFSET @Deprecated
* ConsumeFromWhere.CONSUME_FROM_MIN_OFFSET @Deprecated
* ConsumeFromWhere.CONSUME_FROM_TIMESTAMP 一个新的订阅组第一次启动从指定时间点开始消费,后续再启动接着上次消费的进度开始消费,
* 时间点设置参见 DefaultMQPushConsumer.consumeTimestamp 参数
*
*
* MessageModel.CLUSTERING 集群消费,一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个
Consumer Group 有 3 个实例(可能是 3 个迕程,戒者 3 台机器),那举每个实例只消费其中的 3 条消息
* MessageModel.BROADCASTING 广播 被每个消费组消费
*
/ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setMessageModel(MessageModel.CLUSTERING); /*
* 主题 Tpoic:第一级消息类型,书的标题
* 标签 Tags:第二级消息类型,书的目录,可以基于 Tag 做消息过滤
*/
consumer.subscribe(“TopicTest”, //指定消费主题是 topicTest
“TagA||TagB”); //tag 为 TagA tagB 的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { /** * List<MessageExt> msgs 这里虽然是 list 但实际上基本都只会是一条,除非消息堆积, * 且记住一定是先启动消费者,再启动生产者 * 否则极有可能导致消息的重复消费 * */ for(MessageExt mext : msgs) { try { System.out.println("消费了一条消息:"+new String(mext.getBody(),"utf-8")); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace();
//消费失败告诉 mq 重新发送继续消费 如果多次消费仍不成功可以记录在数据库中,可以通过 mext.getReconsumeTimes()获取消费次数 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } /* * 告诉 mq 消费成功 */ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});consumer.start();System.out.println(“Consumer Started.”);}
}
先启动两个消费者,然后启动生产者,生产者将产生 10 条消息,下图是打印信息:
通过打印信息我们可以看到有的消息被发送到了 broker-a ,有的消息被发送到了 broker-b 实现了自动负载均衡
在集群消费模式下,消息会被消费组里的负载均衡消费
customer0 ,打印消息:
customer1,打印: