aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-08-05 10:50:26 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2016-08-05 10:50:26 +0200
commit5effc016c893ce917d535cc1b5026d8e4c846721 (patch)
tree59e28575a90ec38a17b26dad58297c5d5bfd8436 /sql/hive/src
parentfaaefab26ffea3a5edfeaff42db222c8cd3ff5f1 (diff)
downloadspark-5effc016c893ce917d535cc1b5026d8e4c846721.tar.gz
spark-5effc016c893ce917d535cc1b5026d8e4c846721.tar.bz2
spark-5effc016c893ce917d535cc1b5026d8e4c846721.zip
[SPARK-16879][SQL] unify logical plans for CREATE TABLE and CTAS
## What changes were proposed in this pull request? we have various logical plans for CREATE TABLE and CTAS: `CreateTableUsing`, `CreateTableUsingAsSelect`, `CreateHiveTableAsSelectLogicalPlan`. This PR unifies them to reduce the complexity and centralize the error handling. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #14482 from cloud-fan/table.
Diffstat (limited to 'sql/hive/src')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala29
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala1
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala7
4 files changed, 28 insertions, 15 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index db970785a7..c7c1acda25 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -23,15 +23,13 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
-import org.apache.spark.sql.execution.command.CreateHiveTableAsSelectLogicalPlan
import org.apache.spark.sql.execution.datasources.{Partition => _, _}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.hive.orc.OrcFileFormat
@@ -436,23 +434,30 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p
- case p: LogicalPlan if p.resolved => p
- case p @ CreateHiveTableAsSelectLogicalPlan(table, child, allowExisting) =>
- val desc = if (table.storage.serde.isEmpty) {
+ case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" =>
+ val newTableDesc = if (tableDesc.storage.serde.isEmpty) {
// add default serde
- table.withNewStorage(
+ tableDesc.withNewStorage(
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
} else {
- table
+ tableDesc
}
- val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableDesc)
+
+ // Currently we will never hit this branch, as SQL string API can only use `Ignore` or
+ // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde
+ // tables yet.
+ if (mode == SaveMode.Append || mode == SaveMode.Overwrite) {
+ throw new AnalysisException("" +
+ "CTAS for hive serde tables does not support append or overwrite semantics.")
+ }
execution.CreateHiveTableAsSelectCommand(
- desc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
- child,
- allowExisting)
+ newTableDesc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
+ query,
+ mode == SaveMode.Ignore)
}
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 8773993d36..e01c053ab5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -65,6 +65,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
catalog.ParquetConversions ::
catalog.OrcConversions ::
catalog.CreateTables ::
+ PreprocessDDL(conf) ::
PreprocessTableInsertion(conf) ::
DataSourceAnalysis(conf) ::
(if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index e0c07db3b0..69a6884c7a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hive
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.types.StructType
@@ -36,8 +37,7 @@ class HiveDDLCommandSuite extends PlanTest {
private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
parser.parsePlan(sql).collect {
- case c: CreateTableCommand => (c.table, c.ifNotExists)
- case c: CreateHiveTableAsSelectLogicalPlan => (c.tableDesc, c.allowExisting)
+ case CreateTable(tableDesc, mode, _) => (tableDesc, mode == SaveMode.Ignore)
}.head
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index d15e11a7ff..e078b58542 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -141,6 +141,13 @@ class HiveDDLSuite
}
}
+ test("create table: partition column names exist in table definition") {
+ val e = intercept[AnalysisException] {
+ sql("CREATE TABLE tbl(a int) PARTITIONED BY (a string)")
+ }
+ assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a")
+ }
+
test("add/drop partitions - external table") {
val catalog = spark.sessionState.catalog
withTempDir { tmpDir =>