前言:
此时看官们对“hive 导出文件”大约比较关怀,看官们都想要知道一些“hive 导出文件”的相关内容。那么小编同时在网上汇集了一些对于“hive 导出文件””的相关文章,希望小伙伴们能喜欢,兄弟们快快来学习一下吧!温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。
Fayson的github:
提示:代码块部分可以左右滑动查看噢
1.文档编写目的
在前面的文章Fayson介绍了一些关于Spark2Streaming的示例如《
Spark2Streaming读Kerberos环境的Kafka并写数据到HBase
》和《
Spark2Streaming读Kerberos环境的Kafka并写数据到Kudu
》,本篇文章Fayson主要介绍如何使用Spark2Streaming访问Kerberos环境的Kafka并将接收到的Kafka数据写入Hive,在介绍本篇文章前,你可能需要知道:
《
如何在CDH集群启用Kerberos
》
《
如何在Redhat7.3的CDH5.14中启用Kerberos
》
《
如何在Redhat7.4的CDH5.15中启用Kerberos
》
《
如何通过Cloudera Manager为Kafka启用Kerberos及使用
》
示例架构图如下:
示例详细流程图如下:
内容概述:
1.环境准备
2.Spark2Streaming示例开发
3.示例运行
4.总结
测试环境:
1.CM5.14.3/CDH5.14.2
2.CDK2.2.0(Apache Kafka0.10.2)
3.SPARK2.2.0
4.操作系统版本为Redhat7.3
5.采用root用户进行操作
6.集群已启用Kerberos
2.环境准备
1.准备访问Kafka的Keytab文件,使用xst命令导出keytab文件
[root@cdh01 ~]# kadmin.local kadmin.local: xst -norandkey -k fayson.keytab fayson@FAYSON.COM
(可左右滑动)
使用klist命令检查导出的keytab文件是否正确
[root@cdh01 ~]# klist -ek fayson.keytab
(可左右滑动)
2.导出一个hive/adin@FAYSON.COM账号的keytab文件,该keytab用户向Yarn提交Spark作业使用
[root@cdh01 ~]# kadmin.local kadmin.local: xst -norandkey -k hive.keytab hive/admin@FAYSON.COM
(可左右滑动)
3.准备jaas.cof文件内容如下:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab" principal="fayson@FAYSON.COM";};Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab" principal="fayson@FAYSON.COM";};
(可左右滑动)
将fayson.keytab和jaas.conf文件拷贝至集群的所有节点统一的/data/disk1/0286-kafka-shell/conf目录下。
4.准备向Kerberos环境发送数据的脚本,关于脚本这里就不在过多的介绍前面很多文章都有介绍,具体可以参考Fayson的GitHub:
(可左右滑动)
根据需要将conf下面的配置文件修改为自己集群的环境即可,发送至Kafka的JSON数据示例如下:
{ "occupation": "生产工作、运输工作和部分体力劳动者", "address": "台东东二路16号-8-8", "city": "长治", "marriage": "1", "sex": "1", "name": "仲淑兰", "mobile_phone_num": "13607268580", "bank_name": "广州银行31", "id": "510105197906185179", "child_num": "1", "fix_phone_num": "15004170180"}
(可左右滑动)
5.登录CM进入SPARK2服务的配置项将spark_kafka_version的kafka版本修改为0.10
3.SparkStreaming示例开发
1.使用maven创建scala语言的spark2demo工程,pom.xml依赖如下
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0.cloudera2</version></dependency><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0.cloudera2</version></dependency><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0.cloudera2</version></dependency><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.0.cloudera2</version></dependency><dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version></dependency>
(可左右滑动)
2.在resources下创建0291.properties配置文件,内容如下:
kafka.brokers=cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092kafka.topics=kafka_hive_topic
(可左右滑动)
3.创建Kafka2Spark2Hive.scala文件,内容如下:
package com.cloudera.streamingimport java.io.{File, FileInputStream}import java.util.Propertiesimport org.apache.commons.lang.StringUtilsimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.sql.{SaveMode, SparkSession}import org.apache.spark.sql.types.{StringType, StructField, StructType}import org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import scala.collection.JavaConverters._import scala.util.parsing.json.JSON/** * package: com.cloudera.streaming * describe: Kerberos环境中Spark2Streaming应用实时读取Kafka数据,解析后存入Hive * 使用spark2-submit的方式提交作业 * spark2-submit --class com.cloudera.streaming.Kafka2Spark2Hive \ --master yarn \ --deploy-mode client \ --executor-memory 2g \ --executor-cores 2 \ --driver-memory 2g \ --num-executors 2 \ --queue default \ --principal hive/admin@FAYSON.COM \ --keytab /data/disk1/spark2streaming-kafka-hive/conf/hive.keytab \ --files "/data/disk1/spark2streaming-kafka-hive/conf/jaas.conf#jaas.conf" \ --driver-java-options "-Djava.security.auth.login.config=/data/disk1/spark2streaming-kafka-hive/conf/jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/data/disk1/spark2streaming-kafka-hive/conf/jaas.conf" \ spark2-demo-1.0-SNAPSHOT.jar * creat_user: Fayson * email: htechinfo@163.com * creat_date: 2018/7/15 * creat_time: 下午4:01 * 公众号:Hadoop实操 */object Kafka2Spark2Hive { Logger.getLogger("com").setLevel(Level.ERROR) //设置日志级别 var confPath: String = System.getProperty("user.dir") + File.separator + "conf/0291.properties" /** * 建表Schema定义 */ val userInfoSchema = StructType( // col name type nullable? StructField("id", StringType , false) :: StructField("name" , StringType, true ) :: StructField("sex" , StringType, true ) :: StructField("city" , StringType, true ) :: StructField("occupation" , StringType, true ) :: StructField("tel" , StringType, true ) :: StructField("fixPhoneNum" , StringType, true ) :: StructField("bankName" , StringType, true ) :: StructField("address" , StringType, true ) :: StructField("marriage" , StringType, true ) :: StructField("childNum", StringType , true ) :: Nil ) /** * 定义一个UserInfo对象 */ case class UserInfo ( id: String, name: String, sex: String, city: String, occupation: String, tel: String, fixPhoneNum: String, bankName: String, address: String, marriage: String, childNum: String ) def main(args: Array[String]): Unit = { //加载配置文件 val properties = new Properties() val file = new File(confPath) if(!file.exists()) { System.out.println(Kafka2Spark2Hive.getClass.getClassLoader.getResource("0291.properties")) val in = Kafka2Spark2Hive.getClass.getClassLoader.getResourceAsStream("0291.properties") properties.load(in); } else { properties.load(new FileInputStream(confPath)) } val brokers = properties.getProperty("kafka.brokers") val topics = properties.getProperty("kafka.topics") println("kafka.brokers:" + brokers) println("kafka.topics:" + topics) if(StringUtils.isEmpty(brokers)|| StringUtils.isEmpty(topics)) { println("未配置Kafka信息...") System.exit(0) } val topicsSet = topics.split(",").toSet val spark = SparkSession.builder().appName("Kafka2Spark2Hive-kerberos").config(new SparkConf()).getOrCreate() val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) //设置Spark时间窗口,每5s处理一次 val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers , "auto.offset.reset" -> "latest" , "security.protocol" -> "SASL_PLAINTEXT" , "sasl.kerberos.service.name" -> "kafka" , "key.deserializer" -> classOf[StringDeserializer] , "value.deserializer" -> classOf[StringDeserializer] , "group.id" -> "testgroup" ) val dStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) //引入隐式 import spark.implicits._ dStream.foreachRDD(rdd => { //将rdd数据重新封装为Rdd[UserInfo] val newrdd = rdd.map(line => { val jsonObj = JSON.parseFull(line.value()) val map:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String, Any]] new UserInfo( map.get("id").get.asInstanceOf[String], map.get("name").get.asInstanceOf[String], map.get("sex").get.asInstanceOf[String], map.get("city").get.asInstanceOf[String], map.get("occupation").get.asInstanceOf[String], map.get("mobile_phone_num").get.asInstanceOf[String], map.get("fix_phone_num").get.asInstanceOf[String], map.get("bank_name").get.asInstanceOf[String], map.get("address").get.asInstanceOf[String], map.get("marriage").get.asInstanceOf[String], map.get("child_num").get.asInstanceOf[String] ) }) //将RDD转换为DataFrame val userinfoDF = spark.sqlContext.createDataFrame(newrdd) userinfoDF.write.mode(SaveMode.Append).saveAsTable("ods_user") }) ssc.start() ssc.awaitTermination() }}
(可左右滑动)
4.使用mvn命令编译工程,注意由于是scala工程编译时mvn命令要加scala:compile
mvn clean scala:compile package
(可左右滑动)
5.将编译好的spark2-demo-1.0-SNAPSHOT.jar包上传至服务
在conf目录下新增0291.properties配置文件,内容如下:
4.示例运行
1.使用spark2-submit命令向集群提交SparkStreaming作业
spark2-submit --class com.cloudera.streaming.Kafka2Spark2Hive \ --master yarn \ --deploy-mode client \ --executor-memory 2g \ --executor-cores 2 \ --driver-memory 2g \ --num-executors 2 \ --queue default \ --principal hive/admin@FAYSON.COM \ --keytab /data/disk1/spark2streaming-kafka-hive/conf/hive.keytab \ --files "/data/disk1/spark2streaming-kafka-hive/conf/jaas.conf#jaas.conf" \ --driver-java-options "-Djava.security.auth.login.config=/data/disk1/spark2streaming-kafka-hive/conf/jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/data/disk1/spark2streaming-kafka-hive/conf/jaas.conf" \ spark2-demo-1.0-SNAPSHOT.jar
(可左右滑动)
通过CM查看作业是否提交成功
Spark2的UI界面
2.运行脚本向Kafka的Kafka_kudu_topic生产消息
3.登录Hue在Hive中执行Select查询user_info表中数据
5.总结
1.在前面的文章Fayson也有介绍Java访问Kerberos环境的Kafka,需要使用到jaas.conf文件,这里的jaas.conf文件Fayson通过spark2-submit的方式指定,注意我们的jaas.conf文件及keytab需要在集群的所有节点存在,因为Driver和Executor是随机在集群的节点上启动的。
2.同样在scala代码中访问Kafka是也一样需要添加Kerberos相关的配置security.protocol和sasl.kerberos.service.name参数。
3.Spark2默认的kafka版本为0.9需要通过CM将默认的Kafka版本修改为0.10
4.在文章中将接收到的Kafka数据转换成DataFrame,调用DataFrame的saveAsTable将数据保存到Hive的表中,如果Hive表不存在会默认的创建。
GitHub地址如下:
提示:代码块部分可以左右滑动查看噢
为天地立心,为生民立命,为往圣继绝学,为万世开太平。
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。
推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。
原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操
标签: #hive 导出文件