aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorwangfei <wangfei1@huawei.com>2014-12-18 20:24:56 -0800
committerMichael Armbrust <michael@databricks.com>2014-12-18 20:24:56 -0800
commitc3d91da5ea8b85ca75444ec606f2e1eae376c4b2 (patch)
tree3e41b59c473f6954af52b7e059049b41c99a5754 /sql/core
parentae9f128608f67cbee0a2fb24754783ee3b4f3098 (diff)
downloadspark-c3d91da5ea8b85ca75444ec606f2e1eae376c4b2.tar.gz
spark-c3d91da5ea8b85ca75444ec606f2e1eae376c4b2.tar.bz2
spark-c3d91da5ea8b85ca75444ec606f2e1eae376c4b2.zip
[SPARK-4861][SQL] Refactory command in spark sql
Remove ```Command``` and use ```RunnableCommand``` instead. Author: wangfei <wangfei1@huawei.com> Author: scwf <wangfei1@huawei.com> Closes #3712 from scwf/cmd and squashes the following commits: 51a82f2 [wangfei] fix test failure 0e03be8 [wangfei] address comments 4033bed [scwf] remove CreateTableAsSelect in hivestrategy 5d20010 [wangfei] address comments 125f542 [scwf] factory command in spark sql
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala85
3 files changed, 42 insertions, 58 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index ebd4cc920b..7a13302229 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -329,7 +329,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def strategies: Seq[Strategy] =
extraStrategies ++ (
- CommandStrategy(self) ::
+ CommandStrategy ::
DataSourceStrategy ::
TakeOrdered ::
HashAggregation ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 6e04f26c84..2954d4ce7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -304,17 +304,20 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}
- case class CommandStrategy(context: SQLContext) extends Strategy {
+ case object CommandStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: RunnableCommand => ExecutedCommand(r) :: Nil
case logical.SetCommand(kv) =>
- Seq(execution.SetCommand(kv, plan.output)(context))
+ Seq(ExecutedCommand(execution.SetCommand(kv, plan.output)))
case logical.ExplainCommand(logicalPlan, extended) =>
- Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context))
+ Seq(ExecutedCommand(
+ execution.ExplainCommand(logicalPlan, plan.output, extended)))
case logical.CacheTableCommand(tableName, optPlan, isLazy) =>
- Seq(execution.CacheTableCommand(tableName, optPlan, isLazy))
+ Seq(ExecutedCommand(
+ execution.CacheTableCommand(tableName, optPlan, isLazy)))
case logical.UncacheTableCommand(tableName) =>
- Seq(execution.UncacheTableCommand(tableName))
+ Seq(ExecutedCommand(
+ execution.UncacheTableCommand(tableName)))
case _ => Nil
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index afe3f3f074..b8fa4b0199 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -26,34 +26,20 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{SQLConf, SQLContext}
-// TODO: DELETE ME...
-trait Command {
- this: SparkPlan =>
-
- /**
- * A concrete command should override this lazy field to wrap up any side effects caused by the
- * command or any other computation that should be evaluated exactly once. The value of this field
- * can be used as the contents of the corresponding RDD generated from the physical plan of this
- * command.
- *
- * The `execute()` method of all the physical command classes should reference `sideEffectResult`
- * so that the command can be executed eagerly right after the command query is created.
- */
- protected lazy val sideEffectResult: Seq[Row] = Seq.empty[Row]
-
- override def executeCollect(): Array[Row] = sideEffectResult.toArray
-
- override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
-}
-
-// TODO: Replace command with runnable command.
+/**
+ * A logical command that is executed for its side-effects. `RunnableCommand`s are
+ * wrapped in `ExecutedCommand` during execution.
+ */
trait RunnableCommand extends logical.Command {
self: Product =>
- def output: Seq[Attribute]
def run(sqlContext: SQLContext): Seq[Row]
}
+/**
+ * A physical operator that executes the run method of a `RunnableCommand` and
+ * saves the result to prevent multiple executions.
+ */
case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan {
/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
@@ -79,43 +65,41 @@ case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan {
* :: DeveloperApi ::
*/
@DeveloperApi
-case class SetCommand(kv: Option[(String, Option[String])], output: Seq[Attribute])(
- @transient context: SQLContext)
- extends LeafNode with Command with Logging {
+case class SetCommand(
+ kv: Option[(String, Option[String])],
+ override val output: Seq[Attribute]) extends RunnableCommand with Logging {
- override protected lazy val sideEffectResult: Seq[Row] = kv match {
+ override def run(sqlContext: SQLContext) = kv match {
// Configures the deprecated "mapred.reduce.tasks" property.
case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) =>
logWarning(
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
- context.setConf(SQLConf.SHUFFLE_PARTITIONS, value)
+ sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS, value)
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$value"))
// Configures a single property.
case Some((key, Some(value))) =>
- context.setConf(key, value)
+ sqlContext.setConf(key, value)
Seq(Row(s"$key=$value"))
- // Queries all key-value pairs that are set in the SQLConf of the context. Notice that different
- // from Hive, here "SET -v" is an alias of "SET". (In Hive, "SET" returns all changed properties
- // while "SET -v" returns all properties.)
+ // Queries all key-value pairs that are set in the SQLConf of the sqlContext.
+ // Notice that different from Hive, here "SET -v" is an alias of "SET".
+ // (In Hive, "SET" returns all changed properties while "SET -v" returns all properties.)
case Some(("-v", None)) | None =>
- context.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq
+ sqlContext.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq
// Queries the deprecated "mapred.reduce.tasks" property.
case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) =>
logWarning(
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
- Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${context.numShufflePartitions}"))
+ Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.numShufflePartitions}"))
// Queries a single property.
case Some((key, None)) =>
- Seq(Row(s"$key=${context.getConf(key, "<undefined>")}"))
+ Seq(Row(s"$key=${sqlContext.getConf(key, "<undefined>")}"))
}
-
- override def otherCopyArgs = context :: Nil
}
/**
@@ -128,22 +112,19 @@ case class SetCommand(kv: Option[(String, Option[String])], output: Seq[Attribut
*/
@DeveloperApi
case class ExplainCommand(
- logicalPlan: LogicalPlan, output: Seq[Attribute], extended: Boolean)(
- @transient context: SQLContext)
- extends LeafNode with Command {
+ logicalPlan: LogicalPlan,
+ override val output: Seq[Attribute], extended: Boolean) extends RunnableCommand {
// Run through the optimizer to generate the physical plan.
- override protected lazy val sideEffectResult: Seq[Row] = try {
+ override def run(sqlContext: SQLContext) = try {
// TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties.
- val queryExecution = context.executePlan(logicalPlan)
+ val queryExecution = sqlContext.executePlan(logicalPlan)
val outputString = if (extended) queryExecution.toString else queryExecution.simpleString
outputString.split("\n").map(Row(_))
} catch { case cause: TreeNodeException[_] =>
("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
}
-
- override def otherCopyArgs = context :: Nil
}
/**
@@ -153,10 +134,9 @@ case class ExplainCommand(
case class CacheTableCommand(
tableName: String,
plan: Option[LogicalPlan],
- isLazy: Boolean)
- extends LeafNode with Command {
+ isLazy: Boolean) extends RunnableCommand {
- override protected lazy val sideEffectResult = {
+ override def run(sqlContext: SQLContext) = {
import sqlContext._
plan.foreach(_.registerTempTable(tableName))
@@ -178,8 +158,9 @@ case class CacheTableCommand(
* :: DeveloperApi ::
*/
@DeveloperApi
-case class UncacheTableCommand(tableName: String) extends LeafNode with Command {
- override protected lazy val sideEffectResult: Seq[Row] = {
+case class UncacheTableCommand(tableName: String) extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext) = {
sqlContext.table(tableName).unpersist()
Seq.empty[Row]
}
@@ -191,11 +172,11 @@ case class UncacheTableCommand(tableName: String) extends LeafNode with Command
* :: DeveloperApi ::
*/
@DeveloperApi
-case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
- @transient context: SQLContext)
- extends LeafNode with Command {
+case class DescribeCommand(
+ child: SparkPlan,
+ override val output: Seq[Attribute]) extends RunnableCommand {
- override protected lazy val sideEffectResult: Seq[Row] = {
+ override def run(sqlContext: SQLContext) = {
Row("# Registered as a temporary table", null, null) +:
child.output.map(field => Row(field.name, field.dataType.toString, null))
}