龙空技术网

在 Ubuntu 上进行 Apache Spark 的伪分布式安装

GreatMan 262

前言:

眼前各位老铁们对“apachesparkhadoop”大体比较重视,大家都需要剖析一些“apachesparkhadoop”的相关资讯。那么小编也在网上网罗了一些关于“apachesparkhadoop””的相关文章,希望小伙伴们能喜欢,兄弟们快快来了解一下吧!

在大数据行业里,有一些绕不过的框架,比如Hadoop、spark等,这些框架都是针对超大规模的数据处理工作而被开发出来的,所以一般的,一台计算机是无法应付大数据处理工作的,所以这些框架一般会以分布式的方式进行安装,即这些框架被分别安装在多台能够互相通信的计算机上,通过配置让这些计算机里的大数据框架可以进行协同工作。

但是我们在初学阶段,或者由于当前的条件不允许,无法那么顺利的进行分布式部署。所以这些框架还提供了伪分布式的安装方式,运行起来后是看不出什么变化的。这是一种模拟分布式的运行机制,但是本质上是将框架里的所有组件都安装在了一台计算机上。

下面就开始以伪分布式的方式在单台Ubuntu虚拟机上安装ApacheSpark框架。(这里默认已经事先伪分布式的方式安装好了Hadoop)

Step1:下载

在Apache spark 网站 下载对应版本的spark软件包

因为我的Ubuntu系统里安装的hadoop版本是 3.2.2,所以这里选择支持这个hadoop版本的spark版本。然后点击第三项下载 tgz 文件。

Step2:安装

将下载好的spark压缩包解压缩后放置在某个目录下,然后配置环境变量

tar -zxvf spark-3.1.2-bin-hadoop3.2.tgz -C ~/Apps/

上面代码中的用户根目录下的Apps目录是我自己创建的,主要用于存放一些安装的软件。

在环境变量文件的最后追加以下内容

export SPARK_HOME=/home/frank/Apps/spark-3.1.2-bin-hadoop3.2export PATH=$PATH:$SPARK_HOME/bin

Step 3:配置

安装好spark还不够,还必须合理的配置好spark,才能让spark与现有的hadoop一起协同工作,发挥出它应有的功能。

在spark的conf 目录下,有这样一些文件

其中需要我们配置的是 spark-defaults.conf.template、spar-env.sh.template、workers.template。这三个文件都是模板文件,还不能使用,必须去掉文件名后出现的 template的字样。用下面的命令修改这三个文件的名称。

mv spark-defaults.conf.template spark-defaults.confmv spark-env.sh.template spark-env.shmv workers.template workers

最终的结果如下

下面就开始对这三个配置文件进行配置

workers

因为这次进行大数据框架安装时是以伪分布的方式安装的,所以只有当前主机在工作。这个workers文件就是在定义集群中的机器的地址。打开这个文件后会看到下面只有一个 localhost 的配置信息。

只有一行内容,localhost表示的就是当前主机,和现在的伪分布式的设备架构一致,为了方便其他机器访问,把这个localhost修改为当前Ubuntu虚拟机的真实IP地址 192.168.3.101

spark-env.sh

这个文件是用来定义spark运行环境里的一些包的地址的,比如hadoop在哪个路径,spark的某个模块在哪个路径等等。

在这个文件末尾追加以下内容

export JAVA_HOME=/home/frank/Apps/jdk1.8.0_301export SCALA_HOME=/home/frank/Apps/scala-2.13.6export HADOOP_HOME=/home/frank/Apps/hadoop-3.2.2export HADOOP_CONF_DIR=/home/frank/Apps/hadoop-3.2.2/etc/hadoopexport SPARK_MASTER_HOST=192.168.3.101export SPARK_PID_DIR=/home/frank/Apps/spark-3.1.2-bin-hadoop3.2/data/pidexport SPARK_LOCAL_DIRS=/home/frank/Apps/spark-3.1.2-bin-hadoop3.2/data/spark_shuffleexport SPARK_EXECUTOR_MEMORY=1Gexport SPARK_WORKER_MEMORY=4G
JAVA_HOME:spark采用的是scala语言,而scala语言是Java的迭代,所以这里需要配置jdk的位置才能让spark正常运行;SCALA_HOME:spark是用scala开发的,所以运行时需要调用scala的一些包,所以这里要配置scala的包的位置;HADOOP_HOME:spark本质上是在hadoop体系下运转的,所以运行时要和hadoop进行通信,那么就必须知道hadoop安装在哪里;SPARK_MASTER_HOST:spark的主机地址。因为是伪分布式安装spark,所以主机就是当前的Ubuntu;SPARK_PID_DIR:pid( Process Identification),表示进程识别号,spark是一个多进程的框架,其实大数据中很多包都是多进程的。要控制众多的进程,就必须给每一个进程配置一个唯一的id号码,通过识别这些进程的id号码,就可以方便的控制进程了。这个pid文件夹在这里本身不存在,需要创建;SPARK_LOCAL_DIRS;这里配置的是spark的多进程的排序信息的放置位置。shuffle的意思是随机排序。这个spark_shuffle文件夹在这里本身不存在,需要创建;SPARK_EXECUTOR_MEMORY:顾名思义,就是spark运行时用到的内存大小;SPARK_WORKER_MEMORY:顾名思义,就是spark的工作内存,可以理解为spark的运行时最大内存。这里的配置最好不要超过机器的物理内存的容量;

spark-default.conf

spark.master                     spark://master:7077spark.eventLog.enabled           truespark.eventLog.dir               hdfs://192.168.3.101:9000/eventLogspark.serializer                 org.apache.spark.serializer.KryoSerializerspark.driver.memory              1g

上面的spark.eventLog.dir配置的是spark的状态日志信息保存的位置。从配置信息里可以看到是保存在了hadoop的hdfs的某个位置下,所以在启动spark之前需要先在hdfs里创建这个目录 eventLog。

hdfs dfs -mkdir /eventLog

当然可以不用这么闷骚。这也就是一个存放spark日志的路径而已,所以完全可以在Ubuntu系统里创建一个目录用来存放spark日志,在这里的配置项spark.eventLog.dir就可以指向那个地址。我现在之所以这样做,是考虑到以后可能spark的日志会很多,而利用Hadoop的文件分布式管理可以降低单个日志文件的大小。对于初学者来说,应该不存在这样的情况。

Step 4:启动 Spark

在spark 2.x 的相关教程里提到,要启动spark需要先启动hadoop。但是我当前使用的是spark 3.x。

经过验证,先启动hadoop 3.2.2之后,再启动 spark 3.1.2的时候,会有以下的提示

WARNING: Attempting to start all Apache Hadoop daemons as frank in 10 seconds.WARNING: This is not a recommended production deployment configuration.WARNING: Use CTRL-C to abort.Starting namenodes on [192.168.3.101]192.168.3.101: namenode is running as process 7319.  Stop it first.Starting datanodeslocalhost: datanode is running as process 7449.  Stop it first.Starting secondary namenodes [frank-virtual-machine]frank-virtual-machine: secondarynamenode is running as process 7633.  Stop it first.Starting resourcemanagerresourcemanager is running as process 7891.  Stop it first.Starting nodemanagerslocalhost: nodemanager is running as process 8030.  Stop it first.

从上面的提示可以看出,spark的启动命令 start-all.sh 的功能比较强大,它可以同时将hadoop和spark都启动起来。所以在spark启动时需要先关闭正在运行的hadoop。

那么就将所有hadoop的进程都关闭,包括hbase和hive。关闭全部和大数据有关的进程,直接启动spark。进入到spark的安装目录下,有一个sbin目录,运行里面的 start-all.sh 文件。

frank@frank-virtual-machine:~/Apps/spark-3.1.2-bin-hadoop3.2/sbin$ ./start-all.sh

然后再导航到hadoop的安装目录下的sbin目录,执行 Hadoop的 start-all.sh 文件

frank@frank-virtual-machine:~/Apps/hadoop-3.2.2/sbin$ ./start-all.sh

因为有环境变量存在,而且hadoop和spark的启动文件的名称都是 start-all.sh,为了不会混淆,所以这里逐个导航到sbin目录下,通过 【./】语法来执行当前目录下的start-all.sh文件。

接下来还可以再尝试启动hbase和hive。

frank@frank-virtual-machine:~/Apps/hbase-2.4.5/bin$ start-hbase.shfrank@frank-virtual-machine:~/Apps/apache-hive-3.1.2-bin/bin$ hiveserver2

这里的hiveserver2 采用的是前台启动运行的模式。

最后,通过jps命令查看现在启动的进程

frank@frank-virtual-machine:~$ jps12771 ResourceManager13589 HQuorumPeer11814 Worker12375 DataNode12200 NameNode14488 Jps12585 SecondaryNameNode14265 RunJar11690 Master12922 NodeManager13837 HRegionServer

在运行的进程中:Worker和Master两个进程是spark创建的

spark还提供了一个查看状态的网页,地址是 【localhost:8080】,如果是从其他机器打开这个页面,把localhost修改为Ubuntu虚拟机的IP地址即可。

Step 5:Spark Shell

spark还提供了一个命令行工具 spark-shell。启动这个工具很简单,执行下面的命令

一定要按照下面的命令启动spark-shell,要写清楚spark的master的详细地址和端口

假如当前是用Windows客户机来访问Ubuntu服务器,要用Windows上的命令行工具启动spark-shell,就需要按照下面的命令启动shell工具。

spark-shell --master "spark://192.168.3.101:7077"

红色部分一定要写上去。因为在spark的配置信息里表明了master的端口是7077,而且在spark的状态页面上顶部的大标题也写的很清楚【 Spart Master at spark://192.168.3.101:7077 】

执行上面的命令后,机会进入到spark-shell 状态了

Step6:Spark-Shell简单实例

spark的主要操作对象是分布式的元素集合,成为弹性分布式数据集(Resilient DistributedDataset,RDD),它可以被分发到集群的各个节点上进行并行操作。RDD可以通过hadoop的InputFormat创建,也可以从其他RDD转化而来。

每一个spark应用都是由一个驱动器程序(driver program)来发起集群上的各种并行操作。驱动器程序包含应用的main函数,并且定义了集群上的分布式数据集,还对这些分布式数据集应用了相关操作。下面的例子中,实际的驱动器程序就是spark-shell。

驱动器程序通过一个SparkContext对象来访问Spark。这个对象代表对计算集群的一个连接。shell启动时就自动创建了一个SparkContext对象,是一个叫做 sc 的变量。

实例 1:

scala> val textFile = sc.textFile(";)textFile: org.apache.spark.rdd.RDD[String] =  MapPartitionsRDD[1] at textFile at <console>:24scala> textFile.count()[Stage 0:> (0 +  res0: Long = 108

上面两行代码用于统计某个md文件的字数。变量 textFile本质上就是一个创建出来的RDD。通过前缀 指定去读本地的一个文件。但是spark默认是读取hdfs上的文件的,如果用file作为前缀,就是读取Linux系统本地的文件。

一个RDD有两个操作算子:

Transformation(转换):transformation属于延迟计算,当一个RDD转换为另一个RDD时并没有立即转换,而仅仅是记录下了数据的计算逻辑;

Action(执行):触发spark作业的运行,进行真正的计算;

实例 2:

紧跟着实例 1 中的RDD textFile,对文件中含有“Spark”的行数。spark会按照输入文件的每一行进行拆分。在实例 1 中的 textFile就是从原始文件里按照行拆分得到的一个RDD。

在这个实例中,就是过滤textFile里每一行中含有“Spark”字样的行,统计含有这个字样的行数。

scala> val lineWithSpark = textFile.filter(line => line.contains("Spark"))lineWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:25scala> lineWithSpark.count()res1: Long = 19

实例 3:

找到单词最多的那一行共有几个单词

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a, b) a else b)res2: Int = 16

从上面的代码可以看到,整个过程分为了两步,第一步是 map阶段,这个阶段可以看做是定义统计计算的方法,这个阶段会得到一些结果;第二步是reduce阶段,这个阶段是将map阶段得到的结果进行汇总统计。

那么这个例子里,map阶段是针对每一行(line)的单词个数进行计算,reduce阶段会逐个的对每一行单词个数进行比较,把较大的单词个数返回出来。最终就会得到单词最多的那一行的单词个数。

实例 4:

进行单词统计。一般在hadoop中是通过MapReduce来实现数据流模式进行统计的,那么在spark中同样可以实现这样的操作。

scala> val textFile=sc.textFile(";)scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)scala> wordCounts.collect

这里的 flatMap函数可以想象成“压扁”的感觉,也就是打破line中行的概念,去掉行,直接取出整个RDD中的每一个单词。

输出结果如下:

res3: Array[(String, Int)] = Array((package,1), (this,1), (integration,1), (Python,2), (cluster.,1), (its,1), ([run,1), (There,1), (general,2), (have,1), (pre-built,1), (Because,1), (YARN,,1), (locally,2), (changed,1), (locally.,1), (several,1), (only,1), (Configuration,1), (This,2), (basic,1), (first,1), (learning,,1), (documentation,3), (graph,1), (Hive,2), (info,1), (["Specifying,1), ("yarn",1), ([params]`.,1), ([project,1), (prefer,1), (SparkPi,2), (engine,2), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (are,1), (systems.,1), (params,1), (scala>,1), (DataFrames,,1), (provides,1), (refer,2), (configure,1), (Interactive,2), (R,,1), (can,6), (build,3), (when,1), (easiest,1), (Maven]().,1), (Apache,1), (guide](ht...

谢谢阅读!

标签: #apachesparkhadoop