• -------------------------------------------------------------
  • ====================================

rocketmq 消息队列的顺序性问题

kafka dewbay 6年前 (2019-04-12) 3122次浏览 已收录 0个评论 扫描二维码

为了实现分布式系统可扩展、可伸缩性的关键组件,需要具有高吞吐量、高可用等特点。我们很多时候都会考虑将消息系统纳入我们的选择中;比如我一个登录事件,有可能我登录之后需要做很多东西,比如日志,比如发布消息,比如推送,再比如发送代金券等等;这些事件与登录息息相关,但是本质上它们与登录这个事件没有直接的关系,只是在登录事件后,系统按照需求需要去初始化一些东西,或者去记录一些东西等等;如果把所有的东西都纳入到登录这个事件中(同一个事物中),那登录的事件内处理的逻辑更多,会造成什么后果?登录时间很长,让用户无法忍受,另外,假如登录过程中出现了未发现异常,那是不是导致用户直接无法登录?为了解决这样的问题,我们引入了消息系统,比如我这台机登录过后,我将登录的一些信息,通过远程方式发送到另外一台机器上(或者同一台机),让它们去处理相应的后续逻辑实现;

    目的是:1、用户登录更快,体验上更好,

                   2、只要保证登录部分完整,即便后续出错,并不影响用户正常使用,即容错性更强!

谈到消息系统,首先想到的第一个问题肯定会是:

    消息的顺序性

本来很想说一下关于消息顺序性的一些问题,不过由于我也是借鉴了一些其他的帖子,以及官方的文档,所以这里就不会去赘述这些了,稍后我会分享一些很不错的链接,留给自己以后看,也希望可以给一些刚好要入门 rocketmq 的网友提供一些资料;

rocketmq 是阿里云的一套开源产品,功能什么的就不赘述了,请自行去网站了解:https://help.aliyun.com/document_detail/29532.html?spm=5176.doc34411.6.104.EvZr21

rocketmq 是一类二级消息分类的产品,一级为 topic,二级为 tag;

broker 按照收到生产者发送的消息体,分析其中的 topic,然后去找到相应的 topic 转发出去,在消费端,消费者根据收到的消息分析出 tag 的不同去做不同的逻辑处理;

那么在这个时候,我们就会好奇,为了保证消息的顺序执行的情况,RokectMQ 是如何选择 topic?为此,我们先看看 rokcetmq 的源代码:


// 官方例子如下:
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

        producer.start();

        for (int i = 0; i < 10000000; i++)
            try {
                {
                    Message msg = new Message("TopicTest",// topic
                            "TagA",// tag
                            "OrderID188",// key
                            ("Hello MetaQ").getBytes(RemotingHelper.DEFAULT_CHARSET));// body
                    SendResult sendResult = producer.send(msg); //发送消息
                    System.out.println(sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }
}
// defalutMQProducer 类下封装的方法
    @Override
    public SendResult send(Message msg, MessageQueue mq) throws MQClientException, RemotingException,
            MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, mq);
    }

// send 的实现方法
@Override
    public SendResult send(Message message) {
        this.checkONSProducerServiceState(this.defaultMQProducer.getDefaultMQProducerImpl());
        com.alibaba.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message);

        try {
            com.alibaba.rocketmq.client.producer.SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ);

            message.setMsgID(sendResultRMQ.getMsgId());
            SendResult sendResult = new SendResult();
            sendResult.setTopic(sendResultRMQ.getMessageQueue().getTopic());//如何选择 topic
            sendResult.setMessageId(sendResultRMQ.getMsgId());
            return sendResult;
        } catch (Exception e) {
            log.error(String.format("Send message Exception, %s", message), e);
            throw checkProducerException(message.getTopic(), message.getMsgID(), e);
        }
    }

在官方例子中,我们可以看到,在发送消息的时候,我们并没有去了解细致的发送消息时,那么 MQ 到底是如何选择 topic 的?

但是可以从代码中看到,它确实有个 MessageQueueSelector 接口,这个接口负责是选择 topic,那么我们就来看看它到底为我们提供了那些实现方法吧(一般的消息都是轮询去寻找 topic 来实现负载均衡):

 /**
     * 如果 lastBrokerName 不为 null,则寻找与其不同的 MessageQueue(轮询负载均衡)
     */
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName != null) {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();//轮询
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }

            return null;
        }
        else {
            int index = this.sendWhichQueue.getAndIncrement();
            int pos = Math.abs(index) % this.messageQueueList.size();
            return this.messageQueueList.get(pos);
        }
    }

如果对于这种轮询方式的负载均衡不满意,并不能打达到我们的需求,那么我们又改如何去选择?

      阿里云提供了三种方式来解决我们的需求,如果再不能满足,那么就知道修改源码算法部分来达到自己的要求了。
/**
 * 使用哈希算法来选择队列,顺序消息通常都这样做<br>
 * 
 * @author shijia.wxr<vintage.wang@gmail.com>
 * @since 2013-6-27
 */
public class SelectMessageQueueByHash implements MessageQueueSelector {

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value = value % mqs.size();
        return mqs.get(value);
    }
}

/**
 * 根据机房来选择发往哪个队列,支付宝逻辑机房使用
 * 
 * @author shijia.wxr<vintage.wang@gmail.com>
 * @since 2013-7-25
 */
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
    private Set<String> consumeridcs;


    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        // TODO Auto-generated method stub
        return null;
    }


    public Set<String> getConsumeridcs() {
        return consumeridcs;
    }


    public void setConsumeridcs(Set<String> consumeridcs) {
        this.consumeridcs = consumeridcs;
    }
}

/**
 * 发送消息,随机选择队列
 * 
 * @author shijia.wxr<vintage.wang@gmail.com>
 * @since 2013-7-25
 */
public class SelectMessageQueueByRandoom implements MessageQueueSelector {
    private Random random = new Random(System.currentTimeMillis());


    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = random.nextInt();
        if (value < 0) {
            value = Math.abs(value);
        }

        value = value % mqs.size();
        return mqs.get(value);
    }
}

可以看到,rocketmq 为我们提供了三个选择(除去轮询方法),那么如果我们在非常关注消息顺序的时候,我们可以选择通过哈希算法求值的方式来实现

    SelectMessageQueueByHash

    我们每个传递进入的对象都会被哈希算法计算出 一个哈希值,比如我们传递的是订单号,那么无疑我们可以保证相同的订单号可以传递给相同的 topic 去处理,那么只要再保证是一致的 tag 就可以保证顺序的一致性啦;

    目的是:生产者 — MQ 服务端 — 消费者 可以达到一一对应的关系

第二种是机房选择,算法是木有啦,应该是根据 ip 地址去区分,反正概念我不是很清晰,也没有去注意和了解;有了解的亲留个资料给我吧,链接就好,谢谢撒……

第三种是随机选择,也就是谁也不知道它到底会选择谁,这种效率其实很差,没有负载均衡,谁也不知道会不会堵塞起来,谁也不知道某个队列是否已经塞满。

有些问题,看起来很重要,但实际上我们可以通过合理的设计或者将问题分解来规避。如果硬要把时间花在解决它们身上,实际上是浪费的,效率低下的。从这个角度来看消息的顺序问题,我们可以得出两个结论:

    1、不关注乱序的应用实际大量存在
    2、队列无序并不意味着消息无序

参考链接如下:


露水湾 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:rocketmq 消息队列的顺序性问题
喜欢 (1)
[]
分享 (0)
关于作者:
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址