From f48420fde58d554480cc8830d2f8c4d17618f283 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 3 Sep 2014 18:57:20 -0700 Subject: [SPARK-2973][SQL] Lightweight SQL commands without distributed jobs when calling .collect() By overriding `executeCollect()` in physical plan classes of all commands, we can avoid to kick off a distributed job when collecting result of a SQL command, e.g. `sql("SET").collect()`. Previously, `Command.sideEffectResult` returns a `Seq[Any]`, and the `execute()` method in sub-classes of `Command` typically convert that to a `Seq[Row]` then parallelize it to an RDD. Now with this PR, `sideEffectResult` is required to return a `Seq[Row]` directly, so that `executeCollect()` can directly leverage that and be factored to the `Command` parent class. Author: Cheng Lian Closes #2215 from liancheng/lightweight-commands and squashes the following commits: 3fbef60 [Cheng Lian] Factored execute() method of physical commands to parent class Command 5a0e16c [Cheng Lian] Passes test suites e0e12e9 [Cheng Lian] Refactored Command.sideEffectResult and Command.executeCollect 995bdd8 [Cheng Lian] Cleaned up DescribeHiveTableCommand 542977c [Cheng Lian] Avoids confusion between logical and physical plan by adding package prefixes 55b2aa5 [Cheng Lian] Avoids distributed jobs when execution SQL commands --- .../org/apache/spark/sql/execution/commands.scala | 63 ++++++++-------------- .../org/apache/spark/sql/hive/HiveContext.scala | 4 +- .../org/apache/spark/sql/hive/HiveStrategies.scala | 14 ++--- .../hive/execution/DescribeHiveTableCommand.scala | 30 ++++------- .../spark/sql/hive/execution/NativeCommand.scala | 11 +--- .../apache/spark/sql/hive/execution/commands.scala | 20 ++----- 6 files changed, 48 insertions(+), 94 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 031b695169..286c6d264f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -21,11 +21,13 @@ import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.errors.TreeNodeException -import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.{Row, SQLConf, SQLContext} trait Command { + this: SparkPlan => + /** * A concrete command should override this lazy field to wrap up any side effects caused by the * command or any other computation that should be evaluated exactly once. The value of this field @@ -35,7 +37,11 @@ trait Command { * The `execute()` method of all the physical command classes should reference `sideEffectResult` * so that the command can be executed eagerly right after the command query is created. */ - protected[sql] lazy val sideEffectResult: Seq[Any] = Seq.empty[Any] + protected[sql] lazy val sideEffectResult: Seq[Row] = Seq.empty[Row] + + override def executeCollect(): Array[Row] = sideEffectResult.toArray + + override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1) } /** @@ -47,17 +53,17 @@ case class SetCommand( @transient context: SQLContext) extends LeafNode with Command with Logging { - override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match { + override protected[sql] lazy val sideEffectResult: Seq[Row] = (key, value) match { // Set value for key k. case (Some(k), Some(v)) => if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) { logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.") context.setConf(SQLConf.SHUFFLE_PARTITIONS, v) - Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v") + Array(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")) } else { context.setConf(k, v) - Array(s"$k=$v") + Array(Row(s"$k=$v")) } // Query the value bound to key k. @@ -73,28 +79,22 @@ case class SetCommand( "hive-0.12.0.jar").mkString(":") Array( - "system:java.class.path=" + hiveJars, - "system:sun.java.command=shark.SharkServer2") - } - else { - Array(s"$k=${context.getConf(k, "")}") + Row("system:java.class.path=" + hiveJars), + Row("system:sun.java.command=shark.SharkServer2")) + } else { + Array(Row(s"$k=${context.getConf(k, "")}")) } // Query all key-value pairs that are set in the SQLConf of the context. case (None, None) => context.getAllConfs.map { case (k, v) => - s"$k=$v" + Row(s"$k=$v") }.toSeq case _ => throw new IllegalArgumentException() } - def execute(): RDD[Row] = { - val rows = sideEffectResult.map { line => new GenericRow(Array[Any](line)) } - context.sparkContext.parallelize(rows, 1) - } - override def otherCopyArgs = context :: Nil } @@ -113,19 +113,14 @@ case class ExplainCommand( extends LeafNode with Command { // Run through the optimizer to generate the physical plan. - override protected[sql] lazy val sideEffectResult: Seq[String] = try { + override protected[sql] lazy val sideEffectResult: Seq[Row] = try { // TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties. val queryExecution = context.executePlan(logicalPlan) val outputString = if (extended) queryExecution.toString else queryExecution.simpleString - outputString.split("\n") + outputString.split("\n").map(Row(_)) } catch { case cause: TreeNodeException[_] => - ("Error occurred during query planning: \n" + cause.getMessage).split("\n") - } - - def execute(): RDD[Row] = { - val explanation = sideEffectResult.map(row => new GenericRow(Array[Any](row))) - context.sparkContext.parallelize(explanation, 1) + ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) } override def otherCopyArgs = context :: Nil @@ -144,12 +139,7 @@ case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: } else { context.uncacheTable(tableName) } - Seq.empty[Any] - } - - override def execute(): RDD[Row] = { - sideEffectResult - context.emptyResult + Seq.empty[Row] } override def output: Seq[Attribute] = Seq.empty @@ -163,15 +153,8 @@ case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])( @transient context: SQLContext) extends LeafNode with Command { - override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = { - Seq(("# Registered as a temporary table", null, null)) ++ - child.output.map(field => (field.name, field.dataType.toString, null)) - } - - override def execute(): RDD[Row] = { - val rows = sideEffectResult.map { - case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment)) - } - context.sparkContext.parallelize(rows, 1) + override protected[sql] lazy val sideEffectResult: Seq[Row] = { + Row("# Registered as a temporary table", null, null) +: + child.output.map(field => Row(field.name, field.dataType.toString, null)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index d9b2bc7348..ced8397972 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -389,7 +389,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { }.mkString("{", ",", "}") case (seq: Seq[_], ArrayType(typ, _)) => seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]") - case (map: Map[_,_], MapType(kType, vType, _)) => + case (map: Map[_, _], MapType(kType, vType, _)) => map.map { case (key, value) => toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType)) @@ -409,7 +409,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { // be similar with Hive. describeHiveTableCommand.hiveString case command: PhysicalCommand => - command.sideEffectResult.map(_.toString) + command.sideEffectResult.map(_.head.toString) case other => val result: Seq[Seq[Any]] = toRdd.collect().toSeq diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 47e24f0dec..24abb1b5bd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -18,17 +18,19 @@ package org.apache.spark.sql.hive import org.apache.spark.annotation.Experimental -import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema} -import org.apache.spark.sql.execution._ -import org.apache.spark.sql.hive.execution._ +import org.apache.spark.sql.catalyst.types.StringType import org.apache.spark.sql.columnar.InMemoryRelation -import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTableScan} +import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan} +import org.apache.spark.sql.hive +import org.apache.spark.sql.hive.execution._ +import org.apache.spark.sql.parquet.ParquetRelation +import org.apache.spark.sql.{SQLContext, SchemaRDD} import scala.collection.JavaConversions._ @@ -196,9 +198,9 @@ private[hive] trait HiveStrategies { case logical.NativeCommand(sql) => NativeCommand(sql, plan.output)(context) :: Nil - case DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil + case hive.DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil - case AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil + case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil case describe: logical.DescribeCommand => val resolvedTable = context.executePlan(describe.table).analyzed diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala index a40e89e0d3..317801001c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Row} import org.apache.spark.sql.execution.{Command, LeafNode} import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation} @@ -41,26 +41,21 @@ case class DescribeHiveTableCommand( extends LeafNode with Command { // Strings with the format like Hive. It is used for result comparison in our unit tests. - lazy val hiveString: Seq[String] = { - val alignment = 20 - val delim = "\t" - - sideEffectResult.map { - case (name, dataType, comment) => - String.format("%-" + alignment + "s", name) + delim + - String.format("%-" + alignment + "s", dataType) + delim + - String.format("%-" + alignment + "s", Option(comment).getOrElse("None")) - } + lazy val hiveString: Seq[String] = sideEffectResult.map { + case Row(name: String, dataType: String, comment) => + Seq(name, dataType, Option(comment.asInstanceOf[String]).getOrElse("None")) + .map(s => String.format(s"%-20s", s)) + .mkString("\t") } - override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = { + override protected[sql] lazy val sideEffectResult: Seq[Row] = { // Trying to mimic the format of Hive's output. But not exactly the same. var results: Seq[(String, String, String)] = Nil val columns: Seq[FieldSchema] = table.hiveQlTable.getCols val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols results ++= columns.map(field => (field.getName, field.getType, field.getComment)) - if (!partitionColumns.isEmpty) { + if (partitionColumns.nonEmpty) { val partColumnInfo = partitionColumns.map(field => (field.getName, field.getType, field.getComment)) results ++= @@ -74,14 +69,9 @@ case class DescribeHiveTableCommand( results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, "")) } - results - } - - override def execute(): RDD[Row] = { - val rows = sideEffectResult.map { - case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment)) + results.map { case (name, dataType, comment) => + Row(name, dataType, comment) } - context.sparkContext.parallelize(rows, 1) } override def otherCopyArgs = context :: Nil diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala index fe6031678f..8f10e1ba7f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala @@ -32,16 +32,7 @@ case class NativeCommand( @transient context: HiveContext) extends LeafNode with Command { - override protected[sql] lazy val sideEffectResult: Seq[String] = context.runSqlHive(sql) - - override def execute(): RDD[Row] = { - if (sideEffectResult.size == 0) { - context.emptyResult - } else { - val rows = sideEffectResult.map(r => new GenericRow(Array[Any](r))) - context.sparkContext.parallelize(rows, 1) - } - } + override protected[sql] lazy val sideEffectResult: Seq[Row] = context.runSqlHive(sql).map(Row(_)) override def otherCopyArgs = context :: Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 2985169da0..a1a4aa7de7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -33,19 +33,13 @@ import org.apache.spark.sql.hive.HiveContext */ @DeveloperApi case class AnalyzeTable(tableName: String) extends LeafNode with Command { - def hiveContext = sqlContext.asInstanceOf[HiveContext] def output = Seq.empty - override protected[sql] lazy val sideEffectResult = { + override protected[sql] lazy val sideEffectResult: Seq[Row] = { hiveContext.analyze(tableName) - Seq.empty[Any] - } - - override def execute(): RDD[Row] = { - sideEffectResult - sparkContext.emptyRDD[Row] + Seq.empty[Row] } } @@ -55,20 +49,14 @@ case class AnalyzeTable(tableName: String) extends LeafNode with Command { */ @DeveloperApi case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with Command { - def hiveContext = sqlContext.asInstanceOf[HiveContext] def output = Seq.empty - override protected[sql] lazy val sideEffectResult: Seq[Any] = { + override protected[sql] lazy val sideEffectResult: Seq[Row] = { val ifExistsClause = if (ifExists) "IF EXISTS " else "" hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") hiveContext.catalog.unregisterTable(None, tableName) - Seq.empty - } - - override def execute(): RDD[Row] = { - sideEffectResult - sparkContext.emptyRDD[Row] + Seq.empty[Row] } } -- cgit v1.2.3