aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-04-01 22:00:24 -0700
committerYin Huai <yhuai@databricks.com>2016-04-01 22:00:24 -0700
commit27e71a2cd930ae28c82c9c3ee6476a12ea165fdf (patch)
tree8170697ae9631a2f9b307a785ef221bd3fd7b60f /sql/hive/src
parent4fc35e6f5c590feb47cbcb5b1136f2e985677b3f (diff)
downloadspark-27e71a2cd930ae28c82c9c3ee6476a12ea165fdf.tar.gz
spark-27e71a2cd930ae28c82c9c3ee6476a12ea165fdf.tar.bz2
spark-27e71a2cd930ae28c82c9c3ee6476a12ea165fdf.zip
[SPARK-14244][SQL] Don't use SizeBasedWindowFunction.n created on executor side when evaluating window functions
## What changes were proposed in this pull request? `SizeBasedWindowFunction.n` is a global singleton attribute created for evaluating size based aggregate window functions like `CUME_DIST`. However, this attribute gets different expression IDs when created on both driver side and executor side. This PR adds `withPartitionSize` method to `SizeBasedWindowFunction` so that we can easily rewrite `SizeBasedWindowFunction.n` on executor side. ## How was this patch tested? A test case is added in `HiveSparkSubmitSuite`, which supports launching multi-process clusters. Author: Cheng Lian <lian@databricks.com> Closes #12040 from liancheng/spark-14244-fix-sized-window-function.
Diffstat (limited to 'sql/hive/src')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala44
2 files changed, 46 insertions, 2 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index c07c428895..5ada3d5598 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -107,7 +107,9 @@ private[hive] class HiveFunctionRegistry(
// If there is any other error, we throw an AnalysisException.
val errorMessage = s"No handler for Hive udf ${functionInfo.getFunctionClass} " +
s"because: ${throwable.getMessage}."
- throw new AnalysisException(errorMessage)
+ val analysisException = new AnalysisException(errorMessage)
+ analysisException.setStackTrace(throwable.getStackTrace)
+ throw analysisException
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 16747cab37..53dec6348f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -31,7 +31,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark._
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{QueryTest, SQLContext}
+import org.apache.spark.sql.{QueryTest, Row, SQLContext}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
@@ -135,6 +135,19 @@ class HiveSparkSubmitSuite
runSparkSubmit(args)
}
+ test("SPARK-14244 fix window partition size attribute binding failure") {
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+ val args = Seq(
+ "--class", SPARK_14244.getClass.getName.stripSuffix("$"),
+ "--name", "SparkSQLConfTest",
+ "--master", "local-cluster[2,1,1024]",
+ "--conf", "spark.ui.enabled=false",
+ "--conf", "spark.master.rest.enabled=false",
+ "--driver-java-options", "-Dderby.system.durability=test",
+ unusedJar.toString)
+ runSparkSubmit(args)
+ }
+
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
// This is copied from org.apache.spark.deploy.SparkSubmitSuite
private def runSparkSubmit(args: Seq[String]): Unit = {
@@ -378,3 +391,32 @@ object SPARK_11009 extends QueryTest {
}
}
}
+
+object SPARK_14244 extends QueryTest {
+ import org.apache.spark.sql.expressions.Window
+ import org.apache.spark.sql.functions._
+
+ protected var sqlContext: SQLContext = _
+
+ def main(args: Array[String]): Unit = {
+ Utils.configTestLog4j("INFO")
+
+ val sparkContext = new SparkContext(
+ new SparkConf()
+ .set("spark.ui.enabled", "false")
+ .set("spark.sql.shuffle.partitions", "100"))
+
+ val hiveContext = new TestHiveContext(sparkContext)
+ sqlContext = hiveContext
+
+ import hiveContext.implicits._
+
+ try {
+ val window = Window.orderBy('id)
+ val df = sqlContext.range(2).select(cume_dist().over(window).as('cdist)).orderBy('cdist)
+ checkAnswer(df, Seq(Row(0.5D), Row(1.0D)))
+ } finally {
+ sparkContext.stop()
+ }
+ }
+}