龙空技术网

Spark源码阅读:SparkSession对象spark的构造

赵帅虎 120

前言:

当前小伙伴们对“apache的session覆盖”可能比较注重,小伙伴们都需要了解一些“apache的session覆盖”的相关内容。那么小编也在网摘上收集了一些关于“apache的session覆盖””的相关资讯,希望咱们能喜欢,朋友们快快来了解一下吧!

上一篇文章Spark源码阅读:SparkSession类之spark对象的使用中,我们从读取本地文件写入hive表的例子开始,介绍了 SparkSession 对象的使用方法。spark-shell 里的 spark 对象类型是 class SparkSession,代码在 org/apache/spark/sql/SparkSession.scala 文件里可以找到。class SparkSession 有20个成员变量和10+个成员函数,构造spark对象时需要从环境变量、命令行、配置文件等多个位置读取配置。对于 spark-shell/pyspark(shell)/spark-submit 不同的使用方式,构造 spark 对象走的是同一套流程,下面我们将从一个实际的应用开始,看 SparkSession 对象是如何构造的。

注意:当我们提到实例和对象时,通常指一个事物,比如对象是一个类实例化的结果。而Scala 里对单例模式的语法支持,比如 object SparkSession,我们会直接用object,而不是“对象”这个词。

一个Spark Application

在生产环境中,我们通常会把复杂的业务逻辑封装在一个 jar 包 或者 python 包里,通过 spark-submit 的方式提交到远端的 yarn 集群上。

下面这个例子演示了如何提交一个 Spark Structured Streaming App:

$SPARK_HOME/bin/spark-submit \--class com.demo.Main \--master yarn \--name SparkStructStreamingApp \--queue root.default_streaming \--conf spark.executor.cores=4 \--conf spark.executor.memory=32g \--conf spark.driver.cores=4 \--conf spark.driver.memory=20g \--driver-java-options "-XX:+UseG1GC" \spark-struct-streaming-app.jar \kafka_source=<cluster>.<source topic> \kafka_sink=<cluster>.<sink topic> \checkpointLocation=hdfs:///checkpoint_dir

这个命令里包含了四个主要的部分:

spark-submit 命令spark应用启动参数:--class, --master, --conf, --driver-java-options 等--class 所在的 jar 包:spark-struct-streaming-app.jar 包含了业务逻辑额外的自定义配置:可以在 com.demo.Main 里对 args: Array[String] 进行解析

这个Spark Application模拟了常规的流式报表程序,它从一个Kafka topic读取数据,做一系列transform/aggregate,写入另一个 Kafka topic。下面是Main类的结构:

// 初始化SparkSession对象val spark = SparkSession  .builder  .getOrCreate()import spark.implicits._// 业务代码,仅供参考,无需过于关注细节// 读kafkaval df = spark.readStream  .format("kafka")  .options(someKafkaSourceConfigMap)  .load()// 业务逻辑val out_df = df.select(...)	.groupBy(...)  .agg(...)  // 写入另一个 kafkaout_df.writeStream  .format("kafka")  .trigger(Trigger.ProcessingTime(5 * 60 * 1000))  .options(someKafkaSinkConfigMap)  .start()  .awaitTermination()

Spark 的 DataFrame 对于流式数据和离线数据提供了统一的接口,初始化SparkSession的逻辑完全一样,对于DataFrame也可以做 select/groupBy/agg 等操作。

三年前第一次写 Spark 应用时,启动配置和代码都是从各个地方抄过来的,改吧改吧就跑起来了。但我们不妨延伸一下,SparkSession在初始化时是如何处理命令行参数的?spark-submit 参数的全集是什么?下面我们带着这两个问题一层一层往下看。

SparkSession 如何读取参数

我们从 SparkSession.builder.getOrCreate() 这行代码开始看。在 SparkSession.scala 文件中,三个类的结构如下:

SparkSession.scala

class SparkSession 是 object SparkSession 的伴生类,它可以被实例化,spark-shell 里的 spark 对象 就是它的一个实例;由于 SparkSession 里有很多状态变量,实例化非常容易出差错,所以它的实例化必须由 object SparkSession 委托给 Builder 类完成。

为了避免误用,class SparkSession 被设置成了 private,它的构造函数只能由伴生对象 object SparkSession 和 它自己调用。

class SparkSession private(    @transient val sparkContext: SparkContext,    @transient private val existingSharedState: Option[SharedState],    @transient private val parentSessionState: Option[SessionState],    @transient private[sql] val extensions: SparkSessionExtensions,    @transient private[sql] val initialSessionOptions: Map[String, String])  extends Serializable with Closeable with Logging { self =>}

对于 object SparkSession 的内部类 Builder,它主要包含两个部分:

参数存储和更新:options 用于存储参数,config 用于更新参数,另外还包装了一些快捷函数比如 appName和master构造SparkSession实例:getOrCreate 先检查是否有 thread-local 或 global 的 SparkSession,如果没有才会去创建一个新的 SparkSession 实例。

object SparkSession extends Logging {  /**   * Builder for [[SparkSession]].   */  @Stable  class Builder extends Logging {    private[this] val options = new scala.collection.mutable.HashMap[String, String]    // 快捷函数    def appName(name: String): Builder = config("spark.app.name", name)    def master(master: String): Builder = config("spark.master", master)      // config 系列函数    def config(conf: SparkConf): Builder = ...    def config(key: String, value: String): Builder = ...    def config(key: String, ...      def getOrCreate(): SparkSession = synchronized {      val sparkConf = new SparkConf()      options.foreach { case (k, v) => sparkConf.set(k, v) }      if (!sparkConf.get(EXECUTOR_ALLOW_SPARK_CONTEXT)) {        assertOnDriver()      }      // Get the session from current thread's active session.      var session = activeThreadSession.get()      if ((session ne null) && !session.sparkContext.isStopped) {        applyModifiableSettings(session)        return session      }      // Global synchronization so we will only set the default session once.      SparkSession.synchronized {        // If the current thread does not have an active session, get it from the global session.        session = defaultSession.get()        if ((session ne null) && !session.sparkContext.isStopped) {          applyModifiableSettings(session)          return session        }        // No active nor global default session. Create a new one.        val sparkContext = userSuppliedContext.getOrElse {          // set a random app name if not given.          if (!sparkConf.contains("spark.app.name")) {            sparkConf.setAppName(java.util.UUID.randomUUID().toString)          }          SparkContext.getOrCreate(sparkConf)          // Do not update `SparkConf` for existing `SparkContext`, as it's shared by all sessions.        }        loadExtensions(extensions)        applyExtensions(          sparkContext.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty),          extensions)        session = new SparkSession(sparkContext, None, None, extensions, options.toMap)        setDefaultSession(session)        setActiveSession(session)        registerContextListener(sparkContext)      }      return session    }  }}

有两行读取配置的代码,逻辑如下:

// 读取默认配置(命令行提交的参数、default.conf 里的配置)val sparkConf = new SparkConf()// 代码里通过 builder 显式指定的参数options.foreach { case (k, v) => sparkConf.set(k, v) }

可以看到通过 builder 指定的参数会覆盖默认参数。我们看 SparkConf 如何读取参数:

class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {  import SparkConf._  /** Create a SparkConf that loads defaults from system properties and the classpath */  def this() = this(true)  private val settings = new ConcurrentHashMap[String, String]()  @transient private lazy val reader: ConfigReader = {    val _reader = new ConfigReader(new SparkConfigProvider(settings))    _reader.bindEnv((key: String) => Option(getenv(key)))    _reader  }  if (loadDefaults) {    // silent=false,打印参数过期警告    loadFromSystemProperties(false)   }  private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {    // Load any spark.* system properties    for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {      set(key, value, silent)    }    this  }    // 用于更新本对象的方法  def setMaster(master: String): SparkConf = ...  def set(key: String, value: String): SparkConf = ...  }

我们发现 SparkConf 也有很多个 set 函数,用于更新状态字段 settings。创建 SparkConf 实例以后,会调用 loadFromSystemProperties 方法从 SystemProperties 读取spark相关的配置,并更新 settings。

在使用时,我们不需要显式调用这些 set 方法,而是将参数传给 spark-submit 命令或者修改配置文件 default.conf,这样不需要重复编译业务代码,非常方便。那么问题来了,SparkSubmit 类是如何处理命令行参数和配置文件参数呢?

SparkSubmit 如何解析参数

之前简单提到过 SparkSubmit.scala 里 doSubmit 的逻辑,大致逻辑是:object SparkSubmit 的 main 函数是真正的程序入口。 首先我们创建一个 private class SparkSubmit 的实例 submit,然后调用 submit.doSubmit 方法,最终触发 runMain 函数。代码结构如下:

object SparkSubmit extends CommandLineUtils with Logging {  override def main(args: Array[String]): Unit = {    val submit = new SparkSubmit() {      // override logging      // override doSubmit 方法      override def doSubmit(args: Array[String]): Unit = {      	// 增加异常处理        super.doSubmit(args)      }    }    submit.doSubmit(args)  }}/** * Main gateway of launching a Spark application. * * This program handles setting up the classpath with relevant Spark dependencies and provides * a layer over the different cluster managers and deploy modes that Spark supports. */private[spark] class SparkSubmit extends Logging {  // 从命令行解析参数  protected def parseArguments(args: Array[String]): SparkSubmitArguments = {    new SparkSubmitArguments(args)  }    def doSubmit(args: Array[String]): Unit = {    // ... 一些初始化日志的逻辑    // 从命令行解析参数    val appArgs = parseArguments(args)    appArgs.action match {      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)      case SparkSubmitAction.KILL => kill(appArgs)      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)      case SparkSubmitAction.PRINT_VERSION => printVersion()    }  }    private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {    def doRunMain(): Unit = {      // ... 省略部分逻辑      runMain(args, uninitLog)    }    doRunMain()  }    private def runMain(...): Unit = {    // 从 --class 解析出 childMainClass String    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)    // 通过 reflection 加载执行类    mainClass = Utils.classForName(childMainClass)    // 创建 SparkApplication    val app: SparkApplication = new JavaMainApplication(mainClass)    // 启动 SparkApplication    app.start(childArgs.toArray, sparkConf)  }}

runMain 函数可以拆分成四步:

准备提交环境通过反射获取包含业务逻辑的main 类,比如上面的单例类 com.demo.Main;将 main 类封装到一个 SparkApplication 里;触发 main 类里 main 方法的执行。

我们主要看第一步“准备提交环境”。prepareSubmitEnvironment 的参数 args 是 SparkSubmitArguments 的实例,然后解析出来四个变量:

子进程的运行参数:是一个字符串数组,对于上面的例子是 ["kafka_source=<cluster>.<source_topic>", "kafka_sink=<cluster>.<sink_topic>", "checkpointLocation=hdfs:///checkpoint_dir"]子进程需要的一组 classpath 路径SparkConf 对象:用于存储 system properties,但该对象并不直接用于初始化SparkSession子进程的 main class

在 prepareSubmitEnvironment 之前,参数已经被解析出来,对应的代码是:

// input 类型: Arrayval appArgs = parseArguments(args)// parseArguments 的实现override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {  new SparkSubmitArguments(args) {    override protected def logInfo(msg: => String): Unit = self.logInfo(msg)    override protected def logWarning(msg: => String): Unit = self.logWarning(msg)    override protected def logError(msg: => String): Unit = self.logError(msg)  }}

SparkSubmitArguments 存储了很多变量,比如下面图中这些:

SparkSubmitArguments.scala

这个类在实例化过程中,依次解析命令行参数、解析spark-defaults.conf、解析环境变量,最后校验这些参数的合法性,下面是对应的代码:

  // Set parameters from command line arguments  parse(args.asJava)  // Populate `sparkProperties` map from properties file  mergeDefaultSparkProperties()  // Remove keys that don't start with "spark." from `sparkProperties`.  ignoreNonSparkProperties()  // Use `sparkProperties` map along with env vars to fill in any missing parameters  loadEnvironmentArguments()  useRest = sparkProperties.getOrElse("spark.master.rest.enabled", "false").toBoolean  validateArguments()

函数 parse 是 SparkSubmitArguments 从父类 SparkSubmitOptionParser 继承而来,父类中包含了所有支持的命令行参数:

SparkSubmitOptionParser

这部分代码可以作为我们写Spark Application时的文档使用。

到这里,我们看到了 SparkSession.builder 如何从 SystemProperties 读取配置,以及 SparkSubmit 如何解析配置,但仍然缺少比较关键的一步:SparkSubmit 在哪里将配置写入SystemProperties。我们再次回到 SparkSubmit 类的 runMain 成员函数,它的最后一行代码是启动SparkApplication 子进程:

app.start(childArgs.toArray, sparkConf)

这里 app 对象的类型是 JavaMainApplication,是 SparkApplication 的一个实现:

private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {  override def start(args: Array[String], conf: SparkConf): Unit = {    val mainMethod = klass.getMethod("main", new Array[String](0).getClass)    if (!Modifier.isStatic(mainMethod.getModifiers)) {      throw new IllegalStateException("The main method in the given main class must be static")    }  	// 将 spark conf 写入 SystemProperties    val sysProps = conf.getAll.toMap    sysProps.foreach { case (k, v) =>      sys.props(k) = v    }    mainMethod.invoke(null, args)  }}

在 JavaMainApplication 的成员函数 start 里,在执行子进程类的main方法之前,先更新 SystemProperties,保证其能读取到。

标签: #apache的session覆盖