From c3d91da5ea8b85ca75444ec606f2e1eae376c4b2 Mon Sep 17 00:00:00 2001 From: wangfei Date: Thu, 18 Dec 2014 20:24:56 -0800 Subject: [SPARK-4861][SQL] Refactory command in spark sql Remove ```Command``` and use ```RunnableCommand``` instead. Author: wangfei Author: scwf Closes #3712 from scwf/cmd and squashes the following commits: 51a82f2 [wangfei] fix test failure 0e03be8 [wangfei] address comments 4033bed [scwf] remove CreateTableAsSelect in hivestrategy 5d20010 [wangfei] address comments 125f542 [scwf] factory command in spark sql --- .../sql/catalyst/plans/logical/commands.scala | 17 ----- .../scala/org/apache/spark/sql/SQLContext.scala | 2 +- .../spark/sql/execution/SparkStrategies.scala | 13 ++-- .../org/apache/spark/sql/execution/commands.scala | 85 +++++++++------------- .../spark/sql/hive/ExtendedHiveQlParser.scala | 3 +- .../org/apache/spark/sql/hive/HiveContext.scala | 20 +++-- .../spark/sql/hive/HiveMetastoreCatalog.scala | 16 +++- .../scala/org/apache/spark/sql/hive/HiveQl.scala | 13 +--- .../org/apache/spark/sql/hive/HiveStrategies.scala | 39 ++-------- .../scala/org/apache/spark/sql/hive/TestHive.scala | 8 +- .../sql/hive/execution/CreateTableAsSelect.scala | 38 ++++------ .../hive/execution/DescribeHiveTableCommand.scala | 23 ++---- .../sql/hive/execution/HiveNativeCommand.scala | 38 ++++++++++ .../sql/hive/execution/InsertIntoHiveTable.scala | 18 ++--- .../spark/sql/hive/execution/NativeCommand.scala | 38 ---------- .../apache/spark/sql/hive/execution/commands.scala | 39 ++++------ .../apache/spark/sql/hive/StatisticsSuite.scala | 12 +-- .../sql/hive/execution/HiveComparisonTest.scala | 5 +- 18 files changed, 172 insertions(+), 255 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala delete mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala index 1d513d7789..5a1863953e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala @@ -29,23 +29,6 @@ abstract class Command extends LeafNode { def output: Seq[Attribute] = Seq.empty } -/** - * Returned for commands supported by a given parser, but not catalyst. In general these are DDL - * commands that are passed directly to another system. - */ -case class NativeCommand(cmd: String) extends Command { - override def output = - Seq(AttributeReference("result", StringType, nullable = false)()) -} - -/** - * Commands of the form "SET [key [= value] ]". - */ -case class DFSCommand(kv: Option[(String, Option[String])]) extends Command { - override def output = Seq( - AttributeReference("DFS output", StringType, nullable = false)()) -} - /** * * Commands of the form "SET [key [= value] ]". diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index ebd4cc920b..7a13302229 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -329,7 +329,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def strategies: Seq[Strategy] = extraStrategies ++ ( - CommandStrategy(self) :: + CommandStrategy :: DataSourceStrategy :: TakeOrdered :: HashAggregation :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 6e04f26c84..2954d4ce7d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -304,17 +304,20 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } - case class CommandStrategy(context: SQLContext) extends Strategy { + case object CommandStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case r: RunnableCommand => ExecutedCommand(r) :: Nil case logical.SetCommand(kv) => - Seq(execution.SetCommand(kv, plan.output)(context)) + Seq(ExecutedCommand(execution.SetCommand(kv, plan.output))) case logical.ExplainCommand(logicalPlan, extended) => - Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context)) + Seq(ExecutedCommand( + execution.ExplainCommand(logicalPlan, plan.output, extended))) case logical.CacheTableCommand(tableName, optPlan, isLazy) => - Seq(execution.CacheTableCommand(tableName, optPlan, isLazy)) + Seq(ExecutedCommand( + execution.CacheTableCommand(tableName, optPlan, isLazy))) case logical.UncacheTableCommand(tableName) => - Seq(execution.UncacheTableCommand(tableName)) + Seq(ExecutedCommand( + execution.UncacheTableCommand(tableName))) case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index afe3f3f074..b8fa4b0199 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -26,34 +26,20 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.{SQLConf, SQLContext} -// TODO: DELETE ME... -trait Command { - this: SparkPlan => - - /** - * A concrete command should override this lazy field to wrap up any side effects caused by the - * command or any other computation that should be evaluated exactly once. The value of this field - * can be used as the contents of the corresponding RDD generated from the physical plan of this - * command. - * - * The `execute()` method of all the physical command classes should reference `sideEffectResult` - * so that the command can be executed eagerly right after the command query is created. - */ - protected lazy val sideEffectResult: Seq[Row] = Seq.empty[Row] - - override def executeCollect(): Array[Row] = sideEffectResult.toArray - - override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1) -} - -// TODO: Replace command with runnable command. +/** + * A logical command that is executed for its side-effects. `RunnableCommand`s are + * wrapped in `ExecutedCommand` during execution. + */ trait RunnableCommand extends logical.Command { self: Product => - def output: Seq[Attribute] def run(sqlContext: SQLContext): Seq[Row] } +/** + * A physical operator that executes the run method of a `RunnableCommand` and + * saves the result to prevent multiple executions. + */ case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan { /** * A concrete command should override this lazy field to wrap up any side effects caused by the @@ -79,43 +65,41 @@ case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan { * :: DeveloperApi :: */ @DeveloperApi -case class SetCommand(kv: Option[(String, Option[String])], output: Seq[Attribute])( - @transient context: SQLContext) - extends LeafNode with Command with Logging { +case class SetCommand( + kv: Option[(String, Option[String])], + override val output: Seq[Attribute]) extends RunnableCommand with Logging { - override protected lazy val sideEffectResult: Seq[Row] = kv match { + override def run(sqlContext: SQLContext) = kv match { // Configures the deprecated "mapred.reduce.tasks" property. case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) => logWarning( s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.") - context.setConf(SQLConf.SHUFFLE_PARTITIONS, value) + sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS, value) Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$value")) // Configures a single property. case Some((key, Some(value))) => - context.setConf(key, value) + sqlContext.setConf(key, value) Seq(Row(s"$key=$value")) - // Queries all key-value pairs that are set in the SQLConf of the context. Notice that different - // from Hive, here "SET -v" is an alias of "SET". (In Hive, "SET" returns all changed properties - // while "SET -v" returns all properties.) + // Queries all key-value pairs that are set in the SQLConf of the sqlContext. + // Notice that different from Hive, here "SET -v" is an alias of "SET". + // (In Hive, "SET" returns all changed properties while "SET -v" returns all properties.) case Some(("-v", None)) | None => - context.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq + sqlContext.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq // Queries the deprecated "mapred.reduce.tasks" property. case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) => logWarning( s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.") - Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${context.numShufflePartitions}")) + Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.numShufflePartitions}")) // Queries a single property. case Some((key, None)) => - Seq(Row(s"$key=${context.getConf(key, "")}")) + Seq(Row(s"$key=${sqlContext.getConf(key, "")}")) } - - override def otherCopyArgs = context :: Nil } /** @@ -128,22 +112,19 @@ case class SetCommand(kv: Option[(String, Option[String])], output: Seq[Attribut */ @DeveloperApi case class ExplainCommand( - logicalPlan: LogicalPlan, output: Seq[Attribute], extended: Boolean)( - @transient context: SQLContext) - extends LeafNode with Command { + logicalPlan: LogicalPlan, + override val output: Seq[Attribute], extended: Boolean) extends RunnableCommand { // Run through the optimizer to generate the physical plan. - override protected lazy val sideEffectResult: Seq[Row] = try { + override def run(sqlContext: SQLContext) = try { // TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties. - val queryExecution = context.executePlan(logicalPlan) + val queryExecution = sqlContext.executePlan(logicalPlan) val outputString = if (extended) queryExecution.toString else queryExecution.simpleString outputString.split("\n").map(Row(_)) } catch { case cause: TreeNodeException[_] => ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) } - - override def otherCopyArgs = context :: Nil } /** @@ -153,10 +134,9 @@ case class ExplainCommand( case class CacheTableCommand( tableName: String, plan: Option[LogicalPlan], - isLazy: Boolean) - extends LeafNode with Command { + isLazy: Boolean) extends RunnableCommand { - override protected lazy val sideEffectResult = { + override def run(sqlContext: SQLContext) = { import sqlContext._ plan.foreach(_.registerTempTable(tableName)) @@ -178,8 +158,9 @@ case class CacheTableCommand( * :: DeveloperApi :: */ @DeveloperApi -case class UncacheTableCommand(tableName: String) extends LeafNode with Command { - override protected lazy val sideEffectResult: Seq[Row] = { +case class UncacheTableCommand(tableName: String) extends RunnableCommand { + + override def run(sqlContext: SQLContext) = { sqlContext.table(tableName).unpersist() Seq.empty[Row] } @@ -191,11 +172,11 @@ case class UncacheTableCommand(tableName: String) extends LeafNode with Command * :: DeveloperApi :: */ @DeveloperApi -case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])( - @transient context: SQLContext) - extends LeafNode with Command { +case class DescribeCommand( + child: SparkPlan, + override val output: Seq[Attribute]) extends RunnableCommand { - override protected lazy val sideEffectResult: Seq[Row] = { + override def run(sqlContext: SQLContext) = { Row("# Registered as a temporary table", null, null) +: child.output.map(field => Row(field.name, field.dataType.toString, null)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala index 430ffb2998..ebf7003ff9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala @@ -21,6 +21,7 @@ import scala.language.implicitConversions import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, SqlLexical} +import org.apache.spark.sql.hive.execution.{AddJar, AddFile, HiveNativeCommand} /** * A parser that recognizes all HiveQL constructs together with Spark SQL specific extensions. @@ -52,7 +53,7 @@ private[hive] class ExtendedHiveQlParser extends AbstractSparkSQLParser { protected lazy val dfs: Parser[LogicalPlan] = DFS ~> wholeInput ^^ { - case command => NativeCommand(command.trim) + case command => HiveNativeCommand(command.trim) } private lazy val addFile: Parser[LogicalPlan] = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 7de4407730..56fe27a77b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -39,8 +39,8 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperat import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.types.DecimalType import org.apache.spark.sql.catalyst.types.decimal.Decimal -import org.apache.spark.sql.execution.{ExtractPythonUdfs, QueryExecutionException, Command => PhysicalCommand} -import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand +import org.apache.spark.sql.execution.{SparkPlan, ExecutedCommand, ExtractPythonUdfs, QueryExecutionException} +import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand} import org.apache.spark.sql.sources.DataSourceStrategy /** @@ -340,7 +340,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { override def strategies: Seq[Strategy] = extraStrategies ++ Seq( DataSourceStrategy, - CommandStrategy(self), + CommandStrategy, HiveCommandStrategy(self), TakeOrdered, ParquetOperations, @@ -369,11 +369,17 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { * execution is simply passed back to Hive. */ def stringResult(): Seq[String] = executedPlan match { - case describeHiveTableCommand: DescribeHiveTableCommand => + case ExecutedCommand(desc: DescribeHiveTableCommand) => // If it is a describe command for a Hive table, we want to have the output format // be similar with Hive. - describeHiveTableCommand.hiveString - case command: PhysicalCommand => + desc.run(self).map { + case Row(name: String, dataType: String, comment) => + Seq(name, dataType, + Option(comment.asInstanceOf[String]).getOrElse("")) + .map(s => String.format(s"%-20s", s)) + .mkString("\t") + } + case command: ExecutedCommand => command.executeCollect().map(_.head.toString) case other => @@ -386,7 +392,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { override def simpleString: String = logical match { - case _: NativeCommand => "" + case _: HiveNativeCommand => "" case _: SetCommand => "" case _ => super.simpleString } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d8b10b78c6..b31a3ec250 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.hive import java.io.IOException import java.util.{List => JList} +import org.apache.spark.sql.execution.SparkPlan + import scala.util.parsing.combinator.RegexParsers import org.apache.hadoop.util.ReflectionUtils @@ -286,14 +288,24 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Some(sa.getQB().getTableDesc) } - CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, desc) + execution.CreateTableAsSelect( + databaseName, + tableName, + child, + allowExisting, + desc) case p: LogicalPlan if p.resolved => p case p @ CreateTableAsSelect(db, tableName, child, allowExisting, None) => val (dbName, tblName) = processDatabaseAndTableName(db, tableName) val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) - CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, None) + execution.CreateTableAsSelect( + databaseName, + tableName, + child, + allowExisting, + None) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 5939276f6d..3f3d9e7cd4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.types.decimal.Decimal +import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable} /* Implicit conversions */ import scala.collection.JavaConversions._ @@ -44,14 +45,6 @@ import scala.collection.JavaConversions._ */ private[hive] case object NativePlaceholder extends Command -private[hive] case class AddFile(filePath: String) extends Command - -private[hive] case class AddJar(path: String) extends Command - -private[hive] case class DropTable(tableName: String, ifExists: Boolean) extends Command - -private[hive] case class AnalyzeTable(tableName: String) extends Command - /** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */ private[hive] object HiveQl { protected val nativeCommands = Seq( @@ -239,10 +232,10 @@ private[hive] object HiveQl { try { val tree = getAst(sql) if (nativeCommands contains tree.getText) { - NativeCommand(sql) + HiveNativeCommand(sql) } else { nodeToPlan(tree) match { - case NativePlaceholder => NativeCommand(sql) + case NativePlaceholder => HiveNativeCommand(sql) case other => other } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 4ebd59db83..d3f6381b69 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -17,9 +17,6 @@ package org.apache.spark.sql.hive -import org.apache.hadoop.hive.ql.parse.ASTNode -import org.apache.hadoop.hive.ql.plan.CreateTableDesc - import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions._ @@ -28,7 +25,7 @@ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.types.StringType -import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan, PhysicalRDD} +import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.parquet.ParquetRelation @@ -177,25 +174,10 @@ private[hive] trait HiveStrategies { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) => execution.InsertIntoHiveTable( - table, partition, planLater(child), overwrite)(hiveContext) :: Nil + table, partition, planLater(child), overwrite) :: Nil case hive.InsertIntoHiveTable(table: MetastoreRelation, partition, child, overwrite) => execution.InsertIntoHiveTable( - table, partition, planLater(child), overwrite)(hiveContext) :: Nil - case logical.CreateTableAsSelect( - Some(database), tableName, child, allowExisting, Some(desc: CreateTableDesc)) => - execution.CreateTableAsSelect( - database, - tableName, - child, - allowExisting, - Some(desc)) :: Nil - case logical.CreateTableAsSelect(Some(database), tableName, child, allowExisting, None) => - execution.CreateTableAsSelect( - database, - tableName, - child, - allowExisting, - None) :: Nil + table, partition, planLater(child), overwrite) :: Nil case _ => Nil } } @@ -227,23 +209,14 @@ private[hive] trait HiveStrategies { case class HiveCommandStrategy(context: HiveContext) extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case logical.NativeCommand(sql) => NativeCommand(sql, plan.output)(context) :: Nil - - case hive.DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil - - case hive.AddJar(path) => execution.AddJar(path) :: Nil - - case hive.AddFile(path) => execution.AddFile(path) :: Nil - - case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil - case describe: logical.DescribeCommand => val resolvedTable = context.executePlan(describe.table).analyzed resolvedTable match { case t: MetastoreRelation => - Seq(DescribeHiveTableCommand(t, describe.output, describe.isExtended)(context)) + ExecutedCommand( + DescribeHiveTableCommand(t, describe.output, describe.isExtended)) :: Nil case o: LogicalPlan => - Seq(DescribeCommand(planLater(o), describe.output)(context)) + ExecutedCommand(DescribeCommand(planLater(o), describe.output)) :: Nil } case _ => Nil diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index eedb57de52..b2149bd95a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -20,9 +20,6 @@ package org.apache.spark.sql.hive.test import java.io.File import java.util.{Set => JavaSet} -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.ql.session.SessionState - import scala.collection.mutable import scala.language.implicitConversions @@ -37,10 +34,11 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.util.Utils import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.plans.logical.{CacheTableCommand, LogicalPlan, NativeCommand} +import org.apache.spark.sql.catalyst.plans.logical.{CacheTableCommand, LogicalPlan} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.hive._ import org.apache.spark.sql.SQLConf +import org.apache.spark.sql.hive.execution.HiveNativeCommand /* Implicit conversions */ import scala.collection.JavaConversions._ @@ -161,7 +159,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { abstract class QueryExecution extends super.QueryExecution { override lazy val analyzed = { val describedTables = logical match { - case NativeCommand(describedTable(tbl)) => tbl :: Nil + case HiveNativeCommand(describedTable(tbl)) => tbl :: Nil case CacheTableCommand(tbl, _, _) => tbl :: Nil case _ => Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index b83689ceab..fe21454e7f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql.hive.execution import org.apache.hadoop.hive.ql.plan.CreateTableDesc import org.apache.spark.annotation.Experimental -import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} -import org.apache.spark.sql.execution.{SparkPlan, Command, LeafNode} +import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.MetastoreRelation @@ -44,28 +44,23 @@ case class CreateTableAsSelect( tableName: String, query: LogicalPlan, allowExisting: Boolean, - desc: Option[CreateTableDesc]) extends LeafNode with Command { + desc: Option[CreateTableDesc]) extends RunnableCommand { - def output = Seq.empty + override def run(sqlContext: SQLContext) = { + val hiveContext = sqlContext.asInstanceOf[HiveContext] + lazy val metastoreRelation: MetastoreRelation = { + // Create Hive Table + hiveContext.catalog.createTable(database, tableName, query.output, allowExisting, desc) - private[this] def sc = sqlContext.asInstanceOf[HiveContext] - - // A lazy computing of the metastoreRelation - private[this] lazy val metastoreRelation: MetastoreRelation = { - // Create Hive Table - sc.catalog.createTable(database, tableName, query.output, allowExisting, desc) - - // Get the Metastore Relation - sc.catalog.lookupRelation(Some(database), tableName, None) match { - case r: MetastoreRelation => r + // Get the Metastore Relation + hiveContext.catalog.lookupRelation(Some(database), tableName, None) match { + case r: MetastoreRelation => r + } } - } - - override protected[sql] lazy val sideEffectResult: Seq[Row] = { // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data // processing. - if (sc.catalog.tableExists(Some(database), tableName)) { + if (hiveContext.catalog.tableExists(Some(database), tableName)) { if (allowExisting) { // table already exists, will do nothing, to keep consistent with Hive } else { @@ -73,17 +68,12 @@ case class CreateTableAsSelect( new org.apache.hadoop.hive.metastore.api.AlreadyExistsException(s"$database.$tableName") } } else { - sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd + hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd } Seq.empty[Row] } - override def execute(): RDD[Row] = { - sideEffectResult - sparkContext.emptyRDD[Row] - } - override def argString: String = { s"[Database:$database, TableName: $tableName, InsertIntoHiveTable]\n" + query.toString } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala index 5d98834c6f..bfacc51ef5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala @@ -22,11 +22,11 @@ import scala.collection.JavaConversions._ import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Attribute, Row} -import org.apache.spark.sql.execution.{Command, LeafNode} +import org.apache.spark.sql.execution.{SparkPlan, RunnableCommand} import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation} import org.apache.spark.sql.hive.HiveShim +import org.apache.spark.sql.SQLContext /** * Implementation for "describe [extended] table". @@ -36,21 +36,10 @@ import org.apache.spark.sql.hive.HiveShim @DeveloperApi case class DescribeHiveTableCommand( table: MetastoreRelation, - output: Seq[Attribute], - isExtended: Boolean)( - @transient context: HiveContext) - extends LeafNode with Command { + override val output: Seq[Attribute], + isExtended: Boolean) extends RunnableCommand { - // Strings with the format like Hive. It is used for result comparison in our unit tests. - lazy val hiveString: Seq[String] = sideEffectResult.map { - case Row(name: String, dataType: String, comment) => - Seq(name, dataType, - Option(comment.asInstanceOf[String]).getOrElse("")) - .map(s => String.format(s"%-20s", s)) - .mkString("\t") - } - - override protected lazy val sideEffectResult: Seq[Row] = { + override def run(sqlContext: SQLContext) = { // Trying to mimic the format of Hive's output. But not exactly the same. var results: Seq[(String, String, String)] = Nil @@ -75,6 +64,4 @@ case class DescribeHiveTableCommand( Row(name, dataType, comment) } } - - override def otherCopyArgs = context :: Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala new file mode 100644 index 0000000000..8ba818af5f --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row} +import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.types.StringType + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class HiveNativeCommand(sql: String) extends RunnableCommand { + + override def output = + Seq(AttributeReference("result", StringType, nullable = false)()) + + override def run(sqlContext: SQLContext) = + sqlContext.asInstanceOf[HiveContext].runSqlHive(sql).map(Row(_)) +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 81390f6267..ca0ec15139 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -21,7 +21,6 @@ import java.util import scala.collection.JavaConversions._ -import org.apache.hadoop.hive.common.`type`.HiveVarchar import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.metastore.MetaStoreUtils @@ -31,14 +30,12 @@ import org.apache.hadoop.hive.ql.{Context, ErrorMsg} import org.apache.hadoop.hive.serde2.Serializer import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.objectinspector._ -import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector} import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.sql.catalyst.types.decimal.Decimal -import org.apache.spark.sql.execution.{Command, SparkPlan, UnaryNode} +import org.apache.spark.sql.execution.{UnaryNode, SparkPlan} import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.{ ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.HiveShim._ @@ -52,10 +49,9 @@ case class InsertIntoHiveTable( table: MetastoreRelation, partition: Map[String, Option[String]], child: SparkPlan, - overwrite: Boolean) - (@transient sc: HiveContext) - extends UnaryNode with Command with HiveInspectors { + overwrite: Boolean) extends UnaryNode with HiveInspectors { + @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext] @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass @transient private lazy val hiveContext = new Context(sc.hiveconf) @transient private lazy val db = Hive.get(sc.hiveconf) @@ -66,8 +62,6 @@ case class InsertIntoHiveTable( serializer } - override def otherCopyArgs = sc :: Nil - def output = child.output def saveAsHiveFile( @@ -134,7 +128,7 @@ case class InsertIntoHiveTable( * * Note: this is run once and then kept to avoid double insertions. */ - override protected[sql] lazy val sideEffectResult: Seq[Row] = { + protected[sql] lazy val sideEffectResult: Seq[Row] = { // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc @@ -256,4 +250,8 @@ case class InsertIntoHiveTable( // TODO: implement hive compatibility as rules. Seq.empty[Row] } + + override def executeCollect(): Array[Row] = sideEffectResult.toArray + + override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala deleted file mode 100644 index 6930c2babd..0000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row} -import org.apache.spark.sql.execution.{Command, LeafNode} -import org.apache.spark.sql.hive.HiveContext - -/** - * :: DeveloperApi :: - */ -@DeveloperApi -case class NativeCommand( - sql: String, output: Seq[Attribute])( - @transient context: HiveContext) - extends LeafNode with Command { - - override protected lazy val sideEffectResult: Seq[Row] = context.runSqlHive(sql).map(Row(_)) - - override def otherCopyArgs = context :: Nil -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 903075edf7..6fc4153f6a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -18,10 +18,10 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.sql.execution.{Command, LeafNode} +import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.SQLContext /** * :: DeveloperApi :: @@ -32,13 +32,10 @@ import org.apache.spark.sql.hive.HiveContext * in the Hive metastore. */ @DeveloperApi -case class AnalyzeTable(tableName: String) extends LeafNode with Command { - def hiveContext = sqlContext.asInstanceOf[HiveContext] +case class AnalyzeTable(tableName: String) extends RunnableCommand { - def output = Seq.empty - - override protected lazy val sideEffectResult: Seq[Row] = { - hiveContext.analyze(tableName) + override def run(sqlContext: SQLContext) = { + sqlContext.asInstanceOf[HiveContext].analyze(tableName) Seq.empty[Row] } } @@ -48,12 +45,12 @@ case class AnalyzeTable(tableName: String) extends LeafNode with Command { * Drops a table from the metastore and removes it if it is cached. */ @DeveloperApi -case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with Command { - def hiveContext = sqlContext.asInstanceOf[HiveContext] - - def output = Seq.empty +case class DropTable( + tableName: String, + ifExists: Boolean) extends RunnableCommand { - override protected lazy val sideEffectResult: Seq[Row] = { + override def run(sqlContext: SQLContext) = { + val hiveContext = sqlContext.asInstanceOf[HiveContext] val ifExistsClause = if (ifExists) "IF EXISTS " else "" hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") hiveContext.catalog.unregisterTable(None, tableName) @@ -65,12 +62,10 @@ case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with * :: DeveloperApi :: */ @DeveloperApi -case class AddJar(path: String) extends LeafNode with Command { - def hiveContext = sqlContext.asInstanceOf[HiveContext] +case class AddJar(path: String) extends RunnableCommand { - override def output = Seq.empty - - override protected lazy val sideEffectResult: Seq[Row] = { + override def run(sqlContext: SQLContext) = { + val hiveContext = sqlContext.asInstanceOf[HiveContext] hiveContext.runSqlHive(s"ADD JAR $path") hiveContext.sparkContext.addJar(path) Seq.empty[Row] @@ -81,12 +76,10 @@ case class AddJar(path: String) extends LeafNode with Command { * :: DeveloperApi :: */ @DeveloperApi -case class AddFile(path: String) extends LeafNode with Command { - def hiveContext = sqlContext.asInstanceOf[HiveContext] - - override def output = Seq.empty +case class AddFile(path: String) extends RunnableCommand { - override protected lazy val sideEffectResult: Seq[Row] = { + override def run(sqlContext: SQLContext) = { + val hiveContext = sqlContext.asInstanceOf[HiveContext] hiveContext.runSqlHive(s"ADD FILE $path") hiveContext.sparkContext.addFile(path) Seq.empty[Row] diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index a90fc023e6..ff4071d8e2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -22,10 +22,10 @@ import org.scalatest.BeforeAndAfterAll import scala.reflect.ClassTag import org.apache.spark.sql.{SQLConf, QueryTest} -import org.apache.spark.sql.catalyst.plans.logical.NativeCommand import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin} import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.hive.execution._ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { TestHive.reset() @@ -51,19 +51,19 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { assertAnalyzeCommand( "ANALYZE TABLE Table1 COMPUTE STATISTICS", - classOf[NativeCommand]) + classOf[HiveNativeCommand]) assertAnalyzeCommand( "ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS", - classOf[NativeCommand]) + classOf[HiveNativeCommand]) assertAnalyzeCommand( "ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan", - classOf[NativeCommand]) + classOf[HiveNativeCommand]) assertAnalyzeCommand( "ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS", - classOf[NativeCommand]) + classOf[HiveNativeCommand]) assertAnalyzeCommand( "ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS noscan", - classOf[NativeCommand]) + classOf[HiveNativeCommand]) assertAnalyzeCommand( "ANALYZE TABLE Table1 COMPUTE STATISTICS nOscAn", diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 44eb4cfa59..8011f9b877 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -24,7 +24,6 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen} import org.apache.spark.Logging import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand => LogicalNativeCommand} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.hive.test.TestHive @@ -142,14 +141,14 @@ abstract class HiveComparisonTest // Hack: Hive simply prints the result of a SET command to screen, // and does not return it as a query answer. case _: SetCommand => Seq("0") - case LogicalNativeCommand(c) if c.toLowerCase.contains("desc") => + case HiveNativeCommand(c) if c.toLowerCase.contains("desc") => answer .filterNot(nonDeterministicLine) .map(_.replaceAll("from deserializer", "")) .map(_.replaceAll("None", "")) .map(_.trim) .filterNot(_ == "") - case _: LogicalNativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "") + case _: HiveNativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "") case _: ExplainCommand => answer case _: DescribeCommand => // Filter out non-deterministic lines and lines which do not have actual results but -- cgit v1.2.3