aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-02-28 09:24:36 -0800
committerXiao Li <gatorsmile@gmail.com>2017-02-28 09:24:36 -0800
commit7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd (patch)
tree469c01ca9d14d9a37ca6d3754af1b2dccbab1c02 /sql/catalyst
parentb405466513bcc02cadf1477b6b682ace95d81658 (diff)
downloadspark-7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd.tar.gz
spark-7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd.tar.bz2
spark-7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd.zip
[SPARK-19678][SQL] remove MetastoreRelation
## What changes were proposed in this pull request? `MetastoreRelation` is used to represent table relation for hive tables, and provides some hive related information. We will resolve `SimpleCatalogRelation` to `MetastoreRelation` for hive tables, which is unnecessary as these 2 are the same essentially. This PR merges `SimpleCatalogRelation` and `MetastoreRelation` ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #17015 from cloud-fan/table-relation.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala68
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala8
5 files changed, 51 insertions, 38 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 36ab8b8527..7529f90284 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -18,10 +18,8 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.catalog.SimpleCatalogRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
-import org.apache.spark.sql.catalyst.plans.UsingJoin
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 0230626a66..06734891b6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -592,7 +592,12 @@ class SessionCatalog(
child = parser.parsePlan(viewText))
SubqueryAlias(table, child, Some(name.copy(table = table, database = Some(db))))
} else {
- SubqueryAlias(table, SimpleCatalogRelation(metadata), None)
+ val tableRelation = CatalogRelation(
+ metadata,
+ // we assume all the columns are nullable.
+ metadata.dataSchema.asNullable.toAttributes,
+ metadata.partitionSchema.asNullable.toAttributes)
+ SubqueryAlias(table, tableRelation, None)
}
} else {
SubqueryAlias(table, tempTables(table), None)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 2b3b575b4c..cb939026f1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -19,10 +19,11 @@ package org.apache.spark.sql.catalyst.catalog
import java.util.Date
-import scala.collection.mutable
+import com.google.common.base.Objects
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.{CatalystConf, FunctionIdentifier, InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.quoteIdentifier
@@ -349,36 +350,43 @@ object CatalogTypes {
/**
- * An interface that is implemented by logical plans to return the underlying catalog table.
- * If we can in the future consolidate SimpleCatalogRelation and MetastoreRelation, we should
- * probably remove this interface.
+ * A [[LogicalPlan]] that represents a table.
*/
-trait CatalogRelation {
- def catalogTable: CatalogTable
- def output: Seq[Attribute]
-}
+case class CatalogRelation(
+ tableMeta: CatalogTable,
+ dataCols: Seq[Attribute],
+ partitionCols: Seq[Attribute]) extends LeafNode with MultiInstanceRelation {
+ assert(tableMeta.identifier.database.isDefined)
+ assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType))
+ assert(tableMeta.dataSchema.sameType(dataCols.toStructType))
+
+ // The partition column should always appear after data columns.
+ override def output: Seq[Attribute] = dataCols ++ partitionCols
+
+ def isPartitioned: Boolean = partitionCols.nonEmpty
+
+ override def equals(relation: Any): Boolean = relation match {
+ case other: CatalogRelation => tableMeta == other.tableMeta && output == other.output
+ case _ => false
+ }
+ override def hashCode(): Int = {
+ Objects.hashCode(tableMeta.identifier, output)
+ }
-/**
- * A [[LogicalPlan]] that wraps [[CatalogTable]].
- *
- * Note that in the future we should consolidate this and HiveCatalogRelation.
- */
-case class SimpleCatalogRelation(
- metadata: CatalogTable)
- extends LeafNode with CatalogRelation {
-
- override def catalogTable: CatalogTable = metadata
-
- override lazy val resolved: Boolean = false
-
- override val output: Seq[Attribute] = {
- val (partCols, dataCols) = metadata.schema.toAttributes
- // Since data can be dumped in randomly with no validation, everything is nullable.
- .map(_.withNullability(true).withQualifier(Some(metadata.identifier.table)))
- .partition { a =>
- metadata.partitionColumnNames.contains(a.name)
- }
- dataCols ++ partCols
+ /** Only compare table identifier. */
+ override lazy val cleanArgs: Seq[Any] = Seq(tableMeta.identifier)
+
+ override def computeStats(conf: CatalystConf): Statistics = {
+ // For data source tables, we will create a `LogicalRelation` and won't call this method, for
+ // hive serde tables, we will always generate a statistics.
+ // TODO: unify the table stats generation.
+ tableMeta.stats.map(_.toPlanStats(output)).getOrElse {
+ throw new IllegalStateException("table stats must be specified.")
+ }
}
+
+ override def newInstance(): LogicalPlan = copy(
+ dataCols = dataCols.map(_.newInstance()),
+ partitionCols = partitionCols.map(_.newInstance()))
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
index 26697e9867..a3cc4529b5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
@@ -63,7 +63,9 @@ case class TableIdentifier(table: String, database: Option[String])
}
/** A fully qualified identifier for a table (i.e., database.tableName) */
-case class QualifiedTableName(database: String, name: String)
+case class QualifiedTableName(database: String, name: String) {
+ override def toString: String = s"$database.$name"
+}
object TableIdentifier {
def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 44434324d3..a755231962 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -433,15 +433,15 @@ class SessionCatalogSuite extends PlanTest {
sessionCatalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
sessionCatalog.setCurrentDatabase("db2")
// If we explicitly specify the database, we'll look up the relation in that database
- assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2")))
- == SubqueryAlias("tbl1", SimpleCatalogRelation(metastoreTable1), None))
+ assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head
+ .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
// Otherwise, we'll first look up a temporary table with the same name
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
== SubqueryAlias("tbl1", tempTable1, None))
// Then, if that does not exist, look up the relation in the current database
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
- assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
- == SubqueryAlias("tbl1", SimpleCatalogRelation(metastoreTable1), None))
+ assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")).children.head
+ .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
}
test("look up view relation") {