龙空技术网

二十七、Spark销售订单数据分析

数据致美 65

前言:

现时兄弟们对“po5net”大体比较看重,同学们都想要学习一些“po5net”的相关知识。那么小编也在网上汇集了一些关于“po5net””的相关资讯,希望大家能喜欢,你们快快来学习一下吧!

1.数据源利用Python代码模拟生成,生成的数据如下:

     * id|brand|product|category|cost|price|quantity|province|timestamp     * ---------------------------------------------------------------------------     * 1|金龙鱼|金龙鱼花生油|生活用品|52|75|64|云南|1593536348     * 2|金龙鱼|金龙鱼菜籽油|生活用品|48|62|16|河北|1593536348     * 3|金龙鱼|金龙鱼玉米油|生活用品|34|53|72|河北|1593536348
生成数据的部分代码如下, 如果你需要完整代码,请关注我并私信"销售订单"
def produce_orders(file_name, num):    provinces = (         "江苏", "浙江", "安徽", "江西", "山东",          "山西", "河南", "河北", "湖北", "湖南",          "广东", "广西", "陕西", "甘肃", "福建",          "四川", "黑龙将", "辽宁",         "吉林", "北京", "上海", "天津", "重庆",          "海南", "新疆", "云南", "贵州", "宁夏", "西藏",          "香港", "澳门", "台湾", "内蒙古")    products = (        ("金龙鱼花生油", "生活用品", 52, 75, "金龙鱼"),         ("金龙鱼植物油", "生活用品", 58, 80, "金龙鱼"),        ("金龙鱼大豆油", "生活用品", 54, 73, "金龙鱼"),.................................        ("西湖龙井", "茶叶", 22, 49, "龙井"), ("信阳毛尖", "茶叶", 13, 25, "信阳毛尖"),        ("溧阳白茶", "茶叶", 23, 59, "溧阳白茶"), ("太平猴魁", "茶叶", 30, 69, "太平猴魁"),         ("安溪铁观音", "茶叶", 26, 43, "铁观音"),        ("黄山毛峰", "茶叶", 15, 24, "毛峰")    )    with open(file_name, mode='a', encoding='utf-8') as f:        counter = 0        for i in range(num):            for j in range(len(products)):                counter += 1                province = provinces[random.randint(0, len(provinces) - 1)]                product_name = products[j][0]                category = products[j][1]                cost = products[j][2]                price = products[j][3]                brand = products[j][4]                quantity = random.randint(10, 100)                timestamp = int(time.time())                record = str(counter) + "|" + brand + "|" + product_name + "|" + str(category) + "|"+ str(cost) + "|"+ str(price) + "|" + str(                    quantity) + "|" + province + "|" + str(timestamp)                f.write(record + "\n")                time.sleep(0.0001)
数据集生成好了利用python代码自动上传HDFS
def write_to_hdfs(file_name):    # 利用pyhdfs连接HDFS集群,用于上传生成的数据    fs = pyhdfs.HdfsClient(hosts='node01:9870,node02:9870', randomize_hosts=False, user_name='hadoop', timeout=1000,                           max_tries=1000)    # HDFS要存放数据的目录    hdfs_file_dir = '/user/hadoop/datasets/orders/'    # 如果该目录不存在,则创建    if not fs.exists(hdfs_file_dir):        fs.mkdirs(hdfs_file_dir)    # 上传文件, src为本地生成的文件路径,dest为hdfs上要存放的路径    src = os.getcwd() + "/" + file_name    dest = hdfs_file_dir + file_name    # 从本地上传文件到hdfs指定目录    fs.copy_from_local(src, dest)
2.Spark销售数据分析

由于目前我的文章只更新到了SparkCore部分,所以,现在的分析都是基于SparkCore提供的算子进行分析,在更新完SparkSql和SparkStreaming后会有更加方便的方法来分析这部分数据,后面会慢慢更新这部分内容。

为了清楚地表达代码的逻辑,分析这部分我使用了scala语言,后续也会整合python版本和java版本的。

计算销售额排名的前10的省份

  /**   * 1.计算销售额排名的前10的省份   *   * @param productOrderCached   */  def computeTop10ProvinceSales(productOrderCached: RDD[ProductOrder]): Unit = {    //把province和grossProfit组成K,V格式的RDD    val provinceAndGrossProfit: RDD[(String, Float)] = productOrderCached.map(po => {      (po.province, po.grossProfit)    })    //按照省份进行聚合    val reducedProvinceAndGrossProfit: RDD[(String, Float)] = provinceAndGrossProfit.reduceByKey(_ + _)    //按照聚合好的grossProfit进行倒排序    val sortedProvinceAndGrossProfit: RDD[(String, Float)] = reducedProvinceAndGrossProfit.sortBy(tup => tup._2, false)    //取出grossProfit前10省份的销量    val results: Array[(String, Float)] = sortedProvinceAndGrossProfit.take(10)    //打印结果,一般是生成json格式文件或者直接进入下游(比如,hdfs, mysql, kafka等)    results.foreach(println)  }
计算销售额排名的前5的种类
  /**   * 2.计算销售额排名的前5的种类   */  def computeTop5CategorySales(productOrderCached: RDD[ProductOrder]) = {    //把category和grossProfit组成K,V格式的RDD    val categoryAndGrossProfit: RDD[(String, Float)] = productOrderCached.map(po => {      (po.category, po.grossProfit)    })    //按照category进行聚合    val reducedCategoryAndGrossProfit: RDD[(String, Float)] = categoryAndGrossProfit.reduceByKey(_ + _)    //按照聚合好的grossProfit进行倒排序    val sortedCategoryAndGrossProfit: RDD[(String, Float)] = reducedCategoryAndGrossProfit.sortBy(tup => tup._2, false)    //取出grossProfit前10的category的销量    val results: Array[(String, Float)] = sortedCategoryAndGrossProfit.take(5)    //打印结果,一般是生成json格式文件或者直接进入下游(比如,hdfs, mysql, kafka等)    results.foreach(println)  }
计算最畅销的5个品牌
/**   * 3.计算最畅销的5个品牌   */  def computeTop5Brand(productOrderCached: RDD[ProductOrder]) = {    //对每条销售订单的brand计数1,并组成K,V格式的RDD    val brandAndOne: RDD[(String, Int)] = productOrderCached.map(po => {      (po.brand, 1)    })    //按照brand进行聚合    val reducedBrandCounts: RDD[(String, Int)] = brandAndOne.reduceByKey(_ + _)    //按照brand的计数进行倒排序    val sortedBrandAndCounts: RDD[(String, Int)] = reducedBrandCounts.sortBy(tup => tup._2, false)    //取出前10    val results: Array[(String, Int)] = sortedBrandAndCounts.take(5)    results.foreach(println)  }
统计最挣钱的10个商品
  /**   * 4.统计最挣钱的10个商品   */  def computeTop10NetProfit(productOrderCached: RDD[ProductOrder]) = {    // 取出产品名称和净利润组成K,V格式的RDD    val productAndNetProfit: RDD[(String, Float)] = productOrderCached.map(po => {      (po.product, po.netProfit)    })    // 按产品聚合    val reducedProductAndNetProfit: RDD[(String, Float)] = productAndNetProfit.reduceByKey(_ + _)    // 按照净利润倒排序    val sortedProductAndNetProfit: RDD[(String, Float)] = reducedProductAndNetProfit.sortBy(tup => tup._2, false)    // 取出前10    val results: Array[(String, Float)] = sortedProductAndNetProfit.take(10)    //打印结果    results.foreach(println)  }
后话

以上只是利用现有的SparkCore做的销售数据的分析,后面会持续更新利用SparkSQL+Hive与SparkStreaming对接Kafka用流处理的方式来分析处理数据。

由于测试数据文件比较大(建议下载源码生成),如果您需要全部源码与生成的测试数据,请关注我并发送私信"销售订单"。

标签: #po5net #spark数据集下载