aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-09-03 18:57:20 -0700
committerMichael Armbrust <michael@databricks.com>2014-09-03 18:57:20 -0700
commitf48420fde58d554480cc8830d2f8c4d17618f283 (patch)
treebd793ba5bc1e9917f34a7f40daf697346c447393 /sql/core
parent4bba10c41acaf84a1c4a8e2db467c22f5ab7cbb9 (diff)
downloadspark-f48420fde58d554480cc8830d2f8c4d17618f283.tar.gz
spark-f48420fde58d554480cc8830d2f8c4d17618f283.tar.bz2
spark-f48420fde58d554480cc8830d2f8c4d17618f283.zip
[SPARK-2973][SQL] Lightweight SQL commands without distributed jobs when calling .collect()
By overriding `executeCollect()` in physical plan classes of all commands, we can avoid to kick off a distributed job when collecting result of a SQL command, e.g. `sql("SET").collect()`. Previously, `Command.sideEffectResult` returns a `Seq[Any]`, and the `execute()` method in sub-classes of `Command` typically convert that to a `Seq[Row]` then parallelize it to an RDD. Now with this PR, `sideEffectResult` is required to return a `Seq[Row]` directly, so that `executeCollect()` can directly leverage that and be factored to the `Command` parent class. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #2215 from liancheng/lightweight-commands and squashes the following commits: 3fbef60 [Cheng Lian] Factored execute() method of physical commands to parent class Command 5a0e16c [Cheng Lian] Passes test suites e0e12e9 [Cheng Lian] Refactored Command.sideEffectResult and Command.executeCollect 995bdd8 [Cheng Lian] Cleaned up DescribeHiveTableCommand 542977c [Cheng Lian] Avoids confusion between logical and physical plan by adding package prefixes 55b2aa5 [Cheng Lian] Avoids distributed jobs when execution SQL commands
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala63
1 files changed, 23 insertions, 40 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 031b695169..286c6d264f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -21,11 +21,13 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.errors.TreeNodeException
-import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow}
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
trait Command {
+ this: SparkPlan =>
+
/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
@@ -35,7 +37,11 @@ trait Command {
* The `execute()` method of all the physical command classes should reference `sideEffectResult`
* so that the command can be executed eagerly right after the command query is created.
*/
- protected[sql] lazy val sideEffectResult: Seq[Any] = Seq.empty[Any]
+ protected[sql] lazy val sideEffectResult: Seq[Row] = Seq.empty[Row]
+
+ override def executeCollect(): Array[Row] = sideEffectResult.toArray
+
+ override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}
/**
@@ -47,17 +53,17 @@ case class SetCommand(
@transient context: SQLContext)
extends LeafNode with Command with Logging {
- override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match {
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = (key, value) match {
// Set value for key k.
case (Some(k), Some(v)) =>
if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
- Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
+ Array(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$v"))
} else {
context.setConf(k, v)
- Array(s"$k=$v")
+ Array(Row(s"$k=$v"))
}
// Query the value bound to key k.
@@ -73,28 +79,22 @@ case class SetCommand(
"hive-0.12.0.jar").mkString(":")
Array(
- "system:java.class.path=" + hiveJars,
- "system:sun.java.command=shark.SharkServer2")
- }
- else {
- Array(s"$k=${context.getConf(k, "<undefined>")}")
+ Row("system:java.class.path=" + hiveJars),
+ Row("system:sun.java.command=shark.SharkServer2"))
+ } else {
+ Array(Row(s"$k=${context.getConf(k, "<undefined>")}"))
}
// Query all key-value pairs that are set in the SQLConf of the context.
case (None, None) =>
context.getAllConfs.map { case (k, v) =>
- s"$k=$v"
+ Row(s"$k=$v")
}.toSeq
case _ =>
throw new IllegalArgumentException()
}
- def execute(): RDD[Row] = {
- val rows = sideEffectResult.map { line => new GenericRow(Array[Any](line)) }
- context.sparkContext.parallelize(rows, 1)
- }
-
override def otherCopyArgs = context :: Nil
}
@@ -113,19 +113,14 @@ case class ExplainCommand(
extends LeafNode with Command {
// Run through the optimizer to generate the physical plan.
- override protected[sql] lazy val sideEffectResult: Seq[String] = try {
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = try {
// TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties.
val queryExecution = context.executePlan(logicalPlan)
val outputString = if (extended) queryExecution.toString else queryExecution.simpleString
- outputString.split("\n")
+ outputString.split("\n").map(Row(_))
} catch { case cause: TreeNodeException[_] =>
- ("Error occurred during query planning: \n" + cause.getMessage).split("\n")
- }
-
- def execute(): RDD[Row] = {
- val explanation = sideEffectResult.map(row => new GenericRow(Array[Any](row)))
- context.sparkContext.parallelize(explanation, 1)
+ ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
}
override def otherCopyArgs = context :: Nil
@@ -144,12 +139,7 @@ case class CacheCommand(tableName: String, doCache: Boolean)(@transient context:
} else {
context.uncacheTable(tableName)
}
- Seq.empty[Any]
- }
-
- override def execute(): RDD[Row] = {
- sideEffectResult
- context.emptyResult
+ Seq.empty[Row]
}
override def output: Seq[Attribute] = Seq.empty
@@ -163,15 +153,8 @@ case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
@transient context: SQLContext)
extends LeafNode with Command {
- override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
- Seq(("# Registered as a temporary table", null, null)) ++
- child.output.map(field => (field.name, field.dataType.toString, null))
- }
-
- override def execute(): RDD[Row] = {
- val rows = sideEffectResult.map {
- case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
- }
- context.sparkContext.parallelize(rows, 1)
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = {
+ Row("# Registered as a temporary table", null, null) +:
+ child.output.map(field => Row(field.name, field.dataType.toString, null))
}
}