龙空技术网

flink电商实时业务构建

随心而文 149

前言:

现在大家对“apachelog开启”大约比较关心,朋友们都需要知道一些“apachelog开启”的相关知识。那么小编同时在网摘上网罗了一些关于“apachelog开启””的相关知识,希望大家能喜欢,兄弟们一起来学习一下吧!

case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String,

timestamp: Long)

case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)

object HotItems {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

env.setParallelism(1)

val stream = env

.readTextFile("YOUR_PATH\\resources\\UserBehavior.csv")

.map(line => {

val linearray = line.split(",")

UserBehavior(linearray(0).toLong, linearray(1).toLong, linearray(2).toInt,

linearray(3), linearray(4).toLong)

})

.assignAscendingTimestamps(_.timestamp * 1000)

.filter(_.behavior=="pv")

.keyBy("itemId")

.timeWindow(Time.minutes(60), Time.minutes(5))

.aggregate(new CountAgg(), new WindowResultFunction())

.keyBy(1)

.process(new TopNHotItems(3))

.print()

env.execute("Hot Items Job")

}

// COUNT 统计的聚合函数实现,每出现一条记录加一

class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {

override def createAccumulator(): Long = 0L

override def add(userBehavior: UserBehavior, acc: Long): Long = acc + 1

override def getResult(acc: Long): Long = acc

override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2

}

// 用于输出窗口的结果

class WindowResultFunction extends WindowFunction[Long, ItemViewCount, Tuple,

TimeWindow] {

override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long],

collector: Collector[ItemViewCount]) : Unit = {

val itemId: Long = key.asInstanceOf[Tuple1[Long]].f0

val count = aggregateResult.iterator.next

collector.collect(ItemViewCount(itemId, window.getEnd, count))

}

}

// 求某个窗口中前 N 名的热门点击商品,key 为窗口时间戳,输出为 TopN 的结果字符串

class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount,

String] {

private var itemState : ListState[ItemViewCount] = _

override def open(parameters: Configuration): Unit = {

super.open(parameters)

// 命名状态变量的名字和状态变量的类型

val itemsStateDesc = new ListStateDescriptor[ItemViewCount]("itemState-state",

classOf[ItemViewCount])

// 从运行时上下文中获取状态并赋值

itemState = getRuntimeContext.getListState(itemsStateDesc)

}

override def processElement(input: ItemViewCount, context:

KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector:

Collector[String]): Unit = {

// 每条数据都保存到状态中

itemState.add(input)

// 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于 windowEnd 窗口的所

有商品数据

// 也就是当程序看到 windowend + 1 的水位线 watermark 时,触发 onTimer 回调函数

context.timerService.registerEventTimeTimer(input.windowEnd + 1)

}

override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple,

ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {

// 获取收到的所有商品点击量

val allItems: ListBuffer[ItemViewCount] = ListBuffer()

import scala.collection.JavaConversions._

for (item <- itemState.get) {

allItems += item

}

// 提前清除状态中的数据,释放空间

itemState.clear()

// 按照点击量从大到小排序

val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)

// 将排名信息格式化成 String, 便于打印

val result: StringBuilder = new StringBuilder

result.append("====================================\n")

result.append("时间: ").append(new Timestamp(timestamp - 1)).append("\n")

for(i <- sortedItems.indices){

val currentItem: ItemViewCount = sortedItems(i)

// e.g. No1: 商品 ID=12224 浏览量=2413

result.append("No").append(i+1).append(":")

.append(" 商品 ID=").append(currentItem.itemId)

.append(" 浏览量=").append(currentItem.count).append("\n")

}

result.append("====================================\n\n")

// 控制输出频率,模拟实时滚动结果

Thread.sleep(1000)

out.collect(result.toString)

}

}

}

读取kafka

val properties = new Properties()

properties.setProperty("bootstrap.servers", "localhost:9092")

properties.setProperty("group.id", "consumer-group")

properties.setProperty("key.deserializer",

"org.apache.kafka.common.serialization.StringDeserializer")

properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")

properties.setProperty("auto.offset.reset", "latest")

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

env.setParallelism(1)

val stream = env

.addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(),

properties))

case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String,

url: String)

case class UrlViewCount(url: String, windowEnd: Long, count: Long)

object NetworkFlow{

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

env.setParallelism(1)

val stream = env

// 以 window 下为例,需替换成自己的路径

.readTextFile("YOUR_PATH\\resources\\apache.log")

.map(line => {

val linearray = line.split(" ")

val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")

val timestamp = simpleDateFormat.parse(linearray(3)).getTime

ApacheLogEvent(linearray(0), linearray(2), timestamp, linearray(5),

linearray(6))

})

.assignTimestampsAndWatermarks(new

BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent]

(Time.milliseconds(1000)) {

override def extractTimestamp(t: ApacheLogEvent): Long = {

t.eventTime

}

})

.filter( data => {

val pattern = "^((?!\\.(css|js)$).)*$".r

(pattern findFirstIn data.url).nonEmpty

} )

.keyBy("url")

.timeWindow(Time.minutes(10), Time.seconds(5))

.aggregate(new CountAgg(), new WindowResultFunction())

.keyBy(1)

.process(new TopNHotUrls(5))

.print()

env.execute("Network Flow Job")

}

class CountAgg extends AggregateFunction[ApacheLogEvent, Long, Long] {

override def createAccumulator(): Long = 0L

override def add(apacheLogEvent: ApacheLogEvent, acc: Long): Long = acc + 1

override def getResult(acc: Long): Long = acc

override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2

}

class WindowResultFunction extends WindowFunction[Long, UrlViewCount, Tuple,

TimeWindow] {

override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long],

collector: Collector[UrlViewCount]) : Unit = {

val url: String = key.asInstanceOf[Tuple1[String]].f0

val count = aggregateResult.iterator.next

collector.collect(UrlViewCount(url, window.getEnd, count))

}

}

class TopNHotUrls(topsize: Int) extends KeyedProcessFunction[Tuple, UrlViewCount,

String] {

private var urlState : ListState[UrlViewCount] = _

override def open(parameters: Configuration): Unit = {

super.open(parameters)

val urlStateDesc = new ListStateDescriptor[UrlViewCount]("urlState-state",

classOf[UrlViewCount])

urlState = getRuntimeContext.getListState(urlStateDesc)

}

override def processElement(input: UrlViewCount, context:

KeyedProcessFunction[Tuple, UrlViewCount, String]#Context, collector:

Collector[String]): Unit = {

// 每条数据都保存到状态中

urlState.add(input)

context.timerService.registerEventTimeTimer(input.windowEnd + 1)

}

override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple,

UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {

// 获取收到的所有 URL 访问量

val allUrlViews: ListBuffer[UrlViewCount] = ListBuffer()

import scala.collection.JavaConversions._

for (urlView <- urlState.get) {

allUrlViews += urlView

}

// 提前清除状态中的数据,释放空间

urlState.clear()

// 按照访问量从大到小排序

val sortedUrlViews = allUrlViews.sortBy(_.count)(Ordering.Long.reverse)

.take(topSize)

// 将排名信息格式化成 String, 便于打印

var result: StringBuilder = new StringBuilder

result.append("====================================\n")

result.append("时间: ").append(new Timestamp(timestamp - 1)).append("\n")

for (i <- sortedUrlViews.indices) {

val currentUrlView: UrlViewCount = sortedUrlViews(i)

// e.g. No1: URL=/blog/tags/firefox?flav=rss20 流量=55

result.append("No").append(i+1).append(":")

.append(" URL=").append(currentUrlView.url)

.append(" 流量=").append(currentUrlView.count).append("\n")

}

result.append("====================================\n\n")

// 控制输出频率,模拟实时滚动结果

Thread.sleep(1000)

out.collect(result.toString)

}

}

}

网站总浏览量(PV)的统计

衡量网站流量一个最简单的指标,就是网站的页面浏览量(Page View,PV)。

用户每次打开一个页面便记录 1 次 PV,多次打开同一页面则浏览量累计。一般来说,

PV 与来访者的数量成正比,但是 PV 并不直接决定页面的真实来访者数量,如同一

个来访者通过不断的刷新页面,也可以制造出非常高的 PV。

我们知道,用户浏览页面时,会从浏览器向网络服务器发出一个请求(Request),

网络服务器接到这个请求后,会将该请求对应的一个网页(Page)发送给浏览器,

从而产生了一个 PV。所以我们的统计方法,可以是从 web 服务器的日志中去提取

对应的页面访问然后统计;也可以直接从埋点日志中提取用户发来的页面请求,从而统计出总浏览量。

case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String,

timestamp: Long)

object PageView {

def main(args: Array[String]): Unit = {

val resourcesPath = getClass.getResource("/UserBehaviorTest.csv")

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

env.setParallelism(1)

val stream = env.readTextFile(resourcesPath.getPath)

.map(data => {

val dataArray = data.split(",")

UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt,

dataArray(3), dataArray(4).toLong)

})

.assignAscendingTimestamps(_.timestamp * 1000)

.filter(_.behavior == "pv")

.map(x => ("pv", 1))

.keyBy(_._1)

.timeWindow(Time.seconds(60 * 60))

.sum(1)

.print()

env.execute("Page View Job")

}

}

网站独立访客数(UV)的统计

case class UvCount(windowEnd: Long, count: Long)

object UniqueVisitor {

def main(args: Array[String]): Unit = {

val resourcesPath = getClass.getResource("/UserBehaviorTest.csv")

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

env.setParallelism(1)

val stream = env

.readTextFile(resourcesPath.getPath)

.map(line => {

val linearray = line.split(",")

UserBehavior(linearray(0).toLong, linearray(1).toLong, linearray(2).toInt,

linearray(3), linearray(4).toLong)

})

.assignAscendingTimestamps(_.timestamp * 1000)

.filter(_.behavior == "pv")

.timeWindowAll(Time.seconds(60 * 60))

.apply(new UvCountByWindow())

.print()

env.execute("Unique Visitor Job")

}

}

class UvCountByWindow extends AllWindowFunction[UserBehavior, UvCount, TimeWindow] {

override def apply(window: TimeWindow,

input: Iterable[UserBehavior],

out: Collector[UvCount]): Unit = {

val s: collection.mutable.Set[Long] = collection.mutable.Set()

var idSet = Set[Long]()

for ( userBehavior <- input) {

idSet += userBehavior.userId

}

out.collect(UvCount(window.getEnd, idSet.size))

}

}

标签: #apachelog开启