aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangfei <wangfei1@huawei.com>2014-12-18 20:24:56 -0800
committerMichael Armbrust <michael@databricks.com>2014-12-18 20:24:56 -0800
commitc3d91da5ea8b85ca75444ec606f2e1eae376c4b2 (patch)
tree3e41b59c473f6954af52b7e059049b41c99a5754
parentae9f128608f67cbee0a2fb24754783ee3b4f3098 (diff)
downloadspark-c3d91da5ea8b85ca75444ec606f2e1eae376c4b2.tar.gz
spark-c3d91da5ea8b85ca75444ec606f2e1eae376c4b2.tar.bz2
spark-c3d91da5ea8b85ca75444ec606f2e1eae376c4b2.zip
[SPARK-4861][SQL] Refactory command in spark sql
Remove ```Command``` and use ```RunnableCommand``` instead. Author: wangfei <wangfei1@huawei.com> Author: scwf <wangfei1@huawei.com> Closes #3712 from scwf/cmd and squashes the following commits: 51a82f2 [wangfei] fix test failure 0e03be8 [wangfei] address comments 4033bed [scwf] remove CreateTableAsSelect in hivestrategy 5d20010 [wangfei] address comments 125f542 [scwf] factory command in spark sql
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala85
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala20
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala13
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala39
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala38
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala23
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala (renamed from sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala)76
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala39
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala5
17 files changed, 172 insertions, 255 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
index 1d513d7789..5a1863953e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
@@ -30,23 +30,6 @@ abstract class Command extends LeafNode {
}
/**
- * Returned for commands supported by a given parser, but not catalyst. In general these are DDL
- * commands that are passed directly to another system.
- */
-case class NativeCommand(cmd: String) extends Command {
- override def output =
- Seq(AttributeReference("result", StringType, nullable = false)())
-}
-
-/**
- * Commands of the form "SET [key [= value] ]".
- */
-case class DFSCommand(kv: Option[(String, Option[String])]) extends Command {
- override def output = Seq(
- AttributeReference("DFS output", StringType, nullable = false)())
-}
-
-/**
*
* Commands of the form "SET [key [= value] ]".
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index ebd4cc920b..7a13302229 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -329,7 +329,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def strategies: Seq[Strategy] =
extraStrategies ++ (
- CommandStrategy(self) ::
+ CommandStrategy ::
DataSourceStrategy ::
TakeOrdered ::
HashAggregation ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 6e04f26c84..2954d4ce7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -304,17 +304,20 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}
- case class CommandStrategy(context: SQLContext) extends Strategy {
+ case object CommandStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: RunnableCommand => ExecutedCommand(r) :: Nil
case logical.SetCommand(kv) =>
- Seq(execution.SetCommand(kv, plan.output)(context))
+ Seq(ExecutedCommand(execution.SetCommand(kv, plan.output)))
case logical.ExplainCommand(logicalPlan, extended) =>
- Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context))
+ Seq(ExecutedCommand(
+ execution.ExplainCommand(logicalPlan, plan.output, extended)))
case logical.CacheTableCommand(tableName, optPlan, isLazy) =>
- Seq(execution.CacheTableCommand(tableName, optPlan, isLazy))
+ Seq(ExecutedCommand(
+ execution.CacheTableCommand(tableName, optPlan, isLazy)))
case logical.UncacheTableCommand(tableName) =>
- Seq(execution.UncacheTableCommand(tableName))
+ Seq(ExecutedCommand(
+ execution.UncacheTableCommand(tableName)))
case _ => Nil
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index afe3f3f074..b8fa4b0199 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -26,34 +26,20 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{SQLConf, SQLContext}
-// TODO: DELETE ME...
-trait Command {
- this: SparkPlan =>
-
- /**
- * A concrete command should override this lazy field to wrap up any side effects caused by the
- * command or any other computation that should be evaluated exactly once. The value of this field
- * can be used as the contents of the corresponding RDD generated from the physical plan of this
- * command.
- *
- * The `execute()` method of all the physical command classes should reference `sideEffectResult`
- * so that the command can be executed eagerly right after the command query is created.
- */
- protected lazy val sideEffectResult: Seq[Row] = Seq.empty[Row]
-
- override def executeCollect(): Array[Row] = sideEffectResult.toArray
-
- override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
-}
-
-// TODO: Replace command with runnable command.
+/**
+ * A logical command that is executed for its side-effects. `RunnableCommand`s are
+ * wrapped in `ExecutedCommand` during execution.
+ */
trait RunnableCommand extends logical.Command {
self: Product =>
- def output: Seq[Attribute]
def run(sqlContext: SQLContext): Seq[Row]
}
+/**
+ * A physical operator that executes the run method of a `RunnableCommand` and
+ * saves the result to prevent multiple executions.
+ */
case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan {
/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
@@ -79,43 +65,41 @@ case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan {
* :: DeveloperApi ::
*/
@DeveloperApi
-case class SetCommand(kv: Option[(String, Option[String])], output: Seq[Attribute])(
- @transient context: SQLContext)
- extends LeafNode with Command with Logging {
+case class SetCommand(
+ kv: Option[(String, Option[String])],
+ override val output: Seq[Attribute]) extends RunnableCommand with Logging {
- override protected lazy val sideEffectResult: Seq[Row] = kv match {
+ override def run(sqlContext: SQLContext) = kv match {
// Configures the deprecated "mapred.reduce.tasks" property.
case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) =>
logWarning(
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
- context.setConf(SQLConf.SHUFFLE_PARTITIONS, value)
+ sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS, value)
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$value"))
// Configures a single property.
case Some((key, Some(value))) =>
- context.setConf(key, value)
+ sqlContext.setConf(key, value)
Seq(Row(s"$key=$value"))
- // Queries all key-value pairs that are set in the SQLConf of the context. Notice that different
- // from Hive, here "SET -v" is an alias of "SET". (In Hive, "SET" returns all changed properties
- // while "SET -v" returns all properties.)
+ // Queries all key-value pairs that are set in the SQLConf of the sqlContext.
+ // Notice that different from Hive, here "SET -v" is an alias of "SET".
+ // (In Hive, "SET" returns all changed properties while "SET -v" returns all properties.)
case Some(("-v", None)) | None =>
- context.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq
+ sqlContext.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq
// Queries the deprecated "mapred.reduce.tasks" property.
case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) =>
logWarning(
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
- Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${context.numShufflePartitions}"))
+ Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.numShufflePartitions}"))
// Queries a single property.
case Some((key, None)) =>
- Seq(Row(s"$key=${context.getConf(key, "<undefined>")}"))
+ Seq(Row(s"$key=${sqlContext.getConf(key, "<undefined>")}"))
}
-
- override def otherCopyArgs = context :: Nil
}
/**
@@ -128,22 +112,19 @@ case class SetCommand(kv: Option[(String, Option[String])], output: Seq[Attribut
*/
@DeveloperApi
case class ExplainCommand(
- logicalPlan: LogicalPlan, output: Seq[Attribute], extended: Boolean)(
- @transient context: SQLContext)
- extends LeafNode with Command {
+ logicalPlan: LogicalPlan,
+ override val output: Seq[Attribute], extended: Boolean) extends RunnableCommand {
// Run through the optimizer to generate the physical plan.
- override protected lazy val sideEffectResult: Seq[Row] = try {
+ override def run(sqlContext: SQLContext) = try {
// TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties.
- val queryExecution = context.executePlan(logicalPlan)
+ val queryExecution = sqlContext.executePlan(logicalPlan)
val outputString = if (extended) queryExecution.toString else queryExecution.simpleString
outputString.split("\n").map(Row(_))
} catch { case cause: TreeNodeException[_] =>
("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
}
-
- override def otherCopyArgs = context :: Nil
}
/**
@@ -153,10 +134,9 @@ case class ExplainCommand(
case class CacheTableCommand(
tableName: String,
plan: Option[LogicalPlan],
- isLazy: Boolean)
- extends LeafNode with Command {
+ isLazy: Boolean) extends RunnableCommand {
- override protected lazy val sideEffectResult = {
+ override def run(sqlContext: SQLContext) = {
import sqlContext._
plan.foreach(_.registerTempTable(tableName))
@@ -178,8 +158,9 @@ case class CacheTableCommand(
* :: DeveloperApi ::
*/
@DeveloperApi
-case class UncacheTableCommand(tableName: String) extends LeafNode with Command {
- override protected lazy val sideEffectResult: Seq[Row] = {
+case class UncacheTableCommand(tableName: String) extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext) = {
sqlContext.table(tableName).unpersist()
Seq.empty[Row]
}
@@ -191,11 +172,11 @@ case class UncacheTableCommand(tableName: String) extends LeafNode with Command
* :: DeveloperApi ::
*/
@DeveloperApi
-case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
- @transient context: SQLContext)
- extends LeafNode with Command {
+case class DescribeCommand(
+ child: SparkPlan,
+ override val output: Seq[Attribute]) extends RunnableCommand {
- override protected lazy val sideEffectResult: Seq[Row] = {
+ override def run(sqlContext: SQLContext) = {
Row("# Registered as a temporary table", null, null) +:
child.output.map(field => Row(field.name, field.dataType.toString, null))
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala
index 430ffb2998..ebf7003ff9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala
@@ -21,6 +21,7 @@ import scala.language.implicitConversions
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, SqlLexical}
+import org.apache.spark.sql.hive.execution.{AddJar, AddFile, HiveNativeCommand}
/**
* A parser that recognizes all HiveQL constructs together with Spark SQL specific extensions.
@@ -52,7 +53,7 @@ private[hive] class ExtendedHiveQlParser extends AbstractSparkSQLParser {
protected lazy val dfs: Parser[LogicalPlan] =
DFS ~> wholeInput ^^ {
- case command => NativeCommand(command.trim)
+ case command => HiveNativeCommand(command.trim)
}
private lazy val addFile: Parser[LogicalPlan] =
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 7de4407730..56fe27a77b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -39,8 +39,8 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperat
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types.DecimalType
import org.apache.spark.sql.catalyst.types.decimal.Decimal
-import org.apache.spark.sql.execution.{ExtractPythonUdfs, QueryExecutionException, Command => PhysicalCommand}
-import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand
+import org.apache.spark.sql.execution.{SparkPlan, ExecutedCommand, ExtractPythonUdfs, QueryExecutionException}
+import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand}
import org.apache.spark.sql.sources.DataSourceStrategy
/**
@@ -340,7 +340,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override def strategies: Seq[Strategy] = extraStrategies ++ Seq(
DataSourceStrategy,
- CommandStrategy(self),
+ CommandStrategy,
HiveCommandStrategy(self),
TakeOrdered,
ParquetOperations,
@@ -369,11 +369,17 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* execution is simply passed back to Hive.
*/
def stringResult(): Seq[String] = executedPlan match {
- case describeHiveTableCommand: DescribeHiveTableCommand =>
+ case ExecutedCommand(desc: DescribeHiveTableCommand) =>
// If it is a describe command for a Hive table, we want to have the output format
// be similar with Hive.
- describeHiveTableCommand.hiveString
- case command: PhysicalCommand =>
+ desc.run(self).map {
+ case Row(name: String, dataType: String, comment) =>
+ Seq(name, dataType,
+ Option(comment.asInstanceOf[String]).getOrElse(""))
+ .map(s => String.format(s"%-20s", s))
+ .mkString("\t")
+ }
+ case command: ExecutedCommand =>
command.executeCollect().map(_.head.toString)
case other =>
@@ -386,7 +392,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override def simpleString: String =
logical match {
- case _: NativeCommand => "<Native command: executed by Hive>"
+ case _: HiveNativeCommand => "<Native command: executed by Hive>"
case _: SetCommand => "<SET command: executed by Hive, and noted by SQLContext>"
case _ => super.simpleString
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d8b10b78c6..b31a3ec250 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.hive
import java.io.IOException
import java.util.{List => JList}
+import org.apache.spark.sql.execution.SparkPlan
+
import scala.util.parsing.combinator.RegexParsers
import org.apache.hadoop.util.ReflectionUtils
@@ -286,14 +288,24 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
Some(sa.getQB().getTableDesc)
}
- CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, desc)
+ execution.CreateTableAsSelect(
+ databaseName,
+ tableName,
+ child,
+ allowExisting,
+ desc)
case p: LogicalPlan if p.resolved => p
case p @ CreateTableAsSelect(db, tableName, child, allowExisting, None) =>
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
- CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, None)
+ execution.CreateTableAsSelect(
+ databaseName,
+ tableName,
+ child,
+ allowExisting,
+ None)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 5939276f6d..3f3d9e7cd4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.types.decimal.Decimal
+import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable}
/* Implicit conversions */
import scala.collection.JavaConversions._
@@ -44,14 +45,6 @@ import scala.collection.JavaConversions._
*/
private[hive] case object NativePlaceholder extends Command
-private[hive] case class AddFile(filePath: String) extends Command
-
-private[hive] case class AddJar(path: String) extends Command
-
-private[hive] case class DropTable(tableName: String, ifExists: Boolean) extends Command
-
-private[hive] case class AnalyzeTable(tableName: String) extends Command
-
/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
private[hive] object HiveQl {
protected val nativeCommands = Seq(
@@ -239,10 +232,10 @@ private[hive] object HiveQl {
try {
val tree = getAst(sql)
if (nativeCommands contains tree.getText) {
- NativeCommand(sql)
+ HiveNativeCommand(sql)
} else {
nodeToPlan(tree) match {
- case NativePlaceholder => NativeCommand(sql)
+ case NativePlaceholder => HiveNativeCommand(sql)
case other => other
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 4ebd59db83..d3f6381b69 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -17,9 +17,6 @@
package org.apache.spark.sql.hive
-import org.apache.hadoop.hive.ql.parse.ASTNode
-import org.apache.hadoop.hive.ql.plan.CreateTableDesc
-
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
@@ -28,7 +25,7 @@ import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.types.StringType
-import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan, PhysicalRDD}
+import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
@@ -177,25 +174,10 @@ private[hive] trait HiveStrategies {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
execution.InsertIntoHiveTable(
- table, partition, planLater(child), overwrite)(hiveContext) :: Nil
+ table, partition, planLater(child), overwrite) :: Nil
case hive.InsertIntoHiveTable(table: MetastoreRelation, partition, child, overwrite) =>
execution.InsertIntoHiveTable(
- table, partition, planLater(child), overwrite)(hiveContext) :: Nil
- case logical.CreateTableAsSelect(
- Some(database), tableName, child, allowExisting, Some(desc: CreateTableDesc)) =>
- execution.CreateTableAsSelect(
- database,
- tableName,
- child,
- allowExisting,
- Some(desc)) :: Nil
- case logical.CreateTableAsSelect(Some(database), tableName, child, allowExisting, None) =>
- execution.CreateTableAsSelect(
- database,
- tableName,
- child,
- allowExisting,
- None) :: Nil
+ table, partition, planLater(child), overwrite) :: Nil
case _ => Nil
}
}
@@ -227,23 +209,14 @@ private[hive] trait HiveStrategies {
case class HiveCommandStrategy(context: HiveContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case logical.NativeCommand(sql) => NativeCommand(sql, plan.output)(context) :: Nil
-
- case hive.DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil
-
- case hive.AddJar(path) => execution.AddJar(path) :: Nil
-
- case hive.AddFile(path) => execution.AddFile(path) :: Nil
-
- case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil
-
case describe: logical.DescribeCommand =>
val resolvedTable = context.executePlan(describe.table).analyzed
resolvedTable match {
case t: MetastoreRelation =>
- Seq(DescribeHiveTableCommand(t, describe.output, describe.isExtended)(context))
+ ExecutedCommand(
+ DescribeHiveTableCommand(t, describe.output, describe.isExtended)) :: Nil
case o: LogicalPlan =>
- Seq(DescribeCommand(planLater(o), describe.output)(context))
+ ExecutedCommand(DescribeCommand(planLater(o), describe.output)) :: Nil
}
case _ => Nil
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index eedb57de52..b2149bd95a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -20,9 +20,6 @@ package org.apache.spark.sql.hive.test
import java.io.File
import java.util.{Set => JavaSet}
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.ql.session.SessionState
-
import scala.collection.mutable
import scala.language.implicitConversions
@@ -37,10 +34,11 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.Utils
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.plans.logical.{CacheTableCommand, LogicalPlan, NativeCommand}
+import org.apache.spark.sql.catalyst.plans.logical.{CacheTableCommand, LogicalPlan}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.hive._
import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.hive.execution.HiveNativeCommand
/* Implicit conversions */
import scala.collection.JavaConversions._
@@ -161,7 +159,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
abstract class QueryExecution extends super.QueryExecution {
override lazy val analyzed = {
val describedTables = logical match {
- case NativeCommand(describedTable(tbl)) => tbl :: Nil
+ case HiveNativeCommand(describedTable(tbl)) => tbl :: Nil
case CacheTableCommand(tbl, _, _) => tbl :: Nil
case _ => Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index b83689ceab..fe21454e7f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -20,10 +20,10 @@ package org.apache.spark.sql.hive.execution
import org.apache.hadoop.hive.ql.plan.CreateTableDesc
import org.apache.spark.annotation.Experimental
-import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
-import org.apache.spark.sql.execution.{SparkPlan, Command, LeafNode}
+import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.MetastoreRelation
@@ -44,28 +44,23 @@ case class CreateTableAsSelect(
tableName: String,
query: LogicalPlan,
allowExisting: Boolean,
- desc: Option[CreateTableDesc]) extends LeafNode with Command {
+ desc: Option[CreateTableDesc]) extends RunnableCommand {
- def output = Seq.empty
+ override def run(sqlContext: SQLContext) = {
+ val hiveContext = sqlContext.asInstanceOf[HiveContext]
+ lazy val metastoreRelation: MetastoreRelation = {
+ // Create Hive Table
+ hiveContext.catalog.createTable(database, tableName, query.output, allowExisting, desc)
- private[this] def sc = sqlContext.asInstanceOf[HiveContext]
-
- // A lazy computing of the metastoreRelation
- private[this] lazy val metastoreRelation: MetastoreRelation = {
- // Create Hive Table
- sc.catalog.createTable(database, tableName, query.output, allowExisting, desc)
-
- // Get the Metastore Relation
- sc.catalog.lookupRelation(Some(database), tableName, None) match {
- case r: MetastoreRelation => r
+ // Get the Metastore Relation
+ hiveContext.catalog.lookupRelation(Some(database), tableName, None) match {
+ case r: MetastoreRelation => r
+ }
}
- }
-
- override protected[sql] lazy val sideEffectResult: Seq[Row] = {
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
- if (sc.catalog.tableExists(Some(database), tableName)) {
+ if (hiveContext.catalog.tableExists(Some(database), tableName)) {
if (allowExisting) {
// table already exists, will do nothing, to keep consistent with Hive
} else {
@@ -73,17 +68,12 @@ case class CreateTableAsSelect(
new org.apache.hadoop.hive.metastore.api.AlreadyExistsException(s"$database.$tableName")
}
} else {
- sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
+ hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
}
Seq.empty[Row]
}
- override def execute(): RDD[Row] = {
- sideEffectResult
- sparkContext.emptyRDD[Row]
- }
-
override def argString: String = {
s"[Database:$database, TableName: $tableName, InsertIntoHiveTable]\n" + query.toString
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
index 5d98834c6f..bfacc51ef5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
@@ -22,11 +22,11 @@ import scala.collection.JavaConversions._
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
-import org.apache.spark.sql.execution.{Command, LeafNode}
+import org.apache.spark.sql.execution.{SparkPlan, RunnableCommand}
import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}
import org.apache.spark.sql.hive.HiveShim
+import org.apache.spark.sql.SQLContext
/**
* Implementation for "describe [extended] table".
@@ -36,21 +36,10 @@ import org.apache.spark.sql.hive.HiveShim
@DeveloperApi
case class DescribeHiveTableCommand(
table: MetastoreRelation,
- output: Seq[Attribute],
- isExtended: Boolean)(
- @transient context: HiveContext)
- extends LeafNode with Command {
+ override val output: Seq[Attribute],
+ isExtended: Boolean) extends RunnableCommand {
- // Strings with the format like Hive. It is used for result comparison in our unit tests.
- lazy val hiveString: Seq[String] = sideEffectResult.map {
- case Row(name: String, dataType: String, comment) =>
- Seq(name, dataType,
- Option(comment.asInstanceOf[String]).getOrElse(""))
- .map(s => String.format(s"%-20s", s))
- .mkString("\t")
- }
-
- override protected lazy val sideEffectResult: Seq[Row] = {
+ override def run(sqlContext: SQLContext) = {
// Trying to mimic the format of Hive's output. But not exactly the same.
var results: Seq[(String, String, String)] = Nil
@@ -75,6 +64,4 @@ case class DescribeHiveTableCommand(
Row(name, dataType, comment)
}
}
-
- override def otherCopyArgs = context :: Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
index 6930c2babd..8ba818af5f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
@@ -1,38 +1,38 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row}
-import org.apache.spark.sql.execution.{Command, LeafNode}
-import org.apache.spark.sql.hive.HiveContext
-
-/**
- * :: DeveloperApi ::
- */
-@DeveloperApi
-case class NativeCommand(
- sql: String, output: Seq[Attribute])(
- @transient context: HiveContext)
- extends LeafNode with Command {
-
- override protected lazy val sideEffectResult: Seq[Row] = context.runSqlHive(sql).map(Row(_))
-
- override def otherCopyArgs = context :: Nil
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
+import org.apache.spark.sql.execution.RunnableCommand
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.types.StringType
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class HiveNativeCommand(sql: String) extends RunnableCommand {
+
+ override def output =
+ Seq(AttributeReference("result", StringType, nullable = false)())
+
+ override def run(sqlContext: SQLContext) =
+ sqlContext.asInstanceOf[HiveContext].runSqlHive(sql).map(Row(_))
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 81390f6267..ca0ec15139 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -21,7 +21,6 @@ import java.util
import scala.collection.JavaConversions._
-import org.apache.hadoop.hive.common.`type`.HiveVarchar
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.MetaStoreUtils
@@ -31,14 +30,12 @@ import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector}
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.catalyst.types.decimal.Decimal
-import org.apache.spark.sql.execution.{Command, SparkPlan, UnaryNode}
+import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.{ ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive.HiveShim._
@@ -52,10 +49,9 @@ case class InsertIntoHiveTable(
table: MetastoreRelation,
partition: Map[String, Option[String]],
child: SparkPlan,
- overwrite: Boolean)
- (@transient sc: HiveContext)
- extends UnaryNode with Command with HiveInspectors {
+ overwrite: Boolean) extends UnaryNode with HiveInspectors {
+ @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@transient private lazy val hiveContext = new Context(sc.hiveconf)
@transient private lazy val db = Hive.get(sc.hiveconf)
@@ -66,8 +62,6 @@ case class InsertIntoHiveTable(
serializer
}
- override def otherCopyArgs = sc :: Nil
-
def output = child.output
def saveAsHiveFile(
@@ -134,7 +128,7 @@ case class InsertIntoHiveTable(
*
* Note: this is run once and then kept to avoid double insertions.
*/
- override protected[sql] lazy val sideEffectResult: Seq[Row] = {
+ protected[sql] lazy val sideEffectResult: Seq[Row] = {
// Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
@@ -256,4 +250,8 @@ case class InsertIntoHiveTable(
// TODO: implement hive compatibility as rules.
Seq.empty[Row]
}
+
+ override def executeCollect(): Array[Row] = sideEffectResult.toArray
+
+ override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 903075edf7..6fc4153f6a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -18,10 +18,10 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.execution.{Command, LeafNode}
+import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.SQLContext
/**
* :: DeveloperApi ::
@@ -32,13 +32,10 @@ import org.apache.spark.sql.hive.HiveContext
* in the Hive metastore.
*/
@DeveloperApi
-case class AnalyzeTable(tableName: String) extends LeafNode with Command {
- def hiveContext = sqlContext.asInstanceOf[HiveContext]
+case class AnalyzeTable(tableName: String) extends RunnableCommand {
- def output = Seq.empty
-
- override protected lazy val sideEffectResult: Seq[Row] = {
- hiveContext.analyze(tableName)
+ override def run(sqlContext: SQLContext) = {
+ sqlContext.asInstanceOf[HiveContext].analyze(tableName)
Seq.empty[Row]
}
}
@@ -48,12 +45,12 @@ case class AnalyzeTable(tableName: String) extends LeafNode with Command {
* Drops a table from the metastore and removes it if it is cached.
*/
@DeveloperApi
-case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with Command {
- def hiveContext = sqlContext.asInstanceOf[HiveContext]
-
- def output = Seq.empty
+case class DropTable(
+ tableName: String,
+ ifExists: Boolean) extends RunnableCommand {
- override protected lazy val sideEffectResult: Seq[Row] = {
+ override def run(sqlContext: SQLContext) = {
+ val hiveContext = sqlContext.asInstanceOf[HiveContext]
val ifExistsClause = if (ifExists) "IF EXISTS " else ""
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
hiveContext.catalog.unregisterTable(None, tableName)
@@ -65,12 +62,10 @@ case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with
* :: DeveloperApi ::
*/
@DeveloperApi
-case class AddJar(path: String) extends LeafNode with Command {
- def hiveContext = sqlContext.asInstanceOf[HiveContext]
+case class AddJar(path: String) extends RunnableCommand {
- override def output = Seq.empty
-
- override protected lazy val sideEffectResult: Seq[Row] = {
+ override def run(sqlContext: SQLContext) = {
+ val hiveContext = sqlContext.asInstanceOf[HiveContext]
hiveContext.runSqlHive(s"ADD JAR $path")
hiveContext.sparkContext.addJar(path)
Seq.empty[Row]
@@ -81,12 +76,10 @@ case class AddJar(path: String) extends LeafNode with Command {
* :: DeveloperApi ::
*/
@DeveloperApi
-case class AddFile(path: String) extends LeafNode with Command {
- def hiveContext = sqlContext.asInstanceOf[HiveContext]
-
- override def output = Seq.empty
+case class AddFile(path: String) extends RunnableCommand {
- override protected lazy val sideEffectResult: Seq[Row] = {
+ override def run(sqlContext: SQLContext) = {
+ val hiveContext = sqlContext.asInstanceOf[HiveContext]
hiveContext.runSqlHive(s"ADD FILE $path")
hiveContext.sparkContext.addFile(path)
Seq.empty[Row]
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index a90fc023e6..ff4071d8e2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -22,10 +22,10 @@ import org.scalatest.BeforeAndAfterAll
import scala.reflect.ClassTag
import org.apache.spark.sql.{SQLConf, QueryTest}
-import org.apache.spark.sql.catalyst.plans.logical.NativeCommand
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
+import org.apache.spark.sql.hive.execution._
class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
TestHive.reset()
@@ -51,19 +51,19 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
assertAnalyzeCommand(
"ANALYZE TABLE Table1 COMPUTE STATISTICS",
- classOf[NativeCommand])
+ classOf[HiveNativeCommand])
assertAnalyzeCommand(
"ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS",
- classOf[NativeCommand])
+ classOf[HiveNativeCommand])
assertAnalyzeCommand(
"ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan",
- classOf[NativeCommand])
+ classOf[HiveNativeCommand])
assertAnalyzeCommand(
"ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS",
- classOf[NativeCommand])
+ classOf[HiveNativeCommand])
assertAnalyzeCommand(
"ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS noscan",
- classOf[NativeCommand])
+ classOf[HiveNativeCommand])
assertAnalyzeCommand(
"ANALYZE TABLE Table1 COMPUTE STATISTICS nOscAn",
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 44eb4cfa59..8011f9b877 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -24,7 +24,6 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen}
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand => LogicalNativeCommand}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.hive.test.TestHive
@@ -142,14 +141,14 @@ abstract class HiveComparisonTest
// Hack: Hive simply prints the result of a SET command to screen,
// and does not return it as a query answer.
case _: SetCommand => Seq("0")
- case LogicalNativeCommand(c) if c.toLowerCase.contains("desc") =>
+ case HiveNativeCommand(c) if c.toLowerCase.contains("desc") =>
answer
.filterNot(nonDeterministicLine)
.map(_.replaceAll("from deserializer", ""))
.map(_.replaceAll("None", ""))
.map(_.trim)
.filterNot(_ == "")
- case _: LogicalNativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
+ case _: HiveNativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
case _: ExplainCommand => answer
case _: DescribeCommand =>
// Filter out non-deterministic lines and lines which do not have actual results but