aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-08-26 08:52:10 -0700
committerYin Huai <yhuai@databricks.com>2016-08-26 08:52:10 -0700
commit28ab17922a227e8d93654d3478c0d493bfb599d5 (patch)
treeb06cd32184549eb1a22ec9b7bd55398f7ebcf40b
parent6063d5963fcf01768570c1a9b542be6175a3bcbc (diff)
downloadspark-28ab17922a227e8d93654d3478c0d493bfb599d5.tar.gz
spark-28ab17922a227e8d93654d3478c0d493bfb599d5.tar.bz2
spark-28ab17922a227e8d93654d3478c0d493bfb599d5.zip
[SPARK-17260][MINOR] move CreateTables to HiveStrategies
## What changes were proposed in this pull request? `CreateTables` rule turns a general `CreateTable` plan to `CreateHiveTableAsSelectCommand` for hive serde table. However, this rule is logically a planner strategy, we should move it to `HiveStrategies`, to be consistent with other DDL commands. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #14825 from cloud-fan/ctas.
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala35
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala27
4 files changed, 27 insertions, 37 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 701b73a4aa..ff82c7f7af 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -376,41 +376,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
}
}
-
- /**
- * Creates any tables required for query execution.
- * For example, because of a CREATE TABLE X AS statement.
- */
- object CreateTables extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- // Wait until children are resolved.
- case p: LogicalPlan if !p.childrenResolved => p
-
- case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" =>
- val newTableDesc = if (tableDesc.storage.serde.isEmpty) {
- // add default serde
- tableDesc.withNewStorage(
- serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
- } else {
- tableDesc
- }
-
- val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableDesc)
-
- // Currently we will never hit this branch, as SQL string API can only use `Ignore` or
- // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde
- // tables yet.
- if (mode == SaveMode.Append || mode == SaveMode.Overwrite) {
- throw new AnalysisException("" +
- "CTAS for hive serde tables does not support append or overwrite semantics.")
- }
-
- execution.CreateHiveTableAsSelectCommand(
- newTableDesc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
- query,
- mode == SaveMode.Ignore)
- }
- }
}
/**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index ca8c7347f2..86d3b6de0d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -87,7 +87,6 @@ private[sql] class HiveSessionCatalog(
val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions
- val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables
override def refreshTable(name: TableIdentifier): Unit = {
super.refreshTable(name)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index a7cc7cc142..f3c4135da6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -61,7 +61,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
override val extendedResolutionRules =
catalog.ParquetConversions ::
catalog.OrcConversions ::
- catalog.CreateTables ::
PreprocessDDL(conf) ::
PreprocessTableInsertion(conf) ::
DataSourceAnalysis(conf) ::
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 17956ded17..fb11c849ed 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -23,6 +23,8 @@ import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.execution._
private[hive] trait HiveStrategies {
@@ -45,6 +47,31 @@ private[hive] trait HiveStrategies {
case logical.InsertIntoTable(
table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite, ifNotExists) :: Nil
+
+ case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" =>
+ val newTableDesc = if (tableDesc.storage.serde.isEmpty) {
+ // add default serde
+ tableDesc.withNewStorage(
+ serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
+ } else {
+ tableDesc
+ }
+
+ // Currently we will never hit this branch, as SQL string API can only use `Ignore` or
+ // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde
+ // tables yet.
+ if (mode == SaveMode.Append || mode == SaveMode.Overwrite) {
+ throw new AnalysisException("" +
+ "CTAS for hive serde tables does not support append or overwrite semantics.")
+ }
+
+ val dbName = tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+ val cmd = CreateHiveTableAsSelectCommand(
+ newTableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
+ query,
+ mode == SaveMode.Ignore)
+ ExecutedCommandExec(cmd) :: Nil
+
case _ => Nil
}
}