前言:
今天兄弟们对“sql 逻辑运算”都比较注意,我们都需要了解一些“sql 逻辑运算”的相关文章。那么小编同时在网摘上收集了一些对于“sql 逻辑运算””的相关资讯,希望姐妹们能喜欢,姐妹们快快来了解一下吧!Catalyst优化器是Spark引擎中非常重要的组成部分,也是近年来Spark社区项目重点投入、并且发展十分迅速的核心模块,对于Spark任务的性能提升起到了关键的基础作用。
我们知道,在Spark1.6之前开发人员是通过Spark的RDD编程接口来实现对大规模数据的分析和处理的,到了Spark1.6版本后推出了DataSet和DataFrame的编程接口,这种数据结构与RDD的主要区别在于其携带了结构化数据的Schema信息,从而可以被Spark Catalyst用来做进一步的解析和优化;而Spark SQL则是比DataSet和DataFrame编程接口更为简单易用的大数据领域语言,其用户可以是开发工程师、数据科学家、数据分析师等,并且与其他SQL语言类似,可以通过SQL引擎将SQL预先解析成一棵AST抽象语法树;同时,AST抽象语法树、DataSet及DataFrame接下来均会被Spark Catalyst优化器转换成为Unresolved LogicalPlan、Resolved LogicalPlan,Physical Plan、以及Optimized PhysicalPlan,也就是说带有schema信息的Spark分布式数据集都可以从Spark Catalyst中受益,这也是Spark任务性能得以提升的核心所在。
值得一提的是,在物理计划树的生成过程中,首先会将数据源解析成为RDD,也即在Spark SQL的物理计划执行过程中所操作的对象实际是RDD,一条Spark SQL在生成最终的物理计划后仍然会经过前面文章中所提到的生成DAG、划分Stage、并将taskset分发到特定的executor上运行等一系列的任务调度和执行过程来实现该Spark SQL的处理逻辑。
接下来,本文将着重讲解Spark SQL逻辑计划的相关实现原理,在后续的文章中会继续解析Spark SQL的物理计划。
生成Unresolved LogicalPlan
用户可以通过spark-sql等客户端来提交sql语句,在sparksession初始化时通过BaseSessionStateBuilder的build()方法始化SparkSqlParser、Analyser以及SparkOptimizer对象等:
def build(): SessionState = { new SessionState( session.sharedState, conf, experimentalMethods, functionRegistry, udfRegistration, () => catalog, sqlParser, () => analyzer, () => optimizer, planner, () => streamingQueryManager, listenerManager, () => resourceLoader, createQueryExecution, createClone, columnarRules, queryStagePrepRules)}
当用户程序调用SparkSession的sql接口时即开始了解析sql语句并执行对数据处理的过程:
def sql(sqlText: String): DataFrame = withActive { val tracker = new QueryPlanningTracker val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) { sessionState.sqlParser.parsePlan(sqlText) } Dataset.ofRows(self, plan, tracker)}
其中通过AbstractSqlParser的parsePlan方法将sql语句转换成抽象语法树:
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser => astBuilder.visitSingleStatement(parser.singleStatement()) match { case plan: LogicalPlan => plan case _ => val position = Origin(None, None) throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position) }}
1、从SqlBaseParser的singleStatement()方法开始基于ANTLR4 lib库来解析sql语句中所有的词法片段,生成一棵AST抽象语法树;
2、访问AST抽象语法树并生成Unresolved 逻辑计划树:
1)访问SingleStatementContext节点:
SingleStatementContext是整个抽象语法树的根节点,因此以AstBuilder的visitSingleStatement方法为入口来遍历抽象语法树:
override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = withOrigin(ctx) { visit(ctx.statement).asInstanceOf[LogicalPlan]}...public T visit(ParseTree tree) { return tree.accept(this);}
2)根据访问者模式执行SingleStatementContext节点的accept方法:
@Overridepublic <T> T accept(ParseTreeVisitor<? extends T> visitor) { if ( visitor instanceof SqlBaseVisitor ) return ((SqlBaseVisitor<? extends T>)visitor).visitSingleStatement(this); else return visitor.visitChildren(this);}...@Override public T visitSingleStatement(SqlBaseParser.SingleStatementContext ctx) { return visitChildren(ctx); }
3)迭代遍历整棵AST Tree:
@Overridepublic T visitChildren(RuleNode node) { T result = defaultResult(); int n = node.getChildCount(); for (int i=0; i<n; i++) { if (!shouldVisitNextChild(node, result)) { break; } ParseTree c = node.getChild(i); T childResult = c.accept(this); result = aggregateResult(result, childResult); } return result;}
根据以上代码,在遍历AST 树的过程中,会首先解析父节点的所有子节点,并执行子节点上的accept方法来进行解析,当所有子节点均解析为UnresolvedRelation或者Expression后,将这些结果进行聚合并返回到父节点,由此可见,AST树的遍历所采用的是后序遍历模式。
接下来以查询语句中的QuerySpecificationContext节点的解析为例进一步阐述以上过程:
如下为一条基本的sql语句:
select col1 from tabname where col2 > 10
QuerySpecificationContext节点下会产生用于扫描数据源的FromClauseContext、过滤条件对应的BooleanDefaultContext、以及投影时所需的NamedExpressionSeqContext节点。
1)FromClauseContext继续访问其子节点,当访问到TableINameContext节点时,访问到tableName的tocken时根据表名生成UnresolvedRelation:
override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) { val tableId = visitMultipartIdentifier(ctx.multipartIdentifier) val table = mayApplyAliasPlan(ctx.tableAlias, UnresolvedRelation(tableId)) table.optionalMap(ctx.sample)(withSample)}
2)BooleanDefaultContext的子节点中分为三个分支:代表Reference的ValueExpressionDefaultContext、代表数值的ValueExpressionDefaultContext、以及代表运算符的ComparisonContext;
例如遍历代表数据值ValueExpressionDefaultContext及其子节点,直到访问到IntegerLiteralContext:
override def visitIntegerLiteral(ctx: IntegerLiteralContext): Literal = withOrigin(ctx) { BigDecimal(ctx.getText) match { case v if v.isValidInt => Literal(v.intValue) case v if v.isValidLong => Literal(v.longValue) case v => Literal(v.underlying()) }}
而Literal的定义如下,是一个叶子类型的Expression节点:
case class Literal (value: Any, dataType: DataType) extends LeafExpression
3)NamedExpressionSeqContext是投影节点,迭代遍历直到RegularQuerySpecificationContext节点,然后通过访问withSelectQuerySpecification方法创建出投影所需的Project Logical Plan:
override def visitRegularQuerySpecification( ctx: RegularQuerySpecificationContext): LogicalPlan = withOrigin(ctx) { val from = OneRowRelation().optional(ctx.fromClause) { visitFromClause(ctx.fromClause) } withSelectQuerySpecification( ctx, ctx.selectClause, ctx.lateralView, ctx.whereClause, ctx.aggregationClause, ctx.havingClause, ctx.windowClause, from )}...def createProject() = if (namedExpressions.nonEmpty) { Project(namedExpressions, withFilter)} else { withFilter}
总结一下以上处理过程中所涉及的类之间的关系,如下图所示:
生成Resolved LogicalPlanSpark Analyser
在SparkSession的sql方法中,对sql语句进行过Parser解析并生成Unresolved LogicalPlan之后则通过执行Dataset.ofRows(self, plan, tracker) 继续进行catalog绑定,数据源绑定的过程如下:
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, tracker: QueryPlanningTracker) : DataFrame = sparkSession.withActive { val qe = new QueryExecution(sparkSession, logicalPlan, tracker) qe.assertAnalyzed() new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))}...def assertAnalyzed(): Unit = analyzed
由如下实现逻辑可见, analyzed变量是通过懒加载方式初始化的,通过该变量的初始方法可见Spark的catalog实现逻辑主要通过Analyser类来实现的:
lazy val analyzed: LogicalPlan = executePhase(QueryPlanningTracker.ANALYSIS) { // We can't clone `logical` here, which will reset the `_analyzed` flag. sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)}
其中,executeAndCheck方法的执行是通过Analyzer的父类RuleExecutor的execute方法来实现的:
def execute(plan: TreeType): TreeType = {... batches.foreach { batch => val batchStartPlan = curPlan var iteration = 1 var lastPlan = curPlan var continue = true // Run until fix point (or the max number of iterations as specified in the strategy. while (continue) { curPlan = batch.rules.foldLeft(curPlan) { case (plan, rule) => val startTime = System.nanoTime() val result = rule(plan) val runTime = System.nanoTime() - startTime val effective = !result.fastEquals(plan) if (effective) { queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName) queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime) planChangeLogger.logRule(rule.ruleName, plan, result) } ... result } iteration += 1 if (iteration > batch.strategy.maxIterations) { // Only log if this is a rule that is supposed to run more than once. if (iteration != 2) { val endingMsg = if (batch.strategy.maxIterationsSetting == null) { "." } else { s", please set '${batch.strategy.maxIterationsSetting}' to a larger value." } val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" + s"$endingMsg" if (Utils.isTesting || batch.strategy.errorOnExceed) { throw new TreeNodeException(curPlan, message, null) } else { logWarning(message) } } // Check idempotence for Once batches. if (batch.strategy == Once && Utils.isTesting && !blacklistedOnceBatches.contains(batch.name)) { checkBatchIdempotence(batch, curPlan) } continue = false } if (curPlan.fastEquals(lastPlan)) { logTrace( s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.") continue = false } lastPlan = curPlan } planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan) } planChangeLogger.logMetrics(RuleExecutor.getCurrentMetrics() - beforeMetrics) curPlan}
如上代码的主要处理过程如下:
1、遍历的Analyzer类中的batches列表:
通过batches方法获取所有的catalog绑定相关的规则,在Analyzer中包括Substitution、Hints、Resolution、UDF、Subquery等几个规则组;
以较为常见的"Resolution"规则组为例,其具有非常多的规则用于解析函数、Namespace、数据表、视图、列等信息,当然用户也可以子定义相关规则:
Batch("Resolution", fixedPoint, ResolveTableValuedFunctions :: ResolveNamespace(catalogManager) :: new ResolveCatalogs(catalogManager) :: ResolveInsertInto :: ResolveRelations :: ResolveTables :: ResolveReferences :: ResolveCreateNamedStruct :: ResolveDeserializer :: ResolveNewInstance :: ResolveUpCast :: ResolveGroupingAnalytics :: ResolvePivot :: ResolveOrdinalInOrderByAndGroupBy :: ResolveAggAliasInGroupBy :: ResolveMissingReferences :: ...
其中,Batch类的定义如下,包括Batch名称、循环执行策略、具体的规则组集合,循环执行策略Strategy又分为Once和FixedPoint两种,即仅执行一次和固定次数:
protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
2、将每个Batch中所有的规则Rule对象实施于该Unsolved LogicalPlan,并且该Batch中规则可能要执行多轮,直到执行的批数等于batch.strategy.maxIterations或者logicalplan与上个批次的结果比没有变化,则退出执行;
其中在Spark 中的定义如下,在spark3.0中默认可最大循环100次:
protected def fixedPoint = FixedPoint( conf.analyzerMaxIterations, errorOnExceed = true, maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key)...val ANALYZER_MAX_ITERATIONS = buildConf("spark.sql.analyzer.maxIterations") .internal() .doc("The max number of iterations the analyzer runs.") .version("3.0.0") .intConf .createWithDefault(100)
接下来以将ResolveRelations(解析数据表或者视图)规则应用于Unresolved LogicalPlan的解析过程为例,支持解析UnresolvedRelation、UnresolvedTable、UnresolvedTableOrView等多种未解析的数据源:
def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp { ... case u: UnresolvedRelation => lookupRelation(u.multipartIdentifier).map(resolveViews).getOrElse(u) case u @ UnresolvedTable(identifier) => lookupTableOrView(identifier).map { case v: ResolvedView => u.failAnalysis(s"${v.identifier.quoted} is a view not table.") case table => table }.getOrElse(u) case u @ UnresolvedTableOrView(identifier) => lookupTableOrView(identifier).getOrElse(u)}
当解析对象为UnresolvedRelation实例时,调用lookupRelation方法来对其进行解析,通过SessionCatalog或者扩展的CatalogPlugin来获取数据源的元数据,并生成Resolved LogicalPlan:
private def lookupRelation(identifier: Seq[String]): Option[LogicalPlan] = { expandRelationName(identifier) match { case SessionCatalogAndIdentifier(catalog, ident) => lazy val loaded = CatalogV2Util.loadTable(catalog, ident).map { case v1Table: V1Table => v1SessionCatalog.getRelation(v1Table.v1Table) case table => SubqueryAlias( catalog.name +: ident.asMultipartIdentifier, DataSourceV2Relation.create(table, Some(catalog), Some(ident))) } ...
最常见的是SessionCatalog,作为SparkSession级别catalog接口对象,其定义如下,包括ExternalCatalog、GlobalTempViewManager、FunctionRegistry、SQLConf、Hadoop的Configuration、Parser、FunctionResourceLoader对象;其中,ExternalCatalog有两个主要的实现类:HiveExternalCatalog和InMemoryCatalog,而HiveExternalCatalog则主要应用于企业级的业务场景中:
class SessionCatalog( externalCatalogBuilder: () => ExternalCatalog, globalTempViewManagerBuilder: () => GlobalTempViewManager, functionRegistry: FunctionRegistry, conf: SQLConf, hadoopConf: Configuration, parser: ParserInterface, functionResourceLoader: FunctionResourceLoader)
如果采用默认的SessionCatalog,当需要获取数据表时则通过ExternalCatalog实例调用其对应的接口来实现:
override def loadTable(ident: Identifier): Table = { val catalogTable = try { catalog.getTableMetadata(ident.asTableIdentifier) } catch { case _: NoSuchTableException => throw new NoSuchTableException(ident) } V1Table(catalogTable)}...def getTableMetadata(name: TableIdentifier): CatalogTable = { val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val table = formatTableName(name.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Some(db))) externalCatalog.getTable(db, table) }
接下来如果采用ExternalCatalog接口的实现类HiveExternalCatalog的情况下,则通过HiveClientImpl类从Hive的metadata中类获取用户表的元数据相关信息:
private def getRawTableOption(dbName: String, tableName: String): Option[HiveTable] = { Option(client.getTable(dbName, tableName, false /* do not throw exception */))}
另外,如需扩展的catalog范围可通过实现CatalogPlugin接口、并且配置“spark.sql.catalog.spark_catalog”参数来实现,例如在iceberg数据湖的实现中通过自定义其catalog来实现其个性化的逻辑:
spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog
3、返回解析后的Resolved LogicalPlan。
以上处理逻辑中所涉及的主要的类之间的关系如下所示:
接下来仍然以前面的SQL语句(select col1 from tabname where col2 > 10)为例,简要阐述如何将一个Unresolved LogicalPlan解析成为Analyzed LogicalPlan:
1、根据Analyzer的解析规则,UnResolvedRelation节点可以应用到ResolveRelations规则,通过CatalogManger获取数据源中表的信息,得到Relation的相关列的信息并加上标号,同时创建一个针对数据表的SubqueryAlias节点;
2、针对过滤条件col2>10的过滤条件,针对列UnresolvedAttribute 可以适用到ResolveReference规则,根据第1步中得到的列信息可以进行解析;数字10可以应用到ImplicitTypeCasts 规则对该数字匹配最合适的数据类型;
3、针对Project 节点,接下来在进行下一轮解析,再次匹配到ResolveReference规则对投影列进行解析,从而将整棵树解析为Resolved LogicalPlan。
生成Optimized LogicalPlan
得到Resolved LogicalPlan之后,为了使SQL语句的执行性能更优,则需要根据一些规则进一步优化逻辑计划树,生成Optimized LogicalPlan。
本文采用的是Spark 3.0的源码,生成Optimized LogicalPlan是通过懒加载的方式被调用的,并且Optimizer类与Analyzer类一样继承了 RuleExecutor类,所有基于规则(RBO)的优化实际都是通过RuleExecutor类来执行,同样也是将所有规则构建为多个批次,并且将所有批次中规则应用于Analyzed LogicalPlan,直到树不再改变或者执行优化的循环次数超过最大限制(spark.sql.optimizer.maxIterations,默认100):
lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) { // clone the plan to avoid sharing the plan instance between different stages like analyzing, // optimizing and planning. val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker) // We do not want optimized plans to be re-analyzed as literals that have been constant folded // and such can cause issues during analysis. While `clone` should maintain the `analyzed` state // of the LogicalPlan, we set the plan as analyzed here as well out of paranoia. plan.setAnalyzed() plan}...def executeAndTrack(plan: TreeType, tracker: QueryPlanningTracker): TreeType = { QueryPlanningTracker.withTracker(tracker) { execute(plan) }}
逻辑计划优化规则仍然又多个Batch组成,每个Batch中包含多个具体的Rule并且可以执行一次或者固定次数。其中比较常用的优化规则有:谓词下推、常量累加、列剪枝等几种。
谓词下推将尽可能使得谓词计算靠近数据源,根据不同的场景有LimitPushDown、PushProjectionThroughUnion、PushDownPredicates等多种实现, PushDownPredicates又包含PushPredicateThroughNonJoin和PushPredicateThroughJoin;
其中,PushPredicateThroughJoin可实现将谓词计算下推至join算子的下面,从而可以提升数据表之间的join计算过程中所带来的网络、内存以及IO等性能开销:
val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = { // push the where condition down into join filter case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition, hint)) => val (leftFilterConditions, rightFilterConditions, commonFilterCondition) = split(splitConjunctivePredicates(filterCondition), left, right) joinType match { case _: InnerLike => // push down the single side `where` condition into respective sides val newLeft = leftFilterConditions. reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightFilterConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) val (newJoinConditions, others) = commonFilterCondition.partition(canEvaluateWithinJoin) val newJoinCond = (newJoinConditions ++ joinCondition).reduceLeftOption(And) val join = Join(newLeft, newRight, joinType, newJoinCond, hint) if (others.nonEmpty) { Filter(others.reduceLeft(And), join) } else { join } case RightOuter => // push down the right side only `where` condition
常量折叠是通过ConstantFolding规则来实现的,如果表达式中的算子是可以折叠的则在该阶段直接生成计算结果,以避免在实际的sql执行过程中产生逐行计算,从而可以降低CPU的计算开销:
object ConstantFolding extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { // Skip redundant folding of literals. This rule is technically not necessary. Placing this // here avoids running the next rule for Literal values, which would create a new Literal // object and running eval unnecessarily. case l: Literal => l // Fold expressions that are foldable. case e if e.foldable => Literal.create(e.eval(EmptyRow), e.dataType) } }}
列剪枝规则通过ColumnPruning规则来实现,去掉不需要处理的列,可避免从数据源读取较多的数据列、将不需要的列加载至内存中计算计算计算中、以及返回不需要数据(想象一下大宽表的情况),从而获得较大的性能收益:
object ColumnPruning extends Rule[LogicalPlan] def apply(plan: LogicalPlan): LogicalPlanprunedChild(c: LogicalPlan, allReferences: AttributeSet):LogicalPlan= if (!c.outputSet.subsetOf(allReferences)) {
Optimizer所涉及的主要类的关联关系如下图所示:
spark_optimizer
当所有优化规则完成对于Aanalyzed LogicalPlan的应用则可生成Optimized LogicalPlan。
本文重点讲解了Spark SQL解析为AST抽象语法树、生成Unresolved LogicalPlan、生成Resolved LogicalPlan以及Optimized LogicalPlan的过程,为接下来进一步生成物理计划Spark Plan做好了准备。
标签: #sql 逻辑运算