龙空技术网

聊聊rocketmq的enableMsgTrace

码匠乱炖 164

前言:

而今朋友们对“apache2traceenable”大体比较着重,姐妹们都需要了解一些“apache2traceenable”的相关知识。那么小编也在网络上网罗了一些对于“apache2traceenable””的相关资讯,希望我们能喜欢,姐妹们快快来了解一下吧!

本文主要研究一下rocketmq的enableMsgTrace

enableMsgTrace

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.java

public class DefaultMQProducer extends ClientConfig implements MQProducer {​ private final InternalLogger log = ClientLogger.getLog();​ //......​ public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) { this.namespace = namespace; this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); //if client open the message trace feature if (enableMsgTrace) { try { AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); traceDispatcher = dispatcher; this.getDefaultMQProducerImpl().registerSendMessageHook( new SendMessageTraceHookImpl(traceDispatcher)); } catch (Throwable e) { log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); } } }​ //......}
DefaultMQProducer的构造器在enableMsgTrace为true时会创建AsyncTraceDispatcher,再创建SendMessageTraceHookImpl,然后执行getDefaultMQProducerImpl().registerSendMessageHook

SendMessageHook

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/hook/SendMessageHook.java

public interface SendMessageHook { String hookName();​ void sendMessageBefore(final SendMessageContext context);​ void sendMessageAfter(final SendMessageContext context);}
SendMessageHook定义了hookName、sendMessageBefore、sendMessageAfter方法

SendMessageTraceHookImpl

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java

public class SendMessageTraceHookImpl implements SendMessageHook {​ private TraceDispatcher localDispatcher;​ public SendMessageTraceHookImpl(TraceDispatcher localDispatcher) { this.localDispatcher = localDispatcher; }​ @Override public String hookName() { return "SendMessageTraceHook"; }​ @Override public void sendMessageBefore(SendMessageContext context) { //if it is message trace data,then it doesn't recorded if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) { return; } //build the context content of TuxeTraceContext TraceContext tuxeContext = new TraceContext(); tuxeContext.setTraceBeans(new ArrayList(1)); context.setMqTraceContext(tuxeContext); tuxeContext.setTraceType(TraceType.Pub); tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup())); //build the data bean object of message trace TraceBean traceBean = new TraceBean(); traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic())); traceBean.setTags(context.getMessage().getTags()); traceBean.setKeys(context.getMessage().getKeys()); traceBean.setStoreHost(context.getBrokerAddr()); traceBean.setBodyLength(context.getMessage().getBody().length); traceBean.setMsgType(context.getMsgType()); tuxeContext.getTraceBeans().add(traceBean); }​ @Override public void sendMessageAfter(SendMessageContext context) { //if it is message trace data,then it doesn't recorded if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName()) || context.getMqTraceContext() == null) { return; } if (context.getSendResult() == null) { return; }​ if (context.getSendResult().getRegionId() == null || !context.getSendResult().isTraceOn()) { // if switch is false,skip it return; }​ TraceContext tuxeContext = (TraceContext) context.getMqTraceContext(); TraceBean traceBean = tuxeContext.getTraceBeans().get(0); int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size()); tuxeContext.setCostTime(costTime); if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) { tuxeContext.setSuccess(true); } else { tuxeContext.setSuccess(false); } tuxeContext.setRegionId(context.getSendResult().getRegionId()); traceBean.setMsgId(context.getSendResult().getMsgId()); traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId()); traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2); localDispatcher.append(tuxeContext); }}
SendMessageTraceHookImpl实现了SendMessageHook接口,其构造器接收TraceDispatcher参数;其hookName为SendMessageTraceHook其sendMessageBefore方法会判断topic名是否是AsyncTraceDispatcher要trace的,不是则返回,是的话则构造TraceContext,添加TraceBean其sendMessageAfter方法会判断topic名是否是AsyncTraceDispatcher要trace的,不是则返回,是的话则判断context.getSendResult()是否为null,是则返回,不是接着判断context.getSendResult().getRegionId()为null或者context.getSendResult().isTraceOn()为false则返回;最后获取TraceContext,更新TraceBean,然后将TraceContext追加到TraceDispatcher

TraceDispatcher

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/TraceDispatcher.java

public interface TraceDispatcher {​ /** * Initialize asynchronous transfer data module */ void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;​ /** * Append the transfering data * @param ctx data infomation * @return */ boolean append(Object ctx);​ /** * Write flush action * * @throws IOException */ void flush() throws IOException;​ /** * Close the trace Hook */ void shutdown();}
TraceDispatcher接口定义了start、append、flush、shutdown方法

AsyncTraceDispatcher

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java

public class AsyncTraceDispatcher implements TraceDispatcher {​ private final static InternalLogger log = ClientLogger.getLog(); private final int queueSize; private final int batchSize; private final int maxMsgSize; private final DefaultMQProducer traceProducer; private final ThreadPoolExecutor traceExecutor; // The last discard number of log private AtomicLong discardCount; private Thread worker; private ArrayBlockingQueue traceContextQueue; private ArrayBlockingQueue appenderQueue; private volatile Thread shutDownHook; private volatile boolean stopped = false; private DefaultMQProducerImpl hostProducer; private DefaultMQPushConsumerImpl hostConsumer; private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private String dispatcherId = UUID.randomUUID().toString(); private String traceTopicName; private AtomicBoolean isStarted = new AtomicBoolean(false); private AccessChannel accessChannel = AccessChannel.LOCAL;​ public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) { // queueSize is greater than or equal to the n power of 2 of value this.queueSize = 2048; this.batchSize = 100; this.maxMsgSize = 128000; this.discardCount = new AtomicLong(0L); this.traceContextQueue = new ArrayBlockingQueue(1024); this.appenderQueue = new ArrayBlockingQueue(queueSize); if (!UtilAll.isBlank(traceTopicName)) { this.traceTopicName = traceTopicName; } else { this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; } this.traceExecutor = new ThreadPoolExecutor(// 10, // 20, // 1000 * 60, // TimeUnit.MILLISECONDS, // this.appenderQueue, // new ThreadFactoryImpl("MQTraceSendThread_")); traceProducer = getAndCreateTraceProducer(rpcHook); }​ public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException { if (isStarted.compareAndSet(false, true)) { traceProducer.setNamesrvAddr(nameSrvAddr); traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr); traceProducer.start(); } this.accessChannel = accessChannel; this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); this.worker.setDaemon(true); this.worker.start(); this.registerShutDownHook(); }​ private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) { DefaultMQProducer traceProducerInstance = this.traceProducer; if (traceProducerInstance == null) { traceProducerInstance = new DefaultMQProducer(rpcHook); traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME); traceProducerInstance.setSendMsgTimeout(5000); traceProducerInstance.setVipChannelEnabled(false); // The max size of message is 128K traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000); } return traceProducerInstance; }​ @Override public boolean append(final Object ctx) { boolean result = traceContextQueue.offer((TraceContext) ctx); if (!result) { log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx); } return result; }​ @Override public void flush() throws IOException { // The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return. long end = System.currentTimeMillis() + 500; while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis()  0) { AsyncAppenderRequest request = new AsyncAppenderRequest(contexts); traceExecutor.submit(request); } else if (AsyncTraceDispatcher.this.stopped) { this.stopped = true; } }​ } }
AsyncRunnable实现了Runnable接口,其run方法按batchSize循环从traceContextQueue拉取元素添加到contexts;接着创建AsyncAppenderRequest提交traceExecutor中

AsyncAppenderRequest

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java

 class AsyncAppenderRequest implements Runnable { List contextList;​ public AsyncAppenderRequest(final List contextList) { if (contextList != null) { this.contextList = contextList; } else { this.contextList = new ArrayList(1); } }​ @Override public void run() { sendTraceData(contextList); }​ public void sendTraceData(List contextList) { Map transBeanMap = new HashMap(); for (TraceContext context : contextList) { if (context.getTraceBeans().isEmpty()) { continue; } // Topic value corresponding to original message entity content String topic = context.getTraceBeans().get(0).getTopic(); String regionId = context.getRegionId(); // Use original message entity's topic as key String key = topic; if (!StringUtils.isBlank(regionId)) { key = key + TraceConstants.CONTENT_SPLITOR + regionId; } List transBeanList = transBeanMap.get(key); if (transBeanList == null) { transBeanList = new ArrayList(); transBeanMap.put(key, transBeanList); } TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context); transBeanList.add(traceData); } for (Map.Entry entry : transBeanMap.entrySet()) { String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR)); String dataTopic = entry.getKey(); String regionId = null; if (key.length > 1) { dataTopic = key[0]; regionId = key[1]; } flushData(entry.getValue(), dataTopic, regionId); } }​ /** * Batch sending data actually */ private void flushData(List transBeanList, String dataTopic, String regionId) { if (transBeanList.size() == 0) { return; } // Temporary buffer StringBuilder buffer = new StringBuilder(1024); int count = 0; Set keySet = new HashSet();​ for (TraceTransferBean bean : transBeanList) { // Keyset of message trace includes msgId of or original message keySet.addAll(bean.getTransKey()); buffer.append(bean.getTransData()); count++; // Ensure that the size of the package should not exceed the upper limit. if (buffer.length() >= traceProducer.getMaxMessageSize()) { sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId); // Clear temporary buffer after finishing buffer.delete(0, buffer.length()); keySet.clear(); count = 0; } } if (count > 0) { sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId); } transBeanList.clear(); }​ /** * Send message trace data * * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId) * @param data the message trace data in this batch */ private void sendTraceDataByMQ(Set keySet, final String data, String dataTopic, String regionId) { String traceTopic = traceTopicName; if (AccessChannel.CLOUD == accessChannel) { traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId; } final Message message = new Message(traceTopic, data.getBytes()); // Keyset of message trace includes msgId of or original message message.setKeys(keySet); try { Set traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), traceTopic); SendCallback callback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) {​ }​ @Override public void onException(Throwable e) { log.info("send trace data ,the traceData is " + data); } }; if (traceBrokerSet.isEmpty()) { // No cross set traceProducer.send(message, callback, 5000); } else { traceProducer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List mqs, Message msg, Object arg) { Set brokerSet = (Set) arg; List filterMqs = new ArrayList(); for (MessageQueue queue : mqs) { if (brokerSet.contains(queue.getBrokerName())) { filterMqs.add(queue); } } int index = sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % filterMqs.size(); if (pos < 0) { pos = 0; } return filterMqs.get(pos); } }, traceBrokerSet, callback); }​ } catch (Exception e) { log.info("send trace data,the traceData is" + data); } }​ private Set tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) { Set brokerSet = new HashSet(); TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { producer.getTopicPublishInfoTable().putIfAbsent(topic, new TopicPublishInfo()); producer.getmQClientFactory().updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = producer.getTopicPublishInfoTable().get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { for (MessageQueue queue : topicPublishInfo.getMessageQueueList()) { brokerSet.add(queue.getBrokerName()); } } return brokerSet; } }
AsyncAppenderRequest实现了Runnable接口,其run方法执行sendTraceData;该方法遍历contextList,将TraceTransferBean按topic归类到transBeanMap;之后遍历transBeanMap,执行flushData;flushData方法遍历transBeanList,将transData添加到StringBuilder,如果buffer大小大于等于traceProducer.getMaxMessageSize()则执行sendTraceDataByMQ,并重置count,遍历完之后再次判断count是否大于0,是则再次执行sendTraceDataByMQ方法sendTraceDataByMQ方法首先通过tryGetMessageQueueBrokerSet获取traceBrokerSet,如果traceBrokerSet为空则执行traceProducer.send(message, callback, 5000),否则创建MessageQueueSelector再执行send方法

小结

DefaultMQProducer的构造器在enableMsgTrace为true时会创建AsyncTraceDispatcher,再创建SendMessageTraceHookImpl,然后执行getDefaultMQProducerImpl().registerSendMessageHook

doc

DefaultMQProducer

标签: #apache2traceenable