前言:
当前小伙伴们对“apache的session覆盖”可能比较注重,小伙伴们都需要了解一些“apache的session覆盖”的相关内容。那么小编也在网摘上收集了一些关于“apache的session覆盖””的相关资讯,希望咱们能喜欢,朋友们快快来了解一下吧!上一篇文章Spark源码阅读:SparkSession类之spark对象的使用中,我们从读取本地文件写入hive表的例子开始,介绍了 SparkSession 对象的使用方法。spark-shell 里的 spark 对象类型是 class SparkSession,代码在 org/apache/spark/sql/SparkSession.scala 文件里可以找到。class SparkSession 有20个成员变量和10+个成员函数,构造spark对象时需要从环境变量、命令行、配置文件等多个位置读取配置。对于 spark-shell/pyspark(shell)/spark-submit 不同的使用方式,构造 spark 对象走的是同一套流程,下面我们将从一个实际的应用开始,看 SparkSession 对象是如何构造的。
注意:当我们提到实例和对象时,通常指一个事物,比如对象是一个类实例化的结果。而Scala 里对单例模式的语法支持,比如 object SparkSession,我们会直接用object,而不是“对象”这个词。
一个Spark Application
在生产环境中,我们通常会把复杂的业务逻辑封装在一个 jar 包 或者 python 包里,通过 spark-submit 的方式提交到远端的 yarn 集群上。
下面这个例子演示了如何提交一个 Spark Structured Streaming App:
$SPARK_HOME/bin/spark-submit \--class com.demo.Main \--master yarn \--name SparkStructStreamingApp \--queue root.default_streaming \--conf spark.executor.cores=4 \--conf spark.executor.memory=32g \--conf spark.driver.cores=4 \--conf spark.driver.memory=20g \--driver-java-options "-XX:+UseG1GC" \spark-struct-streaming-app.jar \kafka_source=<cluster>.<source topic> \kafka_sink=<cluster>.<sink topic> \checkpointLocation=hdfs:///checkpoint_dir
这个命令里包含了四个主要的部分:
spark-submit 命令spark应用启动参数:--class, --master, --conf, --driver-java-options 等--class 所在的 jar 包:spark-struct-streaming-app.jar 包含了业务逻辑额外的自定义配置:可以在 com.demo.Main 里对 args: Array[String] 进行解析
这个Spark Application模拟了常规的流式报表程序,它从一个Kafka topic读取数据,做一系列transform/aggregate,写入另一个 Kafka topic。下面是Main类的结构:
// 初始化SparkSession对象val spark = SparkSession .builder .getOrCreate()import spark.implicits._// 业务代码,仅供参考,无需过于关注细节// 读kafkaval df = spark.readStream .format("kafka") .options(someKafkaSourceConfigMap) .load()// 业务逻辑val out_df = df.select(...) .groupBy(...) .agg(...) // 写入另一个 kafkaout_df.writeStream .format("kafka") .trigger(Trigger.ProcessingTime(5 * 60 * 1000)) .options(someKafkaSinkConfigMap) .start() .awaitTermination()
Spark 的 DataFrame 对于流式数据和离线数据提供了统一的接口,初始化SparkSession的逻辑完全一样,对于DataFrame也可以做 select/groupBy/agg 等操作。
三年前第一次写 Spark 应用时,启动配置和代码都是从各个地方抄过来的,改吧改吧就跑起来了。但我们不妨延伸一下,SparkSession在初始化时是如何处理命令行参数的?spark-submit 参数的全集是什么?下面我们带着这两个问题一层一层往下看。
SparkSession 如何读取参数
我们从 SparkSession.builder.getOrCreate() 这行代码开始看。在 SparkSession.scala 文件中,三个类的结构如下:
class SparkSession 是 object SparkSession 的伴生类,它可以被实例化,spark-shell 里的 spark 对象 就是它的一个实例;由于 SparkSession 里有很多状态变量,实例化非常容易出差错,所以它的实例化必须由 object SparkSession 委托给 Builder 类完成。
为了避免误用,class SparkSession 被设置成了 private,它的构造函数只能由伴生对象 object SparkSession 和 它自己调用。
class SparkSession private( @transient val sparkContext: SparkContext, @transient private val existingSharedState: Option[SharedState], @transient private val parentSessionState: Option[SessionState], @transient private[sql] val extensions: SparkSessionExtensions, @transient private[sql] val initialSessionOptions: Map[String, String]) extends Serializable with Closeable with Logging { self =>}
对于 object SparkSession 的内部类 Builder,它主要包含两个部分:
参数存储和更新:options 用于存储参数,config 用于更新参数,另外还包装了一些快捷函数比如 appName和master构造SparkSession实例:getOrCreate 先检查是否有 thread-local 或 global 的 SparkSession,如果没有才会去创建一个新的 SparkSession 实例。
object SparkSession extends Logging { /** * Builder for [[SparkSession]]. */ @Stable class Builder extends Logging { private[this] val options = new scala.collection.mutable.HashMap[String, String] // 快捷函数 def appName(name: String): Builder = config("spark.app.name", name) def master(master: String): Builder = config("spark.master", master) // config 系列函数 def config(conf: SparkConf): Builder = ... def config(key: String, value: String): Builder = ... def config(key: String, ... def getOrCreate(): SparkSession = synchronized { val sparkConf = new SparkConf() options.foreach { case (k, v) => sparkConf.set(k, v) } if (!sparkConf.get(EXECUTOR_ALLOW_SPARK_CONTEXT)) { assertOnDriver() } // Get the session from current thread's active session. var session = activeThreadSession.get() if ((session ne null) && !session.sparkContext.isStopped) { applyModifiableSettings(session) return session } // Global synchronization so we will only set the default session once. SparkSession.synchronized { // If the current thread does not have an active session, get it from the global session. session = defaultSession.get() if ((session ne null) && !session.sparkContext.isStopped) { applyModifiableSettings(session) return session } // No active nor global default session. Create a new one. val sparkContext = userSuppliedContext.getOrElse { // set a random app name if not given. if (!sparkConf.contains("spark.app.name")) { sparkConf.setAppName(java.util.UUID.randomUUID().toString) } SparkContext.getOrCreate(sparkConf) // Do not update `SparkConf` for existing `SparkContext`, as it's shared by all sessions. } loadExtensions(extensions) applyExtensions( sparkContext.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty), extensions) session = new SparkSession(sparkContext, None, None, extensions, options.toMap) setDefaultSession(session) setActiveSession(session) registerContextListener(sparkContext) } return session } }}
有两行读取配置的代码,逻辑如下:
// 读取默认配置(命令行提交的参数、default.conf 里的配置)val sparkConf = new SparkConf()// 代码里通过 builder 显式指定的参数options.foreach { case (k, v) => sparkConf.set(k, v) }
可以看到通过 builder 指定的参数会覆盖默认参数。我们看 SparkConf 如何读取参数:
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable { import SparkConf._ /** Create a SparkConf that loads defaults from system properties and the classpath */ def this() = this(true) private val settings = new ConcurrentHashMap[String, String]() @transient private lazy val reader: ConfigReader = { val _reader = new ConfigReader(new SparkConfigProvider(settings)) _reader.bindEnv((key: String) => Option(getenv(key))) _reader } if (loadDefaults) { // silent=false,打印参数过期警告 loadFromSystemProperties(false) } private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = { // Load any spark.* system properties for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) { set(key, value, silent) } this } // 用于更新本对象的方法 def setMaster(master: String): SparkConf = ... def set(key: String, value: String): SparkConf = ... }
我们发现 SparkConf 也有很多个 set 函数,用于更新状态字段 settings。创建 SparkConf 实例以后,会调用 loadFromSystemProperties 方法从 SystemProperties 读取spark相关的配置,并更新 settings。
在使用时,我们不需要显式调用这些 set 方法,而是将参数传给 spark-submit 命令或者修改配置文件 default.conf,这样不需要重复编译业务代码,非常方便。那么问题来了,SparkSubmit 类是如何处理命令行参数和配置文件参数呢?
SparkSubmit 如何解析参数
之前简单提到过 SparkSubmit.scala 里 doSubmit 的逻辑,大致逻辑是:object SparkSubmit 的 main 函数是真正的程序入口。 首先我们创建一个 private class SparkSubmit 的实例 submit,然后调用 submit.doSubmit 方法,最终触发 runMain 函数。代码结构如下:
object SparkSubmit extends CommandLineUtils with Logging { override def main(args: Array[String]): Unit = { val submit = new SparkSubmit() { // override logging // override doSubmit 方法 override def doSubmit(args: Array[String]): Unit = { // 增加异常处理 super.doSubmit(args) } } submit.doSubmit(args) }}/** * Main gateway of launching a Spark application. * * This program handles setting up the classpath with relevant Spark dependencies and provides * a layer over the different cluster managers and deploy modes that Spark supports. */private[spark] class SparkSubmit extends Logging { // 从命令行解析参数 protected def parseArguments(args: Array[String]): SparkSubmitArguments = { new SparkSubmitArguments(args) } def doSubmit(args: Array[String]): Unit = { // ... 一些初始化日志的逻辑 // 从命令行解析参数 val appArgs = parseArguments(args) appArgs.action match { case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) case SparkSubmitAction.PRINT_VERSION => printVersion() } } private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { def doRunMain(): Unit = { // ... 省略部分逻辑 runMain(args, uninitLog) } doRunMain() } private def runMain(...): Unit = { // 从 --class 解析出 childMainClass String val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) // 通过 reflection 加载执行类 mainClass = Utils.classForName(childMainClass) // 创建 SparkApplication val app: SparkApplication = new JavaMainApplication(mainClass) // 启动 SparkApplication app.start(childArgs.toArray, sparkConf) }}
runMain 函数可以拆分成四步:
准备提交环境通过反射获取包含业务逻辑的main 类,比如上面的单例类 com.demo.Main;将 main 类封装到一个 SparkApplication 里;触发 main 类里 main 方法的执行。
我们主要看第一步“准备提交环境”。prepareSubmitEnvironment 的参数 args 是 SparkSubmitArguments 的实例,然后解析出来四个变量:
子进程的运行参数:是一个字符串数组,对于上面的例子是 ["kafka_source=<cluster>.<source_topic>", "kafka_sink=<cluster>.<sink_topic>", "checkpointLocation=hdfs:///checkpoint_dir"]子进程需要的一组 classpath 路径SparkConf 对象:用于存储 system properties,但该对象并不直接用于初始化SparkSession子进程的 main class
在 prepareSubmitEnvironment 之前,参数已经被解析出来,对应的代码是:
// input 类型: Arrayval appArgs = parseArguments(args)// parseArguments 的实现override protected def parseArguments(args: Array[String]): SparkSubmitArguments = { new SparkSubmitArguments(args) { override protected def logInfo(msg: => String): Unit = self.logInfo(msg) override protected def logWarning(msg: => String): Unit = self.logWarning(msg) override protected def logError(msg: => String): Unit = self.logError(msg) }}
SparkSubmitArguments 存储了很多变量,比如下面图中这些:
这个类在实例化过程中,依次解析命令行参数、解析spark-defaults.conf、解析环境变量,最后校验这些参数的合法性,下面是对应的代码:
// Set parameters from command line arguments parse(args.asJava) // Populate `sparkProperties` map from properties file mergeDefaultSparkProperties() // Remove keys that don't start with "spark." from `sparkProperties`. ignoreNonSparkProperties() // Use `sparkProperties` map along with env vars to fill in any missing parameters loadEnvironmentArguments() useRest = sparkProperties.getOrElse("spark.master.rest.enabled", "false").toBoolean validateArguments()
函数 parse 是 SparkSubmitArguments 从父类 SparkSubmitOptionParser 继承而来,父类中包含了所有支持的命令行参数:
这部分代码可以作为我们写Spark Application时的文档使用。
到这里,我们看到了 SparkSession.builder 如何从 SystemProperties 读取配置,以及 SparkSubmit 如何解析配置,但仍然缺少比较关键的一步:SparkSubmit 在哪里将配置写入SystemProperties。我们再次回到 SparkSubmit 类的 runMain 成员函数,它的最后一行代码是启动SparkApplication 子进程:
app.start(childArgs.toArray, sparkConf)
这里 app 对象的类型是 JavaMainApplication,是 SparkApplication 的一个实现:
private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication { override def start(args: Array[String], conf: SparkConf): Unit = { val mainMethod = klass.getMethod("main", new Array[String](0).getClass) if (!Modifier.isStatic(mainMethod.getModifiers)) { throw new IllegalStateException("The main method in the given main class must be static") } // 将 spark conf 写入 SystemProperties val sysProps = conf.getAll.toMap sysProps.foreach { case (k, v) => sys.props(k) = v } mainMethod.invoke(null, args) }}
在 JavaMainApplication 的成员函数 start 里,在执行子进程类的main方法之前,先更新 SystemProperties,保证其能读取到。
标签: #apache的session覆盖