中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

RocketMQ 源碼學(xué)習(xí) 4 : 消息發(fā)送

2018-07-02    來(lái)源:importnew

容器云強(qiáng)勢(shì)上線!快速搭建集群,上萬(wàn)Linux鏡像隨意使用

1. Client端,三種發(fā)送方式

RocketMQ 支持常見(jiàn)的三種發(fā)送方式,

  • SYNC
producer.send(msg)

同步的發(fā)送方式,會(huì)等待發(fā)送結(jié)果后才返回?梢杂 send(msg, timeout) 的方式指定等待時(shí)間,如果不指定,就是默認(rèn)的 3000ms. 這個(gè)timeout 最終會(huì)被設(shè)置到 ResponseFuture 里,再發(fā)送完消息后,用 countDownLatch 去 await timeout的時(shí)間,如果過(guò)期,就會(huì)拋出異常。

  • ASYNC
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
    }
    @Override
    public void onException(Throwable e) {
        System.out.printf("%-10d Exception %s %n", index, e);
        e.printStackTrace();
    }
});

異步的發(fā)送方式,發(fā)送完后,立刻返回。Client 在拿到 Broker 的響應(yīng)結(jié)果后,會(huì)回調(diào)指定的 callback. 這個(gè) API 也可以指定 Timeout,不指定也是默認(rèn)的 3000ms.

  • ONEWAY
producer.sendOneway(msg);

比較簡(jiǎn)單,發(fā)出去后,什么都不管直接返回。

對(duì)于每種方式,Producer 還提供了可以指定 MessageQueue, MessageQueueSelector的API,這屬于稍微高端一點(diǎn)的玩法,一般用它提供的默認(rèn)的策略選擇 MessageQueue 就可以了。

2. Client端發(fā)送過(guò)程

下面以 SYNC 方式為例,看下整個(gè)消息的發(fā)送過(guò)程,其他方式略有差異,總體流程類似。

1. 根據(jù) Topic 找到指定的 TopicPublishInfo

先去本地 map 找,如果沒(méi)有,就去 Namesrv fetch, 如果 Namesrv 里也沒(méi)有,則用默認(rèn)的 Topic 再去 fetch TopicRouteData. 對(duì)用用默認(rèn) Topic 的這種情況,Client 拿到數(shù)據(jù)后,會(huì)去構(gòu)建 TopicPublishInfo, 然后用當(dāng)前的 Topic 作為 key 放到本地 map 里。Broker 在接收到消息的時(shí)候,會(huì)去更新它本地的配置,然后在 registerBroker 的時(shí)候會(huì)去更新 namesrv 中的 TopicRouteData 信息,這樣 Namesrv 中就會(huì)有這樣一份配置了。當(dāng)然,也可以事先在 Namesrv 增加該配置,很多公司內(nèi)部都有這樣定制的平臺(tái)來(lái)管理MQ的接入配置。

public class TopicPublishInfo {
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;
}

public class TopicRouteData {
    private String orderTopicConf;
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

QueueData 定義了這個(gè) read 和 write 的 queue的數(shù)量,Client 在拿到 TopicRouteData 后,會(huì)根據(jù)這里配的數(shù)量去構(gòu)建響應(yīng)數(shù)目的messageQueue,即 messageQueueList. brokerDatas 保存了各個(gè) broker 的相關(guān)信息。

2. 從 messageQueueList 中選擇一個(gè) MessageQueue

如果沒(méi)有 enable latencyFaultTolerance,就用遞增取模的方式選擇。如果 enable 了,在遞增取模的基礎(chǔ)上,再過(guò)濾掉 not available 的。這里所謂的 latencyFaultTolerance, 是指對(duì)之前失敗的,按一定的時(shí)間做退避:

long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

舉個(gè)例子,如果上次請(qǐng)求的 latency 超過(guò) 550L ms, 就退避 3000L ms;超過(guò) 1000L,就退避 60000L.

以上就是 Producer 到 Broker 的簡(jiǎn)單的負(fù)載均衡。

3. 發(fā)送消息

到這一步,我們已經(jīng)拿到了這些關(guān)鍵數(shù)據(jù):

  • Message, 要發(fā)送的消息
  • MessageQueue,這里面包括 topic/brokerName/queueId
  • CommunicationMode, 發(fā)送方式, SYNC/ASYNC/ONEWAY
  • TopicPublishInfo

有了這些數(shù)據(jù),就可以構(gòu)建 RequestHeader 了,大部分字段意思都很明顯(當(dāng)然,前提是對(duì)RocketMQ的源碼有所熟悉),個(gè)別字段見(jiàn)注釋。

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
//系統(tǒng)Flag, 用于判斷走什么邏輯。標(biāo)識(shí)是否壓縮,事務(wù)的不同TYPE(prepare/rollback/commit/not transaction) 等
requestHeader.setSysFlag(sysFlag); 
requestHeader.setBornTimestamp(System.currentTimeMillis());
//消息Flag, 最終會(huì)落地
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
//TODO,暫不知道這個(gè)字段是干嘛用的
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);

最后用這些 header 字段,以及 message body 構(gòu)建 RemotingCommand,通過(guò) remoting 模塊發(fā)給 broker.

4. 處理結(jié)果

  • 發(fā)送成功:直接返回發(fā)送結(jié)果
  • 發(fā)送失敗:如果 enable retryAnotherBrokerWhenNotStoreOK,就會(huì)重試,默認(rèn)重試兩次(retryTimesWhenSendFailed)。否則直接返回結(jié)果
  • 發(fā)送異常:Producer 對(duì)異常做了很好的區(qū)分,如果是 Remoting 和 Client 模塊的異常,就重試,如果是 Broker 模塊的異常,根據(jù)不同的 response code 做不同的處理,有的重試,有的拋出異常,有的返回結(jié)果。

3. Broker端,消息的處理和落地


如圖,Broker 有很多 Processor 用來(lái)處理不同類型的請(qǐng)求,有些 Processor 會(huì)共用一個(gè) Processor 線程池。對(duì)于消息發(fā)送,Broker 的 remoting 模塊在接收到請(qǐng)求后,根據(jù)request code,最終會(huì)交給 SendMessageProcessor 來(lái)處理。SendMessageProcessor 會(huì)依次做以下處理:

  • 做一些校驗(yàn),包括但不限于
    1. broker 是否可寫(xiě)
    2. topic 配置是否存在,如果不存在就新建一個(gè)(createTopicInSendMessageMethod)
    3. 校驗(yàn) queueId 是否超過(guò)指定大小
  • 構(gòu)建 MessageExtBrokerInner
  • 將 MessageExtBrokerInner 交給 Store 處理
  • 處理 Store 返回的結(jié)果,BrokerStatsManager 做一些統(tǒng)計(jì)更新,設(shè)置 Response 中的一些字段并返回。

Store 收到消息后,會(huì)先做一些校驗(yàn),然后交給 commitLog 去 put,然后做些統(tǒng)計(jì)并返回。Store 存儲(chǔ)消息的過(guò)程比較復(fù)雜,后面會(huì)單獨(dú)分析。

4. 其他

1. 順序消息
很多應(yīng)用并不關(guān)注消息順序,而且消息沒(méi)有順序并不代表消息內(nèi)容沒(méi)有順序,合理的系統(tǒng)設(shè)計(jì)可以避免順序問(wèn)題。MQ 要保證消息順序必然會(huì)損失性能、增加系統(tǒng)實(shí)現(xiàn)復(fù)雜度。具體的分析可以看?分布式開(kāi)放消息系統(tǒng)(RocketMQ)的原理與實(shí)踐。

在 RocketMQ 里, 在發(fā)送消息的時(shí)候可以自己定義 MessageQueueSelector,對(duì)于同一個(gè)訂單ID(或其他ID)的不同消息,可以讓它走同一個(gè) MessageQueue,這樣就可以按順序發(fā)給同一個(gè) Broker 了。

2. Batch Message
Producer 的 API 還支持一次發(fā)多個(gè)消息。?

List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));

producer.send(messages);

Client 模塊會(huì)將 Message List 封裝成 MessageBatch,且會(huì)標(biāo)記 requestHeader 的 batch 標(biāo)志位為 true. Broker 在接收到消息后就可以根據(jù)這個(gè)標(biāo)志位去做不同的處理。

5. Reference

  • RocketMQ 原理簡(jiǎn)介
  • 分布式開(kāi)放消息系統(tǒng)(RocketMQ)的原理與實(shí)踐

標(biāo)簽:

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請(qǐng)聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點(diǎn)!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請(qǐng)與原作者聯(lián)系。

上一篇:深入Spring Boot :怎樣排查 java.lang.ArrayStoreException

下一篇:linux 重要日志說(shuō)明