aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala35
1 files changed, 33 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 5859eba408..e658e6fc4d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -21,10 +21,12 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.errors.TreeNodeException
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Row, Attribute}
+import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.{Row, SQLConf, SQLContext}
+import org.apache.spark.sql.{SQLConf, SQLContext}
+// TODO: DELETE ME...
trait Command {
this: SparkPlan =>
@@ -44,6 +46,35 @@ trait Command {
override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}
+// TODO: Replace command with runnable command.
+trait RunnableCommand extends logical.Command {
+ self: Product =>
+
+ def output: Seq[Attribute]
+ def run(sqlContext: SQLContext): Seq[Row]
+}
+
+case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan {
+ /**
+ * A concrete command should override this lazy field to wrap up any side effects caused by the
+ * command or any other computation that should be evaluated exactly once. The value of this field
+ * can be used as the contents of the corresponding RDD generated from the physical plan of this
+ * command.
+ *
+ * The `execute()` method of all the physical command classes should reference `sideEffectResult`
+ * so that the command can be executed eagerly right after the command query is created.
+ */
+ protected[sql] lazy val sideEffectResult: Seq[Row] = cmd.run(sqlContext)
+
+ override def output = cmd.output
+
+ override def children = Nil
+
+ override def executeCollect(): Array[Row] = sideEffectResult.toArray
+
+ override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
+}
+
/**
* :: DeveloperApi ::
*/