前言:
现时兄弟们对“po5net”大体比较看重,同学们都想要学习一些“po5net”的相关知识。那么小编也在网上汇集了一些关于“po5net””的相关资讯,希望大家能喜欢,你们快快来学习一下吧!1.数据源利用Python代码模拟生成,生成的数据如下:
* id|brand|product|category|cost|price|quantity|province|timestamp * --------------------------------------------------------------------------- * 1|金龙鱼|金龙鱼花生油|生活用品|52|75|64|云南|1593536348 * 2|金龙鱼|金龙鱼菜籽油|生活用品|48|62|16|河北|1593536348 * 3|金龙鱼|金龙鱼玉米油|生活用品|34|53|72|河北|1593536348生成数据的部分代码如下, 如果你需要完整代码,请关注我并私信"销售订单"
def produce_orders(file_name, num): provinces = ( "江苏", "浙江", "安徽", "江西", "山东", "山西", "河南", "河北", "湖北", "湖南", "广东", "广西", "陕西", "甘肃", "福建", "四川", "黑龙将", "辽宁", "吉林", "北京", "上海", "天津", "重庆", "海南", "新疆", "云南", "贵州", "宁夏", "西藏", "香港", "澳门", "台湾", "内蒙古") products = ( ("金龙鱼花生油", "生活用品", 52, 75, "金龙鱼"), ("金龙鱼植物油", "生活用品", 58, 80, "金龙鱼"), ("金龙鱼大豆油", "生活用品", 54, 73, "金龙鱼"),................................. ("西湖龙井", "茶叶", 22, 49, "龙井"), ("信阳毛尖", "茶叶", 13, 25, "信阳毛尖"), ("溧阳白茶", "茶叶", 23, 59, "溧阳白茶"), ("太平猴魁", "茶叶", 30, 69, "太平猴魁"), ("安溪铁观音", "茶叶", 26, 43, "铁观音"), ("黄山毛峰", "茶叶", 15, 24, "毛峰") ) with open(file_name, mode='a', encoding='utf-8') as f: counter = 0 for i in range(num): for j in range(len(products)): counter += 1 province = provinces[random.randint(0, len(provinces) - 1)] product_name = products[j][0] category = products[j][1] cost = products[j][2] price = products[j][3] brand = products[j][4] quantity = random.randint(10, 100) timestamp = int(time.time()) record = str(counter) + "|" + brand + "|" + product_name + "|" + str(category) + "|"+ str(cost) + "|"+ str(price) + "|" + str( quantity) + "|" + province + "|" + str(timestamp) f.write(record + "\n") time.sleep(0.0001)数据集生成好了利用python代码自动上传HDFS
def write_to_hdfs(file_name): # 利用pyhdfs连接HDFS集群,用于上传生成的数据 fs = pyhdfs.HdfsClient(hosts='node01:9870,node02:9870', randomize_hosts=False, user_name='hadoop', timeout=1000, max_tries=1000) # HDFS要存放数据的目录 hdfs_file_dir = '/user/hadoop/datasets/orders/' # 如果该目录不存在,则创建 if not fs.exists(hdfs_file_dir): fs.mkdirs(hdfs_file_dir) # 上传文件, src为本地生成的文件路径,dest为hdfs上要存放的路径 src = os.getcwd() + "/" + file_name dest = hdfs_file_dir + file_name # 从本地上传文件到hdfs指定目录 fs.copy_from_local(src, dest)2.Spark销售数据分析
由于目前我的文章只更新到了SparkCore部分,所以,现在的分析都是基于SparkCore提供的算子进行分析,在更新完SparkSql和SparkStreaming后会有更加方便的方法来分析这部分数据,后面会慢慢更新这部分内容。
为了清楚地表达代码的逻辑,分析这部分我使用了scala语言,后续也会整合python版本和java版本的。
计算销售额排名的前10的省份
/** * 1.计算销售额排名的前10的省份 * * @param productOrderCached */ def computeTop10ProvinceSales(productOrderCached: RDD[ProductOrder]): Unit = { //把province和grossProfit组成K,V格式的RDD val provinceAndGrossProfit: RDD[(String, Float)] = productOrderCached.map(po => { (po.province, po.grossProfit) }) //按照省份进行聚合 val reducedProvinceAndGrossProfit: RDD[(String, Float)] = provinceAndGrossProfit.reduceByKey(_ + _) //按照聚合好的grossProfit进行倒排序 val sortedProvinceAndGrossProfit: RDD[(String, Float)] = reducedProvinceAndGrossProfit.sortBy(tup => tup._2, false) //取出grossProfit前10省份的销量 val results: Array[(String, Float)] = sortedProvinceAndGrossProfit.take(10) //打印结果,一般是生成json格式文件或者直接进入下游(比如,hdfs, mysql, kafka等) results.foreach(println) }计算销售额排名的前5的种类
/** * 2.计算销售额排名的前5的种类 */ def computeTop5CategorySales(productOrderCached: RDD[ProductOrder]) = { //把category和grossProfit组成K,V格式的RDD val categoryAndGrossProfit: RDD[(String, Float)] = productOrderCached.map(po => { (po.category, po.grossProfit) }) //按照category进行聚合 val reducedCategoryAndGrossProfit: RDD[(String, Float)] = categoryAndGrossProfit.reduceByKey(_ + _) //按照聚合好的grossProfit进行倒排序 val sortedCategoryAndGrossProfit: RDD[(String, Float)] = reducedCategoryAndGrossProfit.sortBy(tup => tup._2, false) //取出grossProfit前10的category的销量 val results: Array[(String, Float)] = sortedCategoryAndGrossProfit.take(5) //打印结果,一般是生成json格式文件或者直接进入下游(比如,hdfs, mysql, kafka等) results.foreach(println) }计算最畅销的5个品牌
/** * 3.计算最畅销的5个品牌 */ def computeTop5Brand(productOrderCached: RDD[ProductOrder]) = { //对每条销售订单的brand计数1,并组成K,V格式的RDD val brandAndOne: RDD[(String, Int)] = productOrderCached.map(po => { (po.brand, 1) }) //按照brand进行聚合 val reducedBrandCounts: RDD[(String, Int)] = brandAndOne.reduceByKey(_ + _) //按照brand的计数进行倒排序 val sortedBrandAndCounts: RDD[(String, Int)] = reducedBrandCounts.sortBy(tup => tup._2, false) //取出前10 val results: Array[(String, Int)] = sortedBrandAndCounts.take(5) results.foreach(println) }统计最挣钱的10个商品
/** * 4.统计最挣钱的10个商品 */ def computeTop10NetProfit(productOrderCached: RDD[ProductOrder]) = { // 取出产品名称和净利润组成K,V格式的RDD val productAndNetProfit: RDD[(String, Float)] = productOrderCached.map(po => { (po.product, po.netProfit) }) // 按产品聚合 val reducedProductAndNetProfit: RDD[(String, Float)] = productAndNetProfit.reduceByKey(_ + _) // 按照净利润倒排序 val sortedProductAndNetProfit: RDD[(String, Float)] = reducedProductAndNetProfit.sortBy(tup => tup._2, false) // 取出前10 val results: Array[(String, Float)] = sortedProductAndNetProfit.take(10) //打印结果 results.foreach(println) }后话
以上只是利用现有的SparkCore做的销售数据的分析,后面会持续更新利用SparkSQL+Hive与SparkStreaming对接Kafka用流处理的方式来分析处理数据。
由于测试数据文件比较大(建议下载源码生成),如果您需要全部源码与生成的测试数据,请关注我并发送私信"销售订单"。
版权声明:
本站文章均来自互联网搜集,如有侵犯您的权益,请联系我们删除,谢谢。
标签: #po5net #spark数据集下载