From 3405cc775843a3a80d009d4f9079ba9daa2220e7 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 21 Apr 2016 21:48:48 -0700 Subject: [SPARK-14835][SQL] Remove MetastoreRelation dependency from SQLBuilder ## What changes were proposed in this pull request? This patch removes SQLBuilder's dependency on MetastoreRelation. We should be able to move SQLBuilder into the sql/core package after this change. ## How was this patch tested? N/A - covered by existing tests. Author: Reynold Xin Closes #12594 from rxin/SPARK-14835. --- .../spark/sql/catalyst/catalog/interface.scala | 20 ++++++++++++++++---- .../scala/org/apache/spark/sql/hive/SQLBuilder.scala | 10 ++++++---- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index d2294efd9a..75cbe32c16 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -21,7 +21,8 @@ import javax.annotation.Nullable import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.parser.DataTypeParser import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} @@ -301,6 +302,7 @@ object ExternalCatalog { */ trait CatalogRelation { def catalogTable: CatalogTable + def output: Seq[Attribute] } @@ -315,11 +317,21 @@ case class SimpleCatalogRelation( alias: Option[String] = None) extends LeafNode with CatalogRelation { - // TODO: implement this - override def output: Seq[Attribute] = Seq.empty - override def catalogTable: CatalogTable = metadata + override val output: Seq[Attribute] = { + val cols = catalogTable.schema + .filter { c => !catalogTable.partitionColumnNames.contains(c.name) } + (cols ++ catalogTable.partitionColumns).map { f => + AttributeReference( + f.name, + DataTypeParser.parse(f.dataType), + // Since data can be dumped in randomly with no validation, everything is nullable. + nullable = true + )(qualifier = Some(alias.getOrElse(metadata.identifier.table))) + } + } + require(metadata.identifier.database == Some(databaseName), "provided database does not match the one specified in the table definition") } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 86115d0e9b..3a0e22c742 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -22,8 +22,9 @@ import java.util.concurrent.atomic.AtomicLong import scala.util.control.NonFatal import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.{Dataset, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.{CollapseProject, CombineUnions} import org.apache.spark.sql.catalyst.plans.logical._ @@ -41,7 +42,7 @@ import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType} class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Logging { require(logicalPlan.resolved, "SQLBuilder only supports resolved logical query plans") - def this(df: DataFrame) = this(df.queryExecution.analyzed, df.sqlContext) + def this(df: Dataset[_]) = this(df.queryExecution.analyzed, df.sqlContext) private val nextSubqueryId = new AtomicLong(0) private def newSubqueryName(): String = s"gen_subquery_${nextSubqueryId.getAndIncrement()}" @@ -517,8 +518,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi case l @ LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) => Some(SQLTable(database, table, l.output.map(_.withQualifier(None)))) - case m: MetastoreRelation => - Some(SQLTable(m.databaseName, m.tableName, m.output.map(_.withQualifier(None)))) + case relation: CatalogRelation => + val m = relation.catalogTable + Some(SQLTable(m.database, m.identifier.table, relation.output.map(_.withQualifier(None)))) case _ => None } -- cgit v1.2.3