aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-02-23 17:34:54 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-23 17:34:54 -0800
commit1ed57086d402c38d95cda6c3d9d7aea806609bf9 (patch)
treefb92a551881535edd2bb9c8c234d901d81e10876 /sql/core
parent48376bfe9c97bf31279918def6c6615849c88f4d (diff)
downloadspark-1ed57086d402c38d95cda6c3d9d7aea806609bf9.tar.gz
spark-1ed57086d402c38d95cda6c3d9d7aea806609bf9.tar.bz2
spark-1ed57086d402c38d95cda6c3d9d7aea806609bf9.zip
[SPARK-5873][SQL] Allow viewing of partially analyzed plans in queryExecution
Author: Michael Armbrust <michael@databricks.com> Closes #4684 from marmbrus/explainAnalysis and squashes the following commits: afbaa19 [Michael Armbrust] fix python d93278c [Michael Armbrust] fix hive e5fa0a4 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into explainAnalysis 52119f2 [Michael Armbrust] more tests 82a5431 [Michael Armbrust] fix tests 25753d2 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into explainAnalysis aee1e6a [Michael Armbrust] fix hive b23a844 [Michael Armbrust] newline de8dc51 [Michael Armbrust] more comments acf620a [Michael Armbrust] [SPARK-5873][SQL] Show partially analyzed plans in query execution
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala2
6 files changed, 21 insertions, 13 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 69e5f6a07d..27ac398063 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -117,7 +117,7 @@ class DataFrame protected[sql](
this(sqlContext, {
val qe = sqlContext.executePlan(logicalPlan)
if (sqlContext.conf.dataFrameEagerAnalysis) {
- qe.analyzed // This should force analysis and throw errors if there are any
+ qe.assertAnalyzed() // This should force analysis and throw errors if there are any
}
qe
})
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 39f6c2f4bc..a08c0f5ce3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -52,8 +52,9 @@ private[spark] object SQLConf {
// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = "spark.sql.sources.default"
- // Whether to perform eager analysis on a DataFrame.
- val DATAFRAME_EAGER_ANALYSIS = "spark.sql.dataframe.eagerAnalysis"
+ // Whether to perform eager analysis when constructing a dataframe.
+ // Set to false when debugging requires the ability to look at invalid query plans.
+ val DATAFRAME_EAGER_ANALYSIS = "spark.sql.eagerAnalysis"
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 4bdaa02391..ce800e0754 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -114,7 +114,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
new Analyzer(catalog, functionRegistry, caseSensitive = true) {
override val extendedResolutionRules =
ExtractPythonUdfs ::
- sources.PreWriteCheck(catalog) ::
sources.PreInsertCastAndRename ::
Nil
}
@@ -1057,6 +1056,13 @@ class SQLContext(@transient val sparkContext: SparkContext)
Batch("Add exchange", Once, AddExchange(self)) :: Nil
}
+ @transient
+ protected[sql] lazy val checkAnalysis = new CheckAnalysis {
+ override val extendedCheckRules = Seq(
+ sources.PreWriteCheck(catalog)
+ )
+ }
+
/**
* :: DeveloperApi ::
* The primary workflow for executing relational queries using Spark. Designed to allow easy
@@ -1064,9 +1070,13 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
@DeveloperApi
protected[sql] class QueryExecution(val logical: LogicalPlan) {
+ def assertAnalyzed(): Unit = checkAnalysis(analyzed)
lazy val analyzed: LogicalPlan = analyzer(logical)
- lazy val withCachedData: LogicalPlan = cacheManager.useCachedData(analyzed)
+ lazy val withCachedData: LogicalPlan = {
+ assertAnalyzed
+ cacheManager.useCachedData(analyzed)
+ }
lazy val optimizedPlan: LogicalPlan = optimizer(withCachedData)
// TODO: Don't just pick the first one...
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
index 36a9c0bdc4..8440581074 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
@@ -78,10 +78,10 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
/**
* A rule to do various checks before inserting into or writing to a data source table.
*/
-private[sql] case class PreWriteCheck(catalog: Catalog) extends Rule[LogicalPlan] {
+private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => Unit) {
def failAnalysis(msg: String) = { throw new AnalysisException(msg) }
- def apply(plan: LogicalPlan): LogicalPlan = {
+ def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case i @ logical.InsertIntoTable(
l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite) =>
@@ -93,7 +93,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends Rule[LogicalPlan
val srcRelations = query.collect {
case LogicalRelation(src: BaseRelation) => src
}
- if (srcRelations.exists(src => src == t)) {
+ if (srcRelations.contains(t)) {
failAnalysis(
"Cannot insert overwrite into table that is also being read from.")
} else {
@@ -119,7 +119,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends Rule[LogicalPlan
val srcRelations = query.collect {
case LogicalRelation(src: BaseRelation) => src
}
- if (srcRelations.exists(src => src == dest)) {
+ if (srcRelations.contains(dest)) {
failAnalysis(
s"Cannot overwrite table $tableName that is also being read from.")
} else {
@@ -134,7 +134,5 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends Rule[LogicalPlan
case _ => // OK
}
-
- plan
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
index 0ec6881d7a..91c6367371 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
@@ -30,7 +30,6 @@ abstract class DataSourceTest extends QueryTest with BeforeAndAfter {
override protected[sql] lazy val analyzer: Analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = false) {
override val extendedResolutionRules =
- PreWriteCheck(catalog) ::
PreInsertCastAndRename ::
Nil
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 5682e5a2bc..b5b16f9546 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -205,7 +205,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
val message = intercept[AnalysisException] {
sql(
s"""
- |INSERT OVERWRITE TABLE oneToTen SELECT a FROM jt
+ |INSERT OVERWRITE TABLE oneToTen SELECT CAST(a AS INT) FROM jt
""".stripMargin)
}.getMessage
assert(