PS: Cross posted from blog.imaginea.com

In my previous post, I discussed about the parsing which generated a Logical
Plan. In this post I’ll show how this Logical Plan gets transformed into its
corresponding RDD.

Every SQLExecution is associated with an Id to track at runtime and a query
processing / transformation pipeline i.e. QueryExecution. The
QueryExecution provides the access to all of the stages of query plan
transformation. Let’s go through these stages in detail.

Stage 1: Logical Plan to Analyzed Logical Plan (Binding)

The analyzer associated with the session state of the execution mentioned in my
previous post, transforms the initial Logical plan to its resolved form after
resolving the tables, datasources into LogicalRelation. The rules for these
transformations can be found in DataSourceStrategy.scala(DataSourceAnalysis,
FindDataSourceTable ) and rules.scala(PreprocessTableInsertion,
ResolveDataSource)

org.apache.spark.sql.execution.QueryExecution

lazy val analyzed: LogicalPlan = {
  SparkSession.setActiveSession(sparkSession)
  sparkSession.sessionState.analyzer.execute(logical)
}

Stage 2: Validation and Cached Data source resolution

The analyzed plan is further validated and its data source segments are
replaced to cached versions if are cached already.

org.apache.spark.sql.execution.QueryExecution

def assertAnalyzed(): Unit = {
  try sparkSession.sessionState.analyzer.checkAnalysis(analyzed) catch {
    case e: AnalysisException =>
      val ae = new AnalysisException(e.message, e.line, e.startPosition, Some(analyzed))
      ae.setStackTrace(e.getStackTrace)
      throw ae
  }
}

def assertSupported(): Unit = {
  if (sparkSession.sessionState.conf.getConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
    UnsupportedOperationChecker.checkForBatch(analyzed)
  }
}
lazy val withCachedData: LogicalPlan = {
  assertAnalyzed()
  assertSupported()
  sparkSession.sharedState.cacheManager.useCachedData(analyzed)
}

Stage 3: Query Optimization

A set of predefined Catalyst provided SQL optimization rule and additional
custom rules transform the analyzed logical plan to optimized logical plan.
Custom rules are experimental and can be added through
ExperimentalMethods.scala.

org.apache.spark.sql.execution.QueryExecution

lazy val withCachedData: LogicalPlan = {
  assertAnalyzed()
  assertSupported()
  sparkSession.sharedState.cacheManager.useCachedData(analyzed)
}

You can find this rule in Optimizer.scala.

Stage 4: Spark Plan and Code generation

SparkPlanner which has a set of strategies, transform the optimized logical
plan into SparkPlan with actual physical operators or methods its corresponding
logical counterparts. Whole stage codegen collapsing and reuse exchange are
also important transformations in terms of performance that happens at this
stage. User can also add custom strategies through ExperimentalMethods.scala.

org.apache.spark.sql.execution.QueryExecution

lazy val sparkPlan: SparkPlan = {
  SparkSession.setActiveSession(sparkSession)
  planner.plan(ReturnAnswer(optimizedPlan)).next()
}

// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
/**
 * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal
 * row format conversions as needed.
 */
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
  preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
}

/** A sequence of rules that will be applied in order to the physical plan before execution. */
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
  python.ExtractPythonUDFs,
  PlanSubqueries(sparkSession),
  EnsureRequirements(sparkSession.sessionState.conf),
  CollapseCodegenStages(sparkSession.sessionState.conf),
  ReuseExchange(sparkSession.sessionState.conf))

Stage 5: Execution

The physical plan then gets transformed into RDD for execution on Spark runtime.

org.apache.spark.sql.execution.QueryExecution

/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()