龙空技术网

Spark入门 详解——1

小马赛跑程序员 231

前言:

今天你们对“spark编程基础实验”可能比较看重,兄弟们都需要知道一些“spark编程基础实验”的相关文章。那么小编同时在网摘上汇集了一些关于“spark编程基础实验””的相关知识,希望各位老铁们能喜欢,你们一起来学习一下吧!

安装:

1 安装 scala:

下载:

tar -zxvf scala-2.10.4

2 安装 spark:

Tar spark-2.4.3-bin-hadoop2.7.tgz

vim spark-2.4.3-bin-hadoop2.7/conf/spark-env.sh(简单说一下配置文件)。

vim spark-2.4.3-bin-hadoop2.7/conf/slaves

# A Spark Worker will be started on each of the machines listed below.

slave1

slave2

启动命令:

spark-2.4.3-bin-hadoop2.7/sbin/start-master.sh

用 jps 检查是否启动成功:

实际操作:

查看 web 页面:

因为 spark 需要比较大的内存,因此做实验的时候会把机器搞死。 实验暂时不演示。

Spark 的基本概念

Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用的并行计算框架, Spark基于 map reduce 算法实现的分布式计算,拥有 Hadoop MapReduce 所具有的优点;但不同于MapReduce 的是 Job 中间输出和结果可以保存在内存中,从而不再需要读写 HDFS,因此Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 map reduce 的算法。其架构如下图所示:

Spark 与 Hadoop 的对比

Spark 的中间数据放到内存中,对于迭代运算效率更高。

Spark 更适合于迭代运算比较多的 ML 和 DM 运算。因为在 Spark 里面,有 RDD 的抽象概

念。

Spark 比 Hadoop 更通用

Spark 提供的数据集操作类型有很多种,不像 Hadoop 只提供了 Map 和 Reduce 两种操作。比如 map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues,sort,partionBy 等多种操作类型, Spark 把这些操作称为 Transformations。同时还提供 Count,collect, reduce, lookup, save 等多种 actions 操作。

这些多种多样的数据集操作类型,给给开发上层应用的用户提供了方便。各个处理节点之间的通信模型不再像 Hadoop 那样就是唯一的 Data Shuffle 一种模式。用户可以命名,物化,控制中间结果的存储、分区等。可以说编程模型比 Hadoop 更灵活。

不过由于 RDD 的特性, Spark 不适用那种异步细粒度更新状态的应用,例如 web 服务的存储或者是增量的 web 爬虫和索引。就是对于那种增量修改的应用模型不适合。

容错性

在分布式数据集计算时通过 checkpoint 来实现容错,而 checkpoint 有两种方式,一个是

checkpoint data,一个是 logging the updates。用户可以控制采用哪种方式来实现容错。

可用性

Spark 通过提供丰富的 Scala, Java, Python API 及交互式 Shell 来提高可用性。

Spark 与 Hadoop 的结合

Spark 可以直接对 HDFS 进行数据的读写,同样支持 Spark on YARN。Spark 可以与 MapReduce运行于同集群中,共享存储资源与计算,数据仓库 Shark 实现上借用 Hive,几乎与 Hive 完全兼容。

Spark 的适用场景

Spark 是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。需要反复

操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,

受益就相对较小(大数据库架构中这是是否考虑使用 Spark 的重要因素)

由于 RDD 的特性, Spark 不适用那种异步细粒度更新状态的应用,例如 web 服务的存储或

者是增量的 web 爬虫和索引。就是对于那种增量修改的应用模型不适合。

总的来说 Spark 的适用面比较广泛且比较通用。

运行模式

本地模式

Standalone 模式

Mesoes 模式

yarn 模式

Spark 生态系统

Shark ( Hive on Spark): Shark 基本上就是在 Spark 的框架基础上提供和 Hive 一样的 H iveQL 命令接口,为了最大程度的保持和 Hive 的兼容性,Shark 使用了 Hive 的 API 来实现 query Parsing和 Logic Plan generation,最后的 PhysicalPlan execution 阶段用 Spark 代替 Hadoop MapReduce。通过配置 Shark 参数, Shark 可以自动在内存中缓存特定的 RDD,实现数据重用,进而加快特定数据集的检索。同时, Shark 通过 UDF 用户自定义函数实现特定的数据分析学习算法,使得 SQL 数据查询和运算分析能结合在一起,最大化 RDD 的重复使用。

Spark streaming: 构建在 Spark 上处理 Stream 数据的框架,基本的原理是将 Stream 数据分成小的时间片断(几秒),以类似 batch 批量处理的方式来处理这小部分数据。 Spark Streaming构建在 Spark 上,一方面是因为 Spark 的低延迟执行引擎(100ms+)可以用于实时计算,另一方面相比基于 Record 的其它处理框架(如 Storm), RDD 数据集更容易做高效的容错处理。此外小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法。方便了一些需要历史数据和实时数据联合分析的特定应用场合。

Bagel: Pregel on Spark,可以用 Spark 进行图计算,这是个非常有用的小项目。 Bagel 自带了一个例子,实现了 Google 的 PageRank 算法。

Spark 核心概念 Resilient Distributed Dataset (RDD)弹性分布数据集

RDD 是 Spark 的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来

操作分布式数据集的抽象实现。 RDD 是 Spark 最核心的东西,它表示已被分区,不可变的并

能够被并行操作的数据集合,不同的数据集格式对应不同的 RDD 实现。 RDD 必须是可序

列化的。 RDD 可以 cache 到内存中,每次对 RDD 数据集的操作之后的结果,都可以存放到

内存中,下一个操作可以直接从内存中输入,省去了 MapReduce 大量的磁盘 IO 操作。这对

于迭代运算比较常见的机器学习算法, 交互式数据挖掘来说,效率提升比较大。

RDD 的特点:

它是在集群节点上的不可变的、已分区的集合对象。

通过并行转换的方式来创建如(map, filter, join, etc)。

失败自动重建。

可以控制存储级别(内存、磁盘等)来进行重用。

必须是可序列化的。

是静态类型的。

RDD 的好处

RDD 只能从持久存储或通过 Transformations 操作产生,相比于分布式共享内存(DSM)可

以更高效实现容错,对于丢失部分数据分区只需根据它的 lineage 就可重新计算出来,而不

需要做特定的 Checkpoint。

RDD 的不变性,可以实现类 Hadoop MapReduce 的推测式执行。

RDD 的数据分区特性,可以通过数据的本地性来提高性能,这与 Hadoop MapReduce 是一样

的。

RDD 都是可序列化的,在内存不足时可自动降级为磁盘存储,把 RDD 存储于磁盘上,这时

性能会有大的下降但不会差于现在的 MapReduce。

RDD 的存储与分区

用户可以选择不同的存储级别存储 RDD 以便重用。

当前 RDD 默认是存储于内存,但当内存不足时, RDD 会 spill 到 disk。

RDD 在需要进行分区把数据分布于集群中时会根据每条记录 Key 进行分区(如 Hash 分区),

以此保证两个数据集在 Join 时能高效。

RDD 的内部表示

在 RDD 的内部实现中每个 RDD 都可以使用 5 个方面的特性来表示:

分区列表(数据块列表)

计算每个分片的函数(根据父 RDD 计算出此 RDD)

对父 RDD 的依赖列表

对 key-value RDD 的 Partitioner【可选】

每个数据分片的预定义地址列表(如 HDFS 上的数据块的地址)【可选】

RDD 的存储级别

RDD 根据 useDisk、 useMemory、 deserialized、 replication 四个参数的组合提供了 11 种存储级别:

val NONE = new StorageLevel(false, false, false)

val DISK_ONLY = new StorageLevel(true, false, false)

val DISK_ONLY_2 = new StorageLevel(true, false, false, 2)

val MEMORY_ONLY = new StorageLevel(false, true, true)

val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2)

val MEMORY_ONLY_SER = new StorageLevel(false, true, false)

val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2)

val MEMORY_AND_DISK = new StorageLevel(true, true, true)

val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2)

val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false)

val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)

RDD 定义了各种操作,不同类型的数据由不同的 RDD 类抽象表示,不同的操作也由 RDD

进行抽实现。

RDD 的生成

RDD 有两种创建方式:

1、从 Hadoop 文件系统(或与 Hadoop 兼容的其它存储系统)输入(例如 HDFS)创建。

2、从父 RDD 转换得到新 RDD。

下面来看一从 Hadoop 文件系统生成 RDD 的方式,如: val file = spark.textFile("hdfs://..."),file 变量就是 RDD(实际是 HadoopRDD 实例),生成的它的核心代码如下:

// SparkContext 根据文件/目录及可选的分片数创建 RDD, 这里我们可以看到 Spark 与

Hadoop MapReduce 很像

// 需要 InputFormat, Key、 Value 的类型,其实 Spark 使用的 Hadoop 的 InputFormat, Writable

类型。

def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {

hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable],

classOf[Text], minSplits) .map(pair => pair._2.toString) }

// 根据 Hadoop 配置,及 InputFormat 等创建 HadoopRDD

new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)

对 RDD 进行计算时, RDD 从 HDFS 读取数据时与 Hadoop MapReduce 几乎一样的:

// 根据 hadoop 配置和分片从 InputFormat 中获取 RecordReader 进行数据的读取。

reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)

val key: K = reader.createKey()

val value: V = reader.createValue()

//使用 Hadoop MapReduce 的 RecordReader 读取数据,每个 Key、 Value 对以元组返回。

override def getNext() = {

try {

finished = !reader.next(key, value)

} catch {

case eof: EOFException =>

finished = true

}

(key, value)

}

明天更新 RDD 的转换与操作 及 Scala python

标签: #spark编程基础实验