龙空技术网

Livy REST 提交Spark作业

进击吧大数据 264

前言:

此刻看官们对“apache livy spark任务执行进度展示”大致比较关注,咱们都需要分析一些“apache livy spark任务执行进度展示”的相关文章。那么小编也在网上网罗了一些关于“apache livy spark任务执行进度展示””的相关内容,希望看官们能喜欢,我们快快来学习一下吧!

01 Livy简介

我们平时提交的spark任务,通常是使用Apache Spark本身提供的spark-submit、spark-shell和Thrift Server外,Apache Livy提供了另外一种与Spark集群交互的方式,通过REST接口

此外,Apache Livy支持同时维护多个会话,可以通过REST接口、Java/Scala库和Apache Zeppelin访问Apache Livy

02 Livy基本架构

Livy流程

Livy是一个典型的REST服务架构,一方面接收并解析用户的REST请求,转换成相应的操作;另一方面管理着用户所启动的所有Spark集群

用户可以以REST请求方式通过Livy启动一个会话(session),一个会话是由一个spark集群所构成的,并且通过RPC协议在Spark集群和Livy服务端之间进行通信。根据处理交互方式不同,Livy将会话分为两种类型:

1.交互式会话(interactive session):跟spark的交互处理相同,在启动会话后可以接收用户所提交的代码片段,提交至远程的spark集群编译并执行

2.批处理会话(batch session):用户可以通过livy以批处理的方式启动spark应用,这样的一个方式在Livy中称之为批处理会话,与spark中的批处理是相同的

03 Livy部署

进入下载Apache Livy(注意Github有Cloudera Livy),这是两个不同的Livy,注意区分,下载成功后执行unzip解压缩即可

1.下载压缩包并解压缩

wget  apache-livy-0.6.0-incubating-bin.zipmv apache-livy-0.6.0-incubating-bin /usr/local/livy

2.配置

cd /usr/local/livy/confcp livy-env.sh.template livy-env.shvi livy-env.sh      export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop/conf      export SPARK_HOME=$SPARK_HOMEcp livy.conf.template livy.confvi livy.conf            livy.spark.master=yarn-client #使用yarn模式           livy.spark.deploy-mode = client           livy.repl.enable-hive-context = true           livy.server.session.kind=yarn           livy.server.recovery.mode = recovery  #恢复机制           livy.server.recovery.state-store = zookeeper / filesystem #有两种保存机制,一种是文件,一种是zookeeper           livy.server.recovery.state-store.url = zookeeper_host:port           livy.file.upload.max.size=107059609600 # livy上传文件限制,默认上限是100M,当Jar包大于100M时候,会抛出异常Too large request or file

其他可选的配置有:

livy.server.host #主机地址,默认为 0.0.0.0;livy.server.port #端口号,默认为 8998;livy.server.session.timeout-check # 是否检测会话超时,默认为 true;livy.server.session.timeout # 会话超时时间,默认为 1h;livy.server.session.state-retain.sec # 已完成会话保留时间,默认为 600s;livy.rsc.jars # RSC JAR 包位置,缓存在 HDFS 上,可以加速会话的启动速度;livy.repl.jars # REPL JAR 包位置,缓存在 HDFS 上,可以加速会话的启动速度livy.server.yarn.poll-interval # YARN 状态刷新频率,默认为 5s;livy.ui.enabled # 是否启动 UI 界面,默认为 true;

3.服务启动与停止

cd /usr/local/livy/bin./livy-server start # 启动服务./livy-server stop  # 停止服务

4.访问界面

访问页面 # 注意:默认端口是8998,使用Ambari部署集群时的livy服务端口是8999
04 使用Postman测试工具了解Livy接口

Livy官文档:

1.新建session

post 

2.查看session状态

GET 

3.提交代码片段

POST 注意:如果在同一个session,提交的代码片段是共享的,相当于在同一个spark shell里面进行操作的sesssions/130  :130这个数字是开始新建session(即第一步的结果)时得到的id值

4.查看代码运行结果

GET 注意:statements/0 :后面的0是第3步提交代码片段返回的id值

5.杀掉session

delete 

6.提交spark批处理任务

POST 

注意:示例中使用的SparkPi例子,该jar包路径在$SPARK_HOME/examples/jars/下

7.查看Spark任务结果

GET 
05 Livy Java API

使用Java API只需要以下几个步骤即可:

1.添加maven依赖

2.实现 org.apache.livy.Job 接口

3.基于URL地址构建LivyClient

4.上传依赖jar包 ---->注意如果jar包文件过大会抛出异常

5.提交Livy任务并获取结果

6.关闭客户端

1.添加mavan依赖

<dependency>    <groupId>org.apache.livy</groupId>  <artifactId>livy-client-http</artifactId>  <version>0.5.0-incubating</version></dependency> 

2.实现Job接口

package com.ud.livy;import org.apache.livy.Job;import org.apache.livy.JobContext;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.function.*;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import scala.Tuple2;import java.util.ArrayList;import java.util.Arrays;import java.util.List;/** * Author:lcp * CreateTime:2019/5/31 10:50 * Descriptor: */public class TestJob implements Job {    private static Logger logger = LoggerFactory.getLogger(TestJob.class);    @Override    public  List<String>  call(JobContext jobContext) throws Exception {        JavaRDD<String> stringJavaRDD = jobContext.sc().parallelize(Arrays.asList("hello", "are", "you", "spark", "java", "java", "spark", "hadoop"));        logger.info("样本数据:" + stringJavaRDD.first());        JavaPairRDD<String, Integer> stringIntegerJavaPairRDD = stringJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {            @Override            public Tuple2<String, Integer> call(String s) throws Exception {                return new Tuple2<>(s, 1);            }        }).reduceByKey(new Function2<Integer, Integer, Integer>() {            @Override            public Integer call(Integer integer, Integer integer2) throws Exception {                return integer + integer2;            }        });        List<Tuple2<String, Integer>> collect = stringIntegerJavaPairRDD.collect();        List<String> list = new ArrayList<>();        for (Tuple2<String,Integer> result:collect){            list.add("key:"+ result._1+" value:"+result._2);        }        return list;    }}

3.构建Client,上传Jar包,提交作业获取结果

package com.ud.livy;import org.apache.livy.LivyClient;import org.apache.livy.LivyClientBuilder;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.File;import java.net.URI;import java.util.List;/** * Author:lcp * CreateTime:2019/5/31 10:57 * Descriptor: */public class LivyMain {    private static Logger logger = LoggerFactory.getLogger(LivyMain.class);    public static void main(String[] args) {        LivyClient client = null;        try {            client = new LivyClientBuilder()                    .setURI(new URI(";))                    .setConf("spark.debug.maxToStringFields", "10000000")                    .setConf("spark.sql.parquet.binaryAsString", "true")                    .setConf("spark.sql.parquet.writeLegacyForma", "true")                    .setConf("spark.serializer", "org.apache.spark.serializer.KryoSerializer")                    .setConf("spark.kryoserializer.buffer", "256")                    .setConf("spark.yarn.queue", "llap")                    .setConf("spark.app.name", "sparkTestSpark")                    .setConf("spark.driver.memory", "2g")                    .setConf("spark.executor.memory", "5g")                    .setConf("spark.executor.instances", "3")                    .setConf("spark.executor.cores", "4")                    .setConf("spark.default.parallelism", "500")                    .build();            //注意:使用uploadJar()方法将会在每次运行程序的时候上传jar,使用addJar可加快运行速度,一般需要先把jar包上传至hdfs            client.addJar(new URI("hdfs://bigdatatest-1:8020/bigdata-backtrack-final-0.0.1-SNAPSHOT.jar"));//            client.uploadJar(new File("D:\\GitCode\\livyTest\\target\\bigdata-backtrack-final-0.0.1-SNAPSHOT.jar")).get();            List<String> results = ( List<String>) client.submit(new TestJob()).get();            System.out.println(results);        } catch (Exception e) {                logger.error("抛出异常:",e);        } finally {                if(client!=null){                    client.stop(true);                }        }    }}

4.结果展示

标签: #apache livy spark任务执行进度展示