aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-06-13 12:59:48 -0700
committerMichael Armbrust <michael@databricks.com>2014-06-13 12:59:48 -0700
commitac96d9657c9a9f89a455a1b671c059d896012d41 (patch)
tree4458ab91a13169575b9625e0b6fdcd5b96e21823 /sql/catalyst
parent1c2fd015b05b65abc83c4874ada825deac578af8 (diff)
downloadspark-ac96d9657c9a9f89a455a1b671c059d896012d41.tar.gz
spark-ac96d9657c9a9f89a455a1b671c059d896012d41.tar.bz2
spark-ac96d9657c9a9f89a455a1b671c059d896012d41.zip
[SPARK-2094][SQL] "Exactly once" semantics for DDL and command statements
## Related JIRA issues - Main issue: - [SPARK-2094](https://issues.apache.org/jira/browse/SPARK-2094): Ensure exactly once semantics for DDL/Commands - Issues resolved as dependencies: - [SPARK-2081](https://issues.apache.org/jira/browse/SPARK-2081): Undefine output() from the abstract class Command and implement it in concrete subclasses - [SPARK-2128](https://issues.apache.org/jira/browse/SPARK-2128): No plan for DESCRIBE - [SPARK-1852](https://issues.apache.org/jira/browse/SPARK-1852): SparkSQL Queries with Sorts run before the user asks them to - Other related issue: - [SPARK-2129](https://issues.apache.org/jira/browse/SPARK-2129): NPE thrown while lookup a view Two test cases, `join_view` and `mergejoin_mixed`, within the `HiveCompatibilitySuite` are removed from the whitelist to workaround this issue. ## PR Overview This PR defines physical plans for DDL statements and commands and wraps their side effects in a lazy field `PhysicalCommand.sideEffectResult`, so that they are executed eagerly and exactly once. Also, as a positive side effect, now DDL statements and commands can be turned into proper `SchemaRDD`s and let user query the execution results. This PR defines schemas for the following DDL/commands: - EXPLAIN command - `plan`: String, the plan explanation - SET command - `key`: String, the key(s) of the propert(y/ies) being set or queried - `value`: String, the value(s) of the propert(y/ies) being queried - Other Hive native command - `result`: String, execution result returned by Hive **NOTE**: We should refine schemas for different native commands by defining physical plans for them in the future. ## Examples ### EXPLAIN command Take the "EXPLAIN" command as an example, we first execute the command and obtain a `SchemaRDD` at the same time, then query the `plan` field with the schema DSL: ``` scala> loadTestTable("src") ... scala> val q0 = hql("EXPLAIN SELECT key, COUNT(*) FROM src GROUP BY key") ... q0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:98 == Query Plan == ExplainCommandPhysical [plan#11:0] Aggregate false, [key#4], [key#4,SUM(PartialCount#6L) AS c_1#2L] Exchange (HashPartitioning [key#4:0], 200) Exchange (HashPartitioning [key#4:0], 200) Aggregate true, [key#4], [key#4,COUNT(1) AS PartialCount#6L] HiveTableScan [key#4], (MetastoreRelation default, src, None), None scala> q0.select('plan).collect() ... [ExplainCommandPhysical [plan#24:0] Aggregate false, [key#17], [key#17,SUM(PartialCount#19L) AS c_1#2L] Exchange (HashPartitioning [key#17:0], 200) Exchange (HashPartitioning [key#17:0], 200) Aggregate true, [key#17], [key#17,COUNT(1) AS PartialCount#19L] HiveTableScan [key#17], (MetastoreRelation default, src, None), None] scala> ``` ### SET command In this example we query all the properties set in `SQLConf`, register the result as a table, and then query the table with HiveQL: ``` scala> val q1 = hql("SET") ... q1: org.apache.spark.sql.SchemaRDD = SchemaRDD[7] at RDD at SchemaRDD.scala:98 == Query Plan == <SET command: executed by Hive, and noted by SQLContext> scala> q1.registerAsTable("properties") scala> hql("SELECT key, value FROM properties ORDER BY key LIMIT 10").foreach(println) ... == Query Plan == TakeOrdered 10, [key#51:0 ASC] Project [key#51:0,value#52:1] SetCommandPhysical None, None, [key#55:0,value#56:1]), which has no missing parents 14/06/12 12:19:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 5 (SchemaRDD[21] at RDD at SchemaRDD.scala:98 == Query Plan == TakeOrdered 10, [key#51:0 ASC] Project [key#51:0,value#52:1] SetCommandPhysical None, None, [key#55:0,value#56:1]) ... [datanucleus.autoCreateSchema,true] [datanucleus.autoStartMechanismMode,checked] [datanucleus.cache.level2,false] [datanucleus.cache.level2.type,none] [datanucleus.connectionPoolingType,BONECP] [datanucleus.fixedDatastore,false] [datanucleus.identifierFactory,datanucleus1] [datanucleus.plugin.pluginRegistryBundleCheck,LOG] [datanucleus.rdbms.useLegacyNativeValueStrategy,true] [datanucleus.storeManagerType,rdbms] scala> ``` ### "Exactly once" semantics At last, an example of the "exactly once" semantics: ``` scala> val q2 = hql("CREATE TABLE t1(key INT, value STRING)") ... q2: org.apache.spark.sql.SchemaRDD = SchemaRDD[28] at RDD at SchemaRDD.scala:98 == Query Plan == <Native command: executed by Hive> scala> table("t1") ... res9: org.apache.spark.sql.SchemaRDD = SchemaRDD[32] at RDD at SchemaRDD.scala:98 == Query Plan == HiveTableScan [key#58,value#59], (MetastoreRelation default, t1, None), None scala> q2.collect() ... res10: Array[org.apache.spark.sql.Row] = Array([]) scala> ``` As we can see, the "CREATE TABLE" command is executed eagerly right after the `SchemaRDD` is created, and referencing the `SchemaRDD` again won't trigger a duplicated execution. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #1071 from liancheng/exactlyOnceCommand and squashes the following commits: d005b03 [Cheng Lian] Made "SET key=value" returns the newly set key value pair f6c7715 [Cheng Lian] Added test cases for DDL/command statement RDDs 1d00937 [Cheng Lian] Makes SchemaRDD DSLs work for DDL/command statement RDDs 5c7e680 [Cheng Lian] Bug fix: wrong type used in pattern matching 48aa2e5 [Cheng Lian] Refined SQLContext.emptyResult as an empty RDD[Row] cc64f32 [Cheng Lian] Renamed physical plan classes for DDL/commands 74789c1 [Cheng Lian] Fixed failing test cases 0ad343a [Cheng Lian] Added physical plan for DDL and commands to ensure the "exactly once" semantics
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala18
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala2
2 files changed, 11 insertions, 9 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
index d05c965275..3299e86b85 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference}
import org.apache.spark.sql.catalyst.types.StringType
/**
@@ -26,23 +26,25 @@ import org.apache.spark.sql.catalyst.types.StringType
*/
abstract class Command extends LeafNode {
self: Product =>
- def output: Seq[Attribute] = Seq.empty // TODO: SPARK-2081 should fix this
+ def output: Seq[Attribute] = Seq.empty
}
/**
* Returned for commands supported by a given parser, but not catalyst. In general these are DDL
* commands that are passed directly to another system.
*/
-case class NativeCommand(cmd: String) extends Command
+case class NativeCommand(cmd: String) extends Command {
+ override def output =
+ Seq(BoundReference(0, AttributeReference("result", StringType, nullable = false)()))
+}
/**
* Commands of the form "SET (key) (= value)".
*/
case class SetCommand(key: Option[String], value: Option[String]) extends Command {
override def output = Seq(
- AttributeReference("key", StringType, nullable = false)(),
- AttributeReference("value", StringType, nullable = false)()
- )
+ BoundReference(0, AttributeReference("key", StringType, nullable = false)()),
+ BoundReference(1, AttributeReference("value", StringType, nullable = false)()))
}
/**
@@ -50,11 +52,11 @@ case class SetCommand(key: Option[String], value: Option[String]) extends Comman
* actually performing the execution.
*/
case class ExplainCommand(plan: LogicalPlan) extends Command {
- override def output = Seq(AttributeReference("plan", StringType, nullable = false)())
+ override def output =
+ Seq(BoundReference(0, AttributeReference("plan", StringType, nullable = false)()))
}
/**
* Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
*/
case class CacheCommand(tableName: String, doCache: Boolean) extends Command
-
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 0cada785b6..1f67c80e54 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -161,7 +161,7 @@ class FilterPushdownSuite extends OptimizerTest {
comparePlans(optimized, correctAnswer)
}
-
+
test("joins: push down left outer join #1") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)