aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-09-03 18:57:20 -0700
committerMichael Armbrust <michael@databricks.com>2014-09-03 18:57:20 -0700
commitf48420fde58d554480cc8830d2f8c4d17618f283 (patch)
treebd793ba5bc1e9917f34a7f40daf697346c447393 /sql
parent4bba10c41acaf84a1c4a8e2db467c22f5ab7cbb9 (diff)
downloadspark-f48420fde58d554480cc8830d2f8c4d17618f283.tar.gz
spark-f48420fde58d554480cc8830d2f8c4d17618f283.tar.bz2
spark-f48420fde58d554480cc8830d2f8c4d17618f283.zip
[SPARK-2973][SQL] Lightweight SQL commands without distributed jobs when calling .collect()
By overriding `executeCollect()` in physical plan classes of all commands, we can avoid to kick off a distributed job when collecting result of a SQL command, e.g. `sql("SET").collect()`. Previously, `Command.sideEffectResult` returns a `Seq[Any]`, and the `execute()` method in sub-classes of `Command` typically convert that to a `Seq[Row]` then parallelize it to an RDD. Now with this PR, `sideEffectResult` is required to return a `Seq[Row]` directly, so that `executeCollect()` can directly leverage that and be factored to the `Command` parent class. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #2215 from liancheng/lightweight-commands and squashes the following commits: 3fbef60 [Cheng Lian] Factored execute() method of physical commands to parent class Command 5a0e16c [Cheng Lian] Passes test suites e0e12e9 [Cheng Lian] Refactored Command.sideEffectResult and Command.executeCollect 995bdd8 [Cheng Lian] Cleaned up DescribeHiveTableCommand 542977c [Cheng Lian] Avoids confusion between logical and physical plan by adding package prefixes 55b2aa5 [Cheng Lian] Avoids distributed jobs when execution SQL commands
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala63
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala14
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala30
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala11
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala20
6 files changed, 48 insertions, 94 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 031b695169..286c6d264f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -21,11 +21,13 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.errors.TreeNodeException
-import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow}
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
trait Command {
+ this: SparkPlan =>
+
/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
@@ -35,7 +37,11 @@ trait Command {
* The `execute()` method of all the physical command classes should reference `sideEffectResult`
* so that the command can be executed eagerly right after the command query is created.
*/
- protected[sql] lazy val sideEffectResult: Seq[Any] = Seq.empty[Any]
+ protected[sql] lazy val sideEffectResult: Seq[Row] = Seq.empty[Row]
+
+ override def executeCollect(): Array[Row] = sideEffectResult.toArray
+
+ override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}
/**
@@ -47,17 +53,17 @@ case class SetCommand(
@transient context: SQLContext)
extends LeafNode with Command with Logging {
- override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match {
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = (key, value) match {
// Set value for key k.
case (Some(k), Some(v)) =>
if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
- Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
+ Array(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$v"))
} else {
context.setConf(k, v)
- Array(s"$k=$v")
+ Array(Row(s"$k=$v"))
}
// Query the value bound to key k.
@@ -73,28 +79,22 @@ case class SetCommand(
"hive-0.12.0.jar").mkString(":")
Array(
- "system:java.class.path=" + hiveJars,
- "system:sun.java.command=shark.SharkServer2")
- }
- else {
- Array(s"$k=${context.getConf(k, "<undefined>")}")
+ Row("system:java.class.path=" + hiveJars),
+ Row("system:sun.java.command=shark.SharkServer2"))
+ } else {
+ Array(Row(s"$k=${context.getConf(k, "<undefined>")}"))
}
// Query all key-value pairs that are set in the SQLConf of the context.
case (None, None) =>
context.getAllConfs.map { case (k, v) =>
- s"$k=$v"
+ Row(s"$k=$v")
}.toSeq
case _ =>
throw new IllegalArgumentException()
}
- def execute(): RDD[Row] = {
- val rows = sideEffectResult.map { line => new GenericRow(Array[Any](line)) }
- context.sparkContext.parallelize(rows, 1)
- }
-
override def otherCopyArgs = context :: Nil
}
@@ -113,19 +113,14 @@ case class ExplainCommand(
extends LeafNode with Command {
// Run through the optimizer to generate the physical plan.
- override protected[sql] lazy val sideEffectResult: Seq[String] = try {
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = try {
// TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties.
val queryExecution = context.executePlan(logicalPlan)
val outputString = if (extended) queryExecution.toString else queryExecution.simpleString
- outputString.split("\n")
+ outputString.split("\n").map(Row(_))
} catch { case cause: TreeNodeException[_] =>
- ("Error occurred during query planning: \n" + cause.getMessage).split("\n")
- }
-
- def execute(): RDD[Row] = {
- val explanation = sideEffectResult.map(row => new GenericRow(Array[Any](row)))
- context.sparkContext.parallelize(explanation, 1)
+ ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
}
override def otherCopyArgs = context :: Nil
@@ -144,12 +139,7 @@ case class CacheCommand(tableName: String, doCache: Boolean)(@transient context:
} else {
context.uncacheTable(tableName)
}
- Seq.empty[Any]
- }
-
- override def execute(): RDD[Row] = {
- sideEffectResult
- context.emptyResult
+ Seq.empty[Row]
}
override def output: Seq[Attribute] = Seq.empty
@@ -163,15 +153,8 @@ case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
@transient context: SQLContext)
extends LeafNode with Command {
- override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
- Seq(("# Registered as a temporary table", null, null)) ++
- child.output.map(field => (field.name, field.dataType.toString, null))
- }
-
- override def execute(): RDD[Row] = {
- val rows = sideEffectResult.map {
- case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
- }
- context.sparkContext.parallelize(rows, 1)
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = {
+ Row("# Registered as a temporary table", null, null) +:
+ child.output.map(field => Row(field.name, field.dataType.toString, null))
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index d9b2bc7348..ced8397972 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -389,7 +389,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
}.mkString("{", ",", "}")
case (seq: Seq[_], ArrayType(typ, _)) =>
seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
- case (map: Map[_,_], MapType(kType, vType, _)) =>
+ case (map: Map[_, _], MapType(kType, vType, _)) =>
map.map {
case (key, value) =>
toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
@@ -409,7 +409,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
// be similar with Hive.
describeHiveTableCommand.hiveString
case command: PhysicalCommand =>
- command.sideEffectResult.map(_.toString)
+ command.sideEffectResult.map(_.head.toString)
case other =>
val result: Seq[Seq[Any]] = toRdd.collect().toSeq
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 47e24f0dec..24abb1b5bd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -18,17 +18,19 @@
package org.apache.spark.sql.hive
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema}
-import org.apache.spark.sql.execution._
-import org.apache.spark.sql.hive.execution._
+import org.apache.spark.sql.catalyst.types.StringType
import org.apache.spark.sql.columnar.InMemoryRelation
-import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTableScan}
+import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan}
+import org.apache.spark.sql.hive
+import org.apache.spark.sql.hive.execution._
+import org.apache.spark.sql.parquet.ParquetRelation
+import org.apache.spark.sql.{SQLContext, SchemaRDD}
import scala.collection.JavaConversions._
@@ -196,9 +198,9 @@ private[hive] trait HiveStrategies {
case logical.NativeCommand(sql) =>
NativeCommand(sql, plan.output)(context) :: Nil
- case DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil
+ case hive.DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil
- case AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil
+ case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil
case describe: logical.DescribeCommand =>
val resolvedTable = context.executePlan(describe.table).analyzed
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
index a40e89e0d3..317801001c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
import org.apache.spark.sql.execution.{Command, LeafNode}
import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}
@@ -41,26 +41,21 @@ case class DescribeHiveTableCommand(
extends LeafNode with Command {
// Strings with the format like Hive. It is used for result comparison in our unit tests.
- lazy val hiveString: Seq[String] = {
- val alignment = 20
- val delim = "\t"
-
- sideEffectResult.map {
- case (name, dataType, comment) =>
- String.format("%-" + alignment + "s", name) + delim +
- String.format("%-" + alignment + "s", dataType) + delim +
- String.format("%-" + alignment + "s", Option(comment).getOrElse("None"))
- }
+ lazy val hiveString: Seq[String] = sideEffectResult.map {
+ case Row(name: String, dataType: String, comment) =>
+ Seq(name, dataType, Option(comment.asInstanceOf[String]).getOrElse("None"))
+ .map(s => String.format(s"%-20s", s))
+ .mkString("\t")
}
- override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = {
// Trying to mimic the format of Hive's output. But not exactly the same.
var results: Seq[(String, String, String)] = Nil
val columns: Seq[FieldSchema] = table.hiveQlTable.getCols
val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols
results ++= columns.map(field => (field.getName, field.getType, field.getComment))
- if (!partitionColumns.isEmpty) {
+ if (partitionColumns.nonEmpty) {
val partColumnInfo =
partitionColumns.map(field => (field.getName, field.getType, field.getComment))
results ++=
@@ -74,14 +69,9 @@ case class DescribeHiveTableCommand(
results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, ""))
}
- results
- }
-
- override def execute(): RDD[Row] = {
- val rows = sideEffectResult.map {
- case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
+ results.map { case (name, dataType, comment) =>
+ Row(name, dataType, comment)
}
- context.sparkContext.parallelize(rows, 1)
}
override def otherCopyArgs = context :: Nil
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala
index fe6031678f..8f10e1ba7f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala
@@ -32,16 +32,7 @@ case class NativeCommand(
@transient context: HiveContext)
extends LeafNode with Command {
- override protected[sql] lazy val sideEffectResult: Seq[String] = context.runSqlHive(sql)
-
- override def execute(): RDD[Row] = {
- if (sideEffectResult.size == 0) {
- context.emptyResult
- } else {
- val rows = sideEffectResult.map(r => new GenericRow(Array[Any](r)))
- context.sparkContext.parallelize(rows, 1)
- }
- }
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = context.runSqlHive(sql).map(Row(_))
override def otherCopyArgs = context :: Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 2985169da0..a1a4aa7de7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -33,19 +33,13 @@ import org.apache.spark.sql.hive.HiveContext
*/
@DeveloperApi
case class AnalyzeTable(tableName: String) extends LeafNode with Command {
-
def hiveContext = sqlContext.asInstanceOf[HiveContext]
def output = Seq.empty
- override protected[sql] lazy val sideEffectResult = {
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = {
hiveContext.analyze(tableName)
- Seq.empty[Any]
- }
-
- override def execute(): RDD[Row] = {
- sideEffectResult
- sparkContext.emptyRDD[Row]
+ Seq.empty[Row]
}
}
@@ -55,20 +49,14 @@ case class AnalyzeTable(tableName: String) extends LeafNode with Command {
*/
@DeveloperApi
case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with Command {
-
def hiveContext = sqlContext.asInstanceOf[HiveContext]
def output = Seq.empty
- override protected[sql] lazy val sideEffectResult: Seq[Any] = {
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = {
val ifExistsClause = if (ifExists) "IF EXISTS " else ""
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
hiveContext.catalog.unregisterTable(None, tableName)
- Seq.empty
- }
-
- override def execute(): RDD[Row] = {
- sideEffectResult
- sparkContext.emptyRDD[Row]
+ Seq.empty[Row]
}
}