龙空技术网

0352-Spark2Streaming读非Kerberos环境的Kafka并写数据到Kudu

Hadoop实操 47

前言:

此刻同学们对“apachekudusql”大约比较看重,咱们都想要剖析一些“apachekudusql”的相关内容。那么小编在网上网罗了一些对于“apachekudusql””的相关知识,希望各位老铁们能喜欢,同学们快快来学习一下吧!

温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。

Fayson的github:

提示:代码块部分可以左右滑动查看噢

1.文档编写目的

在前面的文章Fayson介绍了在Kerberos环境下《

Spark2Streaming读Kerberos环境的Kafka并写数据到Kudu

》,本篇文章Fayson主要介绍如何使用Spark2 Streaming访问非Kerberos环境的Kafka并将接收到的数据写入Kudu。

文章概述

1.环境准备

2.Spark2Streaming示例开发

3.示例运行

4.总结

测试环境

1.CM和CDH版本为5.15

2.CDK2.2.0(Apache Kafka0.10.2)

3.Spark2.2.0

4.操作系统版本为RedHat7.4

2.环境准备

1.准备向Kakfa发送数据的脚本,关于脚本这里就不在过多的介绍前面很多文章都有介绍,具体可以参考Fayson的GitHub:

根据需要将conf下面的配置文件修改为自己集群的环境即可,发送至Kafka的JSON数据示例如下:

{ "occupation": "生产工作、运输工作和部分体力劳动者", "address": "台东东二路16号-8-8", "city": "长治", "marriage": "1", "sex": "1", "name": "仲淑兰", "mobile_phone_num": "13607268580", "bank_name": "广州银行31", "id": "510105197906185179", "child_num": "1", "fix_phone_num": "15004170180"}

(可左右滑动)

2.登录CM进入SPARK2服务的配置项将spark_kafka_version的kafka版本修改为0.10

修改完成后并部署客户端配置

3.Spark2Streaming示例代码

1.使用maven创建scala语言的spark2demo工程,pom.xml依赖如下

<dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-spark2_2.11</artifactId> <version>1.6.0-cdh5.14.2</version></dependency><dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>1.6.0-cdh5.14.2</version></dependency><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0.cloudera2</version></dependency><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0.cloudera2</version></dependency><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0.cloudera2</version></dependency><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.0.cloudera2</version></dependency><dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version></dependency>

(可左右滑动)

具体需要的依赖包,可以参考Fayson前面的文章《

Spark2Streaming读Kerberos环境的Kafka并写数据到Kudu

2.在resources下创建0294.properties配置文件,内容如下:

kafka.brokers=cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092kafka.topics=kafka_kudu_topickudumaster.list=cdh01.fayson.com,cdh02.fayson.com,cdh03.fayson.com

(可左右滑动)

3.创建Kafka2Spark2Kudu.scala类

package com.cloudera.streaming.nokerberosimport java.io.{File, FileInputStream}import java.util.Propertiesimport org.apache.commons.lang.StringUtilsimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.kudu.client.CreateTableOptionsimport org.apache.kudu.spark.kudu.KuduContextimport org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.types.{StringType, StructField, StructType}import org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}import scala.util.parsing.json.JSONimport scala.collection.JavaConverters._/** * package: com.cloudera.streaming.nokerberos * 使用spark2-submit的方式提交作业 spark2-submit --class com.fayson.streaming.nokerberos.Kafka2Spark2Kudu \ --master yarn \ --deploy-mode client \ --executor-memory 2g \ --executor-cores 2 \ --driver-memory 2g \ --num-executors 2 \ spark2-demo-1.0-SNAPSHOT.jar * creat_user: Fayson * email: htechinfo@163.com * creat_date: 2018/8/6 * creat_time: 下午5:05 * 公众号:Hadoop实操 */object Kafka2Spark2Kudu { Logger.getLogger("com").setLevel(Level.ERROR) //设置日志级别 var confPath: String = System.getProperty("user.dir") + File.separator + "conf/0294.properties" /** * 建表Schema定义 */ val userInfoSchema = StructType( // col name type nullable? StructField("id", StringType , false) :: StructField("name" , StringType, true ) :: StructField("sex" , StringType, true ) :: StructField("city" , StringType, true ) :: StructField("occupation" , StringType, true ) :: StructField("tel" , StringType, true ) :: StructField("fixPhoneNum" , StringType, true ) :: StructField("bankName" , StringType, true ) :: StructField("address" , StringType, true ) :: StructField("marriage" , StringType, true ) :: StructField("childNum", StringType , true ) :: Nil ) /** * 定义一个UserInfo对象 */ case class UserInfo ( id: String, name: String, sex: String, city: String, occupation: String, tel: String, fixPhoneNum: String, bankName: String, address: String, marriage: String, childNum: String ) def main(args: Array[String]): Unit = { //加载配置文件 val properties = new Properties() val file = new File(confPath) if(!file.exists()) { System.out.println(Kafka2Spark2Kudu.getClass.getClassLoader.getResource("0294.properties")) val in = Kafka2Spark2Kudu.getClass.getClassLoader.getResourceAsStream("0294.properties") properties.load(in); } else { properties.load(new FileInputStream(confPath)) } val brokers = properties.getProperty("kafka.brokers") val topics = properties.getProperty("kafka.topics") val kuduMaster = properties.getProperty("kudumaster.list") println("kafka.brokers:" + brokers) println("kafka.topics:" + topics) println("kudu.master:" + kuduMaster) if(StringUtils.isEmpty(brokers)|| StringUtils.isEmpty(topics) || StringUtils.isEmpty(kuduMaster)) { println("未配置Kafka和KuduMaster信息") System.exit(0) } val topicsSet = topics.split(",").toSet val spark = SparkSession.builder().appName("Kafka2Spark2Kudu-nokerberos").config(new SparkConf()).getOrCreate() val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) //设置Spark时间窗口,每5s处理一次 val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers , "auto.offset.reset" -> "latest" , "key.deserializer" -> classOf[StringDeserializer] , "value.deserializer" -> classOf[StringDeserializer] , "group.id" -> properties.getProperty("group.id") ) val dStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) //引入隐式 import spark.implicits._ val kuduContext = new KuduContext(kuduMaster, spark.sparkContext) //判断表是否存在 if(!kuduContext.tableExists("user_info")) { println("create Kudu Table :{user_info}") val createTableOptions = new CreateTableOptions() createTableOptions.addHashPartitions(List("id").asJava, 8).setNumReplicas(3) kuduContext.createTable("user_info", userInfoSchema, Seq("id"), createTableOptions) } dStream.foreachRDD(rdd => { //将rdd数据重新封装为Rdd[UserInfo] val newrdd = rdd.map(line => { val jsonObj = JSON.parseFull(line.value()) val map:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String, Any]] new UserInfo( map.get("id").get.asInstanceOf[String], map.get("name").get.asInstanceOf[String], map.get("sex").get.asInstanceOf[String], map.get("city").get.asInstanceOf[String], map.get("occupation").get.asInstanceOf[String], map.get("mobile_phone_num").get.asInstanceOf[String], map.get("fix_phone_num").get.asInstanceOf[String], map.get("bank_name").get.asInstanceOf[String], map.get("address").get.asInstanceOf[String], map.get("marriage").get.asInstanceOf[String], map.get("child_num").get.asInstanceOf[String] ) }) //将RDD转换为DataFrame val userinfoDF = spark.sqlContext.createDataFrame(newrdd) kuduContext.upsertRows(userinfoDF, "user_info") }) ssc.start() ssc.awaitTermination() }}

(可左右滑动)

4.使用mvn命令编译工程,注意由于是scala工程编译时mvn命令要加scala:compile

mvn clean scala:compile package

5.将编译好的spark2-demo-1.0-SNAPSHOT.jar包及配置文件上传至服务器

0294.properties配置文件内容如下:

4.示例运行

1.使用spark2-submit命令向集群提交Spark2Streaming作业

spark2-submit --class com.cloudera.streaming.nokerberos.Kafka2Spark2Kudu \ --master yarn \ --deploy-mode client \ --executor-memory 2g \ --executor-cores 2 \ --driver-memory 2g \ --num-executors 2 \ spark2-demo-1.0-SNAPSHOT.jar

(可左右滑动)

通过CM查看作业是否提交成功

Spark2的UI界面

2.查看Kudu Master的UI界面,Tables列表可以看到user_info表已被创建

找到Kudu向Impala的建表语句

CREATE EXTERNAL TABLE `user_info` STORED AS KUDUTBLPROPERTIES( 'kudu.table_name' = 'user_info', 'kudu.master_addresses' = 'cdh01.fayson.com:7051,cdh02.fayson.com:7051,cdh03.fayson.com:7051')

(可左右滑动)

3.运行脚本向Kafka的kafka_kudu_topic生产消息

4.通过Hue查看数据是否已插入Kudu表

5.总结

1.本示例中Spark2Streaming读取非Kerberos环境的Kafka集群,使用的是spark-streaming-kafka0.10.0版本的依赖包,在Spark中提供两个的另外一个版本的为0.8.0版本,在选择依赖包时需要注意与Spark版本的兼容性问题,具体可以参考官网地址:

2.检查/opt/cloudera/parcels/SPARK2/lib/spark2/jars目录下是否有其它版本的spark-streaming-kafka的依赖包,如果存在需要删除,否则会出现版本冲突问题。

3.Spark2默认的kafka版本为0.9需要通过CM将默认的Kafka版本修改为0.10

GitHub地址如下:

相关阅读:

SparkStreaming读Kafka数据写HBase

SparkStreaming读Kafka数据写Kudu

Spark2Streaming读非Kerberos环境的Kafka并写数据到HBase

Spark2Streaming读Kerberos环境的Kafka并写数据到HBase

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

Spark2Streaming读Kerberos环境的Kafka并写数据到Hive

Spark2Streaming读Kerberos环境的Kafka并写数据到Kudu

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。

温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

标签: #apachekudusql