diff options
author | Cheng Lian <lian.cs.zju@gmail.com> | 2014-06-13 12:59:48 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-06-13 12:59:48 -0700 |
commit | ac96d9657c9a9f89a455a1b671c059d896012d41 (patch) | |
tree | 4458ab91a13169575b9625e0b6fdcd5b96e21823 /sql/core | |
parent | 1c2fd015b05b65abc83c4874ada825deac578af8 (diff) | |
download | spark-ac96d9657c9a9f89a455a1b671c059d896012d41.tar.gz spark-ac96d9657c9a9f89a455a1b671c059d896012d41.tar.bz2 spark-ac96d9657c9a9f89a455a1b671c059d896012d41.zip |
[SPARK-2094][SQL] "Exactly once" semantics for DDL and command statements
## Related JIRA issues
- Main issue:
- [SPARK-2094](https://issues.apache.org/jira/browse/SPARK-2094): Ensure exactly once semantics for DDL/Commands
- Issues resolved as dependencies:
- [SPARK-2081](https://issues.apache.org/jira/browse/SPARK-2081): Undefine output() from the abstract class Command and implement it in concrete subclasses
- [SPARK-2128](https://issues.apache.org/jira/browse/SPARK-2128): No plan for DESCRIBE
- [SPARK-1852](https://issues.apache.org/jira/browse/SPARK-1852): SparkSQL Queries with Sorts run before the user asks them to
- Other related issue:
- [SPARK-2129](https://issues.apache.org/jira/browse/SPARK-2129): NPE thrown while lookup a view
Two test cases, `join_view` and `mergejoin_mixed`, within the `HiveCompatibilitySuite` are removed from the whitelist to workaround this issue.
## PR Overview
This PR defines physical plans for DDL statements and commands and wraps their side effects in a lazy field `PhysicalCommand.sideEffectResult`, so that they are executed eagerly and exactly once. Also, as a positive side effect, now DDL statements and commands can be turned into proper `SchemaRDD`s and let user query the execution results.
This PR defines schemas for the following DDL/commands:
- EXPLAIN command
- `plan`: String, the plan explanation
- SET command
- `key`: String, the key(s) of the propert(y/ies) being set or queried
- `value`: String, the value(s) of the propert(y/ies) being queried
- Other Hive native command
- `result`: String, execution result returned by Hive
**NOTE**: We should refine schemas for different native commands by defining physical plans for them in the future.
## Examples
### EXPLAIN command
Take the "EXPLAIN" command as an example, we first execute the command and obtain a `SchemaRDD` at the same time, then query the `plan` field with the schema DSL:
```
scala> loadTestTable("src")
...
scala> val q0 = hql("EXPLAIN SELECT key, COUNT(*) FROM src GROUP BY key")
...
q0: org.apache.spark.sql.SchemaRDD =
SchemaRDD[0] at RDD at SchemaRDD.scala:98
== Query Plan ==
ExplainCommandPhysical [plan#11:0]
Aggregate false, [key#4], [key#4,SUM(PartialCount#6L) AS c_1#2L]
Exchange (HashPartitioning [key#4:0], 200)
Exchange (HashPartitioning [key#4:0], 200)
Aggregate true, [key#4], [key#4,COUNT(1) AS PartialCount#6L]
HiveTableScan [key#4], (MetastoreRelation default, src, None), None
scala> q0.select('plan).collect()
...
[ExplainCommandPhysical [plan#24:0]
Aggregate false, [key#17], [key#17,SUM(PartialCount#19L) AS c_1#2L]
Exchange (HashPartitioning [key#17:0], 200)
Exchange (HashPartitioning [key#17:0], 200)
Aggregate true, [key#17], [key#17,COUNT(1) AS PartialCount#19L]
HiveTableScan [key#17], (MetastoreRelation default, src, None), None]
scala>
```
### SET command
In this example we query all the properties set in `SQLConf`, register the result as a table, and then query the table with HiveQL:
```
scala> val q1 = hql("SET")
...
q1: org.apache.spark.sql.SchemaRDD =
SchemaRDD[7] at RDD at SchemaRDD.scala:98
== Query Plan ==
<SET command: executed by Hive, and noted by SQLContext>
scala> q1.registerAsTable("properties")
scala> hql("SELECT key, value FROM properties ORDER BY key LIMIT 10").foreach(println)
...
== Query Plan ==
TakeOrdered 10, [key#51:0 ASC]
Project [key#51:0,value#52:1]
SetCommandPhysical None, None, [key#55:0,value#56:1]), which has no missing parents
14/06/12 12:19:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 5 (SchemaRDD[21] at RDD at SchemaRDD.scala:98
== Query Plan ==
TakeOrdered 10, [key#51:0 ASC]
Project [key#51:0,value#52:1]
SetCommandPhysical None, None, [key#55:0,value#56:1])
...
[datanucleus.autoCreateSchema,true]
[datanucleus.autoStartMechanismMode,checked]
[datanucleus.cache.level2,false]
[datanucleus.cache.level2.type,none]
[datanucleus.connectionPoolingType,BONECP]
[datanucleus.fixedDatastore,false]
[datanucleus.identifierFactory,datanucleus1]
[datanucleus.plugin.pluginRegistryBundleCheck,LOG]
[datanucleus.rdbms.useLegacyNativeValueStrategy,true]
[datanucleus.storeManagerType,rdbms]
scala>
```
### "Exactly once" semantics
At last, an example of the "exactly once" semantics:
```
scala> val q2 = hql("CREATE TABLE t1(key INT, value STRING)")
...
q2: org.apache.spark.sql.SchemaRDD =
SchemaRDD[28] at RDD at SchemaRDD.scala:98
== Query Plan ==
<Native command: executed by Hive>
scala> table("t1")
...
res9: org.apache.spark.sql.SchemaRDD =
SchemaRDD[32] at RDD at SchemaRDD.scala:98
== Query Plan ==
HiveTableScan [key#58,value#59], (MetastoreRelation default, t1, None), None
scala> q2.collect()
...
res10: Array[org.apache.spark.sql.Row] = Array([])
scala>
```
As we can see, the "CREATE TABLE" command is executed eagerly right after the `SchemaRDD` is created, and referencing the `SchemaRDD` again won't trigger a duplicated execution.
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes #1071 from liancheng/exactlyOnceCommand and squashes the following commits:
d005b03 [Cheng Lian] Made "SET key=value" returns the newly set key value pair
f6c7715 [Cheng Lian] Added test cases for DDL/command statement RDDs
1d00937 [Cheng Lian] Makes SchemaRDD DSLs work for DDL/command statement RDDs
5c7e680 [Cheng Lian] Bug fix: wrong type used in pattern matching
48aa2e5 [Cheng Lian] Refined SQLContext.emptyResult as an empty RDD[Row]
cc64f32 [Cheng Lian] Renamed physical plan classes for DDL/commands
74789c1 [Cheng Lian] Fixed failing test cases
0ad343a [Cheng Lian] Added physical plan for DDL and commands to ensure the "exactly once" semantics
Diffstat (limited to 'sql/core')
7 files changed, 86 insertions, 80 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index b6a2f1b9d1..378ff54531 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.{ScalaReflection, dsl} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.plans.logical.{SetCommand, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.columnar.InMemoryRelation @@ -147,14 +147,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * * @group userf */ - def sql(sqlText: String): SchemaRDD = { - val result = new SchemaRDD(this, parseSql(sqlText)) - // We force query optimization to happen right away instead of letting it happen lazily like - // when using the query DSL. This is so DDL commands behave as expected. This is only - // generates the RDD lineage for DML queries, but do not perform any execution. - result.queryExecution.toRdd - result - } + def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText)) /** Returns the specified table as a SchemaRDD */ def table(tableName: String): SchemaRDD = @@ -259,8 +252,7 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] val planner = new SparkPlanner @transient - protected[sql] lazy val emptyResult = - sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1) + protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[Row], 1) /** * Prepares a planned SparkPlan for execution by binding references to specific ordinals, and @@ -280,22 +272,6 @@ class SQLContext(@transient val sparkContext: SparkContext) protected abstract class QueryExecution { def logical: LogicalPlan - def eagerlyProcess(plan: LogicalPlan): RDD[Row] = plan match { - case SetCommand(key, value) => - // Only this case needs to be executed eagerly. The other cases will - // be taken care of when the actual results are being extracted. - // In the case of HiveContext, sqlConf is overridden to also pass the - // pair into its HiveConf. - if (key.isDefined && value.isDefined) { - set(key.get, value.get) - } - // It doesn't matter what we return here, since this is only used - // to force the evaluation to happen eagerly. To query the results, - // one must use SchemaRDD operations to extract them. - emptyResult - case _ => executedPlan.execute() - } - lazy val analyzed = analyzer(logical) lazy val optimizedPlan = optimizer(analyzed) // TODO: Don't just pick the first one... @@ -303,12 +279,7 @@ class SQLContext(@transient val sparkContext: SparkContext) lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) /** Internal version of the RDD. Avoids copies and has no schema */ - lazy val toRdd: RDD[Row] = { - logical match { - case s: SetCommand => eagerlyProcess(s) - case _ => executedPlan.execute() - } - } + lazy val toRdd: RDD[Row] = executedPlan.execute() protected def stringOrError[A](f: => A): String = try f.toString catch { case e: Throwable => e.toString } @@ -330,7 +301,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * TODO: We only support primitive types, add support for nested types. */ private[sql] def inferSchema(rdd: RDD[Map[String, _]]): SchemaRDD = { - val schema = rdd.first.map { case (fieldName, obj) => + val schema = rdd.first().map { case (fieldName, obj) => val dataType = obj.getClass match { case c: Class[_] if c == classOf[java.lang.String] => StringType case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 7ad8edf5a5..821ac850ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -97,7 +97,7 @@ import java.util.{Map => JMap} @AlphaComponent class SchemaRDD( @transient val sqlContext: SQLContext, - @transient protected[spark] val logicalPlan: LogicalPlan) + @transient val baseLogicalPlan: LogicalPlan) extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike { def baseSchemaRDD = this diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala index 3a895e15a4..656be965a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala @@ -20,13 +20,14 @@ package org.apache.spark.sql import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.SparkLogicalPlan /** * Contains functions that are shared between all SchemaRDD types (i.e., Scala, Java) */ private[sql] trait SchemaRDDLike { @transient val sqlContext: SQLContext - @transient protected[spark] val logicalPlan: LogicalPlan + @transient val baseLogicalPlan: LogicalPlan private[sql] def baseSchemaRDD: SchemaRDD @@ -48,7 +49,17 @@ private[sql] trait SchemaRDDLike { */ @transient @DeveloperApi - lazy val queryExecution = sqlContext.executePlan(logicalPlan) + lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan) + + @transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match { + // For various commands (like DDL) and queries with side effects, we force query optimization to + // happen right away to let these side effects take place eagerly. + case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile => + queryExecution.toRdd + SparkLogicalPlan(queryExecution.executedPlan) + case _ => + baseLogicalPlan + } override def toString = s"""${super.toString} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala index 22f57b758d..aff6ffe9f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala @@ -37,7 +37,7 @@ import org.apache.spark.storage.StorageLevel */ class JavaSchemaRDD( @transient val sqlContext: SQLContext, - @transient protected[spark] val logicalPlan: LogicalPlan) + @transient val baseLogicalPlan: LogicalPlan) extends JavaRDDLike[Row, JavaRDD[Row]] with SchemaRDDLike { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index a657911afe..2233216a6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.{SQLConf, SQLContext, execution} +import org.apache.spark.sql.{SQLContext, execution} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ @@ -157,7 +157,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) => InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil - case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => { + case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => val prunePushedDownFilters = if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) { (filters: Seq[Expression]) => { @@ -186,7 +186,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { filters, prunePushedDownFilters, ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil - } case _ => Nil } @@ -250,12 +249,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case class CommandStrategy(context: SQLContext) extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.SetCommand(key, value) => - Seq(execution.SetCommandPhysical(key, value, plan.output)(context)) + Seq(execution.SetCommand(key, value, plan.output)(context)) case logical.ExplainCommand(child) => val executedPlan = context.executePlan(child).executedPlan - Seq(execution.ExplainCommandPhysical(executedPlan, plan.output)(context)) + Seq(execution.ExplainCommand(executedPlan, plan.output)(context)) case logical.CacheCommand(tableName, cache) => - Seq(execution.CacheCommandPhysical(tableName, cache)(context)) + Seq(execution.CacheCommand(tableName, cache)(context)) case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index be26d19e66..0377290af5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -22,45 +22,69 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SQLContext, Row} import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute} +trait Command { + /** + * A concrete command should override this lazy field to wrap up any side effects caused by the + * command or any other computation that should be evaluated exactly once. The value of this field + * can be used as the contents of the corresponding RDD generated from the physical plan of this + * command. + * + * The `execute()` method of all the physical command classes should reference `sideEffectResult` + * so that the command can be executed eagerly right after the command query is created. + */ + protected[sql] lazy val sideEffectResult: Seq[Any] = Seq.empty[Any] +} + /** * :: DeveloperApi :: */ @DeveloperApi -case class SetCommandPhysical(key: Option[String], value: Option[String], output: Seq[Attribute]) - (@transient context: SQLContext) extends LeafNode { - def execute(): RDD[Row] = (key, value) match { - // Set value for key k; the action itself would - // have been performed in QueryExecution eagerly. - case (Some(k), Some(v)) => context.emptyResult +case class SetCommand( + key: Option[String], value: Option[String], output: Seq[Attribute])( + @transient context: SQLContext) + extends LeafNode with Command { + + override protected[sql] lazy val sideEffectResult: Seq[(String, String)] = (key, value) match { + // Set value for key k. + case (Some(k), Some(v)) => + context.set(k, v) + Array(k -> v) + // Query the value bound to key k. - case (Some(k), None) => - val resultString = context.getOption(k) match { - case Some(v) => s"$k=$v" - case None => s"$k is undefined" - } - context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](resultString))), 1) + case (Some(k), _) => + Array(k -> context.getOption(k).getOrElse("<undefined>")) + // Query all key-value pairs that are set in the SQLConf of the context. case (None, None) => - val pairs = context.getAll - val rows = pairs.map { case (k, v) => - new GenericRow(Array[Any](s"$k=$v")) - }.toSeq - // Assume config parameters can fit into one split (machine) ;) - context.sparkContext.parallelize(rows, 1) - // The only other case is invalid semantics and is impossible. - case _ => context.emptyResult + context.getAll + + case _ => + throw new IllegalArgumentException() } + + def execute(): RDD[Row] = { + val rows = sideEffectResult.map { case (k, v) => new GenericRow(Array[Any](k, v)) } + context.sparkContext.parallelize(rows, 1) + } + + override def otherCopyArgs = context :: Nil } /** * :: DeveloperApi :: */ @DeveloperApi -case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute]) - (@transient context: SQLContext) extends UnaryNode { +case class ExplainCommand( + child: SparkPlan, output: Seq[Attribute])( + @transient context: SQLContext) + extends UnaryNode with Command { + + // Actually "EXPLAIN" command doesn't cause any side effect. + override protected[sql] lazy val sideEffectResult: Seq[String] = this.toString.split("\n") + def execute(): RDD[Row] = { - val planString = new GenericRow(Array[Any](child.toString)) - context.sparkContext.parallelize(Seq(planString)) + val explanation = sideEffectResult.mkString("\n") + context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](explanation))), 1) } override def otherCopyArgs = context :: Nil @@ -70,19 +94,20 @@ case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute]) * :: DeveloperApi :: */ @DeveloperApi -case class CacheCommandPhysical(tableName: String, doCache: Boolean)(@transient context: SQLContext) - extends LeafNode { +case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext) + extends LeafNode with Command { - lazy val commandSideEffect = { + override protected[sql] lazy val sideEffectResult = { if (doCache) { context.cacheTable(tableName) } else { context.uncacheTable(tableName) } + Seq.empty[Any] } override def execute(): RDD[Row] = { - commandSideEffect + sideEffectResult context.emptyResult } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index c1fc99f077..e9360b0fc7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -141,7 +141,7 @@ class SQLQuerySuite extends QueryTest { sql("SELECT AVG(a),b FROM largeAndSmallInts group by b"), Seq((2147483645.0,1),(2.0,2))) } - + test("count") { checkAnswer( sql("SELECT COUNT(*) FROM testData2"), @@ -332,7 +332,7 @@ class SQLQuerySuite extends QueryTest { (3, "C"), (4, "D"))) } - + test("system function upper()") { checkAnswer( sql("SELECT n,UPPER(l) FROM lowerCaseData"), @@ -349,7 +349,7 @@ class SQLQuerySuite extends QueryTest { (2, "ABC"), (3, null))) } - + test("system function lower()") { checkAnswer( sql("SELECT N,LOWER(L) FROM upperCaseData"), @@ -382,25 +382,25 @@ class SQLQuerySuite extends QueryTest { sql(s"SET $testKey=$testVal") checkAnswer( sql("SET"), - Seq(Seq(s"$testKey=$testVal")) + Seq(Seq(testKey, testVal)) ) sql(s"SET ${testKey + testKey}=${testVal + testVal}") checkAnswer( sql("set"), Seq( - Seq(s"$testKey=$testVal"), - Seq(s"${testKey + testKey}=${testVal + testVal}")) + Seq(testKey, testVal), + Seq(testKey + testKey, testVal + testVal)) ) // "set key" checkAnswer( sql(s"SET $testKey"), - Seq(Seq(s"$testKey=$testVal")) + Seq(Seq(testKey, testVal)) ) checkAnswer( sql(s"SET $nonexistentKey"), - Seq(Seq(s"$nonexistentKey is undefined")) + Seq(Seq(nonexistentKey, "<undefined>")) ) clear() } |