

大数据从业者FelixZh 149







exec $(dirname$0)/kafka-run-class.sh kafka.tools.ProducerPerformance $@
def main(args: Array[String]) {    val logger = Logger.getLogger(getClass)    val config = new ProducerPerfConfig(args)    if (!config.isFixedSize)      logger.info("WARN: Throughput will be slower due to changing message size per request")    val totalBytesSent = new AtomicLong(0)    val totalMessagesSent = new AtomicLong(0)    val executor = Executors.newFixedThreadPool(config.numThreads)    val allDone = new CountDownLatch(config.numThreads)    val startMs = System.currentTimeMillis    val rand = new java.util.Random    if (!config.hideHeader)      println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " +        "total.data.sent.in.nMsg, nMsg.sec")    for (i <- 0 until config.numThreads) {      executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand))    }    allDone.await()    val endMs = System.currentTimeMillis    val elapsedSecs = (endMs - startMs) / 1000.0    val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024)    println("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f".format(      config.dateFormat.format(startMs), config.dateFormat.format(endMs),      config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent,      totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs))    System.exit(0)  }



exec $(dirname$0)/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance"$@"
Properties props = readProps(producerProps, producerConfig, transactionalId, transactionsEnabled);            KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props);            if (transactionsEnabled)                producer.initTransactions();            /* setup perf test */            byte[] payload = null;            if (recordSize != null) {                payload = new byte[recordSize];            }            Random random = new Random(0);            ProducerRecord<byte[], byte[]> record;            Stats stats = new Stats(numRecords, 5000);            long startMs = System.currentTimeMillis();            ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);            int currentTransactionSize = 0;            long transactionStartTime = 0;            for (long i = 0; i < numRecords; i++) {                payload = generateRandomPayload(recordSize, payloadByteList, payload, random);                if (transactionsEnabled && currentTransactionSize == 0) {                    producer.beginTransaction();                    transactionStartTime = System.currentTimeMillis();                }                record = new ProducerRecord<>(topicName, payload);                long sendStartMs = System.currentTimeMillis();                Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);                producer.send(record, cb);                currentTransactionSize++;                if (transactionsEnabled && transactionDurationMs <= (sendStartMs - transactionStartTime)) {                    producer.commitTransaction();                    currentTransactionSize = 0;                }                if (throttler.shouldThrottle(i, sendStartMs)) {                    throttler.throttle();                }            }            if (transactionsEnabled && currentTransactionSize != 0)                producer.commitTransaction();            if (!shouldPrintMetrics) {                producer.close();                /* print final results */                stats.printTotal();            } else {                // Make sure all messages are sent before printing out the stats and the metrics                // We need to do this in a different branch for now since tests/kafkatest/sanity_checks/test_performance_services.py                // expects this class to work with older versions of the client jar that don't support flush().                producer.flush();                /* print final results */                stats.printTotal();                /* print out metrics */                ToolsUtils.printMetrics(producer.metrics());                producer.close();            }



 * A Kafka client that publishes records to the Kafka cluster. * <P> * The producer is <i>thread safe</i> and sharing a single producer instance across threads will generally be faster than * having multiple instances. * <p>

标签: #kafka性能测试内置工具