aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2014-09-11 11:57:01 -0700
committerMichael Armbrust <michael@databricks.com>2014-09-11 11:57:01 -0700
commitca83f1e2c4dfa519e44b837b6815cba3b4526d92 (patch)
tree0f1929f65f5e4a51f25d679b59238f7b0bf47e8d
parent1ef656ea85b4b93c7b0f3cf8042b63a0de0901cb (diff)
downloadspark-ca83f1e2c4dfa519e44b837b6815cba3b4526d92.tar.gz
spark-ca83f1e2c4dfa519e44b837b6815cba3b4526d92.tar.bz2
spark-ca83f1e2c4dfa519e44b837b6815cba3b4526d92.zip
[SPARK-2917] [SQL] Avoid table creation in logical plan analyzing for CTAS
Author: Cheng Hao <hao.cheng@intel.com> Closes #1846 from chenghao-intel/ctas and squashes the following commits: 56a0578 [Cheng Hao] remove the unused imports 9a57abc [Cheng Hao] Avoid table creation in logical plan analyzing
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala14
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala73
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala9
8 files changed, 104 insertions, 17 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 4adfb18937..5d10754c7b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -114,11 +114,12 @@ case class InsertIntoTable(
}
}
-case class InsertIntoCreatedTable(
+case class CreateTableAsSelect(
databaseName: Option[String],
tableName: String,
child: LogicalPlan) extends UnaryNode {
override def output = child.output
+ override lazy val resolved = (databaseName != None && childrenResolved)
}
case class WriteToFile(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index 2f3033a5f9..e52eeb3e1c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -54,7 +54,7 @@ private[sql] trait SchemaRDDLike {
@transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match {
// For various commands (like DDL) and queries with side effects, we force query optimization to
// happen right away to let these side effects take place eagerly.
- case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
+ case _: Command | _: InsertIntoTable | _: CreateTableAsSelect |_: WriteToFile =>
queryExecution.toRdd
SparkLogicalPlan(queryExecution.executedPlan)(sqlContext)
case _ =>
@@ -124,7 +124,7 @@ private[sql] trait SchemaRDDLike {
*/
@Experimental
def saveAsTable(tableName: String): Unit =
- sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd
+ sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan)).toRdd
/** Returns the schema as a string in the tree format.
*
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index dfa2a7a9d2..2c0db9be57 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -54,8 +54,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
db: Option[String],
tableName: String,
alias: Option[String]): LogicalPlan = synchronized {
- val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
- val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
+ val (databaseName, tblName) = processDatabaseAndTableName(
+ db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
val table = client.getTable(databaseName, tblName)
val partitions: Seq[Partition] =
if (table.isPartitioned) {
@@ -112,17 +112,11 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p
- case InsertIntoCreatedTable(db, tableName, child) =>
+ case CreateTableAsSelect(db, tableName, child) =>
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
- createTable(databaseName, tblName, child.output)
-
- InsertIntoTable(
- lookupRelation(Some(databaseName), tblName, None),
- Map.empty,
- child,
- overwrite = false)
+ CreateTableAsSelect(Some(databaseName), tableName, child)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index c98287c6aa..21ecf17028 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -489,7 +489,7 @@ private[hive] object HiveQl {
val (db, tableName) = extractDbNameTableName(tableNameParts)
- InsertIntoCreatedTable(db, tableName, nodeToPlan(query))
+ CreateTableAsSelect(db, tableName, nodeToPlan(query))
// If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
case Token("TOK_CREATETABLE", _) => NativePlaceholder
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 72cc01cdf4..43dd3d234f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -165,6 +165,16 @@ private[hive] trait HiveStrategies {
InMemoryRelation(_, _, _,
HiveTableScan(_, table, _)), partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
+ case logical.CreateTableAsSelect(database, tableName, child) =>
+ val query = planLater(child)
+ CreateTableAsSelect(
+ database.get,
+ tableName,
+ query,
+ InsertIntoHiveTable(_: MetastoreRelation,
+ Map(),
+ query,
+ true)(hiveContext)) :: Nil
case _ => Nil
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
new file mode 100644
index 0000000000..71ea774d77
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LowerCaseSchema
+import org.apache.spark.sql.execution.{SparkPlan, Command, LeafNode}
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.MetastoreRelation
+
+/**
+ * :: Experimental ::
+ * Create table and insert the query result into it.
+ * @param database the database name of the new relation
+ * @param tableName the table name of the new relation
+ * @param insertIntoRelation function of creating the `InsertIntoHiveTable`
+ * by specifying the `MetaStoreRelation`, the data will be inserted into that table.
+ * TODO Add more table creating properties, e.g. SerDe, StorageHandler, in-memory cache etc.
+ */
+@Experimental
+case class CreateTableAsSelect(
+ database: String,
+ tableName: String,
+ query: SparkPlan,
+ insertIntoRelation: MetastoreRelation => InsertIntoHiveTable)
+ extends LeafNode with Command {
+
+ def output = Seq.empty
+
+ // A lazy computing of the metastoreRelation
+ private[this] lazy val metastoreRelation: MetastoreRelation = {
+ // Create the table
+ val sc = sqlContext.asInstanceOf[HiveContext]
+ sc.catalog.createTable(database, tableName, query.output, false)
+ // Get the Metastore Relation
+ sc.catalog.lookupRelation(Some(database), tableName, None) match {
+ case LowerCaseSchema(r: MetastoreRelation) => r
+ case o: MetastoreRelation => o
+ }
+ }
+
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = {
+ insertIntoRelation(metastoreRelation).execute
+ Seq.empty[Row]
+ }
+
+ override def execute(): RDD[Row] = {
+ sideEffectResult
+ sparkContext.emptyRDD[Row]
+ }
+
+ override def argString: String = {
+ s"[Database:$database, TableName: $tableName, InsertIntoHiveTable]\n" + query.toString
+ }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 39033bdeac..a284a91a91 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -53,9 +53,9 @@ case class InsertIntoHiveTable(
(@transient sc: HiveContext)
extends UnaryNode {
- val outputClass = newSerializer(table.tableDesc).getSerializedClass
- @transient private val hiveContext = new Context(sc.hiveconf)
- @transient private val db = Hive.get(sc.hiveconf)
+ @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
+ @transient private lazy val hiveContext = new Context(sc.hiveconf)
+ @transient private lazy val db = Hive.get(sc.hiveconf)
private def newSerializer(tableDesc: TableDesc): Serializer = {
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index b99caf77bc..679efe082f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -18,6 +18,8 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.QueryTest
+
+import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.test.TestHive._
case class Nested1(f1: Nested2)
@@ -54,4 +56,11 @@ class SQLQuerySuite extends QueryTest {
sql("SELECT f1.f2.f3 FROM nested"),
1)
}
+
+ test("test CTAS") {
+ checkAnswer(sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src"), Seq.empty[Row])
+ checkAnswer(
+ sql("SELECT key, value FROM test_ctas_123 ORDER BY key"),
+ sql("SELECT key, value FROM src ORDER BY key").collect().toSeq)
+ }
}