aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-02-05 18:07:10 -0800
committerReynold Xin <rxin@databricks.com>2015-02-05 18:07:10 -0800
commite8a5d50a96f6e7d4fce33ea19fbfc083f4351296 (patch)
treef64a871ce853a24228716a0618d9c152922ff5c8 /sql
parent85ccee81acef578ec4b40fb5f5d97b9e24314f35 (diff)
downloadspark-e8a5d50a96f6e7d4fce33ea19fbfc083f4351296.tar.gz
spark-e8a5d50a96f6e7d4fce33ea19fbfc083f4351296.tar.bz2
spark-e8a5d50a96f6e7d4fce33ea19fbfc083f4351296.zip
[SPARK-5638][SQL] Add a config flag to disable eager analysis of DataFrames
Author: Reynold Xin <rxin@databricks.com> Closes #4408 from rxin/df-config-eager and squashes the following commits: c0204cf [Reynold Xin] [SPARK-5638][SQL] Add a config flag to disable eager analysis of DataFrames.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala17
3 files changed, 23 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index 58d1175135..4911443dd6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -53,7 +53,9 @@ private[sql] class DataFrameImpl protected[sql](
def this(sqlContext: SQLContext, logicalPlan: LogicalPlan) = {
this(sqlContext, {
val qe = sqlContext.executePlan(logicalPlan)
- qe.analyzed // This should force analysis and throw errors if there are any
+ if (sqlContext.conf.dataFrameEagerAnalysis) {
+ qe.analyzed // This should force analysis and throw errors if there are any
+ }
qe
})
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 5ef3bb022f..180f5e765f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -52,6 +52,9 @@ private[spark] object SQLConf {
// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = "spark.sql.default.datasource"
+ // Whether to perform eager analysis on a DataFrame.
+ val DATAFRAME_EAGER_ANALYSIS = "spark.sql.dataframe.eagerAnalysis"
+
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
@@ -173,6 +176,9 @@ private[sql] class SQLConf extends Serializable {
private[spark] def defaultDataSourceName: String =
getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet")
+ private[spark] def dataFrameEagerAnalysis: Boolean =
+ getConf(DATAFRAME_EAGER_ANALYSIS, "true").toBoolean
+
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 74c29459d2..77fd3165f1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -17,19 +17,23 @@
package org.apache.spark.sql
+import scala.language.postfixOps
+
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.types._
-
-/* Implicits */
+import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext.logicalPlanToSparkQuery
import org.apache.spark.sql.test.TestSQLContext.implicits._
-import scala.language.postfixOps
class DataFrameSuite extends QueryTest {
import org.apache.spark.sql.TestData._
test("analysis error should be eagerly reported") {
+ val oldSetting = TestSQLContext.conf.dataFrameEagerAnalysis
+ // Eager analysis.
+ TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "true")
+
intercept[Exception] { testData.select('nonExistentName) }
intercept[Exception] {
testData.groupBy('key).agg(Map("nonExistentName" -> "sum"))
@@ -40,6 +44,13 @@ class DataFrameSuite extends QueryTest {
intercept[Exception] {
testData.groupBy($"abcd").agg(Map("key" -> "sum"))
}
+
+ // No more eager analysis once the flag is turned off
+ TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false")
+ testData.select('nonExistentName)
+
+ // Set the flag back to original value before this test.
+ TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString)
}
test("table scan") {