aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-21 21:48:48 -0700
committerReynold Xin <rxin@databricks.com>2016-04-21 21:48:48 -0700
commit3405cc775843a3a80d009d4f9079ba9daa2220e7 (patch)
treeb71fe3ed8e29d4a89642c59ec9ba7b6ee00b4b58
parent145433f1aaf4a58f484f98c2f1d32abd8cc95b48 (diff)
downloadspark-3405cc775843a3a80d009d4f9079ba9daa2220e7.tar.gz
spark-3405cc775843a3a80d009d4f9079ba9daa2220e7.tar.bz2
spark-3405cc775843a3a80d009d4f9079ba9daa2220e7.zip
[SPARK-14835][SQL] Remove MetastoreRelation dependency from SQLBuilder
## What changes were proposed in this pull request? This patch removes SQLBuilder's dependency on MetastoreRelation. We should be able to move SQLBuilder into the sql/core package after this change. ## How was this patch tested? N/A - covered by existing tests. Author: Reynold Xin <rxin@databricks.com> Closes #12594 from rxin/SPARK-14835.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala20
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala10
2 files changed, 22 insertions, 8 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index d2294efd9a..75cbe32c16 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -21,7 +21,8 @@ import javax.annotation.Nullable
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.parser.DataTypeParser
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
@@ -301,6 +302,7 @@ object ExternalCatalog {
*/
trait CatalogRelation {
def catalogTable: CatalogTable
+ def output: Seq[Attribute]
}
@@ -315,11 +317,21 @@ case class SimpleCatalogRelation(
alias: Option[String] = None)
extends LeafNode with CatalogRelation {
- // TODO: implement this
- override def output: Seq[Attribute] = Seq.empty
-
override def catalogTable: CatalogTable = metadata
+ override val output: Seq[Attribute] = {
+ val cols = catalogTable.schema
+ .filter { c => !catalogTable.partitionColumnNames.contains(c.name) }
+ (cols ++ catalogTable.partitionColumns).map { f =>
+ AttributeReference(
+ f.name,
+ DataTypeParser.parse(f.dataType),
+ // Since data can be dumped in randomly with no validation, everything is nullable.
+ nullable = true
+ )(qualifier = Some(alias.getOrElse(metadata.identifier.table)))
+ }
+ }
+
require(metadata.identifier.database == Some(databaseName),
"provided database does not match the one specified in the table definition")
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
index 86115d0e9b..3a0e22c742 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -22,8 +22,9 @@ import java.util.concurrent.atomic.AtomicLong
import scala.util.control.NonFatal
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.{Dataset, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.{CollapseProject, CombineUnions}
import org.apache.spark.sql.catalyst.plans.logical._
@@ -41,7 +42,7 @@ import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType}
class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Logging {
require(logicalPlan.resolved, "SQLBuilder only supports resolved logical query plans")
- def this(df: DataFrame) = this(df.queryExecution.analyzed, df.sqlContext)
+ def this(df: Dataset[_]) = this(df.queryExecution.analyzed, df.sqlContext)
private val nextSubqueryId = new AtomicLong(0)
private def newSubqueryName(): String = s"gen_subquery_${nextSubqueryId.getAndIncrement()}"
@@ -517,8 +518,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
case l @ LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) =>
Some(SQLTable(database, table, l.output.map(_.withQualifier(None))))
- case m: MetastoreRelation =>
- Some(SQLTable(m.databaseName, m.tableName, m.output.map(_.withQualifier(None))))
+ case relation: CatalogRelation =>
+ val m = relation.catalogTable
+ Some(SQLTable(m.database, m.identifier.table, relation.output.map(_.withQualifier(None))))
case _ => None
}