龙空技术网

Spark Streaming 项目实战 (4)

Java资料分享师 41

前言:

今天姐妹们对“sparkstreaming项目”大概比较重视,同学们都想要学习一些“sparkstreaming项目”的相关文章。那么小编同时在网络上搜集了一些关于“sparkstreaming项目””的相关文章,希望咱们能喜欢,同学们一起来了解一下吧!

统计各广告最近 1 小时内的点击量趋势:各广告最近 1 小时内各分钟的点击量

此部分最终想要得到的结果如下:

一. 得到最近1小时广告点击量实时统计 1. 新建类LastHourApp

package com.buwenbuhuo.streaming.project.appimport com.buwenbuhuo.streaming.project.bean.AdsInfoimport org.apache.spark.streaming.{Minutes, Seconds}import org.apache.spark.streaming.dstream.DStream/* * @author 不温卜火 * @create 2020-08-17 11:19 * MyCSDN :   */object LastHourApp extends App {  override def doSomething(adsInfoStream: DStream[AdsInfo]): Unit = {    adsInfoStream        // 1. 先把窗口分好      .window(Minutes(60),Seconds(3))        // 2. 按照广告分钟 进行聚合      .map(info => (info.adsId,info.hmString) -> 1)      .reduceByKey(_+_)        // 3. 再按照广告分组,把这个广告下所有的分钟记录放在一起      .map{        case ((ads,hm),count) => (ads,(hm,count))      }      .groupByKey()      .print(10000)  }}/*统计各广告最近1小时内的点击量趋势:各广告最近1小时内各分钟的点击量,每6秒统计一次1. 各广告,每分钟            ->         按照(广告,分钟) 分组2. 最近1小时,每6秒统计一次   ->          窗口: 窗口长度1小时  窗口的滑动步长 5s----1. 先把窗口分好2. 按照广告分钟 进行聚合3. 再按照广告分组,把这个广告下所有的分钟记录放在一起4. 把结果写在redis中 */
2. 运行结果二. 写入到redis中 1. 源码(添加)
        // 4. 把结果写在redis中      .foreachRDD(rdd => {        rdd.foreachPartition(it=>{          if (it.nonEmpty){ // 只是判断是否有下一个元素,指针不会跳过这个元素            // 1. 先建立到redis连接            val client: Jedis = RedisUtil.getClient            // 2. 写元素到redis            // 2.1 一个一个的写(昨天)            // 2.2 批次写入            import org.json4s.JsonDSL._            val key: String = "last:ads:hour:count"            val map: Map[String, String] = it.toMap.map {              case (adsId, it) => (adsId, JsonMethods.compact(JsonMethods.render(it)))            }            // scala集合转换成java集合            import scala.collection.JavaConversions._            println(map)            client.hmset(key,map)            // 3. 关闭redis(用的是连接池,实际是把连接归还给连接池)            client.close()          }        })
2. 运行结果 3. 在redis中查看三. 完整代码
package com.buwenbuhuo.streaming.project.appimport com.buwenbuhuo.streaming.project.bean.AdsInfoimport com.buwenbuhuo.streaming.project.util.RedisUtilimport org.apache.spark.streaming.{Minutes, Seconds}import org.apache.spark.streaming.dstream.DStreamimport org.json4s.jackson.JsonMethodsimport redis.clients.jedis.Jedis/* * @author 不温卜火 * @create 2020-08-17 11:19 * MyCSDN :   */object LastHourApp extends App {  override def doSomething(adsInfoStream: DStream[AdsInfo]): Unit = {    adsInfoStream        // 1. 先把窗口分好      .window(Minutes(60),Seconds(3))        // 2. 按照广告分钟 进行聚合      .map(info => (info.adsId,info.hmString) -> 1)      .reduceByKey(_+_)        // 3. 再按照广告分组,把这个广告下所有的分钟记录放在一起      .map{        case ((ads,hm),count) => (ads,(hm,count))      }      .groupByKey()        // 4. 把结果写在redis中      .foreachRDD(rdd => {        rdd.foreachPartition(it=>{          if (it.nonEmpty){ // 只是判断是否有下一个元素,指针不会跳过这个元素            // 1. 先建立到redis连接            val client: Jedis = RedisUtil.getClient            // 2. 写元素到redis            // 2.1 一个一个的写(昨天)            // 2.2 批次写入            import org.json4s.JsonDSL._            val key: String = "last:ads:hour:count"            val map: Map[String, String] = it.toMap.map {              case (adsId, it) => (adsId, JsonMethods.compact(JsonMethods.render(it)))            }            // scala集合转换成java集合            import scala.collection.JavaConversions._            println(map)            client.hmset(key,map)            // 3. 关闭redis(用的是连接池,实际是把连接归还给连接池)            client.close()          }        })      })  }}/*统计各广告最近1小时内的点击量趋势:各广告最近1小时内各分钟的点击量,每6秒统计一次1. 各广告,每分钟            ->         按照(广告,分钟) 分组2. 最近1小时,每6秒统计一次   ->          窗口: 窗口长度1小时  窗口的滑动步长 5s----1. 先把窗口分好2. 按照广告分钟 进行聚合3. 再按照广告分组,把这个广告下所有的分钟记录放在一起4. 把结果写在redis中----写到redis的时候的数据的类型1.  key                              value  广告id                           json字符串每分钟的点击量2.  key                              value  "last:ads:hour:count"            hash                                   field       value                                   adsId       json字符串                                   "1"         {"09:24":100,"09:25":110,...} */

标签: #sparkstreaming项目