aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
blob: cc05e1d1d799f11d50371facf7b5e6bd981c6570 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.sql.hive

import java.io.File
import java.sql.Timestamp
import java.util.Date

import scala.collection.mutable.ArrayBuffer
import scala.tools.nsc.Properties

import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.scalatest.concurrent.Timeouts
import org.scalatest.exceptions.TestFailedDueToTimeoutException
import org.scalatest.time.SpanSugar._

import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{QueryTest, Row, SQLContext}
import org.apache.spark.sql.catalyst.catalog.CatalogFunction
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.sql.types.DecimalType
import org.apache.spark.util.{ResetSystemProperties, Utils}

/**
 * This suite tests spark-submit with applications using HiveContext.
 */
class HiveSparkSubmitSuite
  extends SparkFunSuite
  with Matchers
  with BeforeAndAfterEach
  with ResetSystemProperties
  with Timeouts {

  // TODO: rewrite these or mark them as slow tests to be run sparingly

  override def beforeEach() {
    super.beforeEach()
    System.setProperty("spark.testing", "true")
  }

  test("temporary Hive UDF: define a UDF and use it") {
    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
    val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
    val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
    val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
    val args = Seq(
      "--class", TemporaryHiveUDFTest.getClass.getName.stripSuffix("$"),
      "--name", "TemporaryHiveUDFTest",
      "--master", "local-cluster[2,1,1024]",
      "--conf", "spark.ui.enabled=false",
      "--conf", "spark.master.rest.enabled=false",
      "--driver-java-options", "-Dderby.system.durability=test",
      "--jars", jarsString,
      unusedJar.toString, "SparkSubmitClassA", "SparkSubmitClassB")
    runSparkSubmit(args)
  }

  test("permanent Hive UDF: define a UDF and use it") {
    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
    val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
    val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
    val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
    val args = Seq(
      "--class", PermanentHiveUDFTest1.getClass.getName.stripSuffix("$"),
      "--name", "PermanentHiveUDFTest1",
      "--master", "local-cluster[2,1,1024]",
      "--conf", "spark.ui.enabled=false",
      "--conf", "spark.master.rest.enabled=false",
      "--driver-java-options", "-Dderby.system.durability=test",
      "--jars", jarsString,
      unusedJar.toString, "SparkSubmitClassA", "SparkSubmitClassB")
    runSparkSubmit(args)
  }

  test("permanent Hive UDF: use a already defined permanent function") {
    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
    val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
    val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
    val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
    val args = Seq(
      "--class", PermanentHiveUDFTest2.getClass.getName.stripSuffix("$"),
      "--name", "PermanentHiveUDFTest2",
      "--master", "local-cluster[2,1,1024]",
      "--conf", "spark.ui.enabled=false",
      "--conf", "spark.master.rest.enabled=false",
      "--driver-java-options", "-Dderby.system.durability=test",
      "--jars", jarsString,
      unusedJar.toString, "SparkSubmitClassA", "SparkSubmitClassB")
    runSparkSubmit(args)
  }

  test("SPARK-8368: includes jars passed in through --jars") {
    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
    val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
    val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
    val jar3 = TestHive.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath
    val jar4 = TestHive.getHiveFile("hive-hcatalog-core-0.13.1.jar").getCanonicalPath
    val jarsString = Seq(jar1, jar2, jar3, jar4).map(j => j.toString).mkString(",")
    val args = Seq(
      "--class", SparkSubmitClassLoaderTest.getClass.getName.stripSuffix("$"),
      "--name", "SparkSubmitClassLoaderTest",
      "--master", "local-cluster[2,1,1024]",
      "--conf", "spark.ui.enabled=false",
      "--conf", "spark.master.rest.enabled=false",
      "--driver-java-options", "-Dderby.system.durability=test",
      "--jars", jarsString,
      unusedJar.toString, "SparkSubmitClassA", "SparkSubmitClassB")
    runSparkSubmit(args)
  }

  test("SPARK-8020: set sql conf in spark conf") {
    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
    val args = Seq(
      "--class", SparkSQLConfTest.getClass.getName.stripSuffix("$"),
      "--name", "SparkSQLConfTest",
      "--master", "local-cluster[2,1,1024]",
      "--conf", "spark.ui.enabled=false",
      "--conf", "spark.master.rest.enabled=false",
      "--conf", "spark.sql.hive.metastore.version=0.12",
      "--conf", "spark.sql.hive.metastore.jars=maven",
      "--driver-java-options", "-Dderby.system.durability=test",
      unusedJar.toString)
    runSparkSubmit(args)
  }

  // TODO: re-enable this after rebuilding the jar (HiveContext was removed)
  ignore("SPARK-8489: MissingRequirementError during reflection") {
    // This test uses a pre-built jar to test SPARK-8489. In a nutshell, this test creates
    // a HiveContext and uses it to create a data frame from an RDD using reflection.
    // Before the fix in SPARK-8470, this results in a MissingRequirementError because
    // the HiveContext code mistakenly overrides the class loader that contains user classes.
    // For more detail, see sql/hive/src/test/resources/regression-test-SPARK-8489/*scala.
    val version = Properties.versionNumberString match {
      case v if v.startsWith("2.10") || v.startsWith("2.11") => v.substring(0, 4)
      case x => throw new Exception(s"Unsupported Scala Version: $x")
    }
    val testJar = s"sql/hive/src/test/resources/regression-test-SPARK-8489/test-$version.jar"
    val args = Seq(
      "--conf", "spark.ui.enabled=false",
      "--conf", "spark.master.rest.enabled=false",
      "--driver-java-options", "-Dderby.system.durability=test",
      "--class", "Main",
      testJar)
    runSparkSubmit(args)
  }

  test("SPARK-9757 Persist Parquet relation with decimal column") {
    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
    val args = Seq(
      "--class", SPARK_9757.getClass.getName.stripSuffix("$"),
      "--name", "SparkSQLConfTest",
      "--master", "local-cluster[2,1,1024]",
      "--conf", "spark.ui.enabled=false",
      "--conf", "spark.master.rest.enabled=false",
      "--driver-java-options", "-Dderby.system.durability=test",
      unusedJar.toString)
    runSparkSubmit(args)
  }

  test("SPARK-11009 fix wrong result of Window function in cluster mode") {
    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
    val args = Seq(
      "--class", SPARK_11009.getClass.getName.stripSuffix("$"),
      "--name", "SparkSQLConfTest",
      "--master", "local-cluster[2,1,1024]",
      "--conf", "spark.ui.enabled=false",
      "--conf", "spark.master.rest.enabled=false",
      "--driver-java-options", "-Dderby.system.durability=test",
      unusedJar.toString)
    runSparkSubmit(args)
  }

  test("SPARK-14244 fix window partition size attribute binding failure") {
    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
    val args = Seq(
      "--class", SPARK_14244.getClass.getName.stripSuffix("$"),
      "--name", "SparkSQLConfTest",
      "--master", "local-cluster[2,1,1024]",
      "--conf", "spark.ui.enabled=false",
      "--conf", "spark.master.rest.enabled=false",
      "--driver-java-options", "-Dderby.system.durability=test",
      unusedJar.toString)
    runSparkSubmit(args)
  }

  // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
  // This is copied from org.apache.spark.deploy.SparkSubmitSuite
  private def runSparkSubmit(args: Seq[String]): Unit = {
    val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
    val history = ArrayBuffer.empty[String]
    val commands = Seq("./bin/spark-submit") ++ args
    val commandLine = commands.mkString("'", "' '", "'")

    val builder = new ProcessBuilder(commands: _*).directory(new File(sparkHome))
    val env = builder.environment()
    env.put("SPARK_TESTING", "1")
    env.put("SPARK_HOME", sparkHome)

    def captureOutput(source: String)(line: String): Unit = {
      // This test suite has some weird behaviors when executed on Jenkins:
      //
      // 1. Sometimes it gets extremely slow out of unknown reason on Jenkins.  Here we add a
      //    timestamp to provide more diagnosis information.
      // 2. Log lines are not correctly redirected to unit-tests.log as expected, so here we print
      //    them out for debugging purposes.
      val logLine = s"${new Timestamp(new Date().getTime)} - $source> $line"
      // scalastyle:off println
      println(logLine)
      // scalastyle:on println
      history += logLine
    }

    val process = builder.start()
    new ProcessOutputCapturer(process.getInputStream, captureOutput("stdout")).start()
    new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start()

    try {
      val exitCode = failAfter(300.seconds) { process.waitFor() }
      if (exitCode != 0) {
        // include logs in output. Note that logging is async and may not have completed
        // at the time this exception is raised
        Thread.sleep(1000)
        val historyLog = history.mkString("\n")
        fail {
          s"""spark-submit returned with exit code $exitCode.
             |Command line: $commandLine
             |
             |$historyLog
           """.stripMargin
        }
      }
    } catch {
      case to: TestFailedDueToTimeoutException =>
        val historyLog = history.mkString("\n")
        fail(s"Timeout of $commandLine" +
            s" See the log4j logs for more detail." +
            s"\n$historyLog", to)
        case t: Throwable => throw t
    } finally {
      // Ensure we still kill the process in case it timed out
      process.destroy()
    }
  }
}

// This application is used to test defining a new Hive UDF (with an associated jar)
// and use this UDF. We need to run this test in separate JVM to make sure we
// can load the jar defined with the function.
object TemporaryHiveUDFTest extends Logging {
  def main(args: Array[String]) {
    Utils.configTestLog4j("INFO")
    val conf = new SparkConf()
    conf.set("spark.ui.enabled", "false")
    val sc = new SparkContext(conf)
    val hiveContext = new TestHiveContext(sc)

    // Load a Hive UDF from the jar.
    logInfo("Registering a temporary Hive UDF provided in a jar.")
    val jar = hiveContext.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath
    hiveContext.sql(
      s"""
         |CREATE TEMPORARY FUNCTION example_max
         |AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax'
         |USING JAR '$jar'
      """.stripMargin)
    val source =
      hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val")
    source.registerTempTable("sourceTable")
    // Actually use the loaded UDF.
    logInfo("Using the UDF.")
    val result = hiveContext.sql(
      "SELECT example_max(key) as key, val FROM sourceTable GROUP BY val")
    logInfo("Running a simple query on the table.")
    val count = result.orderBy("key", "val").count()
    if (count != 10) {
      throw new Exception(s"Result table should have 10 rows instead of $count rows")
    }
    hiveContext.sql("DROP temporary FUNCTION example_max")
    logInfo("Test finishes.")
    sc.stop()
  }
}

// This application is used to test defining a new Hive UDF (with an associated jar)
// and use this UDF. We need to run this test in separate JVM to make sure we
// can load the jar defined with the function.
object PermanentHiveUDFTest1 extends Logging {
  def main(args: Array[String]) {
    Utils.configTestLog4j("INFO")
    val conf = new SparkConf()
    conf.set("spark.ui.enabled", "false")
    val sc = new SparkContext(conf)
    val hiveContext = new TestHiveContext(sc)

    // Load a Hive UDF from the jar.
    logInfo("Registering a permanent Hive UDF provided in a jar.")
    val jar = hiveContext.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath
    hiveContext.sql(
      s"""
         |CREATE FUNCTION example_max
         |AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax'
         |USING JAR '$jar'
      """.stripMargin)
    val source =
      hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val")
    source.registerTempTable("sourceTable")
    // Actually use the loaded UDF.
    logInfo("Using the UDF.")
    val result = hiveContext.sql(
      "SELECT example_max(key) as key, val FROM sourceTable GROUP BY val")
    logInfo("Running a simple query on the table.")
    val count = result.orderBy("key", "val").count()
    if (count != 10) {
      throw new Exception(s"Result table should have 10 rows instead of $count rows")
    }
    hiveContext.sql("DROP FUNCTION example_max")
    logInfo("Test finishes.")
    sc.stop()
  }
}

// This application is used to test that a pre-defined permanent function with a jar
// resources can be used. We need to run this test in separate JVM to make sure we
// can load the jar defined with the function.
object PermanentHiveUDFTest2 extends Logging {
  def main(args: Array[String]) {
    Utils.configTestLog4j("INFO")
    val conf = new SparkConf()
    conf.set("spark.ui.enabled", "false")
    val sc = new SparkContext(conf)
    val hiveContext = new TestHiveContext(sc)
    // Load a Hive UDF from the jar.
    logInfo("Write the metadata of a permanent Hive UDF into metastore.")
    val jar = hiveContext.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath
    val function = CatalogFunction(
      FunctionIdentifier("example_max"),
      "org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax",
      ("JAR" -> jar) :: Nil)
    hiveContext.sessionState.catalog.createFunction(function, ignoreIfExists = false)
    val source =
      hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val")
    source.registerTempTable("sourceTable")
    // Actually use the loaded UDF.
    logInfo("Using the UDF.")
    val result = hiveContext.sql(
      "SELECT example_max(key) as key, val FROM sourceTable GROUP BY val")
    logInfo("Running a simple query on the table.")
    val count = result.orderBy("key", "val").count()
    if (count != 10) {
      throw new Exception(s"Result table should have 10 rows instead of $count rows")
    }
    hiveContext.sql("DROP FUNCTION example_max")
    logInfo("Test finishes.")
    sc.stop()
  }
}

// This object is used for testing SPARK-8368: https://issues.apache.org/jira/browse/SPARK-8368.
// We test if we can load user jars in both driver and executors when HiveContext is used.
object SparkSubmitClassLoaderTest extends Logging {
  def main(args: Array[String]) {
    Utils.configTestLog4j("INFO")
    val conf = new SparkConf()
    conf.set("spark.ui.enabled", "false")
    val sc = new SparkContext(conf)
    val hiveContext = new TestHiveContext(sc)
    val df = hiveContext.createDataFrame((1 to 100).map(i => (i, i))).toDF("i", "j")
    logInfo("Testing load classes at the driver side.")
    // First, we load classes at driver side.
    try {
      Utils.classForName(args(0))
      Utils.classForName(args(1))
    } catch {
      case t: Throwable =>
        throw new Exception("Could not load user class from jar:\n", t)
    }
    // Second, we load classes at the executor side.
    logInfo("Testing load classes at the executor side.")
    val result = df.rdd.mapPartitions { x =>
      var exception: String = null
      try {
        Utils.classForName(args(0))
        Utils.classForName(args(1))
      } catch {
        case t: Throwable =>
          exception = t + "\n" + Utils.exceptionString(t)
          exception = exception.replaceAll("\n", "\n\t")
      }
      Option(exception).toSeq.iterator
    }.collect()
    if (result.nonEmpty) {
      throw new Exception("Could not load user class from jar:\n" + result(0))
    }

    // Load a Hive UDF from the jar.
    logInfo("Registering temporary Hive UDF provided in a jar.")
    hiveContext.sql(
      """
        |CREATE TEMPORARY FUNCTION example_max
        |AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax'
      """.stripMargin)
    val source =
      hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val")
    source.registerTempTable("sourceTable")
    // Load a Hive SerDe from the jar.
    logInfo("Creating a Hive table with a SerDe provided in a jar.")
    hiveContext.sql(
      """
        |CREATE TABLE t1(key int, val string)
        |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
      """.stripMargin)
    // Actually use the loaded UDF and SerDe.
    logInfo("Writing data into the table.")
    hiveContext.sql(
      "INSERT INTO TABLE t1 SELECT example_max(key) as key, val FROM sourceTable GROUP BY val")
    logInfo("Running a simple query on the table.")
    val count = hiveContext.table("t1").orderBy("key", "val").count()
    if (count != 10) {
      throw new Exception(s"table t1 should have 10 rows instead of $count rows")
    }
    logInfo("Test finishes.")
    sc.stop()
  }
}

// This object is used for testing SPARK-8020: https://issues.apache.org/jira/browse/SPARK-8020.
// We test if we can correctly set spark sql configurations when HiveContext is used.
object SparkSQLConfTest extends Logging {
  def main(args: Array[String]) {
    Utils.configTestLog4j("INFO")
    // We override the SparkConf to add spark.sql.hive.metastore.version and
    // spark.sql.hive.metastore.jars to the beginning of the conf entry array.
    // So, if metadataHive get initialized after we set spark.sql.hive.metastore.version but
    // before spark.sql.hive.metastore.jars get set, we will see the following exception:
    // Exception in thread "main" java.lang.IllegalArgumentException: Builtin jars can only
    // be used when hive execution version == hive metastore version.
    // Execution: 0.13.1 != Metastore: 0.12. Specify a valid path to the correct hive jars
    // using $HIVE_METASTORE_JARS or change spark.sql.hive.metastore.version to 0.13.1.
    val conf = new SparkConf() {
      override def getAll: Array[(String, String)] = {
        def isMetastoreSetting(conf: String): Boolean = {
          conf == "spark.sql.hive.metastore.version" || conf == "spark.sql.hive.metastore.jars"
        }
        // If there is any metastore settings, remove them.
        val filteredSettings = super.getAll.filterNot(e => isMetastoreSetting(e._1))

        // Always add these two metastore settings at the beginning.
        ("spark.sql.hive.metastore.version" -> "0.12") +:
        ("spark.sql.hive.metastore.jars" -> "maven") +:
        filteredSettings
      }

      // For this simple test, we do not really clone this object.
      override def clone: SparkConf = this
    }
    conf.set("spark.ui.enabled", "false")
    val sc = new SparkContext(conf)
    val hiveContext = new TestHiveContext(sc)
    // Run a simple command to make sure all lazy vals in hiveContext get instantiated.
    hiveContext.tables().collect()
    sc.stop()
  }
}

object SPARK_9757 extends QueryTest {
  import org.apache.spark.sql.functions._

  protected var sqlContext: SQLContext = _

  def main(args: Array[String]): Unit = {
    Utils.configTestLog4j("INFO")

    val sparkContext = new SparkContext(
      new SparkConf()
        .set("spark.sql.hive.metastore.version", "0.13.1")
        .set("spark.sql.hive.metastore.jars", "maven")
        .set("spark.ui.enabled", "false"))

    val hiveContext = new TestHiveContext(sparkContext)
    sqlContext = hiveContext
    import hiveContext.implicits._

    val dir = Utils.createTempDir()
    dir.delete()

    try {
      {
        val df =
          hiveContext
            .range(10)
            .select(('id + 0.1) cast DecimalType(10, 3) as 'dec)
        df.write.option("path", dir.getCanonicalPath).mode("overwrite").saveAsTable("t")
        checkAnswer(hiveContext.table("t"), df)
      }

      {
        val df =
          hiveContext
            .range(10)
            .select(callUDF("struct", ('id + 0.2) cast DecimalType(10, 3)) as 'dec_struct)
        df.write.option("path", dir.getCanonicalPath).mode("overwrite").saveAsTable("t")
        checkAnswer(hiveContext.table("t"), df)
      }
    } finally {
      dir.delete()
      hiveContext.sql("DROP TABLE t")
      sparkContext.stop()
    }
  }
}

object SPARK_11009 extends QueryTest {
  import org.apache.spark.sql.functions._

  protected var sqlContext: SQLContext = _

  def main(args: Array[String]): Unit = {
    Utils.configTestLog4j("INFO")

    val sparkContext = new SparkContext(
      new SparkConf()
        .set("spark.ui.enabled", "false")
        .set("spark.sql.shuffle.partitions", "100"))

    val hiveContext = new TestHiveContext(sparkContext)
    sqlContext = hiveContext

    try {
      val df = sqlContext.range(1 << 20)
      val df2 = df.select((df("id") % 1000).alias("A"), (df("id") / 1000).alias("B"))
      val ws = Window.partitionBy(df2("A")).orderBy(df2("B"))
      val df3 = df2.select(df2("A"), df2("B"), row_number().over(ws).alias("rn")).filter("rn < 0")
      if (df3.rdd.count() != 0) {
        throw new Exception("df3 should have 0 output row.")
      }
    } finally {
      sparkContext.stop()
    }
  }
}

object SPARK_14244 extends QueryTest {
  import org.apache.spark.sql.expressions.Window
  import org.apache.spark.sql.functions._

  protected var sqlContext: SQLContext = _

  def main(args: Array[String]): Unit = {
    Utils.configTestLog4j("INFO")

    val sparkContext = new SparkContext(
      new SparkConf()
        .set("spark.ui.enabled", "false")
        .set("spark.sql.shuffle.partitions", "100"))

    val hiveContext = new TestHiveContext(sparkContext)
    sqlContext = hiveContext

    import hiveContext.implicits._

    try {
      val window = Window.orderBy('id)
      val df = sqlContext.range(2).select(cume_dist().over(window).as('cdist)).orderBy('cdist)
      checkAnswer(df, Seq(Row(0.5D), Row(1.0D)))
    } finally {
      sparkContext.stop()
    }
  }
}