aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala209
1 files changed, 208 insertions, 1 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 16747cab37..c5417b06a4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -31,7 +31,9 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark._
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{QueryTest, SQLContext}
+import org.apache.spark.sql.{QueryTest, Row, SQLContext}
+import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
@@ -55,6 +57,57 @@ class HiveSparkSubmitSuite
System.setProperty("spark.testing", "true")
}
+ test("temporary Hive UDF: define a UDF and use it") {
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+ val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
+ val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
+ val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
+ val args = Seq(
+ "--class", TemporaryHiveUDFTest.getClass.getName.stripSuffix("$"),
+ "--name", "TemporaryHiveUDFTest",
+ "--master", "local-cluster[2,1,1024]",
+ "--conf", "spark.ui.enabled=false",
+ "--conf", "spark.master.rest.enabled=false",
+ "--driver-java-options", "-Dderby.system.durability=test",
+ "--jars", jarsString,
+ unusedJar.toString, "SparkSubmitClassA", "SparkSubmitClassB")
+ runSparkSubmit(args)
+ }
+
+ test("permanent Hive UDF: define a UDF and use it") {
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+ val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
+ val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
+ val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
+ val args = Seq(
+ "--class", PermanentHiveUDFTest1.getClass.getName.stripSuffix("$"),
+ "--name", "PermanentHiveUDFTest1",
+ "--master", "local-cluster[2,1,1024]",
+ "--conf", "spark.ui.enabled=false",
+ "--conf", "spark.master.rest.enabled=false",
+ "--driver-java-options", "-Dderby.system.durability=test",
+ "--jars", jarsString,
+ unusedJar.toString, "SparkSubmitClassA", "SparkSubmitClassB")
+ runSparkSubmit(args)
+ }
+
+ test("permanent Hive UDF: use a already defined permanent function") {
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+ val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
+ val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
+ val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
+ val args = Seq(
+ "--class", PermanentHiveUDFTest2.getClass.getName.stripSuffix("$"),
+ "--name", "PermanentHiveUDFTest2",
+ "--master", "local-cluster[2,1,1024]",
+ "--conf", "spark.ui.enabled=false",
+ "--conf", "spark.master.rest.enabled=false",
+ "--driver-java-options", "-Dderby.system.durability=test",
+ "--jars", jarsString,
+ unusedJar.toString, "SparkSubmitClassA", "SparkSubmitClassB")
+ runSparkSubmit(args)
+ }
+
test("SPARK-8368: includes jars passed in through --jars") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
@@ -135,6 +188,19 @@ class HiveSparkSubmitSuite
runSparkSubmit(args)
}
+ test("SPARK-14244 fix window partition size attribute binding failure") {
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+ val args = Seq(
+ "--class", SPARK_14244.getClass.getName.stripSuffix("$"),
+ "--name", "SparkSQLConfTest",
+ "--master", "local-cluster[2,1,1024]",
+ "--conf", "spark.ui.enabled=false",
+ "--conf", "spark.master.rest.enabled=false",
+ "--driver-java-options", "-Dderby.system.durability=test",
+ unusedJar.toString)
+ runSparkSubmit(args)
+ }
+
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
// This is copied from org.apache.spark.deploy.SparkSubmitSuite
private def runSparkSubmit(args: Seq[String]): Unit = {
@@ -195,6 +261,118 @@ class HiveSparkSubmitSuite
}
}
+// This application is used to test defining a new Hive UDF (with an associated jar)
+// and use this UDF. We need to run this test in separate JVM to make sure we
+// can load the jar defined with the function.
+object TemporaryHiveUDFTest extends Logging {
+ def main(args: Array[String]) {
+ Utils.configTestLog4j("INFO")
+ val conf = new SparkConf()
+ conf.set("spark.ui.enabled", "false")
+ val sc = new SparkContext(conf)
+ val hiveContext = new TestHiveContext(sc)
+
+ // Load a Hive UDF from the jar.
+ logInfo("Registering a temporary Hive UDF provided in a jar.")
+ val jar = hiveContext.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath
+ hiveContext.sql(
+ s"""
+ |CREATE TEMPORARY FUNCTION example_max
+ |AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax'
+ |USING JAR '$jar'
+ """.stripMargin)
+ val source =
+ hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val")
+ source.registerTempTable("sourceTable")
+ // Actually use the loaded UDF.
+ logInfo("Using the UDF.")
+ val result = hiveContext.sql(
+ "SELECT example_max(key) as key, val FROM sourceTable GROUP BY val")
+ logInfo("Running a simple query on the table.")
+ val count = result.orderBy("key", "val").count()
+ if (count != 10) {
+ throw new Exception(s"Result table should have 10 rows instead of $count rows")
+ }
+ hiveContext.sql("DROP temporary FUNCTION example_max")
+ logInfo("Test finishes.")
+ sc.stop()
+ }
+}
+
+// This application is used to test defining a new Hive UDF (with an associated jar)
+// and use this UDF. We need to run this test in separate JVM to make sure we
+// can load the jar defined with the function.
+object PermanentHiveUDFTest1 extends Logging {
+ def main(args: Array[String]) {
+ Utils.configTestLog4j("INFO")
+ val conf = new SparkConf()
+ conf.set("spark.ui.enabled", "false")
+ val sc = new SparkContext(conf)
+ val hiveContext = new TestHiveContext(sc)
+
+ // Load a Hive UDF from the jar.
+ logInfo("Registering a permanent Hive UDF provided in a jar.")
+ val jar = hiveContext.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath
+ hiveContext.sql(
+ s"""
+ |CREATE FUNCTION example_max
+ |AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax'
+ |USING JAR '$jar'
+ """.stripMargin)
+ val source =
+ hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val")
+ source.registerTempTable("sourceTable")
+ // Actually use the loaded UDF.
+ logInfo("Using the UDF.")
+ val result = hiveContext.sql(
+ "SELECT example_max(key) as key, val FROM sourceTable GROUP BY val")
+ logInfo("Running a simple query on the table.")
+ val count = result.orderBy("key", "val").count()
+ if (count != 10) {
+ throw new Exception(s"Result table should have 10 rows instead of $count rows")
+ }
+ hiveContext.sql("DROP FUNCTION example_max")
+ logInfo("Test finishes.")
+ sc.stop()
+ }
+}
+
+// This application is used to test that a pre-defined permanent function with a jar
+// resources can be used. We need to run this test in separate JVM to make sure we
+// can load the jar defined with the function.
+object PermanentHiveUDFTest2 extends Logging {
+ def main(args: Array[String]) {
+ Utils.configTestLog4j("INFO")
+ val conf = new SparkConf()
+ conf.set("spark.ui.enabled", "false")
+ val sc = new SparkContext(conf)
+ val hiveContext = new TestHiveContext(sc)
+ // Load a Hive UDF from the jar.
+ logInfo("Write the metadata of a permanent Hive UDF into metastore.")
+ val jar = hiveContext.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath
+ val function = CatalogFunction(
+ FunctionIdentifier("example_max"),
+ "org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax",
+ ("JAR" -> jar) :: Nil)
+ hiveContext.sessionState.catalog.createFunction(function, ignoreIfExists = false)
+ val source =
+ hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val")
+ source.registerTempTable("sourceTable")
+ // Actually use the loaded UDF.
+ logInfo("Using the UDF.")
+ val result = hiveContext.sql(
+ "SELECT example_max(key) as key, val FROM sourceTable GROUP BY val")
+ logInfo("Running a simple query on the table.")
+ val count = result.orderBy("key", "val").count()
+ if (count != 10) {
+ throw new Exception(s"Result table should have 10 rows instead of $count rows")
+ }
+ hiveContext.sql("DROP FUNCTION example_max")
+ logInfo("Test finishes.")
+ sc.stop()
+ }
+}
+
// This object is used for testing SPARK-8368: https://issues.apache.org/jira/browse/SPARK-8368.
// We test if we can load user jars in both driver and executors when HiveContext is used.
object SparkSubmitClassLoaderTest extends Logging {
@@ -378,3 +556,32 @@ object SPARK_11009 extends QueryTest {
}
}
}
+
+object SPARK_14244 extends QueryTest {
+ import org.apache.spark.sql.expressions.Window
+ import org.apache.spark.sql.functions._
+
+ protected var sqlContext: SQLContext = _
+
+ def main(args: Array[String]): Unit = {
+ Utils.configTestLog4j("INFO")
+
+ val sparkContext = new SparkContext(
+ new SparkConf()
+ .set("spark.ui.enabled", "false")
+ .set("spark.sql.shuffle.partitions", "100"))
+
+ val hiveContext = new TestHiveContext(sparkContext)
+ sqlContext = hiveContext
+
+ import hiveContext.implicits._
+
+ try {
+ val window = Window.orderBy('id)
+ val df = sqlContext.range(2).select(cume_dist().over(window).as('cdist)).orderBy('cdist)
+ checkAnswer(df, Seq(Row(0.5D), Row(1.0D)))
+ } finally {
+ sparkContext.stop()
+ }
+ }
+}