龙空技术网

阅读代码深入原理22——RocketMQ之Producer

iMinusMinus 347

前言:

现时我们对“淘宝tbnet”大概比较关心,兄弟们都需要学习一些“淘宝tbnet”的相关资讯。那么小编同时在网上收集了一些有关“淘宝tbnet””的相关知识,希望朋友们能喜欢,朋友们快快来学习一下吧!

RocketMQ由NameServer、Broker、Producer、Consumer构成,NameServer类似于RPC框架的注册中心,Broker负责存储消息,Producer和Consumer之间没有直接关系,通过Broker交互。整体架构如下图:

MQProducer(和MQConsumer)继承接口MQAdmin(定义创建topic、查看指定MessageQueue的offset、通过offsetId查看消息),支持同步(或异步、单向)发送单条(或批量)的(事务型或非事务性)消息。

除此之外,MQProducer还支持类似RPC的同步调用,即通过request方法发送消息,并在Consumer消费后返回消息。

先看Producer的构造方法和启动:

# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.producer.DefaultMQProducer	// 0. 启动时(或构造此对象前)设置系统属性"rocketmq.namesrv.addr"(或环境变量"NAMESRV_ADDR")    public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,        boolean enableMsgTrace, final String customizedTraceTopic) {        this.namespace = namespace; // 1. 若使用不含namespace的构造函数,则尝试从namesrv地址获取(符合"{schema}://MQ_INST_{1}_{2}.{domain}"则取”MQ_INST_{1}_{2}“)。topic在内部处理时,除系统topic(以"rmq_sys_"开头或个别特定topic如"TBW102",见TopicValidator)外都会添加namespace前缀(如果是重试或死信,则添加到中间位置)        this.producerGroup = producerGroup; // 2. 若使用不含producerGroup的构造函数,则使用默认的”DEFAULT_PRODUCER“        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); // 3. RPCHook用于消息发送请求前后的处理,可能为null。构造DefaultMQProducerImpl时会创建异步发送消息的线程池,队列长度为50000        //if client open the message trace feature        if (enableMsgTrace) {            try {                AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook); // 4.customizedTraceTopic为null时,使用默认的topic:RMQ_SYS_TRACE_TOPIC                dispatcher.setHostProducer(this.getDefaultMQProducerImpl());                traceDispatcher = dispatcher;                this.getDefaultMQProducerImpl().registerSendMessageHook( // 5. 注册发送消息钩子                    new SendMessageTraceHookImpl(traceDispatcher));                this.defaultMQProducerImpl.registerEndTransactionHook( // 6. 注册事务结束钩子                    new EndTransactionTraceHookImpl(traceDispatcher));            } catch (Throwable e) {                log.error("system mqtrace hook init failed ,maybe can't send msg trace data");            }        }    }	    @Override    public void start() throws MQClientException {        this.setProducerGroup(withNamespace(this.producerGroup)); // 7. 重置producerGroup:如果有namespace,或namesrv能推测出namespace,则拼接成"{namespace}%{producerGroup}"格式        this.defaultMQProducerImpl.start(); // 8. 启动消息生产者        if (null != traceDispatcher) {            try {                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); // 9. 启动消息跟踪            } catch (MQClientException e) {                log.warn("trace dispatcher start failed ", e);            }        }    }	

接着我们看下DefaultMQProducerImpl的启动过程:

# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl    public void start() throws MQClientException {        this.start(true);    }    public void start(final boolean startFactory) throws MQClientException {        switch (this.serviceState) {            case CREATE_JUST: // 1. 初始状态                this.serviceState = ServiceState.START_FAILED;                this.checkConfig(); // 2. 检查producerGroup(不能为空,长度应小于255,字符应属于”%|a-zA-Z0-9_-“,且不能是”DEFAULT_PRODUCER“)                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {                    this.defaultMQProducer.changeInstanceNameToPID(); // 3. 实例名取自系统属性”rocketmq.client.name“,若为空则取默认值”DEFAULT“,当producerGroup不是”CLIENT_INNER_PRODUCER“,且默认实例名时重置实例名为进程号与当前时间({pid}#{nanoTime})                }                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); // 4. 以IP、实例名作为clientId(如果设置了unitName则添加unitName),创建MQClientInstance,并保存映射关系到MQClientManager.factoryTable                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); // 5. 将producerGroup与DefaultMQProducerImpl绑定,放到MQClientInstance.producerTable                if (!registerOK) {                    this.serviceState = ServiceState.CREATE_JUST;                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),                        null);                }                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); // 6. 将”TBW102“这个topic放入topicPublishInfoTable                if (startFactory) {                    mQClientFactory.start(); // 7. 启动MQClientInstance                }                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),                    this.defaultMQProducer.isSendMessageWithVIPChannel());                this.serviceState = ServiceState.RUNNING;                break;            case RUNNING:            case START_FAILED:            case SHUTDOWN_ALREADY:                throw new MQClientException("The producer service state not OK, maybe started once, "                    + this.serviceState                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),                    null);            default:                break;        }        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); // 8. 给所有broker发送心跳,心跳包内容为{clientId:,producerGroup:[]},当brokerAddrTable不为空时,给broker发送心跳,并记录broker对应的版本        this.startScheduledTask(); // 9. 使用单线程的定时任务线程池,周期性(间隔1秒)扫描requestFutureTable,将超时的RequestResponseFuture移出,设置异常原因,并执行回调    }

再接着看下MQClientInstance的启动:

# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.factory.MQClientInstance    public void start() throws MQClientException {        synchronized (this) {            switch (this.serviceState) {                case CREATE_JUST:                    this.serviceState = ServiceState.START_FAILED;                    // If not specified,looking address from name server                    if (null == this.clientConfig.getNamesrvAddr()) {                        this.mQClientAPIImpl.fetchNameServerAddr(); // 1. 如果未设置namesrv,则使用HTTP GET方式从指定URL获取。此处应该是淘宝内部使用方式,默认地址为”“。可以通过系统属性修改(参考TopAddressing, MixAll)                    }                    // Start request-response channel                    this.mQClientAPIImpl.start(); // 2. 启动netty,使用rocketmq的NettyEncoder、NettyDecoder来编解码,使用rocketmq的NettyRemotingClient.NettyClientHandler来处理                    // Start various schedule tasks					// 3. 如果未设置namesrv,周期性(每2分钟)获取地址(见步骤1:mQClientAPIImpl.fetchNameServerAddr)。					// 周期性从producerTable获取topic,然后根据topic从namesrv获取TopicRouteData,如果信息变化,则更新brokerAddrTable,以及更新topic相关的TopicPublishInfo					// 周期性(每个心跳间隔)清除brokerAddrTable中下线的broker信息,然后给所有broker发送心跳					// 周期性从consumerTable遍历MQConsumerInner,持久化其offset。(广播模式使用本地文件,集群模式使用远程的broker)					// 周期性(每隔1分钟)调整线程池:遍历consumerTable,根据处理队列里消息数量,增加或减少核心线程数                    this.startScheduledTask();                     // Start pull service                    this.pullMessageService.start(); // 4. 创建一个线程,Runnable对象为PullMessageService,启动线程后会阻塞式获取PullRequest,根据Consumer的consumerGroup,从consumerTable获取MQConsumerInner,拉取消息                    // Start rebalance service                    this.rebalanceService.start(); // 5. 创建线程,Runnable对象为RebalanceService,它会隔一定时间,遍历consumerTable获取MQConsumerInner,根据topic和当前消费者情况以一定策略,调整消费队列,以心跳形式通知到broker                    // Start push service                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false); // 6. 再次执行DefaultMQProducerImpl.start,但不会循环调用MQClientInstance.start                    log.info("the client factory [{}] start OK", this.clientId);                    this.serviceState = ServiceState.RUNNING;                    break;                case START_FAILED:                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);                default:                    break;            }        }    }

最后,我们看下消息发送。

先介绍同步发送:

# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.producer.DefaultMQProducer    @Override    public SendResult send(        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        Validators.checkMessage(msg, this);        msg.setTopic(withNamespace(msg.getTopic())); // 1. 如果有namespace,重置topic        return this.defaultMQProducerImpl.send(msg);    }
# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl    public SendResult send(        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        return send(msg, this.defaultMQProducer.getSendMsgTimeout());    }	    public SendResult send(Message msg,        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);    }	private SendResult sendDefaultImpl( // 1. 异步send最终也是调用此方法,只是在线程池中执行任务,不关心返回值,而参数值CommunicationMode为CommunicationMode.ASYNC。单向发送也是调用此方法,参数值CommunicationMode为CommunicationMode.ONEWAY。        Message msg,        final CommunicationMode communicationMode,        final SendCallback sendCallback,        final long timeout    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        this.makeSureStateOK(); // 2. 检查serviceState是否为RUNNING        Validators.checkMessage(msg, this.defaultMQProducer);        final long invokeID = random.nextLong();        long beginTimestampFirst = System.currentTimeMillis();        long beginTimestampPrev = beginTimestampFirst;        long endTimestamp = beginTimestampFirst;        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); // 3. 从topicPublishInfoTable根据topic得到TopicPublishInfo(为空则先从namesrv获取TopicRouteData,转换成TopicPublishInfo)        if (topicPublishInfo != null && topicPublishInfo.ok()) {            boolean callTimeout = false;            MessageQueue mq = null;            Exception exception = null;            SendResult sendResult = null;            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;            int times = 0;            String[] brokersSent = new String[timesTotal];            for (; times < timesTotal; times++) {                String lastBrokerName = null == mq ? null : mq.getBrokerName();                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 4. 使用mqFaultStrategy根据topic、最近一次使用的broker信息挑选MessageQueue。(一些send方法支持入参传MessageQueue或MessageQueueSelector)                if (mqSelected != null) {                    mq = mqSelected;                    brokersSent[times] = mq.getBrokerName();                    try {                        beginTimestampPrev = System.currentTimeMillis();                        if (times > 0) {                            //Reset topic with namespace during resend.                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));                        }                        long costTime = beginTimestampPrev - beginTimestampFirst;                        if (timeout < costTime) {                            callTimeout = true;                            break;                        }                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); // 5. 核心的消息发送代码,见后面分析                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);                        switch (communicationMode) {                            case ASYNC:                                return null;                            case ONEWAY:                                return null;                            case SYNC:                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 6. 发送状态不是成功时,根据属性决定是否在另一个broker重试(默认值不重试)                                        continue;                                    }                                }                                return sendResult; // 7. 仅同步返回结果                            default:                                break;                        }                    } catch (RemotingException e) { // 8. 指定条件(异常类型,及某些类型的返回码)重试                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                        log.warn(msg.toString());                        exception = e;                        continue;                    } catch (MQClientException e) {                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                        log.warn(msg.toString());                        exception = e;                        continue;                    } catch (MQBrokerException e) {                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                        log.warn(msg.toString());                        exception = e;                        switch (e.getResponseCode()) {                            case ResponseCode.TOPIC_NOT_EXIST:                            case ResponseCode.SERVICE_NOT_AVAILABLE:                            case ResponseCode.SYSTEM_ERROR:                            case ResponseCode.NO_PERMISSION:                            case ResponseCode.NO_BUYER_ID:                            case ResponseCode.NOT_IN_CURRENT_UNIT:                                continue;                            default:                                if (sendResult != null) {                                    return sendResult;                                }                                throw e;                        }                    } catch (InterruptedException e) {                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                        log.warn(msg.toString());                        log.warn("sendKernelImpl exception", e);                        log.warn(msg.toString());                        throw e;                    }                } else {                    break;                }            }            if (sendResult != null) {                return sendResult;            }            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",                times,                System.currentTimeMillis() - beginTimestampFirst,                msg.getTopic(),                Arrays.toString(brokersSent));            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);            MQClientException mqClientException = new MQClientException(info, exception);            if (callTimeout) {                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");            }            if (exception instanceof MQBrokerException) {                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());            } else if (exception instanceof RemotingConnectException) {                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);            } else if (exception instanceof RemotingTimeoutException) {                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);            } else if (exception instanceof MQClientException) {                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);            }            throw mqClientException;        }        validateNameServerSetting();        throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);    }		    private SendResult sendKernelImpl(final Message msg,        final MessageQueue mq,        final CommunicationMode communicationMode,        final SendCallback sendCallback,        final TopicPublishInfo topicPublishInfo,        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        long beginStartTime = System.currentTimeMillis();        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); // 5.1 从MQClientInstance.brokerAddrTable根据brokerName获取主broker地址        if (null == brokerAddr) { // 5.2 重试            tryToFindTopicPublishInfo(mq.getTopic());            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());        }        SendMessageContext context = null;        if (brokerAddr != null) {            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); // 5.3 可能使用VIP的处理(设置系统属性”com.rocketmq.sendMessageWithVIPChannel“才能启用,VIP地址为原端口-2)            byte[] prevBody = msg.getBody();            try {                //for MessageBatch,ID has been set in the generating process                if (!(msg instanceof MessageBatch)) {                    MessageClientIDSetter.setUniqID(msg); // 5.4 非MessageBatch类型的Message,设置属性”UNIQ_KEY“值(包含ip、进程号、ClassLoader哈希值、类加载至今时间、自增计数)                }                boolean topicWithNamespace = false;                if (null != this.mQClientFactory.getClientConfig().getNamespace()) { // 5.5 namespace存在时,设置Message属性”INSTANCE_ID“为namespace                    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());                    topicWithNamespace = true;                }                int sysFlag = 0;                boolean msgBodyCompressed = false;                if (this.tryToCompressMessage(msg)) { // 5.6 当消息类型不是MessageBatch,且消息体长度过大时(默认4k),按一定压缩比(默认5级)压缩消息体,再重设消息体                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG; // 5.7 压缩消息后标记sysFlag                    msgBodyCompressed = true;                }                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; // 5.8 (使用TransactionMQProducer发送的事务型消息)消息属性”TRAN_MSG“被解析成true时,标记sysFlag                }                if (hasCheckForbiddenHook()) { // 5.9 如果存在CheckForbiddenHook(需用户注册,默认不存在),执行CheckForbiddenHook.checkForbidden                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());                    checkForbiddenContext.setCommunicationMode(communicationMode);                    checkForbiddenContext.setBrokerAddr(brokerAddr);                    checkForbiddenContext.setMessage(msg);                    checkForbiddenContext.setMq(mq);                    checkForbiddenContext.setUnitMode(this.isUnitMode());                    this.executeCheckForbiddenHook(checkForbiddenContext);                }                if (this.hasSendMessageHook()) { // 5.10 SendMessageHook存在时(如果允许trace,则会有SendMessageTraceHookImpl),执行SendMessageHook.sendMessageBefore                    context = new SendMessageContext();                    context.setProducer(this);                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());                    context.setCommunicationMode(communicationMode);                    context.setBornHost(this.defaultMQProducer.getClientIP());                    context.setBrokerAddr(brokerAddr);                    context.setMessage(msg);                    context.setMq(mq);                    context.setNamespace(this.defaultMQProducer.getNamespace());                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);                    if (isTrans != null && isTrans.equals("true")) {                        context.setMsgType(MessageType.Trans_Msg_Half);                    }                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {                        context.setMsgType(MessageType.Delay_Msg);                    }                    this.executeSendMessageHookBefore(context);                }                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());                requestHeader.setTopic(msg.getTopic());                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());                requestHeader.setQueueId(mq.getQueueId());                requestHeader.setSysFlag(sysFlag);                requestHeader.setBornTimestamp(System.currentTimeMillis());                requestHeader.setFlag(msg.getFlag());                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));                requestHeader.setReconsumeTimes(0);                requestHeader.setUnitMode(this.isUnitMode());                requestHeader.setBatch(msg instanceof MessageBatch);                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // 5.11 topic以”%RETRY%“开头,重置SendMessageRequestHeader的消费次数,清除消息的”RECONSUME_TIME“属性                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);                    if (reconsumeTimes != null) {                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);                    }                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);                    if (maxReconsumeTimes != null) {                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);                    }                }                SendResult sendResult = null;                switch (communicationMode) {                    case ASYNC: // 5.12 异步消息topic去除namespace,发送的消息体使用可能压缩的内容,但原Message.body需还原成压缩前                        Message tmpMessage = msg;                        boolean messageCloned = false;                        if (msgBodyCompressed) {                            //If msg body was compressed, msgbody should be reset using prevBody.                            //Clone new message using commpressed message body and recover origin massage.                            //Fix bug:                            tmpMessage = MessageAccessor.cloneMessage(msg);                            messageCloned = true;                            msg.setBody(prevBody);                        }                        if (topicWithNamespace) {                            if (!messageCloned) {                                tmpMessage = MessageAccessor.cloneMessage(msg);                                messageCloned = true;                            }                            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));                        }                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;                        if (timeout < costTimeAsync) {                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");                        }                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(                            brokerAddr,                            mq.getBrokerName(),                            tmpMessage,                            requestHeader,                            timeout - costTimeAsync,                            communicationMode,                            sendCallback,                            topicPublishInfo,                            this.mQClientFactory,                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),                            context,                            this);                        break;                    case ONEWAY:                    case SYNC:                        long costTimeSync = System.currentTimeMillis() - beginStartTime;                        if (timeout < costTimeSync) {                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");                        }                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( // 5.13 同步发送消息                            brokerAddr,                            mq.getBrokerName(),                            msg,                            requestHeader,                            timeout - costTimeSync,                            communicationMode,                            context,                            this);                        break;                    default:                        assert false;                        break;                }                if (this.hasSendMessageHook()) {                    context.setSendResult(sendResult);                    this.executeSendMessageHookAfter(context); // 5.14 SendMessageHook存在时(如果允许trace,则会有SendMessageTraceHookImpl),执行SendMessageHook.sendMessageAfter                }                return sendResult;            } catch (RemotingException e) {                if (this.hasSendMessageHook()) {                    context.setException(e);                    this.executeSendMessageHookAfter(context);                }                throw e;            } catch (MQBrokerException e) {                if (this.hasSendMessageHook()) {                    context.setException(e);                    this.executeSendMessageHookAfter(context);                }                throw e;            } catch (InterruptedException e) {                if (this.hasSendMessageHook()) {                    context.setException(e);                    this.executeSendMessageHookAfter(context);                }                throw e;            } finally { // 5.15 还原body为未压缩状态,重置topic为不含namespace                msg.setBody(prevBody);                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));            }        }        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);    }	

再跟进去的发送代码:

# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.MQClientAPIImpl    public SendResult sendMessage( // 1. 同步发送        final String addr,        final String brokerName,        final Message msg,        final SendMessageRequestHeader requestHeader,        final long timeoutMillis,        final CommunicationMode communicationMode,        final SendMessageContext context,        final DefaultMQProducerImpl producer    ) throws RemotingException, MQBrokerException, InterruptedException {        return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);    }    public SendResult sendMessage( // 2. 同步/异步发送        final String addr,        final String brokerName,        final Message msg,        final SendMessageRequestHeader requestHeader,        final long timeoutMillis,        final CommunicationMode communicationMode,        final SendCallback sendCallback,        final TopicPublishInfo topicPublishInfo,        final MQClientInstance instance,        final int retryTimesWhenSendFailed,        final SendMessageContext context,        final DefaultMQProducerImpl producer    ) throws RemotingException, MQBrokerException, InterruptedException {        long beginStartTime = System.currentTimeMillis();        RemotingCommand request = null;        String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE); // 3. 默认未设置,模拟RPC时可通过MessageUtil.createReplyMessage来设置        boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);        if (isReply) {            if (sendSmartMsg) {                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);            } else {                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);            }        } else {            if (sendSmartMsg || msg instanceof MessageBatch) { // 4. 创建RemotingCommand(sendSmartMsg根据系统属性""org.apache.rocketmq.client.sendSmartMsg"来定,默认为true)                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);                request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);            } else {                request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);            }        }        request.setBody(msg.getBody());        switch (communicationMode) {            case ONEWAY:                this.remotingClient.invokeOneway(addr, request, timeoutMillis);                return null;            case ASYNC:                final AtomicInteger times = new AtomicInteger();                long costTimeAsync = System.currentTimeMillis() - beginStartTime;                if (timeoutMillis < costTimeAsync) {                    throw new RemotingTooMuchRequestException("sendMessage call timeout");                }                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,                    retryTimesWhenSendFailed, times, context, producer);                return null;            case SYNC:                long costTimeSync = System.currentTimeMillis() - beginStartTime;                if (timeoutMillis < costTimeSync) {                    throw new RemotingTooMuchRequestException("sendMessage call timeout");                }                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); // 5. 调用NettyRemotingClient的invokeSync方法(执行前调RPCHook.doBeforeRequest,执行后调用RPCHook.doAfterResponse),然后处理结果            default:                assert false;                break;        }        return null;    }

单向只会在执行前调RPCHook.doBeforeRequest,然后获取单向的信号量才能写消息。

异步调用只会在执行前调RPCHook.doBeforeRequest,也需要获取信号量,还有InvokeCallback,在此时触发SendMessageHook和SendCallback。

然后看下事务型消息的处理:

# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl    public TransactionSendResult sendMessageInTransaction(final Message msg,        final LocalTransactionExecuter localTransactionExecuter, final Object arg)        throws MQClientException {        TransactionListener transactionListener = getCheckListener();        if (null == localTransactionExecuter && null == transactionListener) {            throw new MQClientException("tranExecutor is null", null);        }        // ignore DelayTimeLevel parameter        if (msg.getDelayTimeLevel() != 0) {            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);        }        Validators.checkMessage(msg, this.defaultMQProducer);        SendResult sendResult = null;        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); // 1. 设置消息属性”TRAN_MSG“、”PGROUP“        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());        try {            sendResult = this.send(msg); // 2. 发送消息(见之前普通的发送代码)        } catch (Exception e) {            throw new MQClientException("send message Exception", e);        }        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;        Throwable localException = null;        switch (sendResult.getSendStatus()) {            case SEND_OK: {                try {                    if (sendResult.getTransactionId() != null) {                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());                    }                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);                    if (null != transactionId && !"".equals(transactionId)) {                        msg.setTransactionId(transactionId);                    }                    if (null != localTransactionExecuter) { // 2. 使用LocalTransactionExecuter或TransactionListener来执行本地事务                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);                    } else if (transactionListener != null) {                        log.debug("Used new transaction API");                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);                    }                    if (null == localTransactionState) {                        localTransactionState = LocalTransactionState.UNKNOW;                    }                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {                        log.info("executeLocalTransactionBranch return {}", localTransactionState);                        log.info(msg.toString());                    }                } catch (Throwable e) {                    log.info("executeLocalTransactionBranch exception", e);                    log.info(msg.toString());                    localException = e;                }            }            break;            case FLUSH_DISK_TIMEOUT:            case FLUSH_SLAVE_TIMEOUT:            case SLAVE_NOT_AVAILABLE:                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;                break;            default:                break;        }        try {            this.endTransaction(msg, sendResult, localTransactionState, localException); // 3. 结束事务:(EndTransactionHook如果存在)则先执行EndTransactionHook.endTransaction,然后发送单向消息进行提交(半消息发送成功且本地事务成功)或回滚        } catch (Exception e) {            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);        }        TransactionSendResult transactionSendResult = new TransactionSendResult();        transactionSendResult.setSendStatus(sendResult.getSendStatus());        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());        transactionSendResult.setMsgId(sendResult.getMsgId());        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());        transactionSendResult.setTransactionId(sendResult.getTransactionId());        transactionSendResult.setLocalTransactionState(localTransactionState);        return transactionSendResult;    }

再看下同步RPC如何模拟:

# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl    public Message request(Message msg,        long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {        long beginTimestamp = System.currentTimeMillis();        prepareSendRequest(msg, timeout); // 1. 以UUID作为关联id给消息属性”CORRELATION_ID“,以clientId给消息属性”REPLY_TO_CLIENT“,以超时时间作为消息属性”TTL“        final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);        try {            final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);            RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);            long cost = System.currentTimeMillis() - beginTimestamp;            this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() { // 2. 发送消息                @Override                public void onSuccess(SendResult sendResult) {                    requestResponseFuture.setSendRequestOk(true);                }                @Override                public void onException(Throwable e) {                    requestResponseFuture.setSendRequestOk(false);                    requestResponseFuture.putResponseMessage(null);                    requestResponseFuture.setCause(e);                }            }, timeout - cost);            return waitResponse(msg, timeout, requestResponseFuture, cost); // 3. 等待Future结果        } finally {            RequestFutureTable.getRequestFutureTable().remove(correlationId);        }    }

从客户端代码无法了解发送消息的客户端如何接收响应的消息,即一个ProducerGroup下有多个实例,在request模式下,只能是发起request请求的Producer能收到Message,具体如何处理,需要后续分析Broker相关代码了。

标签: #淘宝tbnet