龙空技术网

一次POC测试引出:探究Kafka官方性能压测工具的演变

大数据从业者FelixZh 146

前言:

现在同学们对“kafka性能测试内置工具”大约比较关注,朋友们都想要学习一些“kafka性能测试内置工具”的相关内容。那么小编同时在网摘上搜集了一些对于“kafka性能测试内置工具””的相关资讯,希望看官们能喜欢,各位老铁们快快来了解一下吧!

前言

测试同学到客户局点现场对大数据平台众多组件及上层业务平台进行POC测试,测试Kafka时候,使用的是Kafka官方性能压测工具kafka-producer-perf-test.sh。客户提出分别测试不同线程数对应的写性能,测试同学发现瞬间懵逼了,因为发现当前版本Kafka自带kafka-producer-perf-test.sh不支持设置线程数了(早期版本是支持的)!!

剖析kafka-producer-perf-test.sh

0.9.0之前版本:scala实现,支持多线程

exec $(dirname$0)/kafka-run-class.sh kafka.tools.ProducerPerformance $@
def main(args: Array[String]) {    val logger = Logger.getLogger(getClass)    val config = new ProducerPerfConfig(args)    if (!config.isFixedSize)      logger.info("WARN: Throughput will be slower due to changing message size per request")    val totalBytesSent = new AtomicLong(0)    val totalMessagesSent = new AtomicLong(0)    val executor = Executors.newFixedThreadPool(config.numThreads)    val allDone = new CountDownLatch(config.numThreads)    val startMs = System.currentTimeMillis    val rand = new java.util.Random    if (!config.hideHeader)      println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " +        "total.data.sent.in.nMsg, nMsg.sec")    for (i <- 0 until config.numThreads) {      executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand))    }    allDone.await()    val endMs = System.currentTimeMillis    val elapsedSecs = (endMs - startMs) / 1000.0    val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024)    println("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f".format(      config.dateFormat.format(startMs), config.dateFormat.format(endMs),      config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent,      totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs))    System.exit(0)  }

说明:毫无疑问,多线程少不了线程池。这里的实现方式也是一种很经典的多线程案例。将测试数据集均分给每个线程。线程池使用Executors.newFixedThreadPool结合AtomicLong类型的条数和字节数确保线程安全,结合CountDownLatch确保所有线程处理完成。这种实现方案在日常开发工作中很是普遍。

0.9.0之后版本:Java实现,不支持多线程

exec $(dirname$0)/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance"$@"
Properties props = readProps(producerProps, producerConfig, transactionalId, transactionsEnabled);            KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props);            if (transactionsEnabled)                producer.initTransactions();            /* setup perf test */            byte[] payload = null;            if (recordSize != null) {                payload = new byte[recordSize];            }            Random random = new Random(0);            ProducerRecord<byte[], byte[]> record;            Stats stats = new Stats(numRecords, 5000);            long startMs = System.currentTimeMillis();            ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);            int currentTransactionSize = 0;            long transactionStartTime = 0;            for (long i = 0; i < numRecords; i++) {                payload = generateRandomPayload(recordSize, payloadByteList, payload, random);                if (transactionsEnabled && currentTransactionSize == 0) {                    producer.beginTransaction();                    transactionStartTime = System.currentTimeMillis();                }                record = new ProducerRecord<>(topicName, payload);                long sendStartMs = System.currentTimeMillis();                Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);                producer.send(record, cb);                currentTransactionSize++;                if (transactionsEnabled && transactionDurationMs <= (sendStartMs - transactionStartTime)) {                    producer.commitTransaction();                    currentTransactionSize = 0;                }                if (throttler.shouldThrottle(i, sendStartMs)) {                    throttler.throttle();                }            }            if (transactionsEnabled && currentTransactionSize != 0)                producer.commitTransaction();            if (!shouldPrintMetrics) {                producer.close();                /* print final results */                stats.printTotal();            } else {                // Make sure all messages are sent before printing out the stats and the metrics                // We need to do this in a different branch for now since tests/kafkatest/sanity_checks/test_performance_services.py                // expects this class to work with older versions of the client jar that don't support flush().                producer.flush();                /* print final results */                stats.printTotal();                /* print out metrics */                ToolsUtils.printMetrics(producer.metrics());                producer.close();            }

说明:新版本的实现改用Java语言的同时,剔除了多线程的实现逻辑,增加了事务transaction的支持。

此外,KafkaProducer源码中有段描述指出producer是线程安全的,多线程共享同一个producer实例通常为每个线程独占一个producer实例send更快。

 * A Kafka client that publishes records to the Kafka cluster. * <P> * The producer is <i>thread safe</i> and sharing a single producer instance across threads will generally be faster than * having multiple instances. * <p>

标签: #kafka性能测试内置工具