aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-09-03 18:57:20 -0700
committerMichael Armbrust <michael@databricks.com>2014-09-03 18:57:20 -0700
commitf48420fde58d554480cc8830d2f8c4d17618f283 (patch)
treebd793ba5bc1e9917f34a7f40daf697346c447393 /sql/hive
parent4bba10c41acaf84a1c4a8e2db467c22f5ab7cbb9 (diff)
downloadspark-f48420fde58d554480cc8830d2f8c4d17618f283.tar.gz
spark-f48420fde58d554480cc8830d2f8c4d17618f283.tar.bz2
spark-f48420fde58d554480cc8830d2f8c4d17618f283.zip
[SPARK-2973][SQL] Lightweight SQL commands without distributed jobs when calling .collect()
By overriding `executeCollect()` in physical plan classes of all commands, we can avoid to kick off a distributed job when collecting result of a SQL command, e.g. `sql("SET").collect()`. Previously, `Command.sideEffectResult` returns a `Seq[Any]`, and the `execute()` method in sub-classes of `Command` typically convert that to a `Seq[Row]` then parallelize it to an RDD. Now with this PR, `sideEffectResult` is required to return a `Seq[Row]` directly, so that `executeCollect()` can directly leverage that and be factored to the `Command` parent class. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #2215 from liancheng/lightweight-commands and squashes the following commits: 3fbef60 [Cheng Lian] Factored execute() method of physical commands to parent class Command 5a0e16c [Cheng Lian] Passes test suites e0e12e9 [Cheng Lian] Refactored Command.sideEffectResult and Command.executeCollect 995bdd8 [Cheng Lian] Cleaned up DescribeHiveTableCommand 542977c [Cheng Lian] Avoids confusion between logical and physical plan by adding package prefixes 55b2aa5 [Cheng Lian] Avoids distributed jobs when execution SQL commands
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala14
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala30
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala11
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala20
5 files changed, 25 insertions, 54 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index d9b2bc7348..ced8397972 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -389,7 +389,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
}.mkString("{", ",", "}")
case (seq: Seq[_], ArrayType(typ, _)) =>
seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
- case (map: Map[_,_], MapType(kType, vType, _)) =>
+ case (map: Map[_, _], MapType(kType, vType, _)) =>
map.map {
case (key, value) =>
toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
@@ -409,7 +409,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
// be similar with Hive.
describeHiveTableCommand.hiveString
case command: PhysicalCommand =>
- command.sideEffectResult.map(_.toString)
+ command.sideEffectResult.map(_.head.toString)
case other =>
val result: Seq[Seq[Any]] = toRdd.collect().toSeq
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 47e24f0dec..24abb1b5bd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -18,17 +18,19 @@
package org.apache.spark.sql.hive
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema}
-import org.apache.spark.sql.execution._
-import org.apache.spark.sql.hive.execution._
+import org.apache.spark.sql.catalyst.types.StringType
import org.apache.spark.sql.columnar.InMemoryRelation
-import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTableScan}
+import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan}
+import org.apache.spark.sql.hive
+import org.apache.spark.sql.hive.execution._
+import org.apache.spark.sql.parquet.ParquetRelation
+import org.apache.spark.sql.{SQLContext, SchemaRDD}
import scala.collection.JavaConversions._
@@ -196,9 +198,9 @@ private[hive] trait HiveStrategies {
case logical.NativeCommand(sql) =>
NativeCommand(sql, plan.output)(context) :: Nil
- case DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil
+ case hive.DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil
- case AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil
+ case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil
case describe: logical.DescribeCommand =>
val resolvedTable = context.executePlan(describe.table).analyzed
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
index a40e89e0d3..317801001c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
import org.apache.spark.sql.execution.{Command, LeafNode}
import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}
@@ -41,26 +41,21 @@ case class DescribeHiveTableCommand(
extends LeafNode with Command {
// Strings with the format like Hive. It is used for result comparison in our unit tests.
- lazy val hiveString: Seq[String] = {
- val alignment = 20
- val delim = "\t"
-
- sideEffectResult.map {
- case (name, dataType, comment) =>
- String.format("%-" + alignment + "s", name) + delim +
- String.format("%-" + alignment + "s", dataType) + delim +
- String.format("%-" + alignment + "s", Option(comment).getOrElse("None"))
- }
+ lazy val hiveString: Seq[String] = sideEffectResult.map {
+ case Row(name: String, dataType: String, comment) =>
+ Seq(name, dataType, Option(comment.asInstanceOf[String]).getOrElse("None"))
+ .map(s => String.format(s"%-20s", s))
+ .mkString("\t")
}
- override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = {
// Trying to mimic the format of Hive's output. But not exactly the same.
var results: Seq[(String, String, String)] = Nil
val columns: Seq[FieldSchema] = table.hiveQlTable.getCols
val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols
results ++= columns.map(field => (field.getName, field.getType, field.getComment))
- if (!partitionColumns.isEmpty) {
+ if (partitionColumns.nonEmpty) {
val partColumnInfo =
partitionColumns.map(field => (field.getName, field.getType, field.getComment))
results ++=
@@ -74,14 +69,9 @@ case class DescribeHiveTableCommand(
results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, ""))
}
- results
- }
-
- override def execute(): RDD[Row] = {
- val rows = sideEffectResult.map {
- case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
+ results.map { case (name, dataType, comment) =>
+ Row(name, dataType, comment)
}
- context.sparkContext.parallelize(rows, 1)
}
override def otherCopyArgs = context :: Nil
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala
index fe6031678f..8f10e1ba7f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala
@@ -32,16 +32,7 @@ case class NativeCommand(
@transient context: HiveContext)
extends LeafNode with Command {
- override protected[sql] lazy val sideEffectResult: Seq[String] = context.runSqlHive(sql)
-
- override def execute(): RDD[Row] = {
- if (sideEffectResult.size == 0) {
- context.emptyResult
- } else {
- val rows = sideEffectResult.map(r => new GenericRow(Array[Any](r)))
- context.sparkContext.parallelize(rows, 1)
- }
- }
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = context.runSqlHive(sql).map(Row(_))
override def otherCopyArgs = context :: Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 2985169da0..a1a4aa7de7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -33,19 +33,13 @@ import org.apache.spark.sql.hive.HiveContext
*/
@DeveloperApi
case class AnalyzeTable(tableName: String) extends LeafNode with Command {
-
def hiveContext = sqlContext.asInstanceOf[HiveContext]
def output = Seq.empty
- override protected[sql] lazy val sideEffectResult = {
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = {
hiveContext.analyze(tableName)
- Seq.empty[Any]
- }
-
- override def execute(): RDD[Row] = {
- sideEffectResult
- sparkContext.emptyRDD[Row]
+ Seq.empty[Row]
}
}
@@ -55,20 +49,14 @@ case class AnalyzeTable(tableName: String) extends LeafNode with Command {
*/
@DeveloperApi
case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with Command {
-
def hiveContext = sqlContext.asInstanceOf[HiveContext]
def output = Seq.empty
- override protected[sql] lazy val sideEffectResult: Seq[Any] = {
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = {
val ifExistsClause = if (ifExists) "IF EXISTS " else ""
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
hiveContext.catalog.unregisterTable(None, tableName)
- Seq.empty
- }
-
- override def execute(): RDD[Row] = {
- sideEffectResult
- sparkContext.emptyRDD[Row]
+ Seq.empty[Row]
}
}