aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-08-05 09:01:45 -0700
committerYin Huai <yhuai@databricks.com>2015-08-05 09:01:45 -0700
commit23d982204bb9ef74d3b788a32ce6608116968719 (patch)
tree4faef48a8551c8da604463613c2c1b6fc9762d30 /sql/hive
parent34dcf10104460816382908b2b8eeb6c925e862bf (diff)
downloadspark-23d982204bb9ef74d3b788a32ce6608116968719.tar.gz
spark-23d982204bb9ef74d3b788a32ce6608116968719.tar.bz2
spark-23d982204bb9ef74d3b788a32ce6608116968719.zip
[SPARK-9141] [SQL] Remove project collapsing from DataFrame API
Currently we collapse successive projections that are added by `withColumn`. However, this optimization violates the constraint that adding nodes to a plan will never change its analyzed form and thus breaks caching. Instead of doing early optimization, in this PR I just fix some low-hanging slowness in the analyzer. In particular, I add a mechanism for skipping already analyzed subplans, `resolveOperators` and `resolveExpression`. Since trees are generally immutable after construction, it's safe to annotate a plan as already analyzed as any transformation will create a new tree with this bit no longer set. Together these result in a faster analyzer than before, even with added timing instrumentation. ``` Original Code [info] 3430ms [info] 2205ms [info] 1973ms [info] 1982ms [info] 1916ms Without Project Collapsing in DataFrame [info] 44610ms [info] 45977ms [info] 46423ms [info] 46306ms [info] 54723ms With analyzer optimizations [info] 6394ms [info] 4630ms [info] 4388ms [info] 4093ms [info] 4113ms With resolveOperators [info] 2495ms [info] 1380ms [info] 1685ms [info] 1414ms [info] 1240ms ``` Author: Michael Armbrust <michael@databricks.com> Closes #7920 from marmbrus/withColumnCache and squashes the following commits: 2145031 [Michael Armbrust] fix hive udfs tests 5a5a525 [Michael Armbrust] remove wrong comment 7a507d5 [Michael Armbrust] style b59d710 [Michael Armbrust] revert small change 1fa5949 [Michael Armbrust] move logic into LogicalPlan, add tests 0e2cb43 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into withColumnCache c926e24 [Michael Armbrust] naming e593a2d [Michael Armbrust] style f5a929e [Michael Armbrust] [SPARK-9141][SQL] Remove project collapsing from DataFrame API 38b1c83 [Michael Armbrust] WIP
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala2
5 files changed, 14 insertions, 9 deletions
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index d4fc6c2b6e..ab309e0a1d 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution
import java.io.File
import java.util.{Locale, TimeZone}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.SQLConf
@@ -50,6 +51,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, 5)
// Enable in-memory partition pruning for testing purposes
TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
+ RuleExecutor.resetTime()
}
override def afterAll() {
@@ -58,6 +60,9 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
Locale.setDefault(originalLocale)
TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
+
+ // For debugging dump some statistics about how much time was spent in various optimizer rules.
+ logWarning(RuleExecutor.dumpTimeSpent())
}
/** A list of tests deemed out of scope currently and thus completely disregarded. */
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 16c186627f..6b37af99f4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -391,7 +391,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
*/
object ParquetConversions extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
- if (!plan.resolved) {
+ if (!plan.resolved || plan.analyzed) {
return plan
}
@@ -418,8 +418,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
(relation, parquetRelation, attributedRewrites)
// Read path
- case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
- if hive.convertMetastoreParquet &&
+ case relation: MetastoreRelation if hive.convertMetastoreParquet &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 8a86a87368..7182246e46 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -133,8 +133,7 @@ private[hive] case class HiveSimpleUDF(funcWrapper: HiveFunctionWrapper, childre
@transient
private lazy val conversionHelper = new ConversionHelper(method, arguments)
- @transient
- lazy val dataType = javaClassToDataType(method.getReturnType)
+ val dataType = javaClassToDataType(method.getReturnType)
@transient
lazy val returnInspector = ObjectInspectorFactory.getReflectionObjectInspector(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 7069afc9f7..10f2902e5e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -182,7 +182,7 @@ class HiveUDFSuite extends QueryTest {
val errMsg = intercept[AnalysisException] {
sql("SELECT testUDFToListString(s) FROM inputTable")
}
- assert(errMsg.getMessage === "List type in java is unsupported because " +
+ assert(errMsg.getMessage contains "List type in java is unsupported because " +
"JVM type erasure makes spark fail to catch a component type in List<>;")
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListString")
@@ -197,7 +197,7 @@ class HiveUDFSuite extends QueryTest {
val errMsg = intercept[AnalysisException] {
sql("SELECT testUDFToListInt(s) FROM inputTable")
}
- assert(errMsg.getMessage === "List type in java is unsupported because " +
+ assert(errMsg.getMessage contains "List type in java is unsupported because " +
"JVM type erasure makes spark fail to catch a component type in List<>;")
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListInt")
@@ -213,7 +213,7 @@ class HiveUDFSuite extends QueryTest {
val errMsg = intercept[AnalysisException] {
sql("SELECT testUDFToStringIntMap(s) FROM inputTable")
}
- assert(errMsg.getMessage === "Map type in java is unsupported because " +
+ assert(errMsg.getMessage contains "Map type in java is unsupported because " +
"JVM type erasure makes spark fail to catch key and value types in Map<>;")
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToStringIntMap")
@@ -229,7 +229,7 @@ class HiveUDFSuite extends QueryTest {
val errMsg = intercept[AnalysisException] {
sql("SELECT testUDFToIntIntMap(s) FROM inputTable")
}
- assert(errMsg.getMessage === "Map type in java is unsupported because " +
+ assert(errMsg.getMessage contains "Map type in java is unsupported because " +
"JVM type erasure makes spark fail to catch key and value types in Map<>;")
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToIntIntMap")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index ff9a3694d6..1dff07a6de 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -948,6 +948,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils {
}
test("SPARK-7595: Window will cause resolve failed with self join") {
+ sql("SELECT * FROM src") // Force loading of src table.
+
checkAnswer(sql(
"""
|with