diff options
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala | 63 |
1 files changed, 23 insertions, 40 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 031b695169..286c6d264f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -21,11 +21,13 @@ import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.errors.TreeNodeException -import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.{Row, SQLConf, SQLContext} trait Command { + this: SparkPlan => + /** * A concrete command should override this lazy field to wrap up any side effects caused by the * command or any other computation that should be evaluated exactly once. The value of this field @@ -35,7 +37,11 @@ trait Command { * The `execute()` method of all the physical command classes should reference `sideEffectResult` * so that the command can be executed eagerly right after the command query is created. */ - protected[sql] lazy val sideEffectResult: Seq[Any] = Seq.empty[Any] + protected[sql] lazy val sideEffectResult: Seq[Row] = Seq.empty[Row] + + override def executeCollect(): Array[Row] = sideEffectResult.toArray + + override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1) } /** @@ -47,17 +53,17 @@ case class SetCommand( @transient context: SQLContext) extends LeafNode with Command with Logging { - override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match { + override protected[sql] lazy val sideEffectResult: Seq[Row] = (key, value) match { // Set value for key k. case (Some(k), Some(v)) => if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) { logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.") context.setConf(SQLConf.SHUFFLE_PARTITIONS, v) - Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v") + Array(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")) } else { context.setConf(k, v) - Array(s"$k=$v") + Array(Row(s"$k=$v")) } // Query the value bound to key k. @@ -73,28 +79,22 @@ case class SetCommand( "hive-0.12.0.jar").mkString(":") Array( - "system:java.class.path=" + hiveJars, - "system:sun.java.command=shark.SharkServer2") - } - else { - Array(s"$k=${context.getConf(k, "<undefined>")}") + Row("system:java.class.path=" + hiveJars), + Row("system:sun.java.command=shark.SharkServer2")) + } else { + Array(Row(s"$k=${context.getConf(k, "<undefined>")}")) } // Query all key-value pairs that are set in the SQLConf of the context. case (None, None) => context.getAllConfs.map { case (k, v) => - s"$k=$v" + Row(s"$k=$v") }.toSeq case _ => throw new IllegalArgumentException() } - def execute(): RDD[Row] = { - val rows = sideEffectResult.map { line => new GenericRow(Array[Any](line)) } - context.sparkContext.parallelize(rows, 1) - } - override def otherCopyArgs = context :: Nil } @@ -113,19 +113,14 @@ case class ExplainCommand( extends LeafNode with Command { // Run through the optimizer to generate the physical plan. - override protected[sql] lazy val sideEffectResult: Seq[String] = try { + override protected[sql] lazy val sideEffectResult: Seq[Row] = try { // TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties. val queryExecution = context.executePlan(logicalPlan) val outputString = if (extended) queryExecution.toString else queryExecution.simpleString - outputString.split("\n") + outputString.split("\n").map(Row(_)) } catch { case cause: TreeNodeException[_] => - ("Error occurred during query planning: \n" + cause.getMessage).split("\n") - } - - def execute(): RDD[Row] = { - val explanation = sideEffectResult.map(row => new GenericRow(Array[Any](row))) - context.sparkContext.parallelize(explanation, 1) + ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) } override def otherCopyArgs = context :: Nil @@ -144,12 +139,7 @@ case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: } else { context.uncacheTable(tableName) } - Seq.empty[Any] - } - - override def execute(): RDD[Row] = { - sideEffectResult - context.emptyResult + Seq.empty[Row] } override def output: Seq[Attribute] = Seq.empty @@ -163,15 +153,8 @@ case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])( @transient context: SQLContext) extends LeafNode with Command { - override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = { - Seq(("# Registered as a temporary table", null, null)) ++ - child.output.map(field => (field.name, field.dataType.toString, null)) - } - - override def execute(): RDD[Row] = { - val rows = sideEffectResult.map { - case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment)) - } - context.sparkContext.parallelize(rows, 1) + override protected[sql] lazy val sideEffectResult: Seq[Row] = { + Row("# Registered as a temporary table", null, null) +: + child.output.map(field => Row(field.name, field.dataType.toString, null)) } } |