PS: Cross posted from blog.imaginea.com

While digging through Apache Spark SQL’s source code, what I realized is that
looking deeper into its source code is a must for all those who are interested
in implementation details of any SQL based data processing engine. Spark SQL
has three major API modules: Datasource Api, Catalyst Api and Catalog Api. This
series of posts concentrate mostly on the Catalyst correlating the query
processing stages(Parsing, Binding, Validations, Optimization, Code generation,
Query execution) to highlight all the entry points for further exploration.

PREREQUISITE: Overview of Spark Catalyst. You can start with this link.

NOTE: Code snippets in these posts refer to Spark SQL 2.0.1.

As we know since 2.0 SparkSession is the new entry point for the DataFrame
through Dataset, is associated with a SessionState which wraps the SQLContext.

org.apache.spark.sql.SparkSession

/**
   * State isolated across sessions, including SQL configurations, temporary tables, registered
   * functions, and everything else that accepts a [[org.apache.spark.sql.internal.SQLConf]].
   */
  @transient
  private[sql] lazy val sessionState: SessionState = {
    SparkSession.reflect[SessionState, SparkSession](
      SparkSession.sessionStateClassName(sparkContext.conf),
      self)
  }

  /**
   * A wrapped version of this session in the form of a [[SQLContext]], for backward compatibility.
   *
   * @since 2.0.0
   */
  @transient
  val sqlContext: SQLContext = new SQLContext(this)

Every SessionState has instances of SQLConfig, [SessionCalalog][], Analyzer,
SparkOptimizer, SparkSqlParser, SparkPlanner and many more.

org.apache.spark.sql.internal.SessionState

/**
   * Internal catalog for managing table and database states.
   */
  lazy val catalog = new SessionCatalog(
    sparkSession.sharedState.externalCatalog,
    functionResourceLoader,
    functionRegistry,
    conf,
    newHadoopConf())

 /**
   * Logical query plan analyzer for resolving unresolved attributes and relations.
   */
 lazy val analyzer: Analyzer = {
    new Analyzer(catalog, conf) {
      override val extendedResolutionRules =
        PreprocessTableInsertion(conf) ::
        new FindDataSourceTable(sparkSession) ::
        DataSourceAnalysis(conf) ::
        (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)

      override val extendedCheckRules = Seq(datasources.PreWriteCheck(conf, catalog))
    }
  }

  /**
   * Logical query plan optimizer.
   */
  lazy val optimizer: Optimizer = new SparkOptimizer(catalog, conf, experimentalMethods)

  /**
   * Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
   */
  lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)

  /**
   * Planner that converts optimized logical plans to physical plans.
   */
  def planner: SparkPlanner =
    new SparkPlanner(sparkSession.sparkContext, conf, experimentalMethods.extraStrategies)

Parsing

Spark SQL uses ANTLR4 (see SqlBaseParser.scala have rules generated by
SqlBase.g4) to parse the SQL statements into parse tree. SparkSqlParser
mentioned in the above code snippet actually extends the AbstractSqlParser and
uses an overridden version of AstBuilder i.e. SparkSqlAstBuilder to transform
the tree into LogicalPlan, Expression, TableIdentifier and DataType.

org.apache.spark.sql.catalyst.parser.ParserDriver

abstract class AbstractSqlParser extends ParserInterface with Logging {

  /** Creates/Resolves DataType for a given SQL string. */
  def parseDataType(sqlText: String): DataType = parse(sqlText) { parser =>
    // TODO add this to the parser interface.
    astBuilder.visitSingleDataType(parser.singleDataType())
  }

  /** Creates Expression for a given SQL string. */
  override def parseExpression(sqlText: String): Expression = parse(sqlText) { parser =>
    astBuilder.visitSingleExpression(parser.singleExpression())
  }

  /** Creates TableIdentifier for a given SQL string. */
  override def parseTableIdentifier(sqlText: String): TableIdentifier = parse(sqlText) { parser =>
    astBuilder.visitSingleTableIdentifier(parser.singleTableIdentifier())
  }

  /** Creates LogicalPlan for a given SQL string. */
  override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
    astBuilder.visitSingleStatement(parser.singleStatement()) match {
      case plan: LogicalPlan => plan
      case _ =>
        val position = Origin(None, None)
        throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
    }
  }

Let take this command that you may have executed on Spark.

sparkSession.sql("select * from sometable")

SparkSession calls the parsePlan (shown above) to get the LogicalPlan
while creating the DataFrame.

org.apache.spark.sql.Dataset    
/**
 * Executes a SQL query using Spark, returning the result as a [[DataFrame]].
 * The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'.
 *
 * @since 2.0.0
 */
def sql(sqlText: String): DataFrame = {
  Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
}

Also for something like this val filteredDs = ds.filter("age > 15"), parser
calls the parseExpression to get the equivalent Catalyst Expression to
construct the DataFrame i.e. Dataset[Row].

org.apache.spark.sql.Dataset

/**
 * Filters rows using the given SQL expression.
 * {{{
 *   peopleDs.filter("age > 15")
 * }}}
 *
 * @group typedrel
 * @since 1.6.0
 */
def filter(conditionExpr: String): Dataset[T] = {
  filter(Column(sparkSession.sessionState.sqlParser.parseExpression(conditionExpr)))
}

parsetableIdentifier resolves the tablename with the help of catalog api to create the DataFrame.

org.apache.spark.sql.SparkSession

/**
 * Returns the specified table as a [[DataFrame]].
 *
 * @since 2.0.0
 */
def table(tableName: String): DataFrame = {
  table(sessionState.sqlParser.parseTableIdentifier(tableName))
}

private[sql] def table(tableIdent: TableIdentifier): DataFrame = {
  Dataset.ofRows(self, sessionState.catalog<strong>.</strong>lookupRelation(tableIdent))
}

So, every DataFrame associated with a LogicalPlan which is the initial
QueryPlan that ultimately get transformed into actual RDD code.