diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-08-05 10:50:26 +0200 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2016-08-05 10:50:26 +0200 |
commit | 5effc016c893ce917d535cc1b5026d8e4c846721 (patch) | |
tree | 59e28575a90ec38a17b26dad58297c5d5bfd8436 /sql/catalyst/src | |
parent | faaefab26ffea3a5edfeaff42db222c8cd3ff5f1 (diff) | |
download | spark-5effc016c893ce917d535cc1b5026d8e4c846721.tar.gz spark-5effc016c893ce917d535cc1b5026d8e4c846721.tar.bz2 spark-5effc016c893ce917d535cc1b5026d8e4c846721.zip |
[SPARK-16879][SQL] unify logical plans for CREATE TABLE and CTAS
## What changes were proposed in this pull request?
we have various logical plans for CREATE TABLE and CTAS: `CreateTableUsing`, `CreateTableUsingAsSelect`, `CreateHiveTableAsSelectLogicalPlan`. This PR unifies them to reduce the complexity and centralize the error handling.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes #14482 from cloud-fan/table.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala | 17 | ||||
-rw-r--r-- | sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala | 8 |
2 files changed, 11 insertions, 14 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 38f0bc2c4f..f7762e0f8a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -21,8 +21,7 @@ import java.util.Date import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.types.StructType @@ -112,6 +111,8 @@ case class BucketSpec( * Note that Hive's metastore also tracks skewed columns. We should consider adding that in the * future once we have a better understanding of how we want to handle skewed columns. * + * @param provider the name of the data source provider for this table, e.g. parquet, json, etc. + * Can be None if this table is a View, should be "hive" for hive serde tables. * @param unsupportedFeatures is a list of string descriptions of features that are used by the * underlying table but not supported by Spark SQL yet. */ @@ -120,6 +121,7 @@ case class CatalogTable( tableType: CatalogTableType, storage: CatalogStorageFormat, schema: StructType, + provider: Option[String] = None, partitionColumnNames: Seq[String] = Seq.empty, bucketSpec: Option[BucketSpec] = None, owner: String = "", @@ -131,16 +133,6 @@ case class CatalogTable( comment: Option[String] = None, unsupportedFeatures: Seq[String] = Seq.empty) { - // Verify that the provided columns are part of the schema - private val colNames = schema.map(_.name).toSet - private def requireSubsetOfSchema(cols: Seq[String], colType: String): Unit = { - require(cols.toSet.subsetOf(colNames), s"$colType columns (${cols.mkString(", ")}) " + - s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'") - } - requireSubsetOfSchema(partitionColumnNames, "partition") - requireSubsetOfSchema(bucketSpec.map(_.sortColumnNames).getOrElse(Nil), "sort") - requireSubsetOfSchema(bucketSpec.map(_.bucketColumnNames).getOrElse(Nil), "bucket") - /** schema of this table's partition columns */ def partitionSchema: StructType = StructType(schema.filter { c => partitionColumnNames.contains(c.name) @@ -189,6 +181,7 @@ case class CatalogTable( s"Last Access: ${new Date(lastAccessTime).toString}", s"Type: ${tableType.name}", if (schema.nonEmpty) s"Schema: ${schema.mkString("[", ", ", "]")}" else "", + if (provider.isDefined) s"Provider: ${provider.get}" else "", if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else "" ) ++ bucketStrings ++ Seq( viewOriginalText.map("Original View: " + _).getOrElse(""), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 201d39a364..54365fd978 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -552,7 +552,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac identifier = TableIdentifier("my_table", Some("db1")), tableType = CatalogTableType.MANAGED, storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), - schema = new StructType().add("a", "int").add("b", "string") + schema = new StructType().add("a", "int").add("b", "string"), + provider = Some("hive") ) catalog.createTable(table, ignoreIfExists = false) @@ -571,7 +572,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac storage = CatalogStorageFormat( Some(Utils.createTempDir().getAbsolutePath), None, None, None, false, Map.empty), - schema = new StructType().add("a", "int").add("b", "string") + schema = new StructType().add("a", "int").add("b", "string"), + provider = Some("hive") ) catalog.createTable(externalTable, ignoreIfExists = false) assert(!exists(db.locationUri, "external_table")) @@ -589,6 +591,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac .add("col2", "string") .add("a", "int") .add("b", "string"), + provider = Some("hive"), partitionColumnNames = Seq("a", "b") ) catalog.createTable(table, ignoreIfExists = false) @@ -692,6 +695,7 @@ abstract class CatalogTestUtils { .add("col2", "string") .add("a", "int") .add("b", "string"), + provider = Some("hive"), partitionColumnNames = Seq("a", "b"), bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil))) } |