aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala63
1 files changed, 23 insertions, 40 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 031b695169..286c6d264f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -21,11 +21,13 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.errors.TreeNodeException
-import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow}
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
trait Command {
+ this: SparkPlan =>
+
/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
@@ -35,7 +37,11 @@ trait Command {
* The `execute()` method of all the physical command classes should reference `sideEffectResult`
* so that the command can be executed eagerly right after the command query is created.
*/
- protected[sql] lazy val sideEffectResult: Seq[Any] = Seq.empty[Any]
+ protected[sql] lazy val sideEffectResult: Seq[Row] = Seq.empty[Row]
+
+ override def executeCollect(): Array[Row] = sideEffectResult.toArray
+
+ override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}
/**
@@ -47,17 +53,17 @@ case class SetCommand(
@transient context: SQLContext)
extends LeafNode with Command with Logging {
- override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match {
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = (key, value) match {
// Set value for key k.
case (Some(k), Some(v)) =>
if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
- Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
+ Array(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$v"))
} else {
context.setConf(k, v)
- Array(s"$k=$v")
+ Array(Row(s"$k=$v"))
}
// Query the value bound to key k.
@@ -73,28 +79,22 @@ case class SetCommand(
"hive-0.12.0.jar").mkString(":")
Array(
- "system:java.class.path=" + hiveJars,
- "system:sun.java.command=shark.SharkServer2")
- }
- else {
- Array(s"$k=${context.getConf(k, "<undefined>")}")
+ Row("system:java.class.path=" + hiveJars),
+ Row("system:sun.java.command=shark.SharkServer2"))
+ } else {
+ Array(Row(s"$k=${context.getConf(k, "<undefined>")}"))
}
// Query all key-value pairs that are set in the SQLConf of the context.
case (None, None) =>
context.getAllConfs.map { case (k, v) =>
- s"$k=$v"
+ Row(s"$k=$v")
}.toSeq
case _ =>
throw new IllegalArgumentException()
}
- def execute(): RDD[Row] = {
- val rows = sideEffectResult.map { line => new GenericRow(Array[Any](line)) }
- context.sparkContext.parallelize(rows, 1)
- }
-
override def otherCopyArgs = context :: Nil
}
@@ -113,19 +113,14 @@ case class ExplainCommand(
extends LeafNode with Command {
// Run through the optimizer to generate the physical plan.
- override protected[sql] lazy val sideEffectResult: Seq[String] = try {
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = try {
// TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties.
val queryExecution = context.executePlan(logicalPlan)
val outputString = if (extended) queryExecution.toString else queryExecution.simpleString
- outputString.split("\n")
+ outputString.split("\n").map(Row(_))
} catch { case cause: TreeNodeException[_] =>
- ("Error occurred during query planning: \n" + cause.getMessage).split("\n")
- }
-
- def execute(): RDD[Row] = {
- val explanation = sideEffectResult.map(row => new GenericRow(Array[Any](row)))
- context.sparkContext.parallelize(explanation, 1)
+ ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
}
override def otherCopyArgs = context :: Nil
@@ -144,12 +139,7 @@ case class CacheCommand(tableName: String, doCache: Boolean)(@transient context:
} else {
context.uncacheTable(tableName)
}
- Seq.empty[Any]
- }
-
- override def execute(): RDD[Row] = {
- sideEffectResult
- context.emptyResult
+ Seq.empty[Row]
}
override def output: Seq[Attribute] = Seq.empty
@@ -163,15 +153,8 @@ case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
@transient context: SQLContext)
extends LeafNode with Command {
- override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
- Seq(("# Registered as a temporary table", null, null)) ++
- child.output.map(field => (field.name, field.dataType.toString, null))
- }
-
- override def execute(): RDD[Row] = {
- val rows = sideEffectResult.map {
- case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
- }
- context.sparkContext.parallelize(rows, 1)
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = {
+ Row("# Registered as a temporary table", null, null) +:
+ child.output.map(field => Row(field.name, field.dataType.toString, null))
}
}