diff options
author | wangfei <wangfei1@huawei.com> | 2014-12-18 20:24:56 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-12-18 20:24:56 -0800 |
commit | c3d91da5ea8b85ca75444ec606f2e1eae376c4b2 (patch) | |
tree | 3e41b59c473f6954af52b7e059049b41c99a5754 /sql/core | |
parent | ae9f128608f67cbee0a2fb24754783ee3b4f3098 (diff) | |
download | spark-c3d91da5ea8b85ca75444ec606f2e1eae376c4b2.tar.gz spark-c3d91da5ea8b85ca75444ec606f2e1eae376c4b2.tar.bz2 spark-c3d91da5ea8b85ca75444ec606f2e1eae376c4b2.zip |
[SPARK-4861][SQL] Refactory command in spark sql
Remove ```Command``` and use ```RunnableCommand``` instead.
Author: wangfei <wangfei1@huawei.com>
Author: scwf <wangfei1@huawei.com>
Closes #3712 from scwf/cmd and squashes the following commits:
51a82f2 [wangfei] fix test failure
0e03be8 [wangfei] address comments
4033bed [scwf] remove CreateTableAsSelect in hivestrategy
5d20010 [wangfei] address comments
125f542 [scwf] factory command in spark sql
Diffstat (limited to 'sql/core')
3 files changed, 42 insertions, 58 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index ebd4cc920b..7a13302229 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -329,7 +329,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def strategies: Seq[Strategy] = extraStrategies ++ ( - CommandStrategy(self) :: + CommandStrategy :: DataSourceStrategy :: TakeOrdered :: HashAggregation :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 6e04f26c84..2954d4ce7d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -304,17 +304,20 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } - case class CommandStrategy(context: SQLContext) extends Strategy { + case object CommandStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case r: RunnableCommand => ExecutedCommand(r) :: Nil case logical.SetCommand(kv) => - Seq(execution.SetCommand(kv, plan.output)(context)) + Seq(ExecutedCommand(execution.SetCommand(kv, plan.output))) case logical.ExplainCommand(logicalPlan, extended) => - Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context)) + Seq(ExecutedCommand( + execution.ExplainCommand(logicalPlan, plan.output, extended))) case logical.CacheTableCommand(tableName, optPlan, isLazy) => - Seq(execution.CacheTableCommand(tableName, optPlan, isLazy)) + Seq(ExecutedCommand( + execution.CacheTableCommand(tableName, optPlan, isLazy))) case logical.UncacheTableCommand(tableName) => - Seq(execution.UncacheTableCommand(tableName)) + Seq(ExecutedCommand( + execution.UncacheTableCommand(tableName))) case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index afe3f3f074..b8fa4b0199 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -26,34 +26,20 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.{SQLConf, SQLContext} -// TODO: DELETE ME... -trait Command { - this: SparkPlan => - - /** - * A concrete command should override this lazy field to wrap up any side effects caused by the - * command or any other computation that should be evaluated exactly once. The value of this field - * can be used as the contents of the corresponding RDD generated from the physical plan of this - * command. - * - * The `execute()` method of all the physical command classes should reference `sideEffectResult` - * so that the command can be executed eagerly right after the command query is created. - */ - protected lazy val sideEffectResult: Seq[Row] = Seq.empty[Row] - - override def executeCollect(): Array[Row] = sideEffectResult.toArray - - override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1) -} - -// TODO: Replace command with runnable command. +/** + * A logical command that is executed for its side-effects. `RunnableCommand`s are + * wrapped in `ExecutedCommand` during execution. + */ trait RunnableCommand extends logical.Command { self: Product => - def output: Seq[Attribute] def run(sqlContext: SQLContext): Seq[Row] } +/** + * A physical operator that executes the run method of a `RunnableCommand` and + * saves the result to prevent multiple executions. + */ case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan { /** * A concrete command should override this lazy field to wrap up any side effects caused by the @@ -79,43 +65,41 @@ case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan { * :: DeveloperApi :: */ @DeveloperApi -case class SetCommand(kv: Option[(String, Option[String])], output: Seq[Attribute])( - @transient context: SQLContext) - extends LeafNode with Command with Logging { +case class SetCommand( + kv: Option[(String, Option[String])], + override val output: Seq[Attribute]) extends RunnableCommand with Logging { - override protected lazy val sideEffectResult: Seq[Row] = kv match { + override def run(sqlContext: SQLContext) = kv match { // Configures the deprecated "mapred.reduce.tasks" property. case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) => logWarning( s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.") - context.setConf(SQLConf.SHUFFLE_PARTITIONS, value) + sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS, value) Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$value")) // Configures a single property. case Some((key, Some(value))) => - context.setConf(key, value) + sqlContext.setConf(key, value) Seq(Row(s"$key=$value")) - // Queries all key-value pairs that are set in the SQLConf of the context. Notice that different - // from Hive, here "SET -v" is an alias of "SET". (In Hive, "SET" returns all changed properties - // while "SET -v" returns all properties.) + // Queries all key-value pairs that are set in the SQLConf of the sqlContext. + // Notice that different from Hive, here "SET -v" is an alias of "SET". + // (In Hive, "SET" returns all changed properties while "SET -v" returns all properties.) case Some(("-v", None)) | None => - context.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq + sqlContext.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq // Queries the deprecated "mapred.reduce.tasks" property. case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) => logWarning( s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.") - Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${context.numShufflePartitions}")) + Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.numShufflePartitions}")) // Queries a single property. case Some((key, None)) => - Seq(Row(s"$key=${context.getConf(key, "<undefined>")}")) + Seq(Row(s"$key=${sqlContext.getConf(key, "<undefined>")}")) } - - override def otherCopyArgs = context :: Nil } /** @@ -128,22 +112,19 @@ case class SetCommand(kv: Option[(String, Option[String])], output: Seq[Attribut */ @DeveloperApi case class ExplainCommand( - logicalPlan: LogicalPlan, output: Seq[Attribute], extended: Boolean)( - @transient context: SQLContext) - extends LeafNode with Command { + logicalPlan: LogicalPlan, + override val output: Seq[Attribute], extended: Boolean) extends RunnableCommand { // Run through the optimizer to generate the physical plan. - override protected lazy val sideEffectResult: Seq[Row] = try { + override def run(sqlContext: SQLContext) = try { // TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties. - val queryExecution = context.executePlan(logicalPlan) + val queryExecution = sqlContext.executePlan(logicalPlan) val outputString = if (extended) queryExecution.toString else queryExecution.simpleString outputString.split("\n").map(Row(_)) } catch { case cause: TreeNodeException[_] => ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) } - - override def otherCopyArgs = context :: Nil } /** @@ -153,10 +134,9 @@ case class ExplainCommand( case class CacheTableCommand( tableName: String, plan: Option[LogicalPlan], - isLazy: Boolean) - extends LeafNode with Command { + isLazy: Boolean) extends RunnableCommand { - override protected lazy val sideEffectResult = { + override def run(sqlContext: SQLContext) = { import sqlContext._ plan.foreach(_.registerTempTable(tableName)) @@ -178,8 +158,9 @@ case class CacheTableCommand( * :: DeveloperApi :: */ @DeveloperApi -case class UncacheTableCommand(tableName: String) extends LeafNode with Command { - override protected lazy val sideEffectResult: Seq[Row] = { +case class UncacheTableCommand(tableName: String) extends RunnableCommand { + + override def run(sqlContext: SQLContext) = { sqlContext.table(tableName).unpersist() Seq.empty[Row] } @@ -191,11 +172,11 @@ case class UncacheTableCommand(tableName: String) extends LeafNode with Command * :: DeveloperApi :: */ @DeveloperApi -case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])( - @transient context: SQLContext) - extends LeafNode with Command { +case class DescribeCommand( + child: SparkPlan, + override val output: Seq[Attribute]) extends RunnableCommand { - override protected lazy val sideEffectResult: Seq[Row] = { + override def run(sqlContext: SQLContext) = { Row("# Registered as a temporary table", null, null) +: child.output.map(field => Row(field.name, field.dataType.toString, null)) } |