龙空技术网

Spark SQL逻辑计划原理

java姜浩ovo 174

前言:

今天兄弟们对“sql 逻辑运算”都比较注意,我们都需要了解一些“sql 逻辑运算”的相关文章。那么小编同时在网摘上收集了一些对于“sql 逻辑运算””的相关资讯,希望姐妹们能喜欢,姐妹们快快来了解一下吧!

Catalyst优化器是Spark引擎中非常重要的组成部分,也是近年来Spark社区项目重点投入、并且发展十分迅速的核心模块,对于Spark任务的性能提升起到了关键的基础作用。

我们知道,在Spark1.6之前开发人员是通过Spark的RDD编程接口来实现对大规模数据的分析和处理的,到了Spark1.6版本后推出了DataSet和DataFrame的编程接口,这种数据结构与RDD的主要区别在于其携带了结构化数据的Schema信息,从而可以被Spark Catalyst用来做进一步的解析和优化;而Spark SQL则是比DataSet和DataFrame编程接口更为简单易用的大数据领域语言,其用户可以是开发工程师、数据科学家、数据分析师等,并且与其他SQL语言类似,可以通过SQL引擎将SQL预先解析成一棵AST抽象语法树;同时,AST抽象语法树、DataSet及DataFrame接下来均会被Spark Catalyst优化器转换成为Unresolved LogicalPlan、Resolved LogicalPlan,Physical Plan、以及Optimized PhysicalPlan,也就是说带有schema信息的Spark分布式数据集都可以从Spark Catalyst中受益,这也是Spark任务性能得以提升的核心所在。

值得一提的是,在物理计划树的生成过程中,首先会将数据源解析成为RDD,也即在Spark SQL的物理计划执行过程中所操作的对象实际是RDD,一条Spark SQL在生成最终的物理计划后仍然会经过前面文章中所提到的生成DAG、划分Stage、并将taskset分发到特定的executor上运行等一系列的任务调度和执行过程来实现该Spark SQL的处理逻辑。

接下来,本文将着重讲解Spark SQL逻辑计划的相关实现原理,在后续的文章中会继续解析Spark SQL的物理计划。

生成Unresolved LogicalPlan

用户可以通过spark-sql等客户端来提交sql语句,在sparksession初始化时通过BaseSessionStateBuilder的build()方法始化SparkSqlParser、Analyser以及SparkOptimizer对象等:

def build(): SessionState = {  new SessionState(    session.sharedState,    conf,    experimentalMethods,    functionRegistry,    udfRegistration,    () => catalog,    sqlParser,    () => analyzer,    () => optimizer,    planner,    () => streamingQueryManager,    listenerManager,    () => resourceLoader,    createQueryExecution,    createClone,    columnarRules,    queryStagePrepRules)}

当用户程序调用SparkSession的sql接口时即开始了解析sql语句并执行对数据处理的过程:

def sql(sqlText: String): DataFrame = withActive {  val tracker = new QueryPlanningTracker  val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {    sessionState.sqlParser.parsePlan(sqlText)  }  Dataset.ofRows(self, plan, tracker)}

其中通过AbstractSqlParser的parsePlan方法将sql语句转换成抽象语法树:

override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>  astBuilder.visitSingleStatement(parser.singleStatement()) match {    case plan: LogicalPlan => plan    case _ =>      val position = Origin(None, None)      throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)  }}

1、从SqlBaseParser的singleStatement()方法开始基于ANTLR4 lib库来解析sql语句中所有的词法片段,生成一棵AST抽象语法树;

2、访问AST抽象语法树并生成Unresolved 逻辑计划树:

1)访问SingleStatementContext节点:

SingleStatementContext是整个抽象语法树的根节点,因此以AstBuilder的visitSingleStatement方法为入口来遍历抽象语法树:

override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = withOrigin(ctx) {  visit(ctx.statement).asInstanceOf[LogicalPlan]}...public T visit(ParseTree tree) {   return tree.accept(this);}

2)根据访问者模式执行SingleStatementContext节点的accept方法:

@Overridepublic <T> T accept(ParseTreeVisitor<? extends T> visitor) {   if ( visitor instanceof SqlBaseVisitor ) return ((SqlBaseVisitor<? extends T>)visitor).visitSingleStatement(this);   else return visitor.visitChildren(this);}...@Override public T visitSingleStatement(SqlBaseParser.SingleStatementContext ctx) { return visitChildren(ctx); }

3)迭代遍历整棵AST Tree:

@Overridepublic T visitChildren(RuleNode node) {   T result = defaultResult();   int n = node.getChildCount();   for (int i=0; i<n; i++) {      if (!shouldVisitNextChild(node, result)) {         break;      }      ParseTree c = node.getChild(i);      T childResult = c.accept(this);      result = aggregateResult(result, childResult);   }   return result;}

根据以上代码,在遍历AST 树的过程中,会首先解析父节点的所有子节点,并执行子节点上的accept方法来进行解析,当所有子节点均解析为UnresolvedRelation或者Expression后,将这些结果进行聚合并返回到父节点,由此可见,AST树的遍历所采用的是后序遍历模式。

接下来以查询语句中的QuerySpecificationContext节点的解析为例进一步阐述以上过程:

如下为一条基本的sql语句:

select col1 from tabname where col2 > 10

QuerySpecificationContext节点下会产生用于扫描数据源的FromClauseContext、过滤条件对应的BooleanDefaultContext、以及投影时所需的NamedExpressionSeqContext节点。

1)FromClauseContext继续访问其子节点,当访问到TableINameContext节点时,访问到tableName的tocken时根据表名生成UnresolvedRelation:

override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) {  val tableId = visitMultipartIdentifier(ctx.multipartIdentifier)  val table = mayApplyAliasPlan(ctx.tableAlias, UnresolvedRelation(tableId))  table.optionalMap(ctx.sample)(withSample)}

2)BooleanDefaultContext的子节点中分为三个分支:代表Reference的ValueExpressionDefaultContext、代表数值的ValueExpressionDefaultContext、以及代表运算符的ComparisonContext;

例如遍历代表数据值ValueExpressionDefaultContext及其子节点,直到访问到IntegerLiteralContext:

override def visitIntegerLiteral(ctx: IntegerLiteralContext): Literal = withOrigin(ctx) {  BigDecimal(ctx.getText) match {    case v if v.isValidInt =>      Literal(v.intValue)    case v if v.isValidLong =>      Literal(v.longValue)    case v => Literal(v.underlying())  }}

而Literal的定义如下,是一个叶子类型的Expression节点:

case class Literal (value: Any, dataType: DataType) extends LeafExpression

3)NamedExpressionSeqContext是投影节点,迭代遍历直到RegularQuerySpecificationContext节点,然后通过访问withSelectQuerySpecification方法创建出投影所需的Project Logical Plan:

override def visitRegularQuerySpecification(    ctx: RegularQuerySpecificationContext): LogicalPlan = withOrigin(ctx) {  val from = OneRowRelation().optional(ctx.fromClause) {    visitFromClause(ctx.fromClause)  }  withSelectQuerySpecification(    ctx,    ctx.selectClause,    ctx.lateralView,    ctx.whereClause,    ctx.aggregationClause,    ctx.havingClause,    ctx.windowClause,    from  )}...def createProject() = if (namedExpressions.nonEmpty) {  Project(namedExpressions, withFilter)} else {  withFilter}

总结一下以上处理过程中所涉及的类之间的关系,如下图所示:

生成Resolved LogicalPlanSpark Analyser

在SparkSession的sql方法中,对sql语句进行过Parser解析并生成Unresolved LogicalPlan之后则通过执行Dataset.ofRows(self, plan, tracker) 继续进行catalog绑定,数据源绑定的过程如下:

def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, tracker: QueryPlanningTracker)  : DataFrame = sparkSession.withActive {  val qe = new QueryExecution(sparkSession, logicalPlan, tracker)  qe.assertAnalyzed()  new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))}...def assertAnalyzed(): Unit = analyzed

由如下实现逻辑可见, analyzed变量是通过懒加载方式初始化的,通过该变量的初始方法可见Spark的catalog实现逻辑主要通过Analyser类来实现的:

lazy val analyzed: LogicalPlan = executePhase(QueryPlanningTracker.ANALYSIS) {  // We can't clone `logical` here, which will reset the `_analyzed` flag.  sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)}

其中,executeAndCheck方法的执行是通过Analyzer的父类RuleExecutor的execute方法来实现的:

def execute(plan: TreeType): TreeType = {...  batches.foreach { batch =>    val batchStartPlan = curPlan    var iteration = 1    var lastPlan = curPlan    var continue = true    // Run until fix point (or the max number of iterations as specified in the strategy.    while (continue) {      curPlan = batch.rules.foldLeft(curPlan) {        case (plan, rule) =>          val startTime = System.nanoTime()          val result = rule(plan)          val runTime = System.nanoTime() - startTime          val effective = !result.fastEquals(plan)          if (effective) {            queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)            queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)            planChangeLogger.logRule(rule.ruleName, plan, result)          }          ...          result      }      iteration += 1      if (iteration > batch.strategy.maxIterations) {        // Only log if this is a rule that is supposed to run more than once.        if (iteration != 2) {          val endingMsg = if (batch.strategy.maxIterationsSetting == null) {            "."          } else {            s", please set '${batch.strategy.maxIterationsSetting}' to a larger value."          }          val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" +            s"$endingMsg"          if (Utils.isTesting || batch.strategy.errorOnExceed) {            throw new TreeNodeException(curPlan, message, null)          } else {            logWarning(message)          }        }        // Check idempotence for Once batches.        if (batch.strategy == Once &&          Utils.isTesting && !blacklistedOnceBatches.contains(batch.name)) {          checkBatchIdempotence(batch, curPlan)        }        continue = false      }      if (curPlan.fastEquals(lastPlan)) {        logTrace(          s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")        continue = false      }      lastPlan = curPlan    }    planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan)  }  planChangeLogger.logMetrics(RuleExecutor.getCurrentMetrics() - beforeMetrics)  curPlan}

如上代码的主要处理过程如下:

1、遍历的Analyzer类中的batches列表:

通过batches方法获取所有的catalog绑定相关的规则,在Analyzer中包括Substitution、Hints、Resolution、UDF、Subquery等几个规则组;

以较为常见的"Resolution"规则组为例,其具有非常多的规则用于解析函数、Namespace、数据表、视图、列等信息,当然用户也可以子定义相关规则:

Batch("Resolution", fixedPoint,  ResolveTableValuedFunctions ::  ResolveNamespace(catalogManager) ::  new ResolveCatalogs(catalogManager) ::  ResolveInsertInto ::  ResolveRelations ::  ResolveTables ::  ResolveReferences ::  ResolveCreateNamedStruct ::  ResolveDeserializer ::  ResolveNewInstance ::  ResolveUpCast ::  ResolveGroupingAnalytics ::  ResolvePivot ::  ResolveOrdinalInOrderByAndGroupBy ::  ResolveAggAliasInGroupBy ::  ResolveMissingReferences ::  ...

其中,Batch类的定义如下,包括Batch名称、循环执行策略、具体的规则组集合,循环执行策略Strategy又分为Once和FixedPoint两种,即仅执行一次和固定次数:

protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)

2、将每个Batch中所有的规则Rule对象实施于该Unsolved LogicalPlan,并且该Batch中规则可能要执行多轮,直到执行的批数等于batch.strategy.maxIterations或者logicalplan与上个批次的结果比没有变化,则退出执行;

其中在Spark 中的定义如下,在spark3.0中默认可最大循环100次:

protected def fixedPoint =  FixedPoint(    conf.analyzerMaxIterations,    errorOnExceed = true,    maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key)...val ANALYZER_MAX_ITERATIONS = buildConf("spark.sql.analyzer.maxIterations")    .internal()    .doc("The max number of iterations the analyzer runs.")    .version("3.0.0")    .intConf    .createWithDefault(100)

接下来以将ResolveRelations(解析数据表或者视图)规则应用于Unresolved LogicalPlan的解析过程为例,支持解析UnresolvedRelation、UnresolvedTable、UnresolvedTableOrView等多种未解析的数据源:

def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp { ...  case u: UnresolvedRelation =>    lookupRelation(u.multipartIdentifier).map(resolveViews).getOrElse(u)  case u @ UnresolvedTable(identifier) =>    lookupTableOrView(identifier).map {      case v: ResolvedView =>        u.failAnalysis(s"${v.identifier.quoted} is a view not table.")      case table => table    }.getOrElse(u)  case u @ UnresolvedTableOrView(identifier) =>    lookupTableOrView(identifier).getOrElse(u)}

当解析对象为UnresolvedRelation实例时,调用lookupRelation方法来对其进行解析,通过SessionCatalog或者扩展的CatalogPlugin来获取数据源的元数据,并生成Resolved LogicalPlan:

private def lookupRelation(identifier: Seq[String]): Option[LogicalPlan] = {  expandRelationName(identifier) match {    case SessionCatalogAndIdentifier(catalog, ident) =>      lazy val loaded = CatalogV2Util.loadTable(catalog, ident).map {        case v1Table: V1Table =>          v1SessionCatalog.getRelation(v1Table.v1Table)        case table =>          SubqueryAlias(            catalog.name +: ident.asMultipartIdentifier,            DataSourceV2Relation.create(table, Some(catalog), Some(ident)))      }    ...

最常见的是SessionCatalog,作为SparkSession级别catalog接口对象,其定义如下,包括ExternalCatalog、GlobalTempViewManager、FunctionRegistry、SQLConf、Hadoop的Configuration、Parser、FunctionResourceLoader对象;其中,ExternalCatalog有两个主要的实现类:HiveExternalCatalog和InMemoryCatalog,而HiveExternalCatalog则主要应用于企业级的业务场景中:

class SessionCatalog(    externalCatalogBuilder: () => ExternalCatalog,    globalTempViewManagerBuilder: () => GlobalTempViewManager,    functionRegistry: FunctionRegistry,    conf: SQLConf,    hadoopConf: Configuration,    parser: ParserInterface,    functionResourceLoader: FunctionResourceLoader) 

如果采用默认的SessionCatalog,当需要获取数据表时则通过ExternalCatalog实例调用其对应的接口来实现:

override def loadTable(ident: Identifier): Table = {  val catalogTable = try {    catalog.getTableMetadata(ident.asTableIdentifier)  } catch {    case _: NoSuchTableException =>      throw new NoSuchTableException(ident)  }  V1Table(catalogTable)}...def getTableMetadata(name: TableIdentifier): CatalogTable = {    val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))    val table = formatTableName(name.table)    requireDbExists(db)    requireTableExists(TableIdentifier(table, Some(db)))    externalCatalog.getTable(db, table) }

接下来如果采用ExternalCatalog接口的实现类HiveExternalCatalog的情况下,则通过HiveClientImpl类从Hive的metadata中类获取用户表的元数据相关信息:

private def getRawTableOption(dbName: String, tableName: String): Option[HiveTable] = {  Option(client.getTable(dbName, tableName, false /* do not throw exception */))}

另外,如需扩展的catalog范围可通过实现CatalogPlugin接口、并且配置“spark.sql.catalog.spark_catalog”参数来实现,例如在iceberg数据湖的实现中通过自定义其catalog来实现其个性化的逻辑:

spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog

3、返回解析后的Resolved LogicalPlan。

以上处理逻辑中所涉及的主要的类之间的关系如下所示:

接下来仍然以前面的SQL语句(select col1 from tabname where col2 > 10)为例,简要阐述如何将一个Unresolved LogicalPlan解析成为Analyzed LogicalPlan:

1、根据Analyzer的解析规则,UnResolvedRelation节点可以应用到ResolveRelations规则,通过CatalogManger获取数据源中表的信息,得到Relation的相关列的信息并加上标号,同时创建一个针对数据表的SubqueryAlias节点;

2、针对过滤条件col2>10的过滤条件,针对列UnresolvedAttribute 可以适用到ResolveReference规则,根据第1步中得到的列信息可以进行解析;数字10可以应用到ImplicitTypeCasts 规则对该数字匹配最合适的数据类型;

3、针对Project 节点,接下来在进行下一轮解析,再次匹配到ResolveReference规则对投影列进行解析,从而将整棵树解析为Resolved LogicalPlan。

生成Optimized LogicalPlan

得到Resolved LogicalPlan之后,为了使SQL语句的执行性能更优,则需要根据一些规则进一步优化逻辑计划树,生成Optimized LogicalPlan。

本文采用的是Spark 3.0的源码,生成Optimized LogicalPlan是通过懒加载的方式被调用的,并且Optimizer类与Analyzer类一样继承了 RuleExecutor类,所有基于规则(RBO)的优化实际都是通过RuleExecutor类来执行,同样也是将所有规则构建为多个批次,并且将所有批次中规则应用于Analyzed LogicalPlan,直到树不再改变或者执行优化的循环次数超过最大限制(spark.sql.optimizer.maxIterations,默认100):

lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) {  // clone the plan to avoid sharing the plan instance between different stages like analyzing,  // optimizing and planning.  val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)  // We do not want optimized plans to be re-analyzed as literals that have been constant folded  // and such can cause issues during analysis. While `clone` should maintain the `analyzed` state  // of the LogicalPlan, we set the plan as analyzed here as well out of paranoia.  plan.setAnalyzed()  plan}...def executeAndTrack(plan: TreeType, tracker: QueryPlanningTracker): TreeType = {    QueryPlanningTracker.withTracker(tracker) {      execute(plan)    }}

逻辑计划优化规则仍然又多个Batch组成,每个Batch中包含多个具体的Rule并且可以执行一次或者固定次数。其中比较常用的优化规则有:谓词下推、常量累加、列剪枝等几种。

谓词下推将尽可能使得谓词计算靠近数据源,根据不同的场景有LimitPushDown、PushProjectionThroughUnion、PushDownPredicates等多种实现, PushDownPredicates又包含PushPredicateThroughNonJoin和PushPredicateThroughJoin;

其中,PushPredicateThroughJoin可实现将谓词计算下推至join算子的下面,从而可以提升数据表之间的join计算过程中所带来的网络、内存以及IO等性能开销:

val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {  // push the where condition down into join filter  case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition, hint)) =>    val (leftFilterConditions, rightFilterConditions, commonFilterCondition) =      split(splitConjunctivePredicates(filterCondition), left, right)    joinType match {      case _: InnerLike =>        // push down the single side `where` condition into respective sides        val newLeft = leftFilterConditions.          reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)        val newRight = rightFilterConditions.          reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)        val (newJoinConditions, others) =          commonFilterCondition.partition(canEvaluateWithinJoin)        val newJoinCond = (newJoinConditions ++ joinCondition).reduceLeftOption(And)        val join = Join(newLeft, newRight, joinType, newJoinCond, hint)        if (others.nonEmpty) {          Filter(others.reduceLeft(And), join)        } else {          join        }      case RightOuter =>        // push down the right side only `where` condition

常量折叠是通过ConstantFolding规则来实现的,如果表达式中的算子是可以折叠的则在该阶段直接生成计算结果,以避免在实际的sql执行过程中产生逐行计算,从而可以降低CPU的计算开销:

object ConstantFolding extends Rule[LogicalPlan] {  def apply(plan: LogicalPlan): LogicalPlan = plan transform {    case q: LogicalPlan => q transformExpressionsDown {      // Skip redundant folding of literals. This rule is technically not necessary. Placing this      // here avoids running the next rule for Literal values, which would create a new Literal      // object and running eval unnecessarily.      case l: Literal => l      // Fold expressions that are foldable.      case e if e.foldable => Literal.create(e.eval(EmptyRow), e.dataType)    }  }}

列剪枝规则通过ColumnPruning规则来实现,去掉不需要处理的列,可避免从数据源读取较多的数据列、将不需要的列加载至内存中计算计算计算中、以及返回不需要数据(想象一下大宽表的情况),从而获得较大的性能收益:

object ColumnPruning extends Rule[LogicalPlan] def apply(plan: LogicalPlan): LogicalPlanprunedChild(c: LogicalPlan, allReferences: AttributeSet):LogicalPlan=    if (!c.outputSet.subsetOf(allReferences)) {    

Optimizer所涉及的主要类的关联关系如下图所示:

spark_optimizer

当所有优化规则完成对于Aanalyzed LogicalPlan的应用则可生成Optimized LogicalPlan。

本文重点讲解了Spark SQL解析为AST抽象语法树、生成Unresolved LogicalPlan、生成Resolved LogicalPlan以及Optimized LogicalPlan的过程,为接下来进一步生成物理计划Spark Plan做好了准备。

标签: #sql 逻辑运算